Files
nosys_libs/noSys/peers.py
2026-01-25 13:55:46 +10:00

85 lines
2.5 KiB
Python

import enum
class ConnectionState(enum.Enum):
DISCONNECTED = 0
CONNECTING = 1
CONNECTED = 2
from libs.fspn.protocol.connection import Connection
class Peer:
"""Represents a peer in the network."""
def __init__(self, id: str, connection: Connection):
self.id = id
self.connection:Connection = connection
self.state = ConnectionState.DISCONNECTED
self.modules = {}
self.networks = {}
def has_module(self, module_id:str):
return module_id in self.modules
def has_network(self, network_id:str):
return network_id in self.networks
def is_connected(self):
return self.state == ConnectionState.CONNECTED
def get_my_user(self):
return self.connection.security.user
def get_peer_user(self):
return self.connection.security.peer_user
class PeerManager:
"""Handles connection and storage of peers."""
def __init__(self, nosys_core):
from .noSysCore import NoSysCore
self.nosys_core:NoSysCore = nosys_core
self.peers:dict[str, Peer] = {}
def create_peer(self, connection:Connection):
return Peer(connection.id, connection)
def add_peer(self, peer: Peer):
self.peers[peer.id] = peer
def get_peer(self, peer_id:str) -> Peer:
return self.peers.get(peer_id, None)
def remove_peer(self, peer_id: str):
self.peers.pop(peer_id, None)
def get_online_peers(self) -> list[Peer]:
return self.get_peers_in_state(ConnectionState.CONNECTED)
def get_connecting_peers(self) -> list[Peer]:
return self.get_peers_in_state(ConnectionState.CONNECTING)
def get_offline_peers(self) -> list[Peer]:
return self.get_peers_in_state(ConnectionState.DISCONNECTED)
def get_peers_in_state(self, state):
peers:list[Peer] = []
for id, peer in self.peers.items():
if peer.state == state:
peers.append(peer)
return peers
def get_by_peer_user_id(self, user_id):
peers:list[Peer] = []
for id, peer in self.peers.items():
if peer.get_peer_user() == user_id and peer.is_connected():
peers.append(peer)
return peers
def get_by_network_id(self, networks):
peers:list[Peer] = []
for id, peer in self.peers.items():
for peer_network_id in peer.networks:
if peer_network_id in networks:
peers.append(peer)
return peers