from ..utils.observable import Observable from ..utils.wrapper_util import singleton from ..utils import sha256_util, aes_util, ecdh_util, ecdsa_util import base64 import logging import importlib # class EcdsaKey: # def __init__(self) -> None: # self.verifying:ecdsa_util.VerifyingKey = None # self.signing:ecdsa_util.SigningKey = None # def create_key_from_string(self, password:str): # self.verifying, self.signing = ecdsa_util.create_keys(password.encode()) # def create_key_from_bytes(self, password:bytes): # self.verifying, self.signing = ecdsa_util.create_keys(password) # def load_verifying(self, key:str): # self.verifying = ecdsa_util.load_verifying_key(base64.b64decode(key.encode())) # def verifying_key_to_str(self): # return base64.b64encode(self.verifying.to_string('compressed')).decode() class UserData: def __init__(self): self.proof_of_work = None # Password Manager Client class Pmc: def raiseException(self): raise Exception("Missing Password Manager Client") def get(self, user) -> UserData: self.raiseException() # Returns a request_id. Callback receives str:request_id str:signature def sign(self, data, user, callback, info=None) -> str: self.raiseException() class EcdhKey: def __init__(self): self.public, self.private = ecdh_util.generate_keys() self.derived_key = None def generate_derived_key(self, peer_key:str): ecdh_pk = ecdh_util.load_public_key_str(peer_key, True) shared_key = ecdh_util.generate_shared_key(self.private, ecdh_pk) self.derived_key = ecdh_util.generate_derived_key(shared_key) def update_derived_key(self): self.derived_key = ecdh_util.generate_derived_key(self.derived_key) def public_key_to_str(self): return ecdh_util.public_key_to_str(self.public, True) class Security(): def __init__(self, user, pmc:Pmc): self.pmc = pmc self.user = user self.proof_of_work = self.pmc.get(user).proof_of_work self.peer_user = None self.peer_ecdsa = None self.ecdh = EcdhKey() self.peer_pof_level = None self.min_proof_of_work_level = 4 def encrypt_message(self, message:bytes): return aes_util.encrypt(message, self.ecdh.derived_key) def decrypt_message(self, nonce:bytes, message:bytes, mac:bytes): return aes_util.decrypt_and_verify(nonce, message, mac, self.ecdh.derived_key) def sign_message_ecdsa(self, message:str, callback, info=None): hash_message = sha256_util.hash_string(message) return self.pmc.sign(hash_message, self.user, callback, info) # return base64.b64encode(ecdsa_util.sign_message(message, self.my_ecdsa.signing)).decode() def check_signature_ecdsa(self, message:str, signature:str): hash_message = sha256_util.hash_string(message) return ecdsa_util.verify_message(hash_message.encode(), base64.b64decode(signature.encode()), self.peer_ecdsa) def check_signature_ecdsa_vk(self, verifying_key:str, message:str, signature:str): hash_message = sha256_util.hash_string(message) return ecdsa_util.verify_message(hash_message.encode(), base64.b64decode(signature.encode()), ecdsa_util.load_verifying_key(base64.b64decode(verifying_key.encode()))) # def encrypt_message_ecdsa(self, message:str): # encrypted = ecdsa_util.encrypt_message(base64.b64decode(self.peer_ecdsa.verifying_key_to_str().encode()), message.encode()) # return base64.b64encode(encrypted).decode() # def decrypt_message_ecdsa(self, message:str): # return ecdsa_util.decrypt_message(self.my_ecdsa.signing.to_string(), base64.b64decode(message.encode())) def verify_proof_of_work(self, ecdsa:str, proof_of_work:str): hash = sha256_util.hash_string(f'{ecdsa}{proof_of_work}') logging.debug(f'ECDSA {ecdsa}, POF {proof_of_work}, HASH {hash}') if hash[0:self.min_proof_of_work_level] != "0"*self.min_proof_of_work_level: raise Exception(f'Proof of work below minimum level {self.min_proof_of_work_level}') # TODO validate date/time def handshake_validation(self, peer_ecdsa, payload, payload_signature, ecdh, proof_of_work, date): self.peer_user = peer_ecdsa self.peer_ecdsa = ecdsa_util.load_verifying_key(base64.b64decode(peer_ecdsa.encode())) self.check_signature_ecdsa(payload, payload_signature) self.verify_proof_of_work(peer_ecdsa, proof_of_work) self.ecdh.generate_derived_key(ecdh)