64 lines
2.1 KiB
Python
64 lines
2.1 KiB
Python
from typing import TYPE_CHECKING
|
|
if TYPE_CHECKING:
|
|
from .noSysCore import NoSysCore
|
|
|
|
from libs.fspn.utils.observable import Observable
|
|
from .events import Events, DynamicEvents
|
|
|
|
|
|
class NoSysModule(Observable):
|
|
"""Base class for all modules in NoSys."""
|
|
def __init__(self, nosys_core:"NoSysCore"):
|
|
super().__init__()
|
|
self.nosys_core:NoSysCore = nosys_core
|
|
self.id = self._get_module_id()
|
|
self.config = self.nosys_core.config.get(self.package_id)
|
|
self.networks = self._get_module_networks()
|
|
|
|
self.nosys_core.subscribe_event(Events.READY, self.on_nosys_ready)
|
|
self.nosys_core.subscribe_event(DynamicEvents.module_connection(self.package_id, self.module_id), self.on_module_connection)
|
|
self.nosys_core.subscribe_event(DynamicEvents.module_disconnection(self.package_id, self.module_id), self.on_module_disconnection)
|
|
self.nosys_core.subscribe_event(DynamicEvents.module_message(self.package_id, self.module_id), self.on_module_message)
|
|
|
|
for network in self.networks:
|
|
self.nosys_core.subscribe_event(DynamicEvents.network_connection(network.get("id")), self.on_network_connection)
|
|
self.nosys_core.subscribe_event(DynamicEvents.network_disconnection(network.get("id")), self.on_network_disconnection)
|
|
|
|
|
|
def setup(self):
|
|
pass
|
|
|
|
def on_nosys_ready(self, event):
|
|
pass
|
|
|
|
def on_module_connection(self, event):
|
|
pass
|
|
|
|
def on_module_disconnection(self, event):
|
|
pass
|
|
|
|
def on_module_message(self, event):
|
|
pass
|
|
|
|
def on_network_connection(self, event):
|
|
pass
|
|
|
|
def on_network_disconnection(self, event):
|
|
pass
|
|
|
|
def teardown_module(self):
|
|
pass
|
|
|
|
|
|
def _get_module_id(self):
|
|
module_path = self.__class__.__module__
|
|
parts = module_path.split(".")
|
|
|
|
self.package_id = parts[1]
|
|
self.module_id = parts[2]
|
|
self.name = self.package_id+"_"+self.module_id
|
|
|
|
return (self.package_id, self.module_id)
|
|
|
|
def _get_module_networks(self):
|
|
return self.nosys_core.data.network_find_by_module(self.name) |