diff options
author | Jörg Frings-Fürst <debian@jff-webhosting.net> | 2018-01-04 08:57:25 +0100 |
---|---|---|
committer | Jörg Frings-Fürst <debian@jff-webhosting.net> | 2018-01-04 08:57:25 +0100 |
commit | 8ce494b17065c724187dd3f9faec1e419496f871 (patch) | |
tree | fa0c7fb1296f30bfd0cdc241c7556cec8d1e8ba1 /raphodo/interprocess.py | |
parent | 18afe3e2ebdb10bbc542d79280344d9adf923d2f (diff) | |
parent | eba0a9bd6f142cdb299cc070060723d00e81205f (diff) |
Merge branch 'feature/upstream' into develop
Diffstat (limited to 'raphodo/interprocess.py')
-rw-r--r-- | raphodo/interprocess.py | 1748 |
1 files changed, 1748 insertions, 0 deletions
diff --git a/raphodo/interprocess.py b/raphodo/interprocess.py new file mode 100644 index 0000000..2b877ef --- /dev/null +++ b/raphodo/interprocess.py @@ -0,0 +1,1748 @@ +# Copyright (C) 2015-2016 Damon Lynch <damonlynch@gmail.com> + +# This file is part of Rapid Photo Downloader. +# +# Rapid Photo Downloader is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Rapid Photo Downloader is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Rapid Photo Downloader. If not, +# see <http://www.gnu.org/licenses/>. + +__author__ = 'Damon Lynch' +__copyright__ = "Copyright 2015-2016, Damon Lynch" + +import argparse +import sys +import logging +import pickle +import os +import shlex +import time +from collections import deque, namedtuple +from typing import Optional, Set, List, Dict, Sequence, Any, Tuple + + +import psutil + +from PyQt5.QtCore import (pyqtSignal, QObject, pyqtSlot) +from PyQt5.QtGui import (QPixmap, QImage) + +import zmq +import zmq.log.handlers +from zmq.eventloop.ioloop import IOLoop +from zmq.eventloop.zmqstream import ZMQStream + +from raphodo.rpdfile import (RPDFile, FileTypeCounter, FileSizeSum, Photo, Video) +from raphodo.devices import Device +from raphodo.utilities import CacheDirs, set_pdeathsig +from raphodo.constants import ( + RenameAndMoveStatus, ExtractionTask, ExtractionProcessing, CameraErrorCode, FileType, + FileExtension, BackupStatus +) +from raphodo.proximity import TemporalProximityGroups +from raphodo.storage import StorageSpace +from raphodo.iplogging import ZeroMQSocketHandler +from raphodo.viewutils import ThumbnailDataForProximity +from raphodo.folderspreview import DownloadDestination, FoldersPreview +from raphodo.problemnotification import ( + ScanProblems, CopyingProblems, RenamingProblems, BackingUpProblems +) + +logger = logging.getLogger() + + +def make_filter_from_worker_id(worker_id) -> bytes: + r""" + Returns a python byte string from an integer or string + + >>> make_filter_from_worker_id(54) + b'54' + + >>> make_filter_from_worker_id('54') + b'54' + """ + if isinstance(worker_id, int): + return str(worker_id).encode() + if isinstance(worker_id, str): + return worker_id.encode() + raise(TypeError) + +def create_identity(worker_type: str, identity: str) -> bytes: + r"""Generate identity for a worker's 0mq socket. + + >>> create_identity('Worker', '1') + b'Worker-1' + >>> create_identity('Thumbnail Extractor', '2') + b'Thumbnail-Extractor-2' + >>> create_identity('Thumbnail Extractor Plus', '22 2') + b'Thumbnail-Extractor-Plus-22-2' + """ + + # Replace any whitespace in the strings with a hyphen + return '{}-{}'.format('-'.join(worker_type.split()), '-'.join(identity.split())).encode() + +def get_worker_id_from_identity(identity: bytes) -> int: + r"""Extract worker id from the identity used in a 0mq socket + + >>> get_worker_id_from_identity(b'Worker-1') + 1 + >>> get_worker_id_from_identity(b'Thumbnail-Extractor-2') + 2 + >>> get_worker_id_from_identity(b'Thumbnail-Extractor-Plus-22-2') + 2 + """ + return int(identity.decode().split('-')[-1]) + + +def create_inproc_msg(cmd: bytes, + worker_id: Optional[int]=None, + data: Optional[Any]=None) -> List[bytes]: + """ + Create a list of three values to be sent via a PAIR socket + between main and child threads using 0MQ. + """ + + if worker_id is not None: + worker_id = make_filter_from_worker_id(worker_id) + else: + worker_id = b'' + + if data is None: + data = b'' + else: + data = pickle.dumps(data, pickle.HIGHEST_PROTOCOL) + + return [cmd, worker_id, data] + + +class ThreadNames: + rename = 'rename' + scan = 'scan' + copy = 'copy' + backup = 'backup' + thumbnail_daemon = 'thumbnail_daemon' + thumbnailer = 'thumbnailer' + offload = 'offload' + logger = 'logger' + load_balancer = 'load_balancer' + new_version = 'new_version' + + +class ProcessManager: + def __init__(self, logging_port: int, + thread_name: str) -> None: + + super().__init__() + + self.logging_port = logging_port + + self.processes = {} # type: Dict[int, psutil.Process] + self._process_to_run = '' # Implement in subclass + + self.thread_name = thread_name + + # Monitor which workers we have running + self.workers = [] # type: List[int] + + def _get_cmd(self) -> str: + return '{} {}'.format(sys.executable, + os.path.join(os.path.abspath(os.path.dirname(__file__)), + self._process_to_run)) + + def _get_command_line(self, worker_id: int) -> str: + """ + Implement in sublcass + """ + return '' + + def add_worker(self, worker_id: int) -> None: + + command_line = self._get_command_line(worker_id) + args = shlex.split(command_line) + + # run command immediately, without waiting a reply, and instruct the Linux + # kernel to send a terminate signal should this process unexpectedly die + try: + proc = psutil.Popen(args, preexec_fn=set_pdeathsig()) + except OSError as e: + logging.critical("Failed to start process: %s", command_line) + logging.critical('OSError [Errno %s]: %s', e.errno, e.strerror) + if e.errno == 8: + logging.critical("Script shebang line might be malformed or missing: %s", + self._get_cmd()) + sys.exit(1) + logging.debug("Started '%s' with pid %s", command_line, proc.pid) + + # Add to list of running workers + self.workers.append(worker_id) + self.processes[worker_id] = proc + + def forcefully_terminate(self) -> None: + """ + Forcefully terminate any running child processes. + """ + + + zombie_processes = [p for p in self.processes.values() + if p.is_running() and p.status() == psutil.STATUS_ZOMBIE] + running_processes = [p for p in self.processes.values() + if p.is_running() and p.status() != psutil.STATUS_ZOMBIE] + if hasattr(self, '_process_name'): + logging.debug("Forcefully terminating processes for %s: %s zombies, %s running.", + self._process_name, len(zombie_processes), len(running_processes)) + + for p in zombie_processes: # type: psutil.Process + try: + logging.debug("Killing zombie process %s with pid %s", p.name(), p.pid) + p.kill() + except: + logging.error("Failed to kill process with pid %s", p.pid) + for p in running_processes: # type: psutil.Process + try: + logging.debug("Terminating process %s with pid %s", p.name(), p.pid) + p.terminate() + except: + logging.error("Terminating process with pid %s failed", p.pid) + gone, alive = psutil.wait_procs(running_processes, timeout=2) + for p in alive: + try: + logging.debug("Killing zombie process %s with pid %s", p.name(), p.pid) + p.kill() + except: + logging.error("Failed to kill process with pid %s", p.pid) + + def process_alive(self, worker_id: int) -> bool: + """ + Process IDs are reused by the system. Check to make sure + a new process has not been created with the same process id. + + :param worker_id: the process to check + :return True if the process is the same, False otherwise + """ + + return self.processes[worker_id].is_running() + + +class PullPipelineManager(ProcessManager, QObject): + """ + Base class from which more specialized 0MQ classes are derived. + + Receives data into its sink via a ZMQ PULL socket, but does not + specify how workers should be sent data. + + Outputs signals using Qt. + """ + + message = pyqtSignal(str) # Derived class will change this + sinkStarted = pyqtSignal() + workerFinished = pyqtSignal(int) + workerStopped = pyqtSignal(int) + receiverPortSignal = pyqtSignal(int) + + def __init__(self, logging_port: int, + thread_name: str) -> None: + super().__init__(logging_port=logging_port, thread_name=thread_name) + + def _start_sockets(self) -> None: + + context = zmq.Context.instance() + + # Subclasses must define the type of port they need to send messages + self.ventilator_socket = None + self.ventilator_port = None + + # Sink socket to receive results of the workers + self.receiver_socket = context.socket(zmq.PULL) + self.receiver_port = self.receiver_socket.bind_to_random_port('tcp://*') + + # Socket to communicate directly with the sink, bypassing the workers + self.terminate_socket = context.socket(zmq.PUSH) + self.terminate_socket.connect("tcp://localhost:{}".format(self.receiver_port)) + + # Socket to receive commands from main thread + self.thread_controller = context.socket(zmq.PAIR) + self.thread_controller.connect('inproc://{}'.format(self.thread_name)) + + self.terminating = False + + @pyqtSlot() + def run_sink(self) -> None: + logging.debug("Running sink for %s", self._process_name) + + self._start_sockets() + + poller = zmq.Poller() + poller.register(self.receiver_socket, zmq.POLLIN) + poller.register(self.thread_controller, zmq.POLLIN) + + self.receiverPortSignal.emit(self.receiver_port) + self.sinkStarted.emit() + + while True: + try: + socks = dict(poller.poll()) + except KeyboardInterrupt: + break + if self.receiver_socket in socks: + # Receive messages from the workers + # (or the terminate socket) + worker_id, directive, content = self.receiver_socket.recv_multipart() + + if directive == b'cmd': + command = content + assert command in [b"STOPPED", b"FINISHED", b"KILL"] + if command == b"KILL": + # Terminate immediately, without regard for any + # incoming messages. This message is only sent + # from this manager to itself, using the + # self.terminate_socket + logging.debug("{} is terminating".format(self._process_name)) + break + # This worker is done; remove from monitored workers and + # continue + worker_id = int(worker_id) + if command == b"STOPPED": + logging.debug("%s worker %s has stopped", self._process_name, worker_id) + self.workerStopped.emit(worker_id) + else: + # Worker has finished its work + self.workerFinished.emit(worker_id) + self.workers.remove(worker_id) + del self.processes[worker_id] + if not self.workers: + logging.debug("{} currently has no workers".format(self._process_name)) + if not self.workers and self.terminating: + logging.debug("{} is exiting".format(self._process_name)) + break + else: + assert directive == b'data' + self.content = content + self.process_sink_data() + + if self.thread_controller in socks: + # Receive messages from the main Rapid Photo Downloader thread + self.process_thread_directive() + + def process_thread_directive(self) -> None: + directive, worker_id, data = self.thread_controller.recv_multipart() + + # Directives: START, STOP, TERMINATE, SEND_TO_WORKER, STOP_WORKER, START_WORKER + if directive == b'START': + self.start() + elif directive == b'START_WORKER': + self.start_worker(worker_id=worker_id, data=data) + elif directive == b'SEND_TO_WORKER': + self.send_message_to_worker(worker_id=worker_id, data=data) + elif directive == b'STOP': + self.stop() + elif directive == b'STOP_WORKER': + self.stop_worker(worker_id=worker_id) + elif directive == b'PAUSE': + self.pause() + elif directive == b'RESUME': + self.resume(worker_id=worker_id) + elif directive == b'TERMINATE': + self.forcefully_terminate() + else: + logging.critical("%s received unknown directive %s", directive.decode()) + + def process_sink_data(self) -> None: + data = pickle.loads(self.content) + self.message.emit(data) + + def terminate_sink(self) -> None: + self.terminate_socket.send_multipart([b'0', b'cmd', b'KILL']) + + def _get_ventilator_start_message(self, worker_id: bytes) -> list: + return [worker_id, b'cmd', b'START'] + + def start(self) -> None: + logging.critical("Member function start() not implemented in child class of %s", + self._process_name) + + def start_worker(self, worker_id: bytes, data: bytes) -> None: + logging.critical("Member function start_worker() not implemented in child class of %s", + self._process_name) + + def stop(self) -> None: + logging.critical("Member function stop() not implemented in child class of %s", + self._process_name) + + def stop_worker(self, worker_id: int) -> None: + logging.critical("Member function stop_worker() not implemented in child class of %s", + self._process_name) + + def pause(self) -> None: + logging.critical("Member function pause() not implemented in child class of %s", + self._process_name) + + def resume(self, worker_id: Optional[bytes]) -> None: + logging.critical("Member function stop_worker() not implemented in child class of %s", + self._process_name) + + def send_message_to_worker(self, data: bytes, worker_id:Optional[bytes]=None) -> None: + if self.terminating: + logging.debug("%s not sending message to worker because manager is terminated", + self._process_name) + return + if not self.workers: + logging.debug("%s not sending message to worker because there are no workers", + self._process_name) + return + + assert isinstance(data, bytes) + + if worker_id: + message = [worker_id, b'data', data] + else: + message = [b'data', data] + self.ventilator_socket.send_multipart(message) + + def forcefully_terminate(self) -> None: + """ + Forcefully terminate any child processes and clean up. + + Shuts down the sink too. + """ + + super().forcefully_terminate() + self.terminate_sink() + + +class LoadBalancerWorkerManager(ProcessManager): + def __init__(self, no_workers: int, + backend_port: int, + sink_port: int, + logging_port: int) -> None: + super().__init__(logging_port=logging_port, thread_name='') + self.no_workers = no_workers + self.backend_port = backend_port + self.sink_port = sink_port + + def _get_command_line(self, worker_id: int) -> str: + cmd = self._get_cmd() + + return '{} --request {} --send {} --identity {} --logging {}'.format( + cmd, + self.backend_port, + self.sink_port, + worker_id, + self.logging_port) + + def start_workers(self) -> None: + for worker_id in range(self.no_workers): + self.add_worker(worker_id) + + +class LRUQueue: + """LRUQueue class using ZMQStream/IOLoop for event dispatching""" + + def __init__(self, backend_socket: zmq.Socket, + frontend_socket: zmq.Socket, + controller_socket: zmq.Socket, + worker_type: str, + process_manager: LoadBalancerWorkerManager) -> None: + + self.worker_type = worker_type + self.process_manager = process_manager + self.workers = deque() + self.terminating = False + self.terminating_workers = set() # type: Set[bytes] + self.stopped_workers = set() # type: Set[int] + + self.backend = ZMQStream(backend_socket) + self.frontend = ZMQStream(frontend_socket) + self.controller = ZMQStream(controller_socket) + self.backend.on_recv(self.handle_backend) + self.controller.on_recv(self.handle_controller) + + self.loop = IOLoop.instance() + + def handle_controller(self, msg): + self.terminating = True + # logging.debug("%s load balancer requesting %s workers to stop", self.worker_type, + # len(self.workers)) + + while len(self.workers): + worker_identity = self.workers.popleft() + + logging.debug("%s load balancer sending stop cmd to worker %s", self.worker_type, + worker_identity.decode()) + self.backend.send_multipart([worker_identity, b'', b'cmd', b'STOP']) + self.terminating_workers.add(worker_identity) + + self.loop.add_timeout(time.time()+3, self.loop.stop) + + def handle_backend(self, msg): + # Queue worker address for LRU routing + worker_identity, empty, client_addr = msg[:3] + + # add worker back to the list of workers + self.workers.append(worker_identity) + + # Second frame is empty + assert empty == b'' + + if msg[-1] == b'STOPPED' and self.terminating: + worker_id = get_worker_id_from_identity(worker_identity) + self.stopped_workers.add(worker_id) + self.terminating_workers.remove(worker_identity) + if len(self.terminating_workers) == 0: + for worker_id in self.stopped_workers: + p = self.process_manager.processes[worker_id] # type: psutil.Process + if p.is_running(): + pid = p.pid + if p.status() != psutil.STATUS_SLEEPING: + logging.debug("Waiting on %s process %s...", p.status(), pid) + os.waitpid(pid, 0) + logging.debug("...process %s is finished", pid) + else: + logging.debug("Process %s is sleeping", pid) + self.loop.add_timeout(time.time()+0.5, self.loop.stop) + + if len(self.workers) == 1: + # on first recv, start accepting frontend messages + self.frontend.on_recv(self.handle_frontend) + + def handle_frontend(self, request): + # Dequeue and drop the next worker address + worker_identity = self.workers.popleft() + + message = [worker_identity, b''] + request + self.backend.send_multipart(message) + if len(self.workers) == 0: + # stop receiving until workers become available again + self.frontend.stop_on_recv() + + +class LoadBalancer: + def __init__(self, worker_type: str, process_manager) -> None: + + self.parser = argparse.ArgumentParser() + self.parser.add_argument("--receive", required=True) + self.parser.add_argument("--send", required=True) + self.parser.add_argument("--controller", required=True) + self.parser.add_argument("--logging", required=True) + + args = self.parser.parse_args() + self.controller_port = args.controller + + context = zmq.Context() + frontend = context.socket(zmq.PULL) + frontend_port = frontend.bind_to_random_port('tcp://*') + + backend = context.socket(zmq.ROUTER) + backend_port = backend.bind_to_random_port('tcp://*') + + reply = context.socket(zmq.REP) + reply.connect("tcp://localhost:{}".format(args.receive)) + + controller = context.socket(zmq.PULL) + controller.connect('tcp://localhost:{}'.format(self.controller_port)) + + sink_port = args.send + logging_port = args.logging + + self.logger_publisher = ProcessLoggerPublisher( + context=context, name=worker_type, notification_port=args.logging + ) + + logging.debug( + "{} load balancer waiting to be notified how many workers to initialize...".format( + worker_type + ) + ) + no_workers = int(reply.recv()) + logging.debug("...{} load balancer will use {} workers".format(worker_type, no_workers)) + reply.send(str(frontend_port).encode()) + + process_manager = process_manager(no_workers, backend_port, sink_port, logging_port) + process_manager.start_workers() + + # create queue with the sockets + queue = LRUQueue(backend, frontend, controller, worker_type, process_manager) + + # start reactor, which is an infinite loop + IOLoop.instance().start() + + # Finished infinite loop: do some housekeeping + logging.debug("Forcefully terminating load balancer child processes") + process_manager.forcefully_terminate() + + frontend.close() + backend.close() + + +class LoadBalancerManager(ProcessManager, QObject): + """ + Launches and requests termination of the Load Balancer process + """ + + load_balancer_started = pyqtSignal(int) + def __init__(self, context: zmq.Context, + no_workers: int, + sink_port: int, + logging_port: int, + thread_name: str) -> None: + super().__init__(logging_port=logging_port, thread_name=thread_name) + self.no_workers = no_workers + self.sink_port = sink_port + self.context = context + + @pyqtSlot() + def start_load_balancer(self) -> None: + + self.controller_socket = self.context.socket(zmq.PUSH) + self.controller_port = self.controller_socket.bind_to_random_port('tcp://*') + + self.requester = self.context.socket(zmq.REQ) + self.requester_port = self.requester.bind_to_random_port('tcp://*') + + self.thread_controller = self. context.socket(zmq.PAIR) + self.thread_controller.connect('inproc://{}'.format(self.thread_name)) + + worker_id = 0 + self.add_worker(worker_id) + self.requester.send(str(self.no_workers).encode()) + self.frontend_port = int(self.requester.recv()) + self.load_balancer_started.emit(self.frontend_port) + + # wait for stop signal + directive, worker_id, data = self.thread_controller.recv_multipart() + assert directive == b'STOP' + self.stop() + + def stop(self): + self.controller_socket.send(b'STOP') + + def _get_command_line(self, worker_id: int) -> str: + cmd = self._get_cmd() + + return '{} --receive {} --send {} --controller {} --logging {}'.format( + cmd, + self.requester_port, + self.sink_port, + self.controller_port, + self.logging_port) + +DAEMON_WORKER_ID = 0 + + +class PushPullDaemonManager(PullPipelineManager): + """ + Manage a single instance daemon worker process that waits to work on data + issued by this manager. The data to be worked on is issued in sequence, + one after the other. + + Because this is a single daemon process, a Push-Pull model is most + suitable for sending the data. + """ + + def _start_sockets(self) -> None: + + super()._start_sockets() + + context = zmq.Context.instance() + + # Ventilator socket to send message to worker + self.ventilator_socket = context.socket(zmq.PUSH) + self.ventilator_port = self.ventilator_socket.bind_to_random_port('tcp://*') + + def stop(self) -> None: + """ + Permanently stop the daemon process and terminate + """ + + logging.debug("{} halting".format(self._process_name)) + self.terminating = True + + # Only send stop command if the process is still running + if self.process_alive(DAEMON_WORKER_ID): + try: + self.ventilator_socket.send_multipart([b'cmd', b'STOP'], zmq.DONTWAIT) + except zmq.Again: + logging.debug( + "Terminating %s sink because child process did not receive message", + self._process_name) + self.terminate_sink() + else: + # The process may have crashed. Stop the sink. + self.terminate_sink() + + def _get_command_line(self, worker_id: int) -> str: + cmd = self._get_cmd() + + return '{} --receive {} --send {} --logging {}'.format( + cmd, + self.ventilator_port, + self.receiver_port, + self.logging_port) + + def _get_ventilator_start_message(self, worker_id: int) -> List[bytes]: + return [b'cmd', b'START'] + + def start(self) -> None: + logging.debug("Starting worker for %s", self._process_name) + self.add_worker(worker_id=DAEMON_WORKER_ID) + + +class PublishPullPipelineManager(PullPipelineManager): + """ + Manage a collection of worker processes that wait to work on data + issued by this manager. The data to be worked on is issued in sequence, + one after the other, either once, or many times. + + Because there are multiple worker process, a Publish-Subscribe model is + most suitable for sending data to workers. + """ + + def _start_sockets(self) -> None: + + super()._start_sockets() + + context = zmq.Context.instance() + + # Ventilator socket to send messages to workers on + self.ventilator_socket = context.socket(zmq.PUB) + self.ventilator_port= self.ventilator_socket.bind_to_random_port('tcp://*') + + # Socket to synchronize the start of each worker + self.sync_service_socket = context.socket(zmq.REP) + self.sync_service_port = self.sync_service_socket.bind_to_random_port("tcp://*") + + # Socket for worker control: pause, resume, stop + self.controller_socket = context.socket(zmq.PUB) + self.controller_port = self.controller_socket.bind_to_random_port("tcp://*") + + def stop(self) -> None: + """ + Permanently stop all the workers and terminate + """ + + logging.debug("{} halting".format(self._process_name)) + self.terminating = True + if self.workers: + # Signal workers they must immediately stop + termination_signal_sent = False + alive_workers = [worker_id for worker_id in self.workers if + self.process_alive(worker_id)] + for worker_id in alive_workers: + + message = [make_filter_from_worker_id(worker_id),b'STOP'] + self.controller_socket.send_multipart(message) + + message = [make_filter_from_worker_id(worker_id), b'cmd', b'STOP'] + self.ventilator_socket.send_multipart(message) + termination_signal_sent = True + + if not termination_signal_sent: + self.terminate_sink() + else: + self.terminate_sink() + + def stop_worker(self, worker_id: bytes) -> None: + """ + Permanently stop one worker + """ + + if int(worker_id) in self.workers: + message = [worker_id, b'STOP'] + self.controller_socket.send_multipart(message) + message = [worker_id, b'cmd', b'STOP'] + self.ventilator_socket.send_multipart(message) + + def start_worker(self, worker_id: bytes, data: bytes) -> None: + + self.add_worker(int(worker_id)) + + # Send START commands until scan worker indicates it is ready to + # receive data + # Worker ID must be in bytes format + while True: + self.ventilator_socket.send_multipart( + self._get_ventilator_start_message(worker_id)) + try: + # look for synchronization request + self.sync_service_socket.recv(zmq.DONTWAIT) + # send synchronization reply + self.sync_service_socket.send(b'') + break + except zmq.Again: + # Briefly pause sending out START messages + # There is no point flooding the network + time.sleep(.01) + + # Send data to process to tell it what to work on + self.send_message_to_worker(data=data, worker_id=worker_id) + + def _get_command_line(self, worker_id: int) -> str: + cmd = self._get_cmd() + + return '{} --receive {} --send {} --controller {} --syncclient {} ' \ + '--filter {} --logging {}'.format( + cmd, + self.ventilator_port, + self.receiver_port, + self.controller_port, + self.sync_service_port, + worker_id, + self.logging_port) + + def __len__(self) -> int: + return len(self.workers) + + def __contains__(self, item) -> bool: + return item in self.workers + + def pause(self) -> None: + for worker_id in self.workers: + message = [make_filter_from_worker_id(worker_id), b'PAUSE'] + self.controller_socket.send_multipart(message) + + def resume(self, worker_id: bytes) -> None: + if worker_id: + workers = [int(worker_id)] + else: + workers = self.workers + for worker_id in workers: + message = [make_filter_from_worker_id(worker_id), b'RESUME'] + self.controller_socket.send_multipart(message) + + +class ProcessLoggerPublisher: + """ + Setup the sockets for worker processes to send log messages to the + main process. + + Two tasks: set up the PUB socket, and then tell the main process + what port we're using via a second socket, and when we're closing it. + """ + + def __init__(self, context: zmq.Context, name: str, notification_port: int) -> None: + + self.logger_pub = context.socket(zmq.PUB) + self.logger_pub_port = self.logger_pub.bind_to_random_port("tcp://*") + self.handler = ZeroMQSocketHandler(self.logger_pub) + self.handler.setLevel(logging.DEBUG) + + self.logger = logging.getLogger() + self.logger.setLevel(logging.DEBUG) + self.logger.addHandler(self.handler) + + self.logger_socket = context.socket(zmq.PUSH) + self.logger_socket.connect("tcp://localhost:{}".format(notification_port)) + self.logger_socket.send_multipart([b'CONNECT', str(self.logger_pub_port).encode()]) + + def close(self): + self.logger.removeHandler(self.handler) + self.logger_socket.send_multipart([b'DISCONNECT', str(self.logger_pub_port).encode()]) + self.logger_pub.close() + self.logger_socket.close() + + +class WorkerProcess(): + def __init__(self, worker_type: str) -> None: + super().__init__() + self.parser = argparse.ArgumentParser() + self.parser.add_argument("--receive", required=True) + self.parser.add_argument("--send", required=True) + self.parser.add_argument("--logging", required=True) + + def cleanup_pre_stop(self) -> None: + """ + Operations to run if process is stopped. + + Implement in child class if needed. + """ + + pass + + def setup_logging_pub(self, notification_port: int, name: str) -> None: + """ + Sets up the 0MQ socket that sends out logging messages + + :param notification_port: port that should be notified about + the new logging publisher + :param name: descriptive name to place in the log messages + """ + + if self.worker_id is not None: + name = '{}-{}'.format(name, self.worker_id.decode()) + self.logger_publisher = ProcessLoggerPublisher(context=self.context, + name=name, + notification_port=notification_port) + + def send_message_to_sink(self) -> None: + + self.sender.send_multipart([self.worker_id, b'data', self.content]) + + def initialise_process(self) -> None: + # Wait to receive "START" message + worker_id, directive, content = self.receiver.recv_multipart() + assert directive == b'cmd' + assert content == b'START' + + # send a synchronization request + self.sync_client.send(b'') + + # wait for synchronization reply + self.sync_client.recv() + + # Receive next "START" message and discard, looking for data message + while True: + worker_id, directive, content = self.receiver.recv_multipart() + if directive == b'data': + break + else: + assert directive == b'cmd' + assert content == b'START' + + self.content = content + + def do_work(self): + pass + + +class DaemonProcess(WorkerProcess): + """ + Single instance + """ + def __init__(self, worker_type: str) -> None: + super().__init__(worker_type) + + args = self.parser.parse_args() + + self.context = zmq.Context() + # Socket to send messages along the pipe to + self.sender = self.context.socket(zmq.PUSH) + self.sender.set_hwm(10) + self.sender.connect("tcp://localhost:{}".format(args.send)) + + self.receiver = self.context.socket(zmq.PULL) + self.receiver.connect("tcp://localhost:{}".format(args.receive)) + + self.worker_id = None + + self.setup_logging_pub(notification_port=args.logging, name=worker_type) + + def run(self) -> None: + pass + + def check_for_command(self, directive: bytes, content: bytes) -> None: + if directive == b'cmd': + assert content == b'STOP' + self.cleanup_pre_stop() + # signal to sink that we've terminated before finishing + self.sender.send_multipart([make_filter_from_worker_id( + DAEMON_WORKER_ID), b'cmd', b'STOPPED']) + sys.exit(0) + + def send_message_to_sink(self) -> None: + # Must use a dummy value for the worker id, as there is only ever one + # instance. + self.sender.send_multipart([make_filter_from_worker_id( + DAEMON_WORKER_ID), b'data', self.content]) + + +class WorkerInPublishPullPipeline(WorkerProcess): + """ + Worker counterpart to PublishPullPipelineManager; multiple instance. + """ + def __init__(self, worker_type: str) -> None: + super().__init__(worker_type) + self.add_args() + + args = self.parser.parse_args() + + subscription_filter = self.worker_id = args.filter.encode() + self.context = zmq.Context() + + self.setup_sockets(args, subscription_filter) + self.setup_logging_pub(notification_port=args.logging, name=worker_type) + + self.initialise_process() + self.do_work() + + def add_args(self) -> None: + self.parser.add_argument("--filter", required=True) + self.parser.add_argument("--syncclient", required=True) + self.parser.add_argument("--controller", required=True) + + def setup_sockets(self, args, subscription_filter: bytes) -> None: + + # Socket to send messages along the pipe to + self.sender = self.context.socket(zmq.PUSH) + self.sender.set_hwm(10) + self.sender.connect("tcp://localhost:{}".format(args.send)) + + # Socket to receive messages from the pipe + self.receiver = self.context.socket(zmq.SUB) + self.receiver.connect("tcp://localhost:{}".format(args.receive)) + self.receiver.setsockopt(zmq.SUBSCRIBE, subscription_filter) + + # Socket to receive controller messages: stop, pause, resume + self.controller = self.context.socket(zmq.SUB) + self.controller.connect("tcp://localhost:{}".format(args.controller)) + self.controller.setsockopt(zmq.SUBSCRIBE, subscription_filter) + + # Socket to synchronize the start of receiving data from upstream + self.sync_client = self.context.socket(zmq.REQ) + self.sync_client.connect("tcp://localhost:{}".format(args.syncclient)) + + def check_for_command(self, directive: bytes, content) -> None: + if directive == b'cmd': + try: + assert content == b'STOP' + except AssertionError: + logging.critical("Expected STOP command but instead got %s", content.decode()) + else: + self.cleanup_pre_stop() + self.disconnect_logging() + # signal to sink that we've terminated before finishing + self.sender.send_multipart([self.worker_id, b'cmd', b'STOPPED']) + sys.exit(0) + + def check_for_controller_directive(self) -> None: + try: + # Don't block if process is running regularly + # If there is no command,exception will occur + worker_id, command = self.controller.recv_multipart(zmq.DONTWAIT) + assert command in [b'PAUSE', b'STOP'] + assert worker_id == self.worker_id + + if command == b'PAUSE': + # Because the process is paused, do a blocking read to + # wait for the next command + worker_id, command = self.controller.recv_multipart() + assert (command in [b'RESUME', b'STOP']) + if command == b'STOP': + self.cleanup_pre_stop() + # before finishing, signal to sink that we've terminated + self.sender.send_multipart([self.worker_id, b'cmd', b'STOPPED']) + sys.exit(0) + except zmq.Again: + pass # Continue working + + def resume_work(self) -> None: + worker_id, command = self.controller.recv_multipart() + assert (command in [b'RESUME', b'STOP']) + if command == b'STOP': + self.cleanup_pre_stop() + self.disconnect_logging() + # before finishing, signal to sink that we've terminated + self.sender.send_multipart([self.worker_id, b'cmd', b'STOPPED']) + sys.exit(0) + + def disconnect_logging(self) -> None: + self.logger_publisher.close() + + def send_finished_command(self) -> None: + self.sender.send_multipart([self.worker_id, b'cmd', b'FINISHED']) + + +class LoadBalancerWorker: + def __init__(self, worker_type: str) -> None: + super().__init__() + self.parser = argparse.ArgumentParser() + self.parser.add_argument("--request", required=True) + self.parser.add_argument("--send", required=True) + self.parser.add_argument("--identity", required=True) + self.parser.add_argument("--logging", required=True) + + args = self.parser.parse_args() + + self.context = zmq.Context() + + self.requester = self.context.socket(zmq.REQ) + self.identity = create_identity(worker_type, args.identity) + self.requester.identity = self.identity + self.requester.connect("tcp://localhost:{}".format(args.request)) + + # Sender is located in the main process. It is where output (messages) + # from this process are are sent to. + self.sender = self.context.socket(zmq.PUSH) + self.sender.connect("tcp://localhost:{}".format(args.send)) + + self.logger_publisher = ProcessLoggerPublisher(context=self.context, + name=worker_type, + notification_port=args.logging) + + # Tell the load balancer we are ready for work + self.requester.send(b"READY") + self.do_work() + + def do_work(self) -> None: + # Implement in subclass + pass + + def cleanup_pre_stop(self) -> None: + """ + Operations to run if process is stopped. + + Implement in child class if needed. + """ + + pass + + def exit(self): + self.cleanup_pre_stop() + identity = self.requester.identity.decode() + # signal to load balancer that we've terminated before finishing + self.requester.send_multipart([b'', b'', b'STOPPED']) + self.requester.close() + self.sender.close() + self.logger_publisher.close() + self.context.term() + logging.debug("%s with pid %s stopped", identity, os.getpid()) + sys.exit(0) + + def check_for_command(self, directive: bytes, content: bytes): + if directive == b'cmd': + assert content == b'STOP' + self.exit() + + +class ProcessLoggingManager(QObject): + """ + Receive and log logging messages from workers. + + An alternative might be using python logging's QueueListener, which + like this code, runs on its own thread. + """ + + ready = pyqtSignal(int) + + @pyqtSlot() + def startReceiver(self) -> None: + context = zmq.Context.instance() + self.receiver = context.socket(zmq.SUB) + # Subscribe to all variates of logging messages + self.receiver.setsockopt(zmq.SUBSCRIBE, b'') + + # Socket to receive subscription information, and the stop command + info_socket = context.socket(zmq.PULL) + self.info_port = info_socket.bind_to_random_port('tcp://*') + + poller = zmq.Poller() + poller.register(self.receiver, zmq.POLLIN) + poller.register(info_socket, zmq.POLLIN) + + self.ready.emit(self.info_port) + + while True: + try: + socks = dict(poller.poll()) + except KeyboardInterrupt: + break + + if self.receiver in socks: + message = self.receiver.recv() + record = logging.makeLogRecord(pickle.loads(message)) + logger.handle(record) + + if info_socket in socks: + directive, content = info_socket.recv_multipart() + if directive == b'STOP': + break + elif directive == b'CONNECT': + self.addSubscription(content) + else: + assert directive == b'DISCONNECT' + self.removeSubscription(content) + + def addSubscription(self, port: bytes) -> None: + try: + port = int(port) + except ValueError: + logging.critical('Incorrect port value in add logging subscription: %s', port) + else: + logging.debug("Subscribing to logging on port %s", port) + self.receiver.connect("tcp://localhost:{}".format(port)) + + def removeSubscription(self, port: bytes): + try: + port = int(port) + except ValueError: + logging.critical('Incorrect port value in remove logging subscription: %s', port) + else: + logging.debug("Unsubscribing to logging on port %s", port) + self.receiver.disconnect("tcp://localhost:{}".format(port)) + + +def stop_process_logging_manager(info_port: int) -> None: + """ + Stop ProcessLoggingManager thread + + :param info_port: the port number the manager uses + """ + + context = zmq.Context.instance() + command = context.socket(zmq.PUSH) + command.connect("tcp://localhost:{}".format(info_port)) + command.send_multipart([b'STOP', b'']) + + +class ScanArguments: + """ + Pass arguments to the scan process + """ + def __init__(self, device: Device, + ignore_other_types: bool, + log_gphoto2: bool) -> None: + """ + Pass arguments to the scan process + + :param device: the device to scan + :param ignore_other_types: ignore file types like TIFF + :param log_gphoto2: whether to generate detailed gphoto2 log + messages + :param scan_only_DCIM: if the device is an auto-detected volume, + then if True, scan only in it's DCIM folder + :param warn_unknown_file: whether to issue a warning when + encountering an unknown (unrecognized) file + """ + + self.device = device + self.ignore_other_types = ignore_other_types + self.log_gphoto2 = log_gphoto2 + + +class ScanResults: + """ + Receive results from the scan process + """ + + def __init__(self, rpd_files: Optional[List[RPDFile]]=None, + file_type_counter: Optional[FileTypeCounter]=None, + file_size_sum: Optional[FileSizeSum]=None, + error_code: Optional[CameraErrorCode]=None, + scan_id: Optional[int]=None, + optimal_display_name: Optional[str]=None, + storage_space: Optional[List[StorageSpace]]=None, + storage_descriptions: Optional[List[str]]=None, + sample_photo: Optional[Photo]=None, + sample_video: Optional[Video]=None, + problems: Optional[ScanProblems]=None, + fatal_error: Optional[bool]=None, + entire_video_required: Optional[bool]=None) -> None: + self.rpd_files = rpd_files + self.file_type_counter = file_type_counter + self.file_size_sum = file_size_sum + self.error_code = error_code + self.scan_id = scan_id + self.optimal_display_name = optimal_display_name + self.storage_space = storage_space + self.storage_descriptions = storage_descriptions + self.sample_photo = sample_photo + self.sample_video = sample_video + self.problems = problems + self.fatal_error = fatal_error + self.entire_video_required = entire_video_required + + +class CopyFilesArguments: + """ + Pass arguments to the copyfiles process + """ + + def __init__(self, scan_id: int, + device: Device, + photo_download_folder: str, + video_download_folder: str, + files: List[RPDFile], + verify_file: bool, + generate_thumbnails: bool, + log_gphoto2: bool) -> None: + self.scan_id = scan_id + self.device = device + self.photo_download_folder = photo_download_folder + self.video_download_folder = video_download_folder + self.files = files + self.generate_thumbnails = generate_thumbnails + self.verify_file = verify_file + self.log_gphoto2 = log_gphoto2 + + +class CopyFilesResults: + """ + Receive results from the copyfiles process + """ + + def __init__(self, scan_id: Optional[int]=None, + photo_temp_dir: Optional[str]=None, + video_temp_dir: Optional[str]=None, + total_downloaded: Optional[int]=None, + chunk_downloaded: Optional[int]=None, + copy_succeeded: Optional[bool]=None, + rpd_file: Optional[RPDFile]=None, + download_count: Optional[int]=None, + mdata_exceptions: Optional[Tuple]=None, + problems: Optional[CopyingProblems]=None) -> None: + """ + + :param scan_id: scan id of the device the files are being + downloaded from + :param photo_temp_dir: temp directory path, used to copy + photos into until they're renamed + :param video_temp_dir: temp directory path, used to copy + videos into until they're renamed + :param total_downloaded: how many bytes in total have been + downloaded + :param chunk_downloaded: how many bytes were downloaded since + the last message + :param copy_succeeded: whether the copy was successful or not + :param rpd_file: details of the file that was copied + :param download_count: a running count of how many files + have been copied. Used in download tracking. + :param mdata_exceptions: details of errors setting file metadata + :param problems: details of any problems encountered copying files, + not including metedata write problems. + """ + + self.scan_id = scan_id + + self.photo_temp_dir = photo_temp_dir + self.video_temp_dir = video_temp_dir + + self.total_downloaded = total_downloaded + self.chunk_downloaded = chunk_downloaded + + self.copy_succeeded = copy_succeeded + self.rpd_file = rpd_file + self.download_count = download_count + self.mdata_exceptions = mdata_exceptions + self.problems = problems + + + +class ThumbnailDaemonData: + """ + Pass arguments to the thumbnail daemon process. + + Occurs after a file is downloaded & renamed, and also + after a file is backed up. + """ + + def __init__(self, frontend_port: Optional[int]=None, + rpd_file: Optional[RPDFile]=None, + write_fdo_thumbnail: Optional[bool]=None, + use_thumbnail_cache: Optional[bool]=None, + backup_full_file_names: Optional[List[str]]=None, + fdo_name: Optional[str]=None) -> None: + self.frontend_port = frontend_port + self.rpd_file = rpd_file + self.write_fdo_thumbnail = write_fdo_thumbnail + self.use_thumbnail_cache = use_thumbnail_cache + self.backup_full_file_names = backup_full_file_names + self.fdo_name = fdo_name + + +class RenameAndMoveFileData: + """ + Pass arguments to the renameandmovefile process + """ + + def __init__(self, rpd_file: RPDFile=None, + download_count: int=None, + download_succeeded: bool=None, + message: RenameAndMoveStatus=None) -> None: + self.rpd_file = rpd_file + self.download_count = download_count + self.download_succeeded = download_succeeded + self.message = message + + +class RenameAndMoveFileResults: + def __init__(self, move_succeeded: bool=None, + rpd_file: RPDFile=None, + download_count: int=None, + stored_sequence_no: int=None, + downloads_today: List[str]=None, + problems: Optional[RenamingProblems]=None) -> None: + self.move_succeeded = move_succeeded + self.rpd_file = rpd_file + self.download_count = download_count + self.stored_sequence_no = stored_sequence_no + self.downloads_today = downloads_today + self.problems = problems + + +class OffloadData: + def __init__(self, thumbnail_rows: Optional[Sequence[ThumbnailDataForProximity]]=None, + proximity_seconds: int=None, + rpd_files: Optional[Sequence[RPDFile]]=None, + strip_characters: Optional[bool]=None, + folders_preview: Optional[FoldersPreview]=None) -> None: + self.thumbnail_rows = thumbnail_rows + self.proximity_seconds = proximity_seconds + self.rpd_files = rpd_files + self.strip_characters = strip_characters + self.folders_preview = folders_preview + + +class OffloadResults: + def __init__(self, proximity_groups: Optional[TemporalProximityGroups]=None, + folders_preview: Optional[FoldersPreview]=None) -> None: + self.proximity_groups = proximity_groups + self.folders_preview = folders_preview + + +class BackupArguments: + """ + Pass start up data to the back up process + """ + def __init__(self, path: str, device_name: str) -> None: + self.path = path + self.device_name = device_name + + +class BackupFileData: + """ + Pass file data to the backup process + """ + def __init__(self, rpd_file: Optional[RPDFile]=None, + move_succeeded: Optional[bool]=None, + do_backup: Optional[bool]=None, + path_suffix: Optional[str]=None, + backup_duplicate_overwrite: Optional[bool]=None, + verify_file: Optional[bool]=None, + download_count: Optional[int]=None, + save_fdo_thumbnail: Optional[int]=None, + message: Optional[BackupStatus]=None) -> None: + self.rpd_file = rpd_file + self.move_succeeded = move_succeeded + self.do_backup = do_backup + self.path_suffix = path_suffix + self.backup_duplicate_overwrite = backup_duplicate_overwrite + self.verify_file = verify_file + self.download_count = download_count + self.save_fdo_thumbnail = save_fdo_thumbnail + self.message = message + + +class BackupResults: + def __init__(self, scan_id: int, + device_id: int, + total_downloaded: Optional[int]=None, + chunk_downloaded: Optional[int]=None, + backup_succeeded: Optional[bool]=None, + do_backup: Optional[bool]=None, + rpd_file: Optional[RPDFile] = None, + backup_full_file_name: Optional[str]=None, + mdata_exceptions: Optional[Tuple] = None, + problems: Optional[BackingUpProblems]=None) -> None: + self.scan_id = scan_id + self.device_id = device_id + self.total_downloaded = total_downloaded + self.chunk_downloaded = chunk_downloaded + self.backup_succeeded = backup_succeeded + self.do_backup = do_backup + self.rpd_file = rpd_file + self.backup_full_file_name = backup_full_file_name + self.mdata_exceptions = mdata_exceptions + self.problems = problems + + +class GenerateThumbnailsArguments: + def __init__(self, scan_id: int, + rpd_files: List[RPDFile], + name: str, + proximity_seconds: int, + cache_dirs: CacheDirs, + need_video_cache_dir: bool, + frontend_port: int, + log_gphoto2: bool, + camera: Optional[str]=None, + port: Optional[str]=None, + entire_video_required: Optional[bool]=None) -> None: + """ + List of files for which thumbnails are to be generated. + All files are assumed to have the same scan id. + :param scan_id: id of the scan + :param rpd_files: files from which to extract thumbnails + :param name: name of the device + :param proximity_seconds: the time elapsed between consecutive + shots that is used to prioritize the order of thumbnail + generation + :param cache_dirs: the location where the cache directories + should be created + :param need_video_cache_dir: if True, must use cache dir + to extract video thumbnail + :param frontend_port: port to use to send to load balancer's + front end + :param log_gphoto2: if True, log libgphoto2 logging messages + :param camera: If the thumbnails are being downloaded from a + camera, this is the name of the camera, else None + :param port: If the thumbnails are being downloaded from a + camera, this is the port of the camera, else None + :param entire_video_required: if the entire video is + required to extract the thumbnail + """ + + self.rpd_files = rpd_files + self.scan_id = scan_id + self.name = name + self.proximity_seconds = proximity_seconds + self.cache_dirs = cache_dirs + self.need_video_cache_dir = need_video_cache_dir + self.frontend_port = frontend_port + if camera is not None: + assert port is not None + assert entire_video_required is not None + self.camera = camera + self.port = port + self.log_gphoto2 = log_gphoto2 + self.entire_video_required = entire_video_required + + +class GenerateThumbnailsResults: + def __init__(self, rpd_file: Optional[RPDFile]=None, + thumbnail_bytes: Optional[bytes]=None, + scan_id: Optional[int]=None, + cache_dirs: Optional[CacheDirs]=None) -> None: + self.rpd_file = rpd_file + # If thumbnail_bytes is None, there is no thumbnail + self.thumbnail_bytes = thumbnail_bytes + self.scan_id = scan_id + self.cache_dirs = cache_dirs + + +class ThumbnailExtractorArgument: + def __init__(self, rpd_file: RPDFile, + task: ExtractionTask, + processing: Set[ExtractionProcessing], + full_file_name_to_work_on: str, + secondary_full_file_name: str, + exif_buffer: Optional[bytearray], + thumbnail_bytes: bytes, + use_thumbnail_cache: bool, + file_to_work_on_is_temporary: bool, + write_fdo_thumbnail: bool, + send_thumb_to_main: bool) -> None: + self.rpd_file = rpd_file + self.task = task + self.processing = processing + self.full_file_name_to_work_on = full_file_name_to_work_on + self.secondary_full_file_name = secondary_full_file_name + self.file_to_work_on_is_temporary = file_to_work_on_is_temporary + self.exif_buffer = exif_buffer + self.thumbnail_bytes = thumbnail_bytes + self.use_thumbnail_cache = use_thumbnail_cache + self.write_fdo_thumbnail = write_fdo_thumbnail + self.send_thumb_to_main = send_thumb_to_main + + +class RenameMoveFileManager(PushPullDaemonManager): + """ + Manages the single instance daemon process that renames and moves + files that have just been downloaded + """ + + message = pyqtSignal(bool, RPDFile, int) + sequencesUpdate = pyqtSignal(int, list) + renameProblems = pyqtSignal('PyQt_PyObject') + + def __init__(self, logging_port: int) -> None: + super().__init__(logging_port=logging_port, thread_name=ThreadNames.rename) + self._process_name = 'Rename and Move File Manager' + self._process_to_run = 'renameandmovefile.py' + + def process_sink_data(self): + data = pickle.loads(self.content) # type: RenameAndMoveFileResults + if data.move_succeeded is not None: + + self.message.emit(data.move_succeeded, data.rpd_file, data.download_count) + + elif data.problems is not None: + self.renameProblems.emit(data.problems) + else: + assert data.stored_sequence_no is not None + assert data.downloads_today is not None + assert isinstance(data.downloads_today, list) + self.sequencesUpdate.emit(data.stored_sequence_no, + data.downloads_today) + + +class ThumbnailDaemonManager(PushPullDaemonManager): + """ + Manages the process that extracts thumbnails after the file + has already been downloaded and that writes FreeDesktop.org + thumbnails. Not to be confused with ThumbnailManagerPara, which + manages thumbnailing using processes that run in parallel, + one for each device. + """ + + message = pyqtSignal(RPDFile, QPixmap) + + def __init__(self, logging_port: int) -> None: + super().__init__(logging_port=logging_port, thread_name=ThreadNames.thumbnail_daemon) + self._process_name = 'Thumbnail Daemon Manager' + self._process_to_run = 'thumbnaildaemon.py' + + def process_sink_data(self) -> None: + data = pickle.loads(self.content) # type: GenerateThumbnailsResults + if data.thumbnail_bytes is None: + thumbnail = QPixmap() + else: + thumbnail = QImage.fromData(data.thumbnail_bytes) + if thumbnail.isNull(): + thumbnail = QPixmap() + else: + thumbnail = QPixmap.fromImage(thumbnail) + self.message.emit(data.rpd_file, thumbnail) + + +class OffloadManager(PushPullDaemonManager): + """ + Handles tasks best run in a separate process + """ + + message = pyqtSignal(TemporalProximityGroups) + downloadFolders = pyqtSignal(FoldersPreview) + + def __init__(self, logging_port: int) -> None: + super().__init__(logging_port=logging_port, thread_name=ThreadNames.offload) + self._process_name = 'Offload Manager' + self._process_to_run = 'offload.py' + + def process_sink_data(self) -> None: + data = pickle.loads(self.content) # type: OffloadResults + if data.proximity_groups is not None: + self.message.emit(data.proximity_groups) + elif data.folders_preview is not None: + self.downloadFolders.emit(data.folders_preview) + + +class ScanManager(PublishPullPipelineManager): + """ + Handles the processes that scan devices (cameras, external devices, + this computer path) + """ + scannedFiles = pyqtSignal( + 'PyQt_PyObject', 'PyQt_PyObject', FileTypeCounter, 'PyQt_PyObject', bool + ) + deviceError = pyqtSignal(int, CameraErrorCode) + deviceDetails = pyqtSignal(int, 'PyQt_PyObject', 'PyQt_PyObject', str) + scanProblems = pyqtSignal(int, 'PyQt_PyObject') + fatalError = pyqtSignal(int) + + def __init__(self, logging_port: int) -> None: + super().__init__(logging_port=logging_port, thread_name=ThreadNames.scan) + self._process_name = 'Scan Manager' + self._process_to_run = 'scan.py' + + def process_sink_data(self) -> None: + data = pickle.loads(self.content) # type: ScanResults + if data.rpd_files is not None: + assert data.file_type_counter + assert data.file_size_sum + assert data.entire_video_required is not None + self.scannedFiles.emit( + data.rpd_files, + (data.sample_photo, data.sample_video), + data.file_type_counter, + data.file_size_sum, + data.entire_video_required + ) + else: + assert data.scan_id is not None + if data.error_code is not None: + self.deviceError.emit(data.scan_id, data.error_code) + elif data.optimal_display_name is not None: + self.deviceDetails.emit( + data.scan_id, data.storage_space, data.storage_descriptions, + data.optimal_display_name + ) + elif data.problems is not None: + self.scanProblems.emit(data.scan_id, data.problems) + else: + assert data.fatal_error + self.fatalError.emit(data.scan_id) + + +class BackupManager(PublishPullPipelineManager): + """ + Each backup "device" (it could be an external drive, or a user- + specified path on the local file system) has associated with it one + worker process. For example if photos and videos are both being + backed up to the same external hard drive, one worker process + handles both the photos and the videos. However if photos are being + backed up to one drive, and videos to another, there would be a + worker process for each drive (2 in total). + """ + message = pyqtSignal(int, bool, bool, RPDFile, str, 'PyQt_PyObject') + bytesBackedUp = pyqtSignal('PyQt_PyObject', 'PyQt_PyObject') + backupProblems = pyqtSignal(int, 'PyQt_PyObject') + + def __init__(self, logging_port: int) -> None: + super().__init__(logging_port=logging_port, thread_name=ThreadNames.backup) + self._process_name = 'Backup Manager' + self._process_to_run = 'backupfile.py' + + def process_sink_data(self) -> None: + data = pickle.loads(self.content) # type: BackupResults + if data.total_downloaded is not None: + assert data.scan_id is not None + assert data.chunk_downloaded >= 0 + assert data.total_downloaded >= 0 + self.bytesBackedUp.emit(data.scan_id, data.chunk_downloaded) + elif data.backup_succeeded is not None: + assert data.do_backup is not None + assert data.rpd_file is not None + self.message.emit( + data.device_id, data.backup_succeeded, data.do_backup, data.rpd_file, + data.backup_full_file_name, data.mdata_exceptions) + else: + assert data.problems is not None + self.backupProblems.emit(data.device_id, data.problems) + + +class CopyFilesManager(PublishPullPipelineManager): + """ + Manage the processes that copy files from devices to the computer + during the download process + """ + + message = pyqtSignal(bool, RPDFile, int, 'PyQt_PyObject') + tempDirs = pyqtSignal(int, str,str) + bytesDownloaded = pyqtSignal(int, 'PyQt_PyObject', 'PyQt_PyObject') + copyProblems = pyqtSignal(int, 'PyQt_PyObject') + + def __init__(self, logging_port: int) -> None: + super().__init__(logging_port=logging_port, thread_name=ThreadNames.copy) + self._process_name = 'Copy Files Manager' + self._process_to_run = 'copyfiles.py' + + def process_sink_data(self) -> None: + data = pickle.loads(self.content) # type: CopyFilesResults + if data.total_downloaded is not None: + assert data.scan_id is not None + if data.chunk_downloaded < 0: + logging.critical("Chunk downloaded is less than zero: %s", data.chunk_downloaded) + if data.total_downloaded < 0: + logging.critical("Chunk downloaded is less than zero: %s", data.total_downloaded) + + self.bytesDownloaded.emit(data.scan_id, data.total_downloaded, data.chunk_downloaded) + + elif data.copy_succeeded is not None: + assert data.rpd_file is not None + assert data.download_count is not None + self.message.emit(data.copy_succeeded, data.rpd_file, data.download_count, + data.mdata_exceptions) + + elif data.problems is not None: + self.copyProblems.emit(data.scan_id, data.problems) + + else: + assert (data.photo_temp_dir is not None and + data.video_temp_dir is not None) + assert data.scan_id is not None + self.tempDirs.emit(data.scan_id, data.photo_temp_dir, data.video_temp_dir)
\ No newline at end of file |