From 0a297829eaf3912c939e1b43a3ef6ddeb7607b38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antoine=20Beaupr=C3=A9?= Date: Wed, 14 Mar 2018 12:24:17 -0400 Subject: New upstream version 0.9.9 --- raphodo/interprocess.py | 184 +++++++++++++++++++++++++++++------------------- 1 file changed, 110 insertions(+), 74 deletions(-) (limited to 'raphodo/interprocess.py') diff --git a/raphodo/interprocess.py b/raphodo/interprocess.py index 2b877ef..76d9aa4 100644 --- a/raphodo/interprocess.py +++ b/raphodo/interprocess.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2016 Damon Lynch +# Copyright (C) 2015-2018 Damon Lynch # This file is part of Rapid Photo Downloader. # @@ -17,7 +17,7 @@ # see . __author__ = 'Damon Lynch' -__copyright__ = "Copyright 2015-2016, Damon Lynch" +__copyright__ = "Copyright 2015-2018, Damon Lynch" import argparse import sys @@ -37,7 +37,14 @@ from PyQt5.QtGui import (QPixmap, QImage) import zmq import zmq.log.handlers -from zmq.eventloop.ioloop import IOLoop +if zmq.pyzmq_version_info()[0] < 17: + from zmq.eventloop import ioloop +else: + try: + from tornado import ioloop + except ImportError: + from zmq.eventloop import ioloop # note: deprecated in pyzmq 17.0.0 + from zmq.eventloop.zmqstream import ZMQStream from raphodo.rpdfile import (RPDFile, FileTypeCounter, FileSizeSum, Photo, Video) @@ -153,9 +160,11 @@ class ProcessManager: self.workers = [] # type: List[int] def _get_cmd(self) -> str: - return '{} {}'.format(sys.executable, - os.path.join(os.path.abspath(os.path.dirname(__file__)), - self._process_to_run)) + return '{} {}'.format( + sys.executable, os.path.join( + os.path.abspath(os.path.dirname(__file__)), self._process_to_run + ) + ) def _get_command_line(self, worker_id: int) -> str: """ @@ -176,8 +185,9 @@ class ProcessManager: logging.critical("Failed to start process: %s", command_line) logging.critical('OSError [Errno %s]: %s', e.errno, e.strerror) if e.errno == 8: - logging.critical("Script shebang line might be malformed or missing: %s", - self._get_cmd()) + logging.critical( + "Script shebang line might be malformed or missing: %s", self._get_cmd() + ) sys.exit(1) logging.debug("Started '%s' with pid %s", command_line, proc.pid) @@ -190,14 +200,19 @@ class ProcessManager: Forcefully terminate any running child processes. """ - - zombie_processes = [p for p in self.processes.values() - if p.is_running() and p.status() == psutil.STATUS_ZOMBIE] - running_processes = [p for p in self.processes.values() - if p.is_running() and p.status() != psutil.STATUS_ZOMBIE] + zombie_processes = [ + p for p in self.processes.values() + if p.is_running() and p.status() == psutil.STATUS_ZOMBIE + ] + running_processes = [ + p for p in self.processes.values() + if p.is_running() and p.status() != psutil.STATUS_ZOMBIE + ] if hasattr(self, '_process_name'): - logging.debug("Forcefully terminating processes for %s: %s zombies, %s running.", - self._process_name, len(zombie_processes), len(running_processes)) + logging.debug( + "Forcefully terminating processes for %s: %s zombies, %s running.", + self._process_name, len(zombie_processes), len(running_processes) + ) for p in zombie_processes: # type: psutil.Process try: @@ -298,7 +313,7 @@ class PullPipelineManager(ProcessManager, QObject): if directive == b'cmd': command = content - assert command in [b"STOPPED", b"FINISHED", b"KILL"] + assert command in (b"STOPPED", b"FINISHED", b"KILL") if command == b"KILL": # Terminate immediately, without regard for any # incoming messages. This message is only sent @@ -365,37 +380,46 @@ class PullPipelineManager(ProcessManager, QObject): return [worker_id, b'cmd', b'START'] def start(self) -> None: - logging.critical("Member function start() not implemented in child class of %s", - self._process_name) + logging.critical( + "Member function start() not implemented in child class of %s", self._process_name + ) def start_worker(self, worker_id: bytes, data: bytes) -> None: - logging.critical("Member function start_worker() not implemented in child class of %s", - self._process_name) + logging.critical( + "Member function start_worker() not implemented in child class of %s", + self._process_name + ) def stop(self) -> None: - logging.critical("Member function stop() not implemented in child class of %s", - self._process_name) + logging.critical( + "Member function stop() not implemented in child class of %s", self._process_name + ) def stop_worker(self, worker_id: int) -> None: - logging.critical("Member function stop_worker() not implemented in child class of %s", - self._process_name) + logging.critical( + "Member function stop_worker() not implemented in child class of %s", + self._process_name + ) def pause(self) -> None: logging.critical("Member function pause() not implemented in child class of %s", self._process_name) def resume(self, worker_id: Optional[bytes]) -> None: - logging.critical("Member function stop_worker() not implemented in child class of %s", - self._process_name) + logging.critical( + "Member function stop_worker() not implemented in child class of %s", self._process_name + ) def send_message_to_worker(self, data: bytes, worker_id:Optional[bytes]=None) -> None: if self.terminating: - logging.debug("%s not sending message to worker because manager is terminated", - self._process_name) + logging.debug( + "%s not sending message to worker because manager is terminated", self._process_name + ) return if not self.workers: - logging.debug("%s not sending message to worker because there are no workers", - self._process_name) + logging.debug( + "%s not sending message to worker because there are no workers", self._process_name + ) return assert isinstance(data, bytes) @@ -431,11 +455,12 @@ class LoadBalancerWorkerManager(ProcessManager): cmd = self._get_cmd() return '{} --request {} --send {} --identity {} --logging {}'.format( - cmd, - self.backend_port, - self.sink_port, - worker_id, - self.logging_port) + cmd, + self.backend_port, + self.sink_port, + worker_id, + self.logging_port + ) def start_workers(self) -> None: for worker_id in range(self.no_workers): @@ -464,18 +489,21 @@ class LRUQueue: self.backend.on_recv(self.handle_backend) self.controller.on_recv(self.handle_controller) - self.loop = IOLoop.instance() + self.loop = ioloop.IOLoop.instance() def handle_controller(self, msg): self.terminating = True - # logging.debug("%s load balancer requesting %s workers to stop", self.worker_type, - # len(self.workers)) + # logging.debug( + # "%s load balancer requesting %s workers to stop", self.worker_type, len(self.workers) + # ) while len(self.workers): worker_identity = self.workers.popleft() - logging.debug("%s load balancer sending stop cmd to worker %s", self.worker_type, - worker_identity.decode()) + logging.debug( + "%s load balancer sending stop cmd to worker %s", + self.worker_type, worker_identity.decode() + ) self.backend.send_multipart([worker_identity, b'', b'cmd', b'STOP']) self.terminating_workers.add(worker_identity) @@ -501,7 +529,9 @@ class LRUQueue: if p.is_running(): pid = p.pid if p.status() != psutil.STATUS_SLEEPING: - logging.debug("Waiting on %s process %s...", p.status(), pid) + logging.debug( + "Waiting on %s process %s...", p.status(), pid + ) os.waitpid(pid, 0) logging.debug("...process %s is finished", pid) else: @@ -571,7 +601,7 @@ class LoadBalancer: queue = LRUQueue(backend, frontend, controller, worker_type, process_manager) # start reactor, which is an infinite loop - IOLoop.instance().start() + ioloop.IOLoop.instance().start() # Finished infinite loop: do some housekeeping logging.debug("Forcefully terminating load balancer child processes") @@ -627,11 +657,12 @@ class LoadBalancerManager(ProcessManager, QObject): cmd = self._get_cmd() return '{} --receive {} --send {} --controller {} --logging {}'.format( - cmd, - self.requester_port, - self.sink_port, - self.controller_port, - self.logging_port) + cmd, + self.requester_port, + self.sink_port, + self.controller_port, + self.logging_port + ) DAEMON_WORKER_ID = 0 @@ -671,7 +702,8 @@ class PushPullDaemonManager(PullPipelineManager): except zmq.Again: logging.debug( "Terminating %s sink because child process did not receive message", - self._process_name) + self._process_name + ) self.terminate_sink() else: # The process may have crashed. Stop the sink. @@ -681,10 +713,11 @@ class PushPullDaemonManager(PullPipelineManager): cmd = self._get_cmd() return '{} --receive {} --send {} --logging {}'.format( - cmd, - self.ventilator_port, - self.receiver_port, - self.logging_port) + cmd, + self.ventilator_port, + self.receiver_port, + self.logging_port + ) def _get_ventilator_start_message(self, worker_id: int) -> List[bytes]: return [b'cmd', b'START'] @@ -786,15 +819,16 @@ class PublishPullPipelineManager(PullPipelineManager): def _get_command_line(self, worker_id: int) -> str: cmd = self._get_cmd() - return '{} --receive {} --send {} --controller {} --syncclient {} ' \ - '--filter {} --logging {}'.format( - cmd, - self.ventilator_port, - self.receiver_port, - self.controller_port, - self.sync_service_port, - worker_id, - self.logging_port) + return '{} --receive {} --send {} --controller {} --syncclient {} --filter {} --logging '\ + '{}'.format( + cmd, + self.ventilator_port, + self.receiver_port, + self.controller_port, + self.sync_service_port, + worker_id, + self.logging_port + ) def __len__(self) -> int: return len(self.workers) @@ -876,9 +910,9 @@ class WorkerProcess(): if self.worker_id is not None: name = '{}-{}'.format(name, self.worker_id.decode()) - self.logger_publisher = ProcessLoggerPublisher(context=self.context, - name=name, - notification_port=notification_port) + self.logger_publisher = ProcessLoggerPublisher( + context=self.context, name=name, notification_port=notification_port + ) def send_message_to_sink(self) -> None: @@ -941,15 +975,17 @@ class DaemonProcess(WorkerProcess): assert content == b'STOP' self.cleanup_pre_stop() # signal to sink that we've terminated before finishing - self.sender.send_multipart([make_filter_from_worker_id( - DAEMON_WORKER_ID), b'cmd', b'STOPPED']) + self.sender.send_multipart( + [make_filter_from_worker_id(DAEMON_WORKER_ID), b'cmd', b'STOPPED'] + ) sys.exit(0) def send_message_to_sink(self) -> None: # Must use a dummy value for the worker id, as there is only ever one # instance. - self.sender.send_multipart([make_filter_from_worker_id( - DAEMON_WORKER_ID), b'data', self.content]) + self.sender.send_multipart( + [make_filter_from_worker_id(DAEMON_WORKER_ID), b'data', self.content] + ) class WorkerInPublishPullPipeline(WorkerProcess): @@ -1322,7 +1358,6 @@ class CopyFilesResults: self.problems = problems - class ThumbnailDaemonData: """ Pass arguments to the thumbnail daemon process. @@ -1567,8 +1602,7 @@ class RenameMoveFileManager(PushPullDaemonManager): assert data.stored_sequence_no is not None assert data.downloads_today is not None assert isinstance(data.downloads_today, list) - self.sequencesUpdate.emit(data.stored_sequence_no, - data.downloads_today) + self.sequencesUpdate.emit(data.stored_sequence_no, data.downloads_today) class ThumbnailDaemonManager(PushPullDaemonManager): @@ -1699,7 +1733,8 @@ class BackupManager(PublishPullPipelineManager): assert data.rpd_file is not None self.message.emit( data.device_id, data.backup_succeeded, data.do_backup, data.rpd_file, - data.backup_full_file_name, data.mdata_exceptions) + data.backup_full_file_name, data.mdata_exceptions + ) else: assert data.problems is not None self.backupProblems.emit(data.device_id, data.problems) @@ -1735,8 +1770,9 @@ class CopyFilesManager(PublishPullPipelineManager): elif data.copy_succeeded is not None: assert data.rpd_file is not None assert data.download_count is not None - self.message.emit(data.copy_succeeded, data.rpd_file, data.download_count, - data.mdata_exceptions) + self.message.emit( + data.copy_succeeded, data.rpd_file, data.download_count, data.mdata_exceptions + ) elif data.problems is not None: self.copyProblems.emit(data.scan_id, data.problems) -- cgit v1.2.3