summaryrefslogtreecommitdiff
path: root/raphodo/interprocess.py
diff options
context:
space:
mode:
authorAntoine Beaupré <anarcat@debian.org>2018-03-14 12:24:17 -0400
committerAntoine Beaupré <anarcat@debian.org>2018-03-14 12:24:17 -0400
commit0a297829eaf3912c939e1b43a3ef6ddeb7607b38 (patch)
tree51733e0d6ffb00f0f7dfe01dccd48b2a598e5153 /raphodo/interprocess.py
parent88c8bd4cd2ee4707f8a43be4d89c4e040dcced2f (diff)
New upstream version 0.9.9upstream/0.9.9
Diffstat (limited to 'raphodo/interprocess.py')
-rw-r--r--raphodo/interprocess.py184
1 files changed, 110 insertions, 74 deletions
diff --git a/raphodo/interprocess.py b/raphodo/interprocess.py
index 2b877ef..76d9aa4 100644
--- a/raphodo/interprocess.py
+++ b/raphodo/interprocess.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2016 Damon Lynch <damonlynch@gmail.com>
+# Copyright (C) 2015-2018 Damon Lynch <damonlynch@gmail.com>
# This file is part of Rapid Photo Downloader.
#
@@ -17,7 +17,7 @@
# see <http://www.gnu.org/licenses/>.
__author__ = 'Damon Lynch'
-__copyright__ = "Copyright 2015-2016, Damon Lynch"
+__copyright__ = "Copyright 2015-2018, Damon Lynch"
import argparse
import sys
@@ -37,7 +37,14 @@ from PyQt5.QtGui import (QPixmap, QImage)
import zmq
import zmq.log.handlers
-from zmq.eventloop.ioloop import IOLoop
+if zmq.pyzmq_version_info()[0] < 17:
+ from zmq.eventloop import ioloop
+else:
+ try:
+ from tornado import ioloop
+ except ImportError:
+ from zmq.eventloop import ioloop # note: deprecated in pyzmq 17.0.0
+
from zmq.eventloop.zmqstream import ZMQStream
from raphodo.rpdfile import (RPDFile, FileTypeCounter, FileSizeSum, Photo, Video)
@@ -153,9 +160,11 @@ class ProcessManager:
self.workers = [] # type: List[int]
def _get_cmd(self) -> str:
- return '{} {}'.format(sys.executable,
- os.path.join(os.path.abspath(os.path.dirname(__file__)),
- self._process_to_run))
+ return '{} {}'.format(
+ sys.executable, os.path.join(
+ os.path.abspath(os.path.dirname(__file__)), self._process_to_run
+ )
+ )
def _get_command_line(self, worker_id: int) -> str:
"""
@@ -176,8 +185,9 @@ class ProcessManager:
logging.critical("Failed to start process: %s", command_line)
logging.critical('OSError [Errno %s]: %s', e.errno, e.strerror)
if e.errno == 8:
- logging.critical("Script shebang line might be malformed or missing: %s",
- self._get_cmd())
+ logging.critical(
+ "Script shebang line might be malformed or missing: %s", self._get_cmd()
+ )
sys.exit(1)
logging.debug("Started '%s' with pid %s", command_line, proc.pid)
@@ -190,14 +200,19 @@ class ProcessManager:
Forcefully terminate any running child processes.
"""
-
- zombie_processes = [p for p in self.processes.values()
- if p.is_running() and p.status() == psutil.STATUS_ZOMBIE]
- running_processes = [p for p in self.processes.values()
- if p.is_running() and p.status() != psutil.STATUS_ZOMBIE]
+ zombie_processes = [
+ p for p in self.processes.values()
+ if p.is_running() and p.status() == psutil.STATUS_ZOMBIE
+ ]
+ running_processes = [
+ p for p in self.processes.values()
+ if p.is_running() and p.status() != psutil.STATUS_ZOMBIE
+ ]
if hasattr(self, '_process_name'):
- logging.debug("Forcefully terminating processes for %s: %s zombies, %s running.",
- self._process_name, len(zombie_processes), len(running_processes))
+ logging.debug(
+ "Forcefully terminating processes for %s: %s zombies, %s running.",
+ self._process_name, len(zombie_processes), len(running_processes)
+ )
for p in zombie_processes: # type: psutil.Process
try:
@@ -298,7 +313,7 @@ class PullPipelineManager(ProcessManager, QObject):
if directive == b'cmd':
command = content
- assert command in [b"STOPPED", b"FINISHED", b"KILL"]
+ assert command in (b"STOPPED", b"FINISHED", b"KILL")
if command == b"KILL":
# Terminate immediately, without regard for any
# incoming messages. This message is only sent
@@ -365,37 +380,46 @@ class PullPipelineManager(ProcessManager, QObject):
return [worker_id, b'cmd', b'START']
def start(self) -> None:
- logging.critical("Member function start() not implemented in child class of %s",
- self._process_name)
+ logging.critical(
+ "Member function start() not implemented in child class of %s", self._process_name
+ )
def start_worker(self, worker_id: bytes, data: bytes) -> None:
- logging.critical("Member function start_worker() not implemented in child class of %s",
- self._process_name)
+ logging.critical(
+ "Member function start_worker() not implemented in child class of %s",
+ self._process_name
+ )
def stop(self) -> None:
- logging.critical("Member function stop() not implemented in child class of %s",
- self._process_name)
+ logging.critical(
+ "Member function stop() not implemented in child class of %s", self._process_name
+ )
def stop_worker(self, worker_id: int) -> None:
- logging.critical("Member function stop_worker() not implemented in child class of %s",
- self._process_name)
+ logging.critical(
+ "Member function stop_worker() not implemented in child class of %s",
+ self._process_name
+ )
def pause(self) -> None:
logging.critical("Member function pause() not implemented in child class of %s",
self._process_name)
def resume(self, worker_id: Optional[bytes]) -> None:
- logging.critical("Member function stop_worker() not implemented in child class of %s",
- self._process_name)
+ logging.critical(
+ "Member function stop_worker() not implemented in child class of %s", self._process_name
+ )
def send_message_to_worker(self, data: bytes, worker_id:Optional[bytes]=None) -> None:
if self.terminating:
- logging.debug("%s not sending message to worker because manager is terminated",
- self._process_name)
+ logging.debug(
+ "%s not sending message to worker because manager is terminated", self._process_name
+ )
return
if not self.workers:
- logging.debug("%s not sending message to worker because there are no workers",
- self._process_name)
+ logging.debug(
+ "%s not sending message to worker because there are no workers", self._process_name
+ )
return
assert isinstance(data, bytes)
@@ -431,11 +455,12 @@ class LoadBalancerWorkerManager(ProcessManager):
cmd = self._get_cmd()
return '{} --request {} --send {} --identity {} --logging {}'.format(
- cmd,
- self.backend_port,
- self.sink_port,
- worker_id,
- self.logging_port)
+ cmd,
+ self.backend_port,
+ self.sink_port,
+ worker_id,
+ self.logging_port
+ )
def start_workers(self) -> None:
for worker_id in range(self.no_workers):
@@ -464,18 +489,21 @@ class LRUQueue:
self.backend.on_recv(self.handle_backend)
self.controller.on_recv(self.handle_controller)
- self.loop = IOLoop.instance()
+ self.loop = ioloop.IOLoop.instance()
def handle_controller(self, msg):
self.terminating = True
- # logging.debug("%s load balancer requesting %s workers to stop", self.worker_type,
- # len(self.workers))
+ # logging.debug(
+ # "%s load balancer requesting %s workers to stop", self.worker_type, len(self.workers)
+ # )
while len(self.workers):
worker_identity = self.workers.popleft()
- logging.debug("%s load balancer sending stop cmd to worker %s", self.worker_type,
- worker_identity.decode())
+ logging.debug(
+ "%s load balancer sending stop cmd to worker %s",
+ self.worker_type, worker_identity.decode()
+ )
self.backend.send_multipart([worker_identity, b'', b'cmd', b'STOP'])
self.terminating_workers.add(worker_identity)
@@ -501,7 +529,9 @@ class LRUQueue:
if p.is_running():
pid = p.pid
if p.status() != psutil.STATUS_SLEEPING:
- logging.debug("Waiting on %s process %s...", p.status(), pid)
+ logging.debug(
+ "Waiting on %s process %s...", p.status(), pid
+ )
os.waitpid(pid, 0)
logging.debug("...process %s is finished", pid)
else:
@@ -571,7 +601,7 @@ class LoadBalancer:
queue = LRUQueue(backend, frontend, controller, worker_type, process_manager)
# start reactor, which is an infinite loop
- IOLoop.instance().start()
+ ioloop.IOLoop.instance().start()
# Finished infinite loop: do some housekeeping
logging.debug("Forcefully terminating load balancer child processes")
@@ -627,11 +657,12 @@ class LoadBalancerManager(ProcessManager, QObject):
cmd = self._get_cmd()
return '{} --receive {} --send {} --controller {} --logging {}'.format(
- cmd,
- self.requester_port,
- self.sink_port,
- self.controller_port,
- self.logging_port)
+ cmd,
+ self.requester_port,
+ self.sink_port,
+ self.controller_port,
+ self.logging_port
+ )
DAEMON_WORKER_ID = 0
@@ -671,7 +702,8 @@ class PushPullDaemonManager(PullPipelineManager):
except zmq.Again:
logging.debug(
"Terminating %s sink because child process did not receive message",
- self._process_name)
+ self._process_name
+ )
self.terminate_sink()
else:
# The process may have crashed. Stop the sink.
@@ -681,10 +713,11 @@ class PushPullDaemonManager(PullPipelineManager):
cmd = self._get_cmd()
return '{} --receive {} --send {} --logging {}'.format(
- cmd,
- self.ventilator_port,
- self.receiver_port,
- self.logging_port)
+ cmd,
+ self.ventilator_port,
+ self.receiver_port,
+ self.logging_port
+ )
def _get_ventilator_start_message(self, worker_id: int) -> List[bytes]:
return [b'cmd', b'START']
@@ -786,15 +819,16 @@ class PublishPullPipelineManager(PullPipelineManager):
def _get_command_line(self, worker_id: int) -> str:
cmd = self._get_cmd()
- return '{} --receive {} --send {} --controller {} --syncclient {} ' \
- '--filter {} --logging {}'.format(
- cmd,
- self.ventilator_port,
- self.receiver_port,
- self.controller_port,
- self.sync_service_port,
- worker_id,
- self.logging_port)
+ return '{} --receive {} --send {} --controller {} --syncclient {} --filter {} --logging '\
+ '{}'.format(
+ cmd,
+ self.ventilator_port,
+ self.receiver_port,
+ self.controller_port,
+ self.sync_service_port,
+ worker_id,
+ self.logging_port
+ )
def __len__(self) -> int:
return len(self.workers)
@@ -876,9 +910,9 @@ class WorkerProcess():
if self.worker_id is not None:
name = '{}-{}'.format(name, self.worker_id.decode())
- self.logger_publisher = ProcessLoggerPublisher(context=self.context,
- name=name,
- notification_port=notification_port)
+ self.logger_publisher = ProcessLoggerPublisher(
+ context=self.context, name=name, notification_port=notification_port
+ )
def send_message_to_sink(self) -> None:
@@ -941,15 +975,17 @@ class DaemonProcess(WorkerProcess):
assert content == b'STOP'
self.cleanup_pre_stop()
# signal to sink that we've terminated before finishing
- self.sender.send_multipart([make_filter_from_worker_id(
- DAEMON_WORKER_ID), b'cmd', b'STOPPED'])
+ self.sender.send_multipart(
+ [make_filter_from_worker_id(DAEMON_WORKER_ID), b'cmd', b'STOPPED']
+ )
sys.exit(0)
def send_message_to_sink(self) -> None:
# Must use a dummy value for the worker id, as there is only ever one
# instance.
- self.sender.send_multipart([make_filter_from_worker_id(
- DAEMON_WORKER_ID), b'data', self.content])
+ self.sender.send_multipart(
+ [make_filter_from_worker_id(DAEMON_WORKER_ID), b'data', self.content]
+ )
class WorkerInPublishPullPipeline(WorkerProcess):
@@ -1322,7 +1358,6 @@ class CopyFilesResults:
self.problems = problems
-
class ThumbnailDaemonData:
"""
Pass arguments to the thumbnail daemon process.
@@ -1567,8 +1602,7 @@ class RenameMoveFileManager(PushPullDaemonManager):
assert data.stored_sequence_no is not None
assert data.downloads_today is not None
assert isinstance(data.downloads_today, list)
- self.sequencesUpdate.emit(data.stored_sequence_no,
- data.downloads_today)
+ self.sequencesUpdate.emit(data.stored_sequence_no, data.downloads_today)
class ThumbnailDaemonManager(PushPullDaemonManager):
@@ -1699,7 +1733,8 @@ class BackupManager(PublishPullPipelineManager):
assert data.rpd_file is not None
self.message.emit(
data.device_id, data.backup_succeeded, data.do_backup, data.rpd_file,
- data.backup_full_file_name, data.mdata_exceptions)
+ data.backup_full_file_name, data.mdata_exceptions
+ )
else:
assert data.problems is not None
self.backupProblems.emit(data.device_id, data.problems)
@@ -1735,8 +1770,9 @@ class CopyFilesManager(PublishPullPipelineManager):
elif data.copy_succeeded is not None:
assert data.rpd_file is not None
assert data.download_count is not None
- self.message.emit(data.copy_succeeded, data.rpd_file, data.download_count,
- data.mdata_exceptions)
+ self.message.emit(
+ data.copy_succeeded, data.rpd_file, data.download_count, data.mdata_exceptions
+ )
elif data.problems is not None:
self.copyProblems.emit(data.scan_id, data.problems)