import subprocess from itertools import dropwhile, takewhile from enum import StrEnum ADG_EXE = "HRB.Tools.CRA.CommandLine.exe" ADG_DIR = "AutoDatabaseGenerator" class StorageType(StrEnum): RANDOM = "random", SERIAL = "serial" class AdgClient: def __init__(self, adg_exe: str, adg_dir: str): self.adg_exe = adg_exe self.adg_dir = adg_dir def _run(self, args: list[str]): return subprocess.run(["./"+self.adg_exe, *args], cwd=self.adg_dir, capture_output=True) def set_system_name(self, name: str): self.system_name = name def get_databases(self): res = self._run(["ld"]) if res.returncode > 0: raise Exception("Failed to get databases") res_lines = iter(str(res.stdout, encoding="utf-8").splitlines()) skip_header = dropwhile(lambda x: "Started" not in x, res_lines) next(skip_header) # Advance to one positon past the header skip_footer = takewhile(lambda x: "Completed" not in x, skip_header) return [x.replace("INFO|", "") for x in skip_footer] def create_database(self, name: str): res = self._run(["ccdb", "-u", "system", "-p", "oracle", "-v", "4", "-d", f"{name.upper()}"]) if res.returncode > 0: raise Exception("Failed to create database", res.stdout, res.args) self.set_system_name(name) def create_system(self, name: str = "", switch_ip: str = "", host_name: str = ""): if not name: name = self.system_name else: self.set_system_name(name) extra_args = [] if switch_ip: extra_args += ["-a", f"{switch_ip}"] if host_name: extra_args += ["-h", f"{host_name}"] res = self._run(["csd", "-s", f"{name}", "-d", f"{name}", *extra_args]) if res.returncode > 0: raise Exception("Failed to create system", res.stdout, res.args) def create_normal_resource(self, name: str, location: int, envelope: int, driver_path: str = "Drivers\\", package_path: str = "", driver_description: str = "", connection_parameters: str = "", system: str = ""): if not (package_path or driver_description): raise Exception( "Either package path or driver description must be passed") if package_path and driver_description: raise Exception( "Package path and driver description are mutually exclusive") if not system: system = self.system_name if not driver_path: driver_path = "Drivers\\" extra_args = [] if package_path: extra_args += ["-p", f"{package_path}"] elif driver_description: extra_args += ["-r", f"{driver_description}"] if connection_parameters: extra_args.append(f"-c {connection_parameters}") res = self._run(["cnr", "-s", f"{system}", "-n", f"{name}", "-l", f"{location}", "-e", f"{envelope}", "-d", f"{driver_path}", *extra_args]) if res.returncode > 0: raise Exception("Failed to add resource", res.stdout, res.args) def create_platehotel(self, name: str, location: int, envelope: int, system: str = "", rows: int = -1, type: StorageType = None): if not system: system = self.system_name res = self._run(["csr", "-s", f"{system}", "-n", f"{name}", "-l", f"{location}", "-e", f"{envelope}", "-p", "platehotel"]) if res.returncode > 0: raise Exception("Failed to create platehotel", res.stdout, res.args) if type and rows > 0: storage_res = self._run(["crs", "-s", f"{system}", "-n", f"{name}", "-r", f"{rows}", "-t", f"{type}"]) if storage_res.returncode > 0: raise Exception("Failed to add storage to platehotel", storage_res.stdout, storage_res.args) def create_deadlock_resolver(self, name: str, location: int, envelope: int, system: str = ""): if not system: system = self.system_name res = self._run(["csr", "-s", f"{system}", "-n", f"{name}", "-l", f"{location}", "-e", f"{envelope}", "-p", "deadlockresolver"]) if res.returncode > 0: raise Exception("Failed to create deadock resolver", res.stdout, res.args) def create_resource_nest(self, name: str, child_nest_name: str, location: int, envelope: int, system: str = ""): if not system: system = self.system_name res = self._run(["crn", "-s", f"{system}", "-n", f"{name}", "-c", f"{child_nest_name}", "-l", f"{location}", "-e", f"{envelope}"]) if res.returncode > 0: raise Exception("Failed to create resource nest", res.stdout, res.args) def _create_unique_database_name(self, prefix: str) -> str: MAX_ATTEMPTS = 100 databases = self.get_databases() for i in range(1, MAX_ATTEMPTS): potential = f"{prefix}{i}" if potential not in databases: return potential def create_unique_database(self, prefix: str) -> str: name = self._create_unique_database_name(prefix) self.create_database(name) return name