adg_script/parser.py

206 lines
7.1 KiB
Python

from enum import StrEnum, auto
from adg_control import AdgClient
from cache import Cache
import ntpath
from adg_control import ADG_DIR, StorageType
class CommandType(StrEnum):
NEWDB = auto(),
NORMAL = auto(),
NEWSYSTEM = auto(),
HOTEL = auto(),
DEADLOCK = auto(),
RESOURCENEST = auto(),
class Command():
def __init__(self, type: CommandType, **kwargs):
self.type = type
try:
if self.type == CommandType.NEWDB:
self._init_newdb(**kwargs)
elif self.type == CommandType.NORMAL:
self._init_normal(**kwargs)
elif self.type == CommandType.NEWSYSTEM:
self._init_system(**kwargs)
elif self.type == CommandType.HOTEL:
self._init_hotel(**kwargs)
elif self.type == CommandType.DEADLOCK:
self._init_deadlock(**kwargs)
elif self.type == CommandType.RESOURCENEST:
self._init_resourcenest(**kwargs)
except KeyError as e:
raise Exception("Required argument not found!", e)
def execute(self, client: AdgClient, cache: Cache):
match self.type:
case CommandType.NEWDB:
self._execute_newdb(client, cache)
case CommandType.NORMAL:
self._execute_normal(client, cache)
case CommandType.NEWSYSTEM:
self._execute_newsystem(client, cache)
case CommandType.HOTEL:
self._execute_hotel(client, cache)
case CommandType.DEADLOCK:
self._execute_deadlock(client, cache)
case CommandType.RESOURCENEST:
self._execute_resourcenest(client, cache)
def _init_newdb(self, **kwargs):
self.name = kwargs["name"]
def _init_normal(self, **kwargs):
self.name = kwargs["name"]
self.location = kwargs["location"]
self.envelope = kwargs["envelope"]
self.device = kwargs["device"]
if "driver_path" in kwargs:
self.driver_path = kwargs["driver_path"]
if "package_path" in kwargs:
self.package_path = kwargs["package_path"]
if "driver_description" in kwargs:
self.driver_description = kwargs["package_path"]
if "connection_parameters" in kwargs:
self.connection_parameters = kwargs["connection_parameters"]
if "system" in kwargs:
self.system = kwargs["system"]
def _init_hotel(self, **kwargs):
self.name = kwargs["name"]
self.location = kwargs["location"]
self.envelope = kwargs["location"]
if "rows" in kwargs:
self.rows = int(kwargs["rows"])
if "storage_type" in kwargs:
storage_type_str = kwargs["storage_type"]
if storage_type_str in list(StorageType):
self.storage_type = StorageType(storage_type_str)
else:
raise ValueError("StorageType passed was invalid: "
+ f"{storage_type_str} not in "
+ "{RANDOM,SERIAL}")
if "system" in kwargs:
self.system = kwargs["system"]
def _init_deadlock(self, **kwargs):
self.name = kwargs["name"]
self.location = kwargs["location"]
self.envelope = kwargs["location"]
if "system" in kwargs:
self.system = kwargs["system"]
def _init_system(self, **kwargs):
if "name" in kwargs:
self.name = kwargs["name"]
if "switch_ip" in kwargs:
self.switch_ip = kwargs["switch_ip"]
if "host_name" in kwargs:
self.host_name = kwargs["host_name"]
def _init_resourcenest(self, **kwargs):
print(kwargs)
self.name = kwargs["name"]
self.child_nest_name = kwargs["child_nest_name"]
self.location = kwargs["location"]
self.envelope = kwargs["envelope"]
if "system" in kwargs:
self.system = kwargs["system"]
def _execute_newdb(self, client: AdgClient, cache: Cache):
client.create_database(self.name)
def _execute_newsystem(self, client: AdgClient, cache: Cache):
client.create_system(
name=self.name if hasattr(self, "name") else "",
switch_ip=self.switch_ip
if hasattr(self, "switch_ip") else "",
host_name=self.host_name
if hasattr(self, "host_name") else "")
def _execute_normal(self, client: AdgClient, cache: Cache):
if hasattr(self, "package_path"):
package_path = self.package_path
else:
abs_path = cache.get(self.device)
package_path = ntpath.relpath(abs_path, start=ADG_DIR)
print(abs_path, package_path)
client.create_normal_resource(
self.name,
self.location,
self.envelope,
driver_path=self.driver_path
if hasattr(self, "driver_path") else "",
package_path=package_path
if not hasattr(self, "driver_description") else "",
driver_description=self.driver_description
if hasattr(self, "driver_description") else "",
connection_parameters=self.connection_parameters
if hasattr(self, "connection_parameters") else "",
system=self.system if hasattr(self, "system") else "")
def _execute_hotel(self, client: AdgClient, cache: Cache):
client.create_platehotel(
self.name,
self.location,
self.envelope,
rows=self.rows if hasattr(self, "rows") else -1,
type=self.storage_type if hasattr(self, "storage_type") else None,
system=self.system if hasattr(self, "system") else ""
)
def _execute_deadlock(self, client: AdgClient, cache: Cache):
client.create_deadlock_resolver(
self.name,
self.location,
self.envelope,
system=self.system if hasattr(self, "system") else ""
)
def _execute_resourcenest(self, client: AdgClient, cache: Cache):
client.create_resource_nest(
self.name,
self.child_nest_name,
self.location,
self.envelope,
system=self.system if hasattr(self, "system") else ""
)
class Parser():
def __init__(self, path: str):
self.path = path
self._read_lines()
def _read_lines(self):
with open(self.path) as file:
commands = []
for line in file:
commands.append(Parser._read_line(line.strip()))
self.commands = commands
def _read_line(line: str) -> Command:
args = line.split(';')
command_type_str = args[0].lower().strip()
if command_type_str not in list(CommandType):
raise KeyError("Invalid command type found!")
type = CommandType(command_type_str)
args_dict = dict()
for arg in args[1:]:
if not arg:
continue
key, value = [x.strip() for x in arg.split('=')]
args_dict[key.lower()] = value
return Command(type, **args_dict)