109 lines
3.8 KiB
Python
109 lines
3.8 KiB
Python
import sys, os
|
|
import math
|
|
from enum import Enum, auto
|
|
|
|
from libs.app.common.logging import get_logger
|
|
from libs.fspn.utils.observable import Observable
|
|
from libs.fspn.utils.wrapper_util import threaded
|
|
from libs.fspn.utils.sha256_util import hash_bytes, hash_file
|
|
|
|
logger = get_logger()
|
|
|
|
class FileEvents(Enum):
|
|
ON_FILE_COMPLETED = auto()
|
|
ON_FILE_ERROR = auto()
|
|
ON_FILE_UPDATE = auto()
|
|
ON_FILE_APPROVED = auto()
|
|
|
|
# TODO Add try except and retries
|
|
class File(Observable):
|
|
def __init__(self, folder, name, size, chunk_size, hash, connection_id, sending, file_transfer, to_module) -> None:
|
|
self.id = (hash, connection_id)
|
|
self.name = name
|
|
self.folder = folder
|
|
self.size = size
|
|
self.chunk_size = chunk_size
|
|
self.parts = [None] * math.ceil(size/chunk_size)
|
|
self.hash = hash
|
|
self.status = "WAITING"
|
|
self.sending = sending
|
|
self.connection_id = connection_id
|
|
from libs.fileTransfer.fileTransfer import FileTransfer
|
|
self.file_transfer:FileTransfer = file_transfer
|
|
self.to_module = to_module
|
|
|
|
self.output_path = os.path.join(self.folder, self.name + '.download')
|
|
self.final_path = self.get_final_path()
|
|
|
|
def get_final_path(self):
|
|
base_name, ext = os.path.splitext(self.name)
|
|
count = 0
|
|
while True:
|
|
if count == 0:
|
|
filename = f"{base_name}{ext}"
|
|
else:
|
|
filename = f"{base_name}({count}){ext}"
|
|
final_path = os.path.join(self.folder, filename)
|
|
if not os.path.exists(final_path):
|
|
return final_path
|
|
count += 1
|
|
|
|
def approve_transfer(self, approved):
|
|
if approved:
|
|
self.status = "TRANSFERING"
|
|
if self.sending:
|
|
self.start_send()
|
|
else:
|
|
# TODO Check if file exists and same hash
|
|
if not os.path.exists(self.output_path):
|
|
os.makedirs(self.folder, exist_ok=True)
|
|
with open(self.output_path, 'wb') as f:
|
|
f.truncate(self.size)
|
|
else:
|
|
self.status = "CANCELED"
|
|
|
|
self.fire_event(FileEvents.ON_FILE_APPROVED.name, approved=approved)
|
|
|
|
@threaded
|
|
def start_send(self):
|
|
logger.warning("Start Sending")
|
|
f = open(os.path.join(self.folder, self.name), 'rb')
|
|
for part in range(len(self.parts)):
|
|
f.seek(part * self.chunk_size)
|
|
data = f.read(self.chunk_size)
|
|
part_hash = hash_bytes(data)
|
|
logger.debug(f"Sending part {part}")
|
|
# time.sleep(0.5)
|
|
self.file_transfer.send_file_part(self, part, part_hash, data)
|
|
f.close()
|
|
|
|
@threaded
|
|
def write_part(self, data, part, hash):
|
|
check_hash = hash_bytes(data)
|
|
logger.debug(f"Writing part {part}")
|
|
if check_hash == hash:
|
|
part_offset = part * self.chunk_size
|
|
with open(self.output_path, 'r+b') as f:
|
|
f.seek(part_offset)
|
|
f.write(data)
|
|
|
|
self.parts[part] = True
|
|
# TODO Send ack
|
|
self.update_status()
|
|
else:
|
|
# TODO Request part again
|
|
logger.error("HASH PART ERROR")
|
|
|
|
def update_status(self):
|
|
if all(x for x in self.parts):
|
|
os.rename(self.output_path, self.final_path)
|
|
if hash_file(self.final_path) == self.hash:
|
|
logger.info(f'File {self.name} downloaded!')
|
|
self.status = "COMPLETED"
|
|
self.fire_event(FileEvents.ON_FILE_COMPLETED.name, final_path=self.final_path)
|
|
else:
|
|
self.status = "CORRUPTED"
|
|
logger.warning(f'File {self.name} corrupted')
|
|
self.fire_event(FileEvents.ON_FILE_ERROR.name)
|
|
else:
|
|
self.fire_event(FileEvents.ON_FILE_UPDATE.name) |