Files
nosys_libs/lockbox/lockboxService.py
2026-01-25 13:55:46 +10:00

169 lines
5.9 KiB
Python

import os, sys
root_dir = os.path.normpath(__file__.split("libs")[0])
sys.path.append(root_dir)
from threading import Thread
from libs.fspn.utils import sha256_util, aes_util, ecdh_util, ecdsa_util
from libs.fspn.utils.wrapper_util import singleton, threaded
from libs.app.common.logging import get_logger, config_root_logger
from libs.lockbox.utils import save_credential_data, get_credential_data, credential_exists, delete_credential, load_history, save_history, delete_from_history, add_to_history
import argparse
import base64
import subprocess
import secrets
import os
import time, datetime
import webbrowser
config_root_logger()
logger = get_logger("service")
class UserData:
def __init__(self, id=None, proof_of_work=None, time_to_live=43200, added_at=None, logged=False):
self.id = id
self.proof_of_work = proof_of_work
self.added_at = added_at
self.time_to_live = time_to_live
self.logged = logged
class User:
def __init__(self, verifying_key, signing_key, data:UserData=None):
self.verifying_key = verifying_key
self.signing_key = signing_key
self.data = data
thread_ws = Thread(target=self.wait_time_to_live, args=[self.data.time_to_live,])
thread_ws.start()
def sign(self, data):
return base64.b64encode(ecdsa_util.sign_message(data, self.signing_key)).decode()
# TODO Fix it
def wait_time_to_live(self, seconds):
time.sleep(seconds)
self.signing_key = None
user_data = UserData(id=self.data.id)
self.data = user_data
class LockboxService:
def __init__(self):
logger.info("Started")
self.users:dict[str, User] = {}
self.signature_requests = {}
self.mining = {}
self.load_user_history()
from libs.lockbox.lockboxServiceApi import LockboxServiceApi
self.api = LockboxServiceApi(self)
# webbrowser.open("http://localhost:5001", new=0, autoraise=True)
def load_user_history(self):
user_history = load_history()
for user in user_history:
if credential_exists(user):
user_data = UserData(id=user)
self.users[user] = User(user, None, user_data)
else:
delete_from_history(user)
def set_user(self, password:bytes, data:UserData):
verifying_key, signing_key = ecdsa_util.create_keys(password)
vk_b64 = base64.b64encode(verifying_key.to_string('compressed')).decode()
data.id = vk_b64
data.added_at = datetime.datetime.now()
data.logged = True
if vk_b64 not in self.users or self.users[vk_b64].signing_key == None:
self.users[vk_b64] = User(verifying_key, signing_key, data)
return vk_b64
def user_add(self, password_str, password_encode="b64", data=UserData(), credential_password=None):
if(password_encode == "b64"):
password = base64.b64decode(password_str)
else:
raise Exception(f"Password encode {password_encode} not supported")
vk_b64 = self.set_user(password, data)
if credential_password:
if credential_exists(vk_b64):
delete_credential(vk_b64)
credential_data = {"password":password_str,"pof":data.proof_of_work}
save_credential_data(credential_data, vk_b64, credential_password)
logger.debug(f"User added: {vk_b64}")
add_to_history(vk_b64)
return vk_b64
def login_user(self, verifying_key, credential_password):
credential_data = get_credential_data(verifying_key, credential_password)
if credential_data:
password = base64.b64decode(credential_data["password"])
return self.set_user(password, data=UserData(proof_of_work=credential_data["pof"]))
return None
def requested_user_sign(self, verifying_key, data, info):
if verifying_key not in self.users:
return None
request_id = sha256_util.hash_string(data)
self.signature_requests[request_id] = (verifying_key, data, info)
logger.debug(f"Signature {request_id} waiting approvement by {verifying_key}\nInfo:{info}\nData: {data}")
# TODO !!!!!! REMOVE TEST
# self.test(request_id)
return request_id
@threaded
def test(self, request_id):
time.sleep(1)
signature = self.user_sign(request_id, True)
self.api.socketio.emit("signatureResponse",{"requestId":request_id, "signature":signature})
def user_sign(self, request_id, approved):
if request_id not in self.signature_requests:
return None
verifying_key, data, info = self.signature_requests[request_id]
signature = None
if approved:
signature = self.users[verifying_key].sign(data)
logger.debug(f"Request {request_id} signed by {verifying_key} \nSignature: {signature}")
else:
logger.debug(f"Request {request_id} not signed by {verifying_key}")
# TODO Maybe create history
self.signature_requests.pop(request_id)
return signature
def user_list(self):
users = []
for user in self.users.values():
users.append(user.data)
logger.debug(f"Listing users: {users}")
return users
def user_get(self, verifying_key):
data = None
if verifying_key in self.users:
data = self.users[verifying_key].data
logger.debug(f"User {verifying_key} data: {data}")
return data
def user_delete(self, verifying_key):
if verifying_key in self.users:
self.users.pop(verifying_key)
logger.info(f"User {verifying_key} removed")
else:
logger.info(f"User {verifying_key} not exists")
return verifying_key
if __name__ == "__main__":
try:
lockbox = LockboxService()
except Exception as e:
logger.exception("ERROR")