Files
nosys_libs/app/BKP/utils.py
2026-01-25 13:55:46 +10:00

256 lines
8.3 KiB
Python

import os, sys
import uuid
import pathlib
import json
import subprocess
root_dir = os.path.normpath(__file__.split("libs")[0])
node_id = str(uuid.uuid4())
### LOGGER ###
import logging, inspect
class CustomLoggingFormatter(logging.Formatter):
grey = "\x1b[0;37m"
green = "\x1b[1;32m"
yellow = "\x1b[1;33m"
red = "\x1b[1;31m"
purple = "\x1b[1;35m"
blue = "\x1b[1;34m"
light_blue = "\x1b[1;36m"
bold_red = "\x1b[31;1m"
blink_red = "\x1b[5m\x1b[1;31m"
reset = "\x1b[0m"
prefix = light_blue + '%(asctime)s' + reset + ' |'
colored_level = '%(levelname)-8s'
message = '| %(message)s' + ''
suffix = purple + ' (%(name)s %(filename)s:%(lineno)d)' + reset
FORMATS = {
logging.DEBUG: prefix + grey + colored_level + reset + message + suffix,
logging.INFO: prefix + blue + colored_level + reset + message + suffix,
logging.WARNING: prefix + yellow + colored_level + reset + message + suffix,
logging.ERROR: prefix + red + colored_level + reset + message + suffix,
logging.CRITICAL: prefix + bold_red + colored_level + reset + message + suffix
}
file_format = '%(asctime)s | %(levelname)-8s | %(message)s | %(name)s (%(filename)s:%(lineno)d)'
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
# class FileFilter(logging.Filter):
# def filter(self, record):
# file_path = os.path.normpath(inspect.stack()[1].filename)
# head = os.path.split(file_path)[0]
# tail = os.path.split(file_path)[1]
# if head == os.path.normpath(root_dir):
# name = "app"
# else:
# after_lib = head.replace(os.path.join(root_dir, "libs"), "")
# name = after_lib.split(os.sep)[1]
# record.name = name
# print(name)
# return True
def add_logging_console_handler(logger:logging.Logger, logging_level=logging.DEBUG):
sh = logging.StreamHandler()
sh.setLevel(logging_level)
sh.setFormatter(CustomLoggingFormatter())
logger.addHandler(sh)
def add_logging_file_handler(logger:logging.Logger, directory, file_name, logging_level=logging.DEBUG):
file_path = os.path.join(root_dir, "logs", directory, f"{file_name}.log")
pathlib.Path(file_path).parent.mkdir(parents=True, exist_ok=True)
fh = logging.FileHandler(file_path)
fh.setLevel(logging_level)
fh.setFormatter(logging.Formatter(CustomLoggingFormatter().file_format))
logger.addHandler(fh)
def config_root_logger():
logger = get_logger(name="root")
# TODO Get lib that is calling logging
# logger.addFilter(FileFilter())
def get_logger(name=""):
file_path = os.path.normpath(inspect.stack()[1].filename)
head = os.path.split(file_path)[0]
tail = os.path.split(file_path)[1]
if head == os.path.normpath(root_dir):
lib = "root"
else:
after_lib = head.replace(os.path.join(root_dir, "libs"), "")
lib = after_lib.split(os.sep)[1]
if name == "root":
logger_name = name
elif name:
logger_name = f"{lib}.{name}"
else:
logger_name = lib
file_name = name if name else lib
logger = logging.getLogger(logger_name)
logger.propagate = False
if not logger.hasHandlers():
logger.setLevel(logging.DEBUG)
add_logging_console_handler(logger=logger, logging_level=logging.DEBUG)
add_logging_file_handler(logger=logger, directory=lib, file_name=file_name, logging_level=logging.DEBUG)
return logger
logger = get_logger()
logger.debug(f"Root dir: {root_dir}")
### VENV ###
import venv
def create_venv():
venv_dir = os.path.join(root_dir, ".venv")
if not os.path.exists(venv_dir):
logger.debug(f"Creating python venv: {venv_dir}")
venv.create(venv_dir, with_pip=True)
### DOWNLOADS ###
import urllib.request
def download_file(url, path):
head = os.path.split(path)[0]
if not os.path.exists(head):
logger.debug(f"Creating repository {head}")
pathlib.Path(head).mkdir(parents=True, exist_ok=True)
logger.debug(f"Downloading from URL: {url} to {path}")
urllib.request.urlretrieve(url, path)
### KARGS ###
def read_kargs():
kargs = {}
params = sys.argv
for param in params:
if "=" in param:
split = param.split("=")
key = split[0]
value = split[1]
if value == "true" or value == "True":
value = True
elif value == "false" or value == "False":
value = False
kargs[key] = value
kargs["rootDir"] = root_dir
return kargs
def kargs_to_array(kargs={}):
if not kargs:
kargs = read_kargs()
array = []
for key, value in kargs.items():
array.append(f"{key}={value}")
return array
### HTTP UTILS ###
def is_http_running(url):
try:
u:urllib.request.URLopener = urllib.request.urlopen(url)
u.close()
return True
except:
return False
### UTILS ###
class Utils:
def __init__(self):
self.kargs = read_kargs()
self.configs = Config()
self.flags = {}
# def get_env():
# return os.environ.get('NOSYS_ENV', "PROD")
# def download_file(self, package, file, destination, repository=None):
# if not repository:
# repository = self.default_repository
# url = f"{repository}/{package}/{file}"
# logger.debug(f"Downloading {file} from URL: {url}")
# urllib.request.urlretrieve(url, destination)
# def download_main_file(self, ignoreIfExists=True):
# main_path = os.path.join(self.root_dir, "libs","app", "main.py")
# if (not os.path.exists(main_path)) or (os.path.exists(main_path) and not ignoreIfExists):
# self.download_file(package="app", file="main.py", destination=main_path)
def is_terminal_visible(self):
try:
return self.configs.data["main"]["terminalWindow"]
except Exception as e:
return True
# def _create_log_dir(self):
# logs_path = os.path.join(self.root_dir, "logs")
# if not os.path.exists(logs_path):
# logger.debug(f"Creating logs directory: {logs_path}")
# pathlib.Path(logs_path).mkdir(parents=True, exist_ok=True)
def new_python_process(self, file_path, args):
python_executable = "python" if self.is_terminal_visible() else "pythonw"
args = [f"{root_dir}/.venv/scripts/{python_executable}", file_path] + args
logger.debug(f"Starting a new process: {args}")
process = subprocess.Popen(args, cwd=root_dir, creationflags=subprocess.CREATE_NEW_CONSOLE)
logger.debug(f"Process PID {process.pid} - {args}")
return process.pid
def restart_app(self):
args = [sys.executable, "start.py"] + kargs_to_array(self.kargs)
subprocess.Popen(args, cwd=root_dir, creationflags=subprocess.CREATE_NEW_CONSOLE)
logger.info(f"Restarting app: {args}")
self.exit_app()
def exit_app(self, exit_code=1):
os._exit(exit_code)
# sys.exit(exit_code)
class Config:
def __init__(self):
self.data = None
self.packages:dict[str, Package] = {}
self.read_config()
def read_config(self):
config_path = os.path.join(root_dir, "libs", "app", "config.json")
if(os.path.exists(config_path)):
with open(config_path) as f:
data = json.load(f)
self.data = data
self.read_packages()
else:
raise Exception(f"Config file {config_path} not exist")
def read_packages(self):
for package in self.data["packages"]:
try:
with open(os.path.join(root_dir,'libs',package["id"],'info.json')) as f:
self.packages[package["id"]] = Package(config=package, info=json.load(f))
except Exception as e:
self.packages[package["id"]] = Package(config=package, info={})
logger.error(e)
class Package:
def __init__(self, config={}, info={}):
self.config = config
self.info = info
self.modules:dict[str, any] = {}
self.read_modules()
def read_modules(self):
if "modules" in self.info:
for module in self.info["modules"]:
self.modules[module["id"]] = module