Initial
This commit is contained in:
		
						commit
						107eedd1c3
					
				| 
						 | 
					@ -0,0 +1,2 @@
 | 
				
			||||||
 | 
					**/__pycache__/
 | 
				
			||||||
 | 
					**.toml
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,2 @@
 | 
				
			||||||
 | 
					@echo on
 | 
				
			||||||
 | 
					python %~dp0\src\py-auto-cel-switch.py
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,75 @@
 | 
				
			||||||
 | 
					from instance import Instance
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from typing import Optional
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					CONN_STRING_PATH = r"SOFTWARE\HRE\Cellario"
 | 
				
			||||||
 | 
					JSON_DEFAULT_PATH = r"C:\Program Files\HighRes Biosolutions\Cellario\appsettings.Production.json"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def generate_postgres_conn_string(instance: Instance) -> Optional[str]:
 | 
				
			||||||
 | 
					    if instance.db_type != "postgres":
 | 
				
			||||||
 | 
					        # Why??
 | 
				
			||||||
 | 
					        return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return f"Server=localhost;Database={instance.user};" + \
 | 
				
			||||||
 | 
					        f"Username={instance.user};Password={instance.password};" + \
 | 
				
			||||||
 | 
					        "Port=5432;SearchPath=cellario;Application Name=CellarioPostgres"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def generate_oracle_conn_string(instance: Instance) -> Optional[str]:
 | 
				
			||||||
 | 
					    if instance.db_type != "oracle":
 | 
				
			||||||
 | 
					        # Again, why??
 | 
				
			||||||
 | 
					        return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if instance.version_is_43_or_higher():
 | 
				
			||||||
 | 
					        return "Data Source=XE;" + \
 | 
				
			||||||
 | 
					            f"User ID={instance.user};" + \
 | 
				
			||||||
 | 
					            f"Password={instance.password}"
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        return "Provider=OraOLEDB.Oracle;Data Source=XE;" + \
 | 
				
			||||||
 | 
					            f"User ID={instance.user};Password={instance.password}"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def set_conn_string(conn_string: str, db_type: str, is_43_or_higher: bool):
 | 
				
			||||||
 | 
					    if is_43_or_higher:
 | 
				
			||||||
 | 
					        pass
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        set_conn_string_regkey(conn_string)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def set_conn_string_regkey(conn_string: str):
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        import winreg
 | 
				
			||||||
 | 
					    except ImportError:
 | 
				
			||||||
 | 
					        print("You may not be running on Windows. Aborting.")
 | 
				
			||||||
 | 
					        import sys
 | 
				
			||||||
 | 
					        sys.exit(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, CONN_STRING_PATH)
 | 
				
			||||||
 | 
					    winreg.SetValueEx(key, "ConnectionString", 0, winreg.REG_SZ, conn_string)
 | 
				
			||||||
 | 
					    winreg.CloseKey(key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def set_conn_string_json(conn_string: str,
 | 
				
			||||||
 | 
					                         db_type: str,
 | 
				
			||||||
 | 
					                         json_path: str = JSON_DEFAULT_PATH):
 | 
				
			||||||
 | 
					    import json
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        with open(json_path, 'r') as f:
 | 
				
			||||||
 | 
					            app_settings = json.load(f)
 | 
				
			||||||
 | 
					    except FileNotFoundError as e:
 | 
				
			||||||
 | 
					        # Should we generate a default? Probably not?
 | 
				
			||||||
 | 
					        raise e
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Title case correction smh
 | 
				
			||||||
 | 
					    if "postgres" in db_type.lower():
 | 
				
			||||||
 | 
					        proper_db_type = "Postgres"
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        proper_db_type = "Oracle"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    app_settings["ConnectionStrings"] = conn_string
 | 
				
			||||||
 | 
					    app_settings["DatabaseType"] = proper_db_type
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    with open(json_path, 'w') as f:
 | 
				
			||||||
 | 
					        json.dump(app_settings, f)
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,8 @@
 | 
				
			||||||
 | 
					DEFAULT_TOML_STRING = """# CellarioScheduler Auto Switcher Config
 | 
				
			||||||
 | 
					[ExampleDatabase]
 | 
				
			||||||
 | 
					DatabaseUser = "MC02015"
 | 
				
			||||||
 | 
					DatabasePassword = "postgres"
 | 
				
			||||||
 | 
					DatabaseType = "postgres"
 | 
				
			||||||
 | 
					CellarioDirectory = "C:\\\\Program Files\\\\HighRes Biosolutions\\\\MC02015_CS"
 | 
				
			||||||
 | 
					Version = "4.2"
 | 
				
			||||||
 | 
					"""
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,105 @@
 | 
				
			||||||
 | 
					from typing import Dict, Callable
 | 
				
			||||||
 | 
					from pathlib import Path
 | 
				
			||||||
 | 
					import os
 | 
				
			||||||
 | 
					import subprocess
 | 
				
			||||||
 | 
					from tkinter import messagebox
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import connection_strings
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					_REQUIRED_ATTRS = ["user", "password", "db_type", "dir", "version", "disable"]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def is_cellario_running() -> bool:
 | 
				
			||||||
 | 
					    return bool(len([x for x in subprocess.check_output(['tasklist']).split(b'\r\n') if b"Cellario.exe" in x]))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Instance:
 | 
				
			||||||
 | 
					    def __init__(self, name: str, input_dict: Dict):
 | 
				
			||||||
 | 
					        self.name = name
 | 
				
			||||||
 | 
					        self.disable = False
 | 
				
			||||||
 | 
					        for (key, value) in input_dict.items():
 | 
				
			||||||
 | 
					            match key.lower():
 | 
				
			||||||
 | 
					                case "databaseuser" | "user":
 | 
				
			||||||
 | 
					                    self.user = value
 | 
				
			||||||
 | 
					                case "databasepassword" | "password":
 | 
				
			||||||
 | 
					                    self.password = value
 | 
				
			||||||
 | 
					                case "databasetype" | "type":
 | 
				
			||||||
 | 
					                    self.db_type = value
 | 
				
			||||||
 | 
					                case "cellariodirectory" | "directory" | "dir":
 | 
				
			||||||
 | 
					                    self.dir = value
 | 
				
			||||||
 | 
					                case "version":
 | 
				
			||||||
 | 
					                    self.version = value
 | 
				
			||||||
 | 
					                case "disable":
 | 
				
			||||||
 | 
					                    if value.lower() in ("true", "1", "y", "yes"):
 | 
				
			||||||
 | 
					                        self.disable = True
 | 
				
			||||||
 | 
					                case _:
 | 
				
			||||||
 | 
					                    print(f"Unexpected key: {key}, dropping this value.")
 | 
				
			||||||
 | 
					        if not self._validate():
 | 
				
			||||||
 | 
					            raise ValueError("Instance missing one or more valid keys.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def version_is_43_or_higher(self) -> bool:
 | 
				
			||||||
 | 
					        version_numbers = self.version.split('.')
 | 
				
			||||||
 | 
					        if version_numbers[0] < 4:
 | 
				
			||||||
 | 
					            return False
 | 
				
			||||||
 | 
					        if version_numbers[1] >= 3:
 | 
				
			||||||
 | 
					            return True
 | 
				
			||||||
 | 
					        return False
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _validate(self) -> bool:
 | 
				
			||||||
 | 
					        for attr in _REQUIRED_ATTRS:
 | 
				
			||||||
 | 
					            if not hasattr(self, attr):
 | 
				
			||||||
 | 
					                return False
 | 
				
			||||||
 | 
					        return True
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __repr__(self):
 | 
				
			||||||
 | 
					        attrs = ', '.join(f'{key}={value!r}' for key,
 | 
				
			||||||
 | 
					                          value in self.__dict__.items())
 | 
				
			||||||
 | 
					        return f'{self.__class__.__name__}({attrs})'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def set_symlink_for_instance(instance: Instance):
 | 
				
			||||||
 | 
					    target_path = r"C:\Program Files\HighRes Biosolutions\Cellario"
 | 
				
			||||||
 | 
					    if Path(target_path).is_dir() and not Path(target_path).is_symlink():
 | 
				
			||||||
 | 
					        print("Real CellarioScheduler directory already exists; it will be moved")
 | 
				
			||||||
 | 
					        import shutil
 | 
				
			||||||
 | 
					        import random
 | 
				
			||||||
 | 
					        import string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        hrb_dir = r"C:\Program Files\HighRes Biosolutions"
 | 
				
			||||||
 | 
					        new_dir_name = "Cellario_old_" + \
 | 
				
			||||||
 | 
					            ''.join(random.choices(string.ascii_lowercase, k=8))
 | 
				
			||||||
 | 
					        shutil.move(target_path, hrb_dir + "\\" + new_dir_name)
 | 
				
			||||||
 | 
					    if Path(target_path).is_dir() and Path(target_path).is_symlink():
 | 
				
			||||||
 | 
					        os.unlink(target_path)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    os.symlink(instance.dir, target_path, target_is_directory=True)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def gen_function_for_instance(instance: Instance) -> Callable:
 | 
				
			||||||
 | 
					    match instance.db_type:
 | 
				
			||||||
 | 
					        case "postgres":
 | 
				
			||||||
 | 
					            conn_string = connection_strings.generate_postgres_conn_string(
 | 
				
			||||||
 | 
					                instance)
 | 
				
			||||||
 | 
					        case "oracle":
 | 
				
			||||||
 | 
					            conn_string = connection_strings.generate_oracle_conn_string(
 | 
				
			||||||
 | 
					                instance)
 | 
				
			||||||
 | 
					        case _:
 | 
				
			||||||
 | 
					            # What???
 | 
				
			||||||
 | 
					            raise ValueError()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def retval():
 | 
				
			||||||
 | 
					        if not is_cellario_running():
 | 
				
			||||||
 | 
					            # Run symlink first, makes editing json easier in 4.3+ case
 | 
				
			||||||
 | 
					            set_symlink_for_instance(instance)
 | 
				
			||||||
 | 
					            connection_strings.set_conn_string(
 | 
				
			||||||
 | 
					                conn_string,
 | 
				
			||||||
 | 
					                instance.db_type,
 | 
				
			||||||
 | 
					                instance.version_is_43_or_higher())
 | 
				
			||||||
 | 
					            subprocess.Popen([r"C:\Program Files\HighRes Biosolutions\Cellario\Cellario.exe"],
 | 
				
			||||||
 | 
					                             cwd=r"C:\Program Files\HighRes Biosolutions\Cellario")
 | 
				
			||||||
 | 
					            # exit(0)
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            messagebox.showerror(
 | 
				
			||||||
 | 
					                title="Cellario already running", message="Cellario is already running!")
 | 
				
			||||||
 | 
					    return retval
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,26 @@
 | 
				
			||||||
 | 
					from tkinter import Tk
 | 
				
			||||||
 | 
					from tkinter import ttk
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from instance import Instance, gen_function_for_instance
 | 
				
			||||||
 | 
					from settings import MainSettings, SortType
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def start_ui(instances: [Instance], settings):
 | 
				
			||||||
 | 
					    root = Tk()
 | 
				
			||||||
 | 
					    frame = ttk.Frame(root, padding=50)
 | 
				
			||||||
 | 
					    frame.grid()
 | 
				
			||||||
 | 
					    ttk.Label(frame, text="Available Databases").grid(column=0, row=0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Alphanumeric sorting
 | 
				
			||||||
 | 
					    if MainSettings.Sort == SortType.ALPHA:
 | 
				
			||||||
 | 
					        instances = list(sorted(instances))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    for row, instance in enumerate(instances):
 | 
				
			||||||
 | 
					        if instance.disable and not MainSettings.ShowAll:
 | 
				
			||||||
 | 
					            continue
 | 
				
			||||||
 | 
					        ttk.Button(frame,
 | 
				
			||||||
 | 
					                   text=instance.name,
 | 
				
			||||||
 | 
					                   command=gen_function_for_instance(instance)).grid(column=0,
 | 
				
			||||||
 | 
					                                                                     row=row+1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    root.mainloop()
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,53 @@
 | 
				
			||||||
 | 
					import tomllib
 | 
				
			||||||
 | 
					import os
 | 
				
			||||||
 | 
					from pathlib import Path
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from typing import List, Tuple, Optional
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import default_toml
 | 
				
			||||||
 | 
					from instance import Instance
 | 
				
			||||||
 | 
					from interface import start_ui
 | 
				
			||||||
 | 
					from settings import MainSettings
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def resolve_config_path() -> str:
 | 
				
			||||||
 | 
					    if '__file__' in globals():
 | 
				
			||||||
 | 
					        script_path = os.path.dirname(os.path.realpath(__file__))
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        script_path = os.getcwd()
 | 
				
			||||||
 | 
					    return script_path + "/Cellario_Switcher_DB_List.toml"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def write_default_config_file():
 | 
				
			||||||
 | 
					    if Path(resolve_config_path()).is_file():
 | 
				
			||||||
 | 
					        return
 | 
				
			||||||
 | 
					    with open(resolve_config_path(), "x") as f:
 | 
				
			||||||
 | 
					        f.write(default_toml.DEFAULT_TOML_STRING)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def read_config_file() -> Tuple[List[Instance], Optional[MainSettings]]:
 | 
				
			||||||
 | 
					    with open(resolve_config_path(), "rb") as f:
 | 
				
			||||||
 | 
					        unparsed = tomllib.load(f)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    instances = []
 | 
				
			||||||
 | 
					    settings = None
 | 
				
			||||||
 | 
					    for name, value in unparsed.items():
 | 
				
			||||||
 | 
					        if name == "SETTINGS":
 | 
				
			||||||
 | 
					            settings = MainSettings(**value)
 | 
				
			||||||
 | 
					            continue
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            new_instance = Instance(name, value)
 | 
				
			||||||
 | 
					            instances.append(new_instance)
 | 
				
			||||||
 | 
					        except ValueError:
 | 
				
			||||||
 | 
					            # Potentially log on bad entries
 | 
				
			||||||
 | 
					            pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if settings is not None:
 | 
				
			||||||
 | 
					        return (instances, settings)
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        return (instances, None)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					if __name__ == "__main__":
 | 
				
			||||||
 | 
					    write_default_config_file()
 | 
				
			||||||
 | 
					    start_ui(**read_config_file())
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,13 @@
 | 
				
			||||||
 | 
					from dataclasses import dataclass
 | 
				
			||||||
 | 
					from enum import Enum, auto
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class SortType(Enum):
 | 
				
			||||||
 | 
					    ALPHA = auto()
 | 
				
			||||||
 | 
					    NOSORT = auto()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@dataclass(frozen=True)
 | 
				
			||||||
 | 
					class MainSettings:
 | 
				
			||||||
 | 
					    Sort: SortType = SortType.NOSORT
 | 
				
			||||||
 | 
					    ShowAll: bool = False
 | 
				
			||||||
		Loading…
	
		Reference in New Issue