from typing import Dict, List, Optional, Any from libs.app.common.store import DataStore from libs.fspn.utils.sha256_util import hash_string class DataManager(): def __init__(self, nosys_core): from .noSysCore import NoSysCore self.nosys_core:NoSysCore = nosys_core self.store = DataStore(path="noSys/data.json", default_data={"users":[], "rendezvous":[], "networks":[]}) # ------------------- USER CRUD ------------------- def add_user(self, user_id: str): if self.get_user(user_id): raise ValueError(f"User {user_id} already exists") user = {"id": user_id, "networks":[]} self.store.add_item("users", user, unique=True, id_field="id", id=user_id) return user def get_user(self, user_id: str): return self.store.get_item("users", "id", user_id) def list_users(self): return self.store.list_items("users") def delete_user(self, user_id: str) -> bool: return self.store.remove_item("users", "id", user_id) def update_user(self, user_id: str, updates: Dict[str, Any]) -> bool: return self.store.update_item("users", "id", user_id, updates) # ------------------- USER NETWORK MANAGEMENT ------------------- def user_add_network(self, user_id: str, network_id: str) -> bool: user = self.get_user(user_id) if not user: return False if network_id not in user.get("networks"): user.setdefault("networks").append(network_id) return self.update_user(user_id, {"networks": user["networks"]}) return False def user_remove_network(self, user_id: str, network_id: str) -> bool: user = self.get_user(user_id) if not user: return False if network_id not in user.get("networks"): new_networks = [n for n in user["networks"] if n != network_id] return self.update_user(user_id, {"networks": new_networks}) return False def user_list_networks(self, user_id: str) -> List[str]: user = self.get_user(user_id) return user["networks", []] if user else [] # ------------------- RENDEZVOUS CRUD ------------------- def add_rendezvous(self, rv_id: str, address: str): if self.get_rendezvous(rv_id): raise ValueError(f"Rendezvous {rv_id} already exists") rv = {"id": rv_id, "address": address} self.store.add_item("rendezvous", rv, unique=True, id_field="id", id=rv_id) def get_rendezvous(self, rv_id: str): return self.store.get_item("rendezvous", "id", rv_id) def list_rendezvous(self): return self.store.list_items("rendezvous") def delete_rendezvous(self, rv_id: str) -> bool: return self.store.remove_item("rendezvous", "id", rv_id) def update_rendezvous(self, rv_id: str, updates: Dict[str, Any]) -> bool: return self.store.update_item("rendezvous", "id", rv_id, updates) # ------------------- NETWORK CRUD ------------------- def _default_config(self, net_type: str) -> Dict[str, Any]: defaults = { "test_network": { "auto_connect": True, "min_connections": 10, "max_connections": 20, "max_store_size": 1000, "max_message_size": 64_000, "message_ttl": 86400, "ack_required": True, "min_pow": 4 } } return defaults.get(net_type, {}) def _generate_id(self, name, net_type): payload = f"{name}:{net_type}".encode() return hash_string(payload) def create_network( self, name: str, description: str, net_type: str, modules: Optional[List[str]] = None, config: Optional[Dict[str, Any]] = None, ): network_id = self._generate_id(name, net_type) if self.get_network(network_id): raise ValueError(f"Network {network_id} already exists") final_config = {**self._default_config(net_type), **(config or {})} network = { "id": network_id, "name": name, "description": description, "type": net_type, "modules": modules or [], "peers": [], "rendezvous": [], "config": final_config } self.store.add_item("networks", network, unique=True, id_field="id", id=network_id) return network def get_network(self, network_id: str) -> Optional[Dict[str, Any]]: return self.store.get_item("networks", "id", network_id) def list_networks(self) -> List[Dict[str, Any]]: return self.store.list_items("networks") def delete_network(self, network_id: str) -> bool: return self.store.remove_item("networks", "id", network_id) def update_network(self, network_id: str, updates: Dict[str, Any]) -> bool: return self.store.update_item("networks", "id", network_id, updates) def network_assign_rendezvous(self, network_id: str, rv_id: str): network = self.store.get_item("networks", "id", network_id) if not network: return False if rv_id not in network.get("rendezvous", []): network.setdefault("rendezvous", []).append(rv_id) return self.store.update_item("networks", "id", network_id, {"rendezvous": network["rendezvous"]}) return False # ------------------- NETWORK QUERY ------------------- def network_find_by_module(self, module_name: str) -> List[Dict[str, Any]]: """Return all networks that use a given module""" return [n for n in self.list_networks() if module_name in n.get("modules", [])]