adg_script/adg_control.py

182 lines
6.4 KiB
Python

import subprocess
from itertools import dropwhile, takewhile
from enum import StrEnum
ADG_EXE = "HRB.Tools.CRA.CommandLine.exe"
ADG_DIR = "AutoDatabaseGenerator"
class StorageType(StrEnum):
RANDOM = "random",
SERIAL = "serial"
class AdgClient:
def __init__(self, adg_exe: str, adg_dir: str):
self.adg_exe = adg_exe
self.adg_dir = adg_dir
def _run(self, args: list[str]):
return subprocess.run(["./"+self.adg_exe, *args],
cwd=self.adg_dir, capture_output=True)
def set_system_name(self, name: str):
self.system_name = name
def get_databases(self):
res = self._run(["ld"])
if res.returncode > 0:
raise Exception("Failed to get databases")
res_lines = iter(str(res.stdout, encoding="utf-8").splitlines())
skip_header = dropwhile(lambda x: "Started" not in x, res_lines)
next(skip_header) # Advance to one positon past the header
skip_footer = takewhile(lambda x: "Completed" not in x, skip_header)
return [x.replace("INFO|", "") for x in skip_footer]
def create_database(self, name: str):
res = self._run(["ccdb",
"-u", "system",
"-p", "oracle",
"-v", "4",
"-d", f"{name.upper()}"])
if res.returncode > 0:
raise Exception("Failed to create database", res.stdout, res.args)
self.set_system_name(name)
def create_system(self, name: str = "", switch_ip: str = "",
host_name: str = ""):
if not name:
name = self.system_name
else:
self.set_system_name(name)
extra_args = []
if switch_ip:
extra_args += ["-a", f"{switch_ip}"]
if host_name:
extra_args += ["-h", f"{host_name}"]
res = self._run(["csd",
"-s", f"{name}",
"-d", f"{name}",
*extra_args])
if res.returncode > 0:
raise Exception("Failed to create system", res.stdout, res.args)
def create_normal_resource(self, name: str,
location: int, envelope: int,
driver_path: str = "Drivers\\",
package_path: str = "",
driver_description: str = "",
connection_parameters: str = "",
system: str = ""):
if not (package_path or driver_description):
raise Exception(
"Either package path or driver description must be passed")
if package_path and driver_description:
raise Exception(
"Package path and driver description are mutually exclusive")
if not system:
system = self.system_name
if not driver_path:
driver_path = "Drivers\\"
extra_args = []
if package_path:
extra_args += ["-p", f"{package_path}"]
elif driver_description:
extra_args += ["-r", f"{driver_description}"]
if connection_parameters:
extra_args.append(f"-c {connection_parameters}")
res = self._run(["cnr",
"-s", f"{system}",
"-n", f"{name}",
"-l", f"{location}",
"-e", f"{envelope}",
"-d", f"{driver_path}", *extra_args])
if res.returncode > 0:
raise Exception("Failed to add resource", res.stdout, res.args)
def create_platehotel(self, name: str,
location: int, envelope: int,
system: str = "",
rows: int = -1,
type: StorageType = None):
if not system:
system = self.system_name
res = self._run(["csr",
"-s", f"{system}",
"-n", f"{name}",
"-l", f"{location}",
"-e", f"{envelope}",
"-p", "platehotel"])
if res.returncode > 0:
raise Exception("Failed to create platehotel",
res.stdout, res.args)
if type and rows > 0:
storage_res = self._run(["crs",
"-s", f"{system}",
"-n", f"{name}",
"-r", f"{rows}",
"-t", f"{type}"])
if storage_res.returncode > 0:
raise Exception("Failed to add storage to platehotel",
storage_res.stdout,
storage_res.args)
def create_deadlock_resolver(self, name: str,
location: int,
envelope: int,
system: str = ""):
if not system:
system = self.system_name
res = self._run(["csr",
"-s", f"{system}",
"-n", f"{name}",
"-l", f"{location}",
"-e", f"{envelope}",
"-p", "deadlockresolver"])
if res.returncode > 0:
raise Exception("Failed to create deadock resolver",
res.stdout, res.args)
def create_resource_nest(self, name: str, child_nest_name: str, location: int, envelope: int, system: str = ""):
if not system:
system = self.system_name
res = self._run(["crn",
"-s", f"{system}",
"-n", f"{name}",
"-c", f"{child_nest_name}",
"-l", f"{location}",
"-e", f"{envelope}"])
if res.returncode > 0:
raise Exception("Failed to create resource nest",
res.stdout, res.args)
def _create_unique_database_name(self, prefix: str) -> str:
MAX_ATTEMPTS = 100
databases = self.get_databases()
for i in range(1, MAX_ATTEMPTS):
potential = f"{prefix}{i}"
if potential not in databases:
return potential
def create_unique_database(self, prefix: str) -> str:
name = self._create_unique_database_name(prefix)
self.create_database(name)
return name