from flask import Flask, jsonify, request, abort, render_template from flask_cors import CORS, cross_origin from flask_socketio import SocketIO from libs.app.common.logging import get_logger from libs.app.common.paths import ROOT_DIR from libs.lockbox.lockboxService import User, UserData import os from threading import Thread import secrets logger = get_logger("service_api") FLASK_HOST = "127.0.0.1" FLASK_PORT = 5001 # TODO Fix JSON to CamelCase not SnakeCase # TODO Fix websocket events name class LockboxServiceApi: def __init__(self, service): from libs.lockbox.lockboxService import LockboxService self.service:LockboxService = service # self.app = Flask(__name__, static_folder=os.path.join(ROOT_DIR, "libs/lockbox/frontend/dist/assets"), template_folder=os.path.join(ROOT_DIR, "libs/lockbox/frontend/dist")) self.app = Flask(__name__) self.app.secret_key = secrets.token_urlsafe(16) self.socketio = SocketIO(self.app, cors_allowed_origins="*", ping_timeout=120, ping_interval=30) self.valid_tokens = {} thread_ws = Thread(target=self.run) thread_ws.start() def run(self): self.routes() self.events() CORS(self.app) logger.debug(f"LOCKBOX Flask Running on {FLASK_HOST}:{FLASK_PORT}") certs_path = os.path.join(ROOT_DIR, "libs", "api", "certs") cert_path = os.path.join(certs_path, "cert.pem") key_path = os.path.join(certs_path, "key.pem") try: self.socketio.run(self.app, host=FLASK_HOST, port=FLASK_PORT, debug=False, allow_unsafe_werkzeug=True, ssl_context=(cert_path, key_path)) logger.info("DEVERIA ESTAR RODANDO") except: logger.exception("DEU ERRO IRMAO") def events(self): @self.socketio.on("message") def on_message(*args, **kwargs): logger.debug(f'Lockbox message{args}, {kwargs}') self.socketio.send(f"Message Received") def generate_token(self, verifying_key): token = secrets.token_urlsafe(32) self.valid_tokens[token] = verifying_key return token # TODO Separate token for actions. For example approve sign a message def check_user_token(self, verifying_key): auth_header = request.headers.get('Authorization', '') if not auth_header.startswith('Bearer '): abort(403) token = auth_header.split(' ')[1] user = self.valid_tokens.get(token) if user != verifying_key: abort(403) def routes(self): @self.app.route('/') @self.app.route('/') def index(path=''): return render_template('index.html') @self.app.route("/healthCheck", methods=["GET"]) def health_check(): return jsonify({"status":"running"}) @self.app.route("/users", methods=["GET", "POST"]) def users(): if request.method == "GET": response = [] for data in self.service.user_list(): if data: response.append(data.__dict__) return response elif request.method == "POST": # TODO FIX IT!! content:dict = request.json password = content["password"] data = content["data"] user_data = UserData(proof_of_work=data["proof_of_work"]) credential_password = content.get("credentialPassword") verifying_key = self.service.user_add(password_str=password, data=user_data, credential_password=credential_password) if not verifying_key: return jsonify({"verifying_key":None, "token":None}) token = self.generate_token(verifying_key) self.socketio.emit("userAdded",f"{verifying_key}") body = {"verifying_key":verifying_key, "token":token} return jsonify(body) @self.app.route("/users//login", methods=["POST"]) def user_login(verifying_key): content:dict = request.json verifying_key = self.service.login_user(verifying_key, content["credentialPassword"]) if not verifying_key: return jsonify({"verifying_key":None, "token":None}) token = self.generate_token(verifying_key) self.socketio.emit("userAdded",f"{verifying_key}") body = {"verifying_key":verifying_key, "token":token} return jsonify(body) @self.app.route("/users/", methods=["GET", "POST", "DELETE"]) def user(verifying_key): self.check_user_token(verifying_key) if request.method == "GET": data = self.service.user_get(verifying_key) if data: return jsonify(data.__dict__) else: return jsonify({}) elif request.method == "POST": return jsonify({}) elif request.method == "DELETE": result = self.service.user_delete(verifying_key) self.socketio.emit("userRemoved",f"{verifying_key}") return jsonify(result) @self.app.route("/users//signatures", methods=["GET", "POST"]) def user_signatures(verifying_key): if request.method == "GET": self.check_user_token(verifying_key) # TODO Create list_signature_requests(verifying_key) in lockbox service requests = [] for request_id in self.service.signature_requests: user, data, info = self.service.signature_requests[request_id] if user == verifying_key: requests.append({"requestId":request_id, "user":user, "data":data, "info":info}) return requests elif request.method == "POST": content = request.json info = content["info"] data = content["data"] request_id = self.service.requested_user_sign(verifying_key, data, info) if request_id: self.socketio.emit("signatureWaiting",{"requestId":request_id, "verifyingKey":verifying_key, "info":info}) return jsonify({"requestId":request_id}) @self.app.route("/users//signatures/", methods=["GET", "POST"]) def user_signatures_request(verifying_key, request_id): self.check_user_token(verifying_key) if request.method == "GET": # TODO Create get_signature_request(request_id) in lockbox service user, data, info = self.service.signature_requests[request_id] return jsonify({"data":data, "info":info}) elif request.method == "POST": content = request.json approved = content["approved"] signature = self.service.user_sign(request_id, approved) self.socketio.emit("signatureResponse",{"requestId":request_id, "signature":signature}) return jsonify({"signature":signature})