from enum import StrEnum, auto from adg_control import AdgClient from cache import Cache import ntpath from adg_control import ADG_DIR, StorageType class CommandType(StrEnum): NEWDB = auto(), NORMAL = auto(), NEWSYSTEM = auto(), HOTEL = auto(), DEADLOCK = auto() class Command(): def __init__(self, type: CommandType, **kwargs): self.type = type try: if self.type == CommandType.NEWDB: self._init_newdb(**kwargs) elif self.type == CommandType.NORMAL: self._init_normal(**kwargs) elif self.type == CommandType.NEWSYSTEM: self._init_system(**kwargs) elif self.type == CommandType.HOTEL: self._init_hotel(**kwargs) elif self.type == CommandType.DEADLOCK: self._init_deadlock(**kwargs) except KeyError as e: raise Exception("Required argument not found!", e) def execute(self, client: AdgClient, cache: Cache): match self.type: case CommandType.NEWDB: self._execute_newdb(client, cache) case CommandType.NORMAL: self._execute_normal(client, cache) case CommandType.NEWSYSTEM: self._execute_newsystem(client, cache) case CommandType.HOTEL: self._execute_hotel(client, cache) case CommandType.DEADLOCK: self._execute_deadlock(client, cache) def _init_newdb(self, **kwargs): self.name = kwargs["name"] def _init_normal(self, **kwargs): self.name = kwargs["name"] self.location = kwargs["location"] self.envelope = kwargs["envelope"] self.device = kwargs["device"] if "driver_path" in kwargs: self.driver_path = kwargs["driver_path"] if "package_path" in kwargs: self.package_path = kwargs["package_path"] if "driver_description" in kwargs: self.driver_description = kwargs["package_path"] if "connection_parameters" in kwargs: self.connection_parameters = kwargs["connection_parameters"] if "system" in kwargs: self.system = kwargs["system"] def _init_hotel(self, **kwargs): self.name = kwargs["name"] self.location = kwargs["location"] self.envelope = kwargs["location"] if "rows" in kwargs: self.rows = int(kwargs["rows"]) if "storage_type" in kwargs: storage_type_str = kwargs["storage_type"] if storage_type_str in list(StorageType): self.storage_type = StorageType(storage_type_str) else: raise ValueError("StorageType passed was invalid: " + f"{storage_type_str} not in " + "{RANDOM,SERIAL}") if "system" in kwargs: self.system = kwargs["system"] def _init_deadlock(self, **kwargs): self.name = kwargs["name"] self.location = kwargs["location"] self.envelope = kwargs["location"] if "system" in kwargs: self.system = kwargs["system"] def _init_system(self, **kwargs): if "name" in kwargs: self.name = kwargs["name"] if "switch_ip" in kwargs: self.switch_ip = kwargs["switch_ip"] if "host_name" in kwargs: self.host_name = kwargs["host_name"] def _execute_newdb(self, client: AdgClient, cache: Cache): client.create_database(self.name) def _execute_newsystem(self, client: AdgClient, cache: Cache): client.create_system( name=self.name if hasattr(self, "name") else "", switch_ip=self.switch_ip if hasattr(self, "switch_ip") else "", host_name=self.host_name if hasattr(self, "host_name") else "") def _execute_normal(self, client: AdgClient, cache: Cache): if hasattr(self, "package_path"): package_path = self.package_path else: abs_path = cache.get(self.device) package_path = ntpath.relpath(abs_path, start=ADG_DIR) print(abs_path, package_path) client.create_normal_resource( self.name, self.location, self.envelope, driver_path=self.driver_path if hasattr(self, "driver_path") else "", package_path=package_path if not hasattr(self, "driver_description") else "", driver_description=self.driver_description if hasattr(self, "driver_description") else "", connection_parameters=self.connection_parameters if hasattr(self, "connection_parameters") else "", system=self.system if hasattr(self, "system") else "") def _execute_hotel(self, client: AdgClient, cache: Cache): client.create_platehotel( self.name, self.location, self.envelope, rows=self.rows if hasattr(self, "rows") else -1, type=self.storage_type if hasattr(self, "storage_type") else None, system=self.system if hasattr(self, "system") else "" ) def _execute_deadlock(self, client: AdgClient, cache: Cache): client.create_deadlock_resolver( self.name, self.location, self.envelope, system=self.system if hasattr(self, "system") else "" ) class Parser(): def __init__(self, path: str): self.path = path self._read_lines() def _read_lines(self): with open(self.path) as file: commands = [] for line in file: commands.append(Parser._read_line(line.strip())) self.commands = commands def _read_line(line: str) -> Command: args = line.split(';') command_type_str = args[0].lower().strip() if command_type_str not in list(CommandType): raise KeyError("Invalid command type found!") type = CommandType(command_type_str) args_dict = dict() for arg in args[1:]: if not arg: continue key, value = [x.strip() for x in arg.split('=')] args_dict[key.lower()] = value return Command(type, **args_dict)