From c776582b415efba4018101e2d0737b9655c66820 Mon Sep 17 00:00:00 2001 From: Emilia Allison Date: Tue, 9 Jan 2024 14:16:14 -0500 Subject: [PATCH] Initial adg_control Only implements database creation, system creation, normal resources, and platehotels --- .gitignore | 7 +++ adg_control.py | 134 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 .gitignore create mode 100644 adg_control.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c3a062b --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +# Tokens! +token +.token +.env + +# ADG itself +AutoDatabaseGenerator/ diff --git a/adg_control.py b/adg_control.py new file mode 100644 index 0000000..9216ba9 --- /dev/null +++ b/adg_control.py @@ -0,0 +1,134 @@ +import subprocess +from itertools import dropwhile, takewhile +from enum import StrEnum + +ADG_EXE = "HRB.Tools.CRA.CommandLine.exe" +ADG_DIR = "AutoDatabaseGenerator" + + +class StorageType(StrEnum): + RANDOM = "random", + SERIAL = "serial" + + +class AdgClient: + def __init__(self, adg_exe: str, adg_dir: str): + self.adg_exe = adg_exe + self.adg_dir = adg_dir + + def _run(self, args: list[str]): + return subprocess.run(["./"+self.adg_exe, *args], + cwd=self.adg_dir, capture_output=True) + + def set_system_name(self, name: str): + self.system_name = name + + def get_databases(self): + res = self._run(["ld"]) + if res.returncode > 0: + raise Exception("Failed to get databases") + + res_lines = iter(str(res.stdout, encoding="utf-8").splitlines()) + skip_header = dropwhile(lambda x: "Started" not in x, res_lines) + next(skip_header) # Advance to one positon past the header + + skip_footer = takewhile(lambda x: "Completed" not in x, skip_header) + + return [x.replace("INFO|", "") for x in skip_footer] + + def create_database(self, name: str): + res = self._run(["ccdb", "-u system", "-p oracle", f"-d {name}"]) + if res.returncode > 0: + raise Exception("Failed to create database") + + def create_system(self, name: str = "", switch_ip: str = "", + host_name: str = ""): + if not name: + name = self.system_name + + extra_args = [] + if switch_ip: + extra_args.append(f"-a {switch_ip}") + if host_name: + extra_args.append(f"-h {host_name}") + + res = self._run(["csd", + f"-n {name}", + f"-d {name}", + *extra_args]) + if res.returncode > 0: + raise Exception("Failed to create system") + + def create_normal_resource(self, name: str, + location: int, envelope: int, + driver_path: str, package_path: str = "", + driver_description: str = "", + connection_parameters: str = "", + system: str = ""): + if not (package_path or driver_description): + raise Exception( + "Either package path or driver description must be passed") + if package_path and driver_description: + raise Exception( + "Package path and driver description are mutually exclusive") + + if not system: + system = self.system_name + + extra_args = [] + + if package_path: + extra_args.append(f"-p {package_path}") + elif driver_description: + extra_args.append(f"-r {driver_description}") + + if connection_parameters: + extra_args.append(f"-c {connection_parameters}") + + res = self._run(["cnr", + f"-s {system}", + f"-n {name}", + f"-l {location}", + f"-e {envelope}", + f"-d {driver_path}", *extra_args]) + if res.returncode > 0: + raise Exception("Failed to add resource") + + def create_platehotel(self, name: str, + location: int, envelope: int, + system: str = "", + rows: int = -1, + type: StorageType = None): + if not system: + system = self.system_name + + res = self._run(["csr", + f"-s {system}", + f"-n {name}", + f"-l {location}", + f"-e {envelope}", + "-p platehotel"]) + if res.returncode > 0: + raise Exception("Failed to create platehotel") + + if type and rows > 0: + storage_res = self._run(["crs", + f"-s {system}", + f"-n {name}", + f"-r {rows}", + f"-t {type}"]) + if storage_res.returncode > 0: + raise Exception("Failed to add storage to platehotel") + + def _create_unique_database_name(self, prefix: str) -> str: + MAX_ATTEMPTS = 100 + databases = self.get_databases() + for i in range(1, MAX_ATTEMPTS): + potential = f"{prefix}{i}" + if potential not in databases: + return potential + + def create_unique_database(self, prefix: str) -> str: + name = self._create_unique_database_name(prefix) + self.create_database(name) + return name