Get driver from Jenkins cloud by name
This commit is contained in:
parent
d5df4b8f3d
commit
910b678ffe
|
@ -0,0 +1,110 @@
|
||||||
|
import requests
|
||||||
|
|
||||||
|
|
||||||
|
class DriverUrl:
|
||||||
|
def __init__(self, name: str, url: str):
|
||||||
|
self.name = name
|
||||||
|
self.url = url
|
||||||
|
|
||||||
|
|
||||||
|
def get_all_drivers() -> [DriverUrl]:
|
||||||
|
URL = "https://jenkins.cellario.cloud/job/drivers/api/json"
|
||||||
|
user, token = read_credentials()
|
||||||
|
|
||||||
|
res = requests.get(URL, auth=(user, token))
|
||||||
|
|
||||||
|
drivers: [DriverUrl] = []
|
||||||
|
|
||||||
|
if res.status_code != 200:
|
||||||
|
raise Exception("Failed to get driver list", res.status_code)
|
||||||
|
|
||||||
|
res_json = res.json()
|
||||||
|
jobs = res_json["jobs"]
|
||||||
|
|
||||||
|
for job in jobs:
|
||||||
|
driver_url = DriverUrl(job["name"], job["url"])
|
||||||
|
drivers.append(driver_url)
|
||||||
|
|
||||||
|
return drivers
|
||||||
|
|
||||||
|
|
||||||
|
def get_driver_by_exact_name(name: str) -> DriverUrl:
|
||||||
|
drivers = get_all_drivers()
|
||||||
|
|
||||||
|
try:
|
||||||
|
res = next(driver for driver in drivers if driver.name == name)
|
||||||
|
except StopIteration:
|
||||||
|
raise Exception("Driver not found")
|
||||||
|
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
def find_master_branch(driver: DriverUrl) -> str:
|
||||||
|
user, token = read_credentials()
|
||||||
|
driver_jobs_res = requests.get(
|
||||||
|
driver.url + "/api/json", auth=(user, token))
|
||||||
|
|
||||||
|
if driver_jobs_res.status_code != 200:
|
||||||
|
raise Exception("Failed to read driver jobs page",
|
||||||
|
driver_jobs_res.status_code)
|
||||||
|
|
||||||
|
driver_jobs_json = driver_jobs_res.json()
|
||||||
|
|
||||||
|
master_branch_url = ""
|
||||||
|
for job in driver_jobs_json["jobs"]:
|
||||||
|
if job["name"] == "master":
|
||||||
|
master_branch_url = job["url"]
|
||||||
|
|
||||||
|
return master_branch_url
|
||||||
|
|
||||||
|
|
||||||
|
def get_hrb_artifact_url(master_url: str) -> str:
|
||||||
|
user, token = read_credentials()
|
||||||
|
res = requests.get(master_url + "/lastSuccessfulBuild" +
|
||||||
|
"/api/json", auth=(user, token))
|
||||||
|
|
||||||
|
if res.status_code != 200:
|
||||||
|
raise Exception("Failed to read master branch", res.status_code)
|
||||||
|
|
||||||
|
json = res.json()
|
||||||
|
|
||||||
|
try:
|
||||||
|
hrb_url = next(artifact["relativePath"]
|
||||||
|
for artifact in json["artifacts"]
|
||||||
|
if ".hrb" in artifact["relativePath"])
|
||||||
|
except StopIteration:
|
||||||
|
raise Exception("Failed to find an hrb archive")
|
||||||
|
|
||||||
|
return master_url + "/lastSuccessfulBuild/artifact/" + hrb_url
|
||||||
|
|
||||||
|
|
||||||
|
def download_driver(driver: DriverUrl, output: str):
|
||||||
|
user, token = read_credentials()
|
||||||
|
|
||||||
|
master_branch_url = find_master_branch(driver)
|
||||||
|
hrb_url = get_hrb_artifact_url(master_branch_url)
|
||||||
|
|
||||||
|
res = requests.get(hrb_url, auth=(user, token))
|
||||||
|
|
||||||
|
if res.status_code != 200:
|
||||||
|
raise Exception("Failed to download file", res.status_code)
|
||||||
|
|
||||||
|
with open(output, 'xb') as file:
|
||||||
|
for chunk in res.iter_content(chunk_size=128):
|
||||||
|
file.write(chunk)
|
||||||
|
|
||||||
|
|
||||||
|
def read_credentials() -> (str, str):
|
||||||
|
token = ""
|
||||||
|
user = ""
|
||||||
|
with open("token", "r") as file:
|
||||||
|
for line in file:
|
||||||
|
spl = line.strip().split('=')
|
||||||
|
if spl[0] == "token":
|
||||||
|
token = spl[1]
|
||||||
|
elif spl[0] == "user":
|
||||||
|
user = spl[1]
|
||||||
|
if not (token or user):
|
||||||
|
raise Exception("Failed to read credentials")
|
||||||
|
|
||||||
|
return (user, token)
|
Loading…
Reference in New Issue