Added libs

This commit is contained in:
Lucas
2026-01-25 13:55:46 +10:00
parent 575c682afc
commit f70af3c4ea
229 changed files with 26983 additions and 0 deletions

112
fspn/protocol/security.py Normal file
View File

@@ -0,0 +1,112 @@
from ..utils.observable import Observable
from ..utils.wrapper_util import singleton
from ..utils import sha256_util, aes_util, ecdh_util, ecdsa_util
import base64
import logging
import importlib
# class EcdsaKey:
# def __init__(self) -> None:
# self.verifying:ecdsa_util.VerifyingKey = None
# self.signing:ecdsa_util.SigningKey = None
# def create_key_from_string(self, password:str):
# self.verifying, self.signing = ecdsa_util.create_keys(password.encode())
# def create_key_from_bytes(self, password:bytes):
# self.verifying, self.signing = ecdsa_util.create_keys(password)
# def load_verifying(self, key:str):
# self.verifying = ecdsa_util.load_verifying_key(base64.b64decode(key.encode()))
# def verifying_key_to_str(self):
# return base64.b64encode(self.verifying.to_string('compressed')).decode()
class UserData:
def __init__(self):
self.proof_of_work = None
# Password Manager Client
class Pmc:
def raiseException(self):
raise Exception("Missing Password Manager Client")
def get(self, user) -> UserData:
self.raiseException()
# Returns a request_id. Callback receives str:request_id str:signature
def sign(self, data, user, callback, info=None) -> str:
self.raiseException()
class EcdhKey:
def __init__(self):
self.public, self.private = ecdh_util.generate_keys()
self.derived_key = None
def generate_derived_key(self, peer_key:str):
ecdh_pk = ecdh_util.load_public_key_str(peer_key, True)
shared_key = ecdh_util.generate_shared_key(self.private, ecdh_pk)
self.derived_key = ecdh_util.generate_derived_key(shared_key)
def update_derived_key(self):
self.derived_key = ecdh_util.generate_derived_key(self.derived_key)
def public_key_to_str(self):
return ecdh_util.public_key_to_str(self.public, True)
class Security():
def __init__(self, user, pmc:Pmc):
self.pmc = pmc
self.user = user
self.proof_of_work = self.pmc.get(user).proof_of_work
self.peer_user = None
self.peer_ecdsa = None
self.ecdh = EcdhKey()
self.peer_pof_level = None
self.min_proof_of_work_level = 4
def encrypt_message(self, message:bytes):
return aes_util.encrypt(message, self.ecdh.derived_key)
def decrypt_message(self, nonce:bytes, message:bytes, mac:bytes):
return aes_util.decrypt_and_verify(nonce, message, mac, self.ecdh.derived_key)
def sign_message_ecdsa(self, message:str, callback, info=None):
hash_message = sha256_util.hash_string(message)
return self.pmc.sign(hash_message, self.user, callback, info)
# return base64.b64encode(ecdsa_util.sign_message(message, self.my_ecdsa.signing)).decode()
def check_signature_ecdsa(self, message:str, signature:str):
hash_message = sha256_util.hash_string(message)
return ecdsa_util.verify_message(hash_message.encode(), base64.b64decode(signature.encode()), self.peer_ecdsa)
def check_signature_ecdsa_vk(self, verifying_key:str, message:str, signature:str):
hash_message = sha256_util.hash_string(message)
return ecdsa_util.verify_message(hash_message.encode(), base64.b64decode(signature.encode()), ecdsa_util.load_verifying_key(base64.b64decode(verifying_key.encode())))
# def encrypt_message_ecdsa(self, message:str):
# encrypted = ecdsa_util.encrypt_message(base64.b64decode(self.peer_ecdsa.verifying_key_to_str().encode()), message.encode())
# return base64.b64encode(encrypted).decode()
# def decrypt_message_ecdsa(self, message:str):
# return ecdsa_util.decrypt_message(self.my_ecdsa.signing.to_string(), base64.b64decode(message.encode()))
def verify_proof_of_work(self, ecdsa:str, proof_of_work:str):
hash = sha256_util.hash_string(f'{ecdsa}{proof_of_work}')
logging.debug(f'ECDSA {ecdsa}, POF {proof_of_work}, HASH {hash}')
if hash[0:self.min_proof_of_work_level] != "0"*self.min_proof_of_work_level:
raise Exception(f'Proof of work below minimum level {self.min_proof_of_work_level}')
# TODO validate date/time
def handshake_validation(self, peer_ecdsa, payload, payload_signature, ecdh, proof_of_work, date):
self.peer_user = peer_ecdsa
self.peer_ecdsa = ecdsa_util.load_verifying_key(base64.b64decode(peer_ecdsa.encode()))
self.check_signature_ecdsa(payload, payload_signature)
self.verify_proof_of_work(peer_ecdsa, proof_of_work)
self.ecdh.generate_derived_key(ecdh)