Files
nosys_libs/fspn/utils/observable.py
2026-01-25 13:55:46 +10:00

44 lines
1.2 KiB
Python

from .wrapper_util import threaded
import logging, traceback
class Event(object):
def __init__(self):
self.source = self
class Observable(object):
"""
A simple publish-subscribe system.
"""
def __init__(self, events = []):
self.events: dict[str, list] = {}
for event in events:
self.register_event(event)
def register_event(self, event):
if(event not in self.events):
self.events[event] = []
def subscribe_event(self, event, callback):
if(event not in self.events):
self.register_event(event)
self.events[event].append(callback)
@threaded
def fire_event(self, event, **kwargs):
e = Event()
e.source = self
for k, v in kwargs.items():
setattr(e, k, v)
if(event in self.events):
for fn in self.events[event]:
self.call_observer(fn, e)
else:
logging.warning(f"Event {event} without callback")
@threaded
def call_observer(self, function, event):
try:
function(event)
except Exception as ex:
logging.exception(f'Error in event {event} to function {function.__name__}')