2024-01-09 22:20:55 +00:00
|
|
|
from enum import StrEnum, auto
|
|
|
|
from adg_control import AdgClient
|
2024-01-11 14:49:34 +00:00
|
|
|
from cache import Cache
|
|
|
|
import ntpath
|
|
|
|
from adg_control import ADG_DIR
|
2024-01-09 22:20:55 +00:00
|
|
|
|
|
|
|
|
|
|
|
class CommandType(StrEnum):
|
|
|
|
NEWDB = auto(),
|
|
|
|
NORMAL = auto(),
|
|
|
|
NEWSYSTEM = auto(),
|
|
|
|
HOTEL = auto()
|
|
|
|
|
|
|
|
|
|
|
|
class Command():
|
|
|
|
def __init__(self, type: CommandType, **kwargs):
|
|
|
|
self.type = type
|
|
|
|
|
|
|
|
try:
|
|
|
|
if self.type == CommandType.NEWDB:
|
|
|
|
self._init_newdb(**kwargs)
|
|
|
|
elif self.type == CommandType.NORMAL:
|
|
|
|
self._init_normal(**kwargs)
|
|
|
|
elif self.type == CommandType.NEWSYSTEM:
|
|
|
|
self._init_system(**kwargs)
|
|
|
|
elif self.type == CommandType.HOTEL:
|
|
|
|
pass
|
|
|
|
except KeyError as e:
|
|
|
|
raise Exception("Required argument not found!", e)
|
|
|
|
|
2024-01-11 14:49:34 +00:00
|
|
|
def execute(self, client: AdgClient, cache: Cache):
|
2024-01-09 22:20:55 +00:00
|
|
|
match self.type:
|
|
|
|
case CommandType.NEWDB:
|
2024-01-11 14:54:34 +00:00
|
|
|
self._execute_newdb(client, cache)
|
2024-01-09 22:20:55 +00:00
|
|
|
case CommandType.NORMAL:
|
2024-01-11 14:54:34 +00:00
|
|
|
self._execute_normal(client, cache)
|
2024-01-09 22:20:55 +00:00
|
|
|
case CommandType.NEWSYSTEM:
|
2024-01-11 14:54:34 +00:00
|
|
|
self._execute_newsystem(client, cache)
|
2024-01-09 22:20:55 +00:00
|
|
|
case CommandType.HOTEL:
|
|
|
|
pass
|
|
|
|
|
|
|
|
def _init_newdb(self, **kwargs):
|
|
|
|
self.name = kwargs["name"]
|
|
|
|
|
|
|
|
def _init_normal(self, **kwargs):
|
|
|
|
self.name = kwargs["name"]
|
|
|
|
self.location = kwargs["location"]
|
|
|
|
self.envelope = kwargs["envelope"]
|
2024-01-11 14:49:34 +00:00
|
|
|
self.device = kwargs["device"]
|
2024-01-09 22:20:55 +00:00
|
|
|
|
|
|
|
if "driver_path" in kwargs:
|
|
|
|
self.driver_path = kwargs["driver_path"]
|
|
|
|
if "package_path" in kwargs:
|
|
|
|
self.package_path = kwargs["package_path"]
|
|
|
|
if "driver_description" in kwargs:
|
|
|
|
self.driver_description = kwargs["package_path"]
|
|
|
|
if "connection_parameters" in kwargs:
|
|
|
|
self.connection_parameters = kwargs["connection_parameters"]
|
|
|
|
if "system" in kwargs:
|
|
|
|
self.system = kwargs["system"]
|
|
|
|
|
|
|
|
def _init_system(self, **kwargs):
|
|
|
|
if "name" in kwargs:
|
|
|
|
self.name = kwargs["name"]
|
|
|
|
if "switch_ip" in kwargs:
|
|
|
|
self.switch_ip = kwargs["switch_ip"]
|
|
|
|
if "host_name" in kwargs:
|
|
|
|
self.host_name = kwargs["host_name"]
|
|
|
|
|
2024-01-11 14:54:34 +00:00
|
|
|
def _execute_newdb(self, client: AdgClient, cache: Cache):
|
|
|
|
client.create_database(self.name)
|
|
|
|
|
|
|
|
def _execute_newsystem(self, client: AdgClient, cache: Cache):
|
|
|
|
client.create_system(
|
|
|
|
name=self.name if hasattr(self, "name") else "",
|
|
|
|
switch_ip=self.switch_ip
|
|
|
|
if hasattr(self, "switch_ip") else "",
|
|
|
|
host_name=self.host_name
|
|
|
|
if hasattr(self, "host_name") else "")
|
|
|
|
|
|
|
|
def _execute_normal(self, client: AdgClient, cache: Cache):
|
|
|
|
if hasattr(self, "package_path"):
|
|
|
|
package_path = self.package_path
|
|
|
|
else:
|
|
|
|
abs_path = cache.get(self.device)
|
|
|
|
package_path = ntpath.relpath(abs_path, start=ADG_DIR)
|
|
|
|
|
|
|
|
client.create_normal_resource(
|
|
|
|
self.name,
|
|
|
|
self.location,
|
|
|
|
self.envelope,
|
|
|
|
driver_path=self.driver_path
|
|
|
|
if hasattr(self, "driver_path") else "",
|
|
|
|
package_path=package_path
|
|
|
|
if not hasattr(self, "driver_description") else "",
|
|
|
|
driver_description=self.driver_description
|
|
|
|
if hasattr(self, "driver_description") else "",
|
|
|
|
connection_parameters=self.connection_parameters
|
|
|
|
if hasattr(self, "connection_parameters") else "",
|
|
|
|
system=self.system if hasattr(self, "system") else "")
|
|
|
|
|
2024-01-09 22:20:55 +00:00
|
|
|
|
|
|
|
class Parser():
|
|
|
|
def __init__(self, path: str):
|
|
|
|
self.path = path
|
|
|
|
self._read_lines()
|
|
|
|
|
|
|
|
def _read_lines(self):
|
|
|
|
with open(self.path) as file:
|
|
|
|
commands = []
|
|
|
|
for line in file:
|
|
|
|
commands.append(Parser._read_line(line.strip()))
|
|
|
|
self.commands = commands
|
|
|
|
|
|
|
|
def _read_line(line: str) -> Command:
|
|
|
|
args = line.split(';')
|
|
|
|
|
|
|
|
command_type_str = args[0].lower().strip()
|
|
|
|
if command_type_str not in list(CommandType):
|
|
|
|
raise KeyError("Invalid command type found!")
|
|
|
|
|
|
|
|
type = CommandType(command_type_str)
|
|
|
|
args_dict = dict()
|
|
|
|
|
|
|
|
for arg in args[1:]:
|
|
|
|
if not arg:
|
|
|
|
continue
|
|
|
|
key, value = [x.strip() for x in arg.split('=')]
|
|
|
|
args_dict[key.lower()] = value
|
|
|
|
|
|
|
|
return Command(type, **args_dict)
|