Files
nosys_libs/p2post/dataManager.py
2026-01-25 13:55:46 +10:00

109 lines
3.9 KiB
Python

from typing import Dict, List, Optional, Any
from libs.app.common.store import DataStore
from libs.fspn.utils.sha256_util import hash_string
class DataManager():
def __init__(self):
self.store = DataStore(path="p2post/data.json", default_data={"users":[], "networks":[], "posts":[], "medias":[]})
# TODO Load networks from noSys
# ------------------- USER CRUD -------------------
def add_user(self, pubkey: str, nickname: str, avatar: Optional[str] = None, bio: Optional[str] = None):
if self.get_user(pubkey):
raise ValueError(f"User {pubkey} already exists")
user = {"pubkey": pubkey, "nickname": nickname, "avatar": avatar or "", "bio": bio or ""}
self.store.add_item("users", user, unique=True, id_field="pubkey", id=pubkey)
return user
def get_user(self, pubkey: str):
return self.store.get_item("users", "pubkey", pubkey)
def delete_user(self, pubkey: str) -> bool:
return self.store.remove_item("users", "pubkey", pubkey)
def list_users(self):
return self.store.list_items("users")
def update_user(self, pubkey: str, updates: Dict[str, Any]) -> bool:
return self.store.update_item("users", "pubkey", pubkey, updates)
# ---------------- NETWORK CRUD ----------------
def add_network(self, hash: str, name: str):
if self.get_network(hash):
raise ValueError(f"Network {hash} already exists")
network = {
"hash": hash,
"posts": []
}
self.store.add_item("networks", network, unique=True, id_field="hash", id=hash)
return network
def get_network(self, hash: str):
return self.store.get_item("networks", "hash", hash)
def delete_network(self, hash: str) -> bool:
return self.store.remove_item("networks", "hash", hash)
def list_networks(self):
return self.store.list_items("networks")
def update_network(self, hash: str, updates: Dict[str, Any]) -> bool:
return self.store.update_item("networks", "hash", hash, updates)
# ---------------- POST CRUD ----------------
def add_post(self, hash: str, timestamp: int, author: str, content: str, signature: str, medias: Optional[list[Dict]] = None):
if self.get_post(hash):
raise ValueError(f"Post {hash} already exists")
post = {
"hash": hash,
"timestamp": timestamp,
"author": author,
"content": content,
"medias": medias,
"signature": signature
}
self.store.add_item("posts", post, unique=True, id_field="hash", id=hash)
return post
def get_post(self, hash: str):
return self.store.get_item("posts", "hash", hash)
def delete_post(self, hash: str) -> bool:
return self.store.remove_item("posts", "hash", hash)
def list_posts(self):
return self.store.list_items("posts")
def update_post(self, hash: str, updates: Dict[str, Any]) -> bool:
return self.store.update_item("posts", "hash", hash, updates)
# ---------------- Media ----------------
# TODO Add field status. Create method to verify media
def add_media(self, hash, file_path):
if self.get_media(hash):
raise ValueError(f"Media {hash} already exists")
media = {
"hash":hash,
"file_path": file_path
}
self.store.add_item("medias", media, unique=True, id_field="hash", id=hash)
return media
def get_media(self, hash: str):
return self.store.get_item("medias", "hash", hash)
def delete_media(self, hash: str) -> bool:
return self.store.remove_item("medias", "hash", hash)
def list_medias(self):
return self.store.list_items("medias")
def update_media(self, hash: str, updates: Dict[str, Any]) -> bool:
return self.store.update_item("medias", "hash", hash, updates)