adg_script/parser.py

157 lines
5.4 KiB
Python

from enum import StrEnum, auto
from adg_control import AdgClient
from cache import Cache
import ntpath
from adg_control import ADG_DIR, StorageType
class CommandType(StrEnum):
NEWDB = auto(),
NORMAL = auto(),
NEWSYSTEM = auto(),
HOTEL = auto()
class Command():
def __init__(self, type: CommandType, **kwargs):
self.type = type
try:
if self.type == CommandType.NEWDB:
self._init_newdb(**kwargs)
elif self.type == CommandType.NORMAL:
self._init_normal(**kwargs)
elif self.type == CommandType.NEWSYSTEM:
self._init_system(**kwargs)
elif self.type == CommandType.HOTEL:
self._init_hotel(**kwargs)
except KeyError as e:
raise Exception("Required argument not found!", e)
def execute(self, client: AdgClient, cache: Cache):
match self.type:
case CommandType.NEWDB:
self._execute_newdb(client, cache)
case CommandType.NORMAL:
self._execute_normal(client, cache)
case CommandType.NEWSYSTEM:
self._execute_newsystem(client, cache)
case CommandType.HOTEL:
self._execute_hotel(client, cache)
def _init_newdb(self, **kwargs):
self.name = kwargs["name"]
def _init_normal(self, **kwargs):
self.name = kwargs["name"]
self.location = kwargs["location"]
self.envelope = kwargs["envelope"]
self.device = kwargs["device"]
if "driver_path" in kwargs:
self.driver_path = kwargs["driver_path"]
if "package_path" in kwargs:
self.package_path = kwargs["package_path"]
if "driver_description" in kwargs:
self.driver_description = kwargs["package_path"]
if "connection_parameters" in kwargs:
self.connection_parameters = kwargs["connection_parameters"]
if "system" in kwargs:
self.system = kwargs["system"]
def _init_hotel(self, **kwargs):
self.name = kwargs["name"]
self.location = kwargs["location"]
self.envelope = kwargs["location"]
if "rows" in kwargs:
self.rows = int(kwargs["rows"])
if "storage_type" in kwargs:
storage_type_str = kwargs["storage_type"]
if storage_type_str in list(StorageType):
self.storage_type = StorageType(storage_type_str)
else:
raise ValueError("StorageType passed was invalid: "
+ f"{storage_type_str} not in "
+ "{RANDOM,SERIAL}")
def _init_system(self, **kwargs):
if "name" in kwargs:
self.name = kwargs["name"]
if "switch_ip" in kwargs:
self.switch_ip = kwargs["switch_ip"]
if "host_name" in kwargs:
self.host_name = kwargs["host_name"]
def _execute_newdb(self, client: AdgClient, cache: Cache):
client.create_database(self.name)
def _execute_newsystem(self, client: AdgClient, cache: Cache):
client.create_system(
name=self.name if hasattr(self, "name") else "",
switch_ip=self.switch_ip
if hasattr(self, "switch_ip") else "",
host_name=self.host_name
if hasattr(self, "host_name") else "")
def _execute_normal(self, client: AdgClient, cache: Cache):
if hasattr(self, "package_path"):
package_path = self.package_path
else:
abs_path = cache.get(self.device)
package_path = ntpath.relpath(abs_path, start=ADG_DIR)
client.create_normal_resource(
self.name,
self.location,
self.envelope,
driver_path=self.driver_path
if hasattr(self, "driver_path") else "",
package_path=package_path
if not hasattr(self, "driver_description") else "",
driver_description=self.driver_description
if hasattr(self, "driver_description") else "",
connection_parameters=self.connection_parameters
if hasattr(self, "connection_parameters") else "",
system=self.system if hasattr(self, "system") else "")
def _execute_hotel(self, client: AdgClient, cache: Cache):
client.create_platehotel(
self.name,
self.location,
self.envelope,
rows=self.rows if hasattr(self, "rows") else -1,
type=self.storage_type if hasattr(self, "storage_type") else None
)
class Parser():
def __init__(self, path: str):
self.path = path
self._read_lines()
def _read_lines(self):
with open(self.path) as file:
commands = []
for line in file:
commands.append(Parser._read_line(line.strip()))
self.commands = commands
def _read_line(line: str) -> Command:
args = line.split(';')
command_type_str = args[0].lower().strip()
if command_type_str not in list(CommandType):
raise KeyError("Invalid command type found!")
type = CommandType(command_type_str)
args_dict = dict()
for arg in args[1:]:
if not arg:
continue
key, value = [x.strip() for x in arg.split('=')]
args_dict[key.lower()] = value
return Command(type, **args_dict)