import json from libs.app.common.logging import get_logger from .events import DynamicEvents logger = get_logger("dispatcher") class Dispatcher: """Responsible for sending messages/binary, broadcasting and dispatching received messages for dynamic events.""" def __init__(self, nosys_core): from .noSysCore import NoSysCore self.nosys_core: NoSysCore = nosys_core # -------- Sending -------- def send_binary(self, data: bytes, peer_id, to_module: tuple[str, str], meta=None, encrypted: bool = True): meta = meta or {} peer = self.nosys_core.peers.get_peer(peer_id) if peer and peer.has_module(to_module): app_body = { "app": {"lib": to_module[0], "module": to_module[1]}, "meta": meta, } peer.connection.send_binary(data=data, meta=app_body, encrypted=encrypted) def send_message(self, message, peer_id, to_module: tuple[str, str], encrypted: bool = True): peer = self.nosys_core.peers.get_peer(peer_id) if peer: if not peer.is_connected(): raise Exception(f"Cannot send message to a disconnected peer: {peer_id}") app_body = { "app": {"lib": to_module[0], "module": to_module[1]}, "data": message, } peer.connection.send_message(message=json.dumps(app_body), encrypted=encrypted) def broadcast_message(self, message, to_module: tuple[str, str], exclude: list[str] | None = None, encrypted: bool = True): exclude = exclude or [] for peer in self.nosys_core.peers.get_online_peers(): if peer.peer_id not in exclude: self.send_message(message, peer.peer_id, to_module, encrypted) # -------- Receiving -------- def on_message(self, event): """Receives event from Connection and fires dynamic event `module_message_`""" try: if "meta" in event.message: app_body = event.message["meta"] module_meta = app_body.get("meta", {}) data = event.message["data"] else: app_body = json.loads(event.message["data"]) module_meta = {} data = app_body["data"] module_key = (app_body["app"]["lib"], app_body["app"]["module"]) logger.debug( "%s receiving from %s\n Module: %s\n Meta: %s\n Data: %s", event.source.bind_address, event.source.address, module_key, module_meta, data, ) evt_name = DynamicEvents.module_message(module_key[0], module_key[1]) peer = self.nosys_core.peers.get_peer(event.source.id) self.nosys_core.fire_event(evt_name, peer=peer, module=module_key, data=data, meta=module_meta) except Exception: logger.exception("ERROR while dispatching incoming message")