diff options
Diffstat (limited to 'raphodo/devices.py')
-rw-r--r-- | raphodo/devices.py | 1451 |
1 files changed, 1451 insertions, 0 deletions
diff --git a/raphodo/devices.py b/raphodo/devices.py new file mode 100644 index 0000000..a2597fe --- /dev/null +++ b/raphodo/devices.py @@ -0,0 +1,1451 @@ +# Copyright (C) 2015-2017 Damon Lynch <damonlynch@gmail.com> + +# This file is part of Rapid Photo Downloader. +# +# Rapid Photo Downloader is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Rapid Photo Downloader is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Rapid Photo Downloader. If not, +# see <http://www.gnu.org/licenses/>. + +""" +Handle Devices and Device Collections. + +In Rapid Photo Downloader, "Device" has two meanings, depending on the +context: +1. In the GUI, a Device is a camera or a volume (external drive) +2. In code, a Device is one of a camera, volume, or path +""" + +__author__ = 'Damon Lynch' +__copyright__ = "Copyright 2015-2017, Damon Lynch" + +import sys +import shutil +import os +import logging +import itertools +from collections import namedtuple, Counter, defaultdict +from typing import Tuple, List, Optional, Set, Dict, Union, DefaultDict + +from gettext import gettext as _ + +from PyQt5.QtCore import QStorageInfo, QSize +from PyQt5.QtWidgets import QFileIconProvider +from PyQt5.QtGui import QIcon, QPixmap + +import raphodo.qrc_resources as qrc_resources +from raphodo.constants import ( + DeviceType, BackupLocationType, FileType, DeviceState, DownloadStatus, ExifSource, + DownloadingFileTypes, BackupFailureType +) +from raphodo.rpdfile import FileTypeCounter, FileSizeSum +from raphodo.storage import ( + StorageSpace, udev_attributes, UdevAttr, get_path_display_name, validate_download_folder, + ValidatedFolder, CameraDetails, get_uri, fs_device_details +) +from raphodo.camera import generate_devname +from raphodo.utilities import ( + number, make_internationalized_list, stdchannel_redirected, same_device +) +import raphodo.metadataphoto as metadataphoto +from raphodo.rpdfile import Photo, Video +import raphodo.exiftool as exiftool +from raphodo.problemnotification import FsMetadataWriteProblem + +display_devices = (DeviceType.volume, DeviceType.camera) + + +class Device: + r""" + Representation of a camera, or a device, or a path. + Files will be downloaded from it. + + To run the doctests, ensure at least one camera is plugged in + but not mounted! + + >>> d = Device() + >>> d.set_download_from_volume('/media/damon/EOS_DIGITAL', 'EOS_DIGITAL') + >>> d + 'EOS_DIGITAL':'/media/damon/EOS_DIGITAL' + >>> str(d) + '/media/damon/EOS_DIGITAL (EOS_DIGITAL)' + >>> d.display_name + 'EOS_DIGITAL' + >>> d.camera_model + >>> d.camera_port + + >>> import gphoto2 as gp + >>> gp_context = gp.Context() + >>> cameras = gp_context.camera_autodetect() + >>> c = Device() + >>> for model, port in cameras: + ... c.set_download_from_camera(model, port) + ... isinstance(c.display_name, str) + True + >>> e = Device() + >>> e.set_download_from_volume('/media/damon/EOS_DIGITAL', 'EOS_DIGITAL') + >>> e == d + True + >>> e != c + True + >>> c == d + False + >>> c != d + True + """ + def __init__(self): + self.clear() + + def clear(self): + self.camera_model = None # type: str + self.camera_port = None # type: str + # Assume an MTP device is likely a smart phone or tablet + self.is_mtp_device = False + self.udev_name = None # type: str + self.storage_space = [] # type: List[StorageSpace] + # Name of storage on a camera + self.storage_descriptions = [] # type: List[str] + + self.path = None # type: str + self.display_name = None # type: str + self.have_optimal_display_name = False + self.device_type = None # type: DeviceType + self.icon_name = None # type: str + self.can_eject = None # type: bool + self.photo_cache_dir = None # type: str + self.video_cache_dir = None # type: str + self.file_size_sum = FileSizeSum() + self.file_type_counter = FileTypeCounter() + self.download_statuses = set() # type: Set[DownloadStatus] + self._uri = '' + + def __repr__(self): + if self.device_type == DeviceType.camera: + return "%r:%r" % (self.camera_model, self.camera_port) + elif self.device_type == DeviceType.volume: + return "%r:%r" % (self.display_name, self.path) + else: + return "%r" % self.path + + def __str__(self): + if self.device_type == DeviceType.camera: + return '{} on port {}. Udev: {}; Display name: {} (optimal: {}); MTP: {}'.format( + self.camera_model, self.camera_port, self.udev_name, + self.display_name, self.have_optimal_display_name, self.is_mtp_device) + elif self.device_type == DeviceType.volume: + if self.path != self.display_name: + return "%s (%s)" % (self.path, self.display_name) + else: + return "%s" % (self.path) + else: + return "%s" % (self.path) + + def __eq__(self, other): + for attr in ('device_type', 'camera_model', 'camera_port', 'path'): + if getattr(self, attr) != getattr(other, attr): + return False + return True + + def __hash__(self): + return hash((self.device_type, self.camera_model, self.camera_port, self.path)) + + def __ne__(self, other): + return not self.__eq__(other) + + def _get_valid_icon_name(self, possible_names): + if possible_names is not None: + for icon_name in possible_names: + if QIcon.hasThemeIcon(icon_name): + return icon_name + return None + + @property + def uri(self) -> str: + if self._uri: + return self._uri + + if self.device_type == DeviceType.camera: + if self.storage_descriptions: + storage_desc = self.storage_descriptions[0] + else: + storage_desc = '' + camera_details = CameraDetails( + model=self.camera_model, port=self.camera_port, display_name=self.display_name, + is_mtp=self.is_mtp_device, storage_desc=storage_desc + ) + self._uri = get_uri(camera_details=camera_details) + else: + self._uri = get_uri(path=self.path) + + return self._uri + + def set_download_from_camera(self, camera_model: str, camera_port: str) -> None: + self.clear() + self.device_type = DeviceType.camera + self.camera_model = camera_model + # Set default display name, for when all else fails. + # Try to override this value below + self.display_name = camera_model + self.camera_port = camera_port + self.icon_name = self._get_valid_icon_name(('camera-photo', 'camera')) + + # Assign default udev name if cannot determine from udev itself + self.udev_name = camera_model + + devname = generate_devname(camera_port) + if devname is not None: + udev_attr = udev_attributes(devname) + if udev_attr is not None: + self.is_mtp_device = udev_attr.is_mtp_device + self.udev_name = udev_attr.model + self.display_name = udev_attr.model + else: + logging.error("Could not determine udev values for %s %s", + self.camera_model, camera_port) + + def update_camera_attributes(self, display_name: str, + storage_space: List[StorageSpace], + storage_descriptions: List[str]) -> None: + self.display_name = display_name + self.have_optimal_display_name = True + self.storage_space = storage_space + self.storage_descriptions = storage_descriptions + + def set_download_from_volume(self, path: str, display_name: str, + icon_names=None, can_eject=None, + mount: QStorageInfo=None) -> None: + self.clear() + self.device_type = DeviceType.volume + self.path = path + self.icon_name = self._get_valid_icon_name(icon_names) + if not display_name.find(os.sep) >= 0: + self.display_name = display_name + else: + self.display_name = os.path.basename(display_name) + self.have_optimal_display_name = True + self.can_eject = can_eject + if not mount: + mount = QStorageInfo(path) + self.storage_space.append(StorageSpace( + bytes_free=mount.bytesAvailable(), + bytes_total=mount.bytesTotal(), + path=path)) + + def set_download_from_path(self, path: str) -> None: + self.clear() + self.device_type = DeviceType.path + self.path = path + if path.endswith(os.sep): + path = path[:-1] + display_name = os.path.basename(path) + if display_name: + self.display_name = display_name + self.have_optimal_display_name = True + else: + self.display_name = path + # the next value is almost certainly ("folder",), but I guess it's + # better to generate it from code + self.icon_name = ('{}'.format(QFileIconProvider().icon( + QFileIconProvider.Folder).name())) + mount = QStorageInfo(path) + self.storage_space.append(StorageSpace( + bytes_free=mount.bytesAvailable(), + bytes_total=mount.bytesTotal(), + path=path)) + + def get_storage_space(self, index: int=0) -> StorageSpace: + """ + Convenience function to retrieve information about bytes + free and bytes total (capacity of the media). Almost all + devices have only one storage media, but some cameras have + more than one + :param index: the storage media to get the values from + :return: tuple of bytes free and bytes total + """ + return self.storage_space[index] + + def name(self) -> str: + """ + Get the name of the device, suitable to be displayed to the + user. If the device is a path, return the path name + :return str containg the name + """ + if self.device_type == DeviceType.camera: + return self.display_name + elif self.device_type == DeviceType.volume: + return self.display_name + else: + return self.path + + def get_icon(self) -> QIcon: + """Return icon for the device.""" + + if self.device_type == DeviceType.volume: + return QIcon(':icons/drive-removable-media.svg') + elif self.device_type == DeviceType.path: + return QIcon(':/icons/folder.svg') + else: + assert self.device_type == DeviceType.camera + if self.is_mtp_device: + if self.camera_model.lower().find('tablet') >= 0: + #TODO use tablet icon + pass + return QIcon(':icons/smartphone.svg') + return QIcon(':/icons/camera.svg') + + def get_pixmap(self, size: QSize=QSize(30, 30)) -> QPixmap: + icon = self.get_icon() + return icon.pixmap(size) + + def _delete_cache_dir(self, cache_dir) -> None: + if cache_dir: + if os.path.isdir(cache_dir): + assert cache_dir != os.path.expanduser('~') + try: + shutil.rmtree(cache_dir, ignore_errors=True) + except: + logging.error("Unknown error deleting cache directory %s", cache_dir) + + def delete_cache_dirs(self) -> None: + self._delete_cache_dir(self.photo_cache_dir) + self._delete_cache_dir(self.video_cache_dir) + + + +class DeviceCollection: + """ + Maintain collection of devices that are being scanned, where a + device is of type Device. + + When a device is added, a scan_id is generated and returned. + + >>> d = Device() + >>> d.set_download_from_volume('/media/damon/EOS_DIGITAL', 'EOS_DIGITAL') + >>> c = Device() + >>> c.set_download_from_camera('Canon EOS 1D X', 'usb:001,002') + >>> e = Device() + >>> e.set_download_from_volume('/media/damon/EOS_DIGITAL', 'EOS_DIGITAL') + >>> dc = DeviceCollection() + >>> d_scan_id = dc.add_device(d) + >>> d_scan_id + 0 + >>> d_scan_id in dc + True + >>> dc.known_path(d.path, DeviceType.volume) + True + >>> dc.known_path(d.path) + True + >>> dc[d_scan_id] == d + True + >>> len(dc.volumes_and_cameras) + 1 + >>> len(dc.this_computer) + 0 + >>> dc.known_path('/root', DeviceType.path) + False + >>> dc.known_path('/root') + False + >>> c_scan_id = dc.add_device(c) + >>> c_scan_id + 1 + >>> len(dc) + 2 + >>> len(dc.volumes_and_cameras) + 2 + >>> len(dc.this_computer) + 0 + >>> dc[d_scan_id] == dc[c_scan_id] + False + >>> dc.known_camera('Canon EOS 1D X', 'usb:001,002') + True + >>> dc.known_camera('Canon EOS 1D X', 'usb:001,003') + False + >>> dc.delete_device(c) + True + >>> len(dc.cameras) + 0 + >>> len(dc.volumes_and_cameras) + 1 + >>> len(dc.this_computer) + 0 + >>> dc.known_camera('Canon EOS 1D X', 'usb:001,002') + False + >>> len(dc) + 1 + >>> dc.known_device(e) + True + >>> del dc[d_scan_id] + >>> len(dc) + 0 + >>> len(dc.volumes_and_cameras) + 0 + >>> len(dc.this_computer) + 0 + >>> dc.delete_device(e) + False + """ + def __init__(self, exiftool_process: Optional[exiftool.ExifTool]=None, + rapidApp=None) -> None: + + self.rapidApp = rapidApp + + self.devices = {} # type: Dict[int, Device] + # port: model + self.cameras = {} # type: Dict[str, str] + + # Used to assign scan ids + self.scan_counter = 0 # type: int + + # scan_id: DeviceState + self.device_state = {} # type: Dict[int, DeviceState] + + # Track which devices are being scanned, by scan_id + self.scanning = set() # type: Set[int] + # Track which downloads are running, by scan_id + + self.downloading = set() # type: Set[int] + # Track which devices have been downloaded from during one + # download, by display name. Must do it by display name + # because some devices could be removed before all devices + # have been downloaded from. + self.have_downloaded_from = set() # type: Set[str] + + # Track which devices are thumbnailing, by scan_id + self.thumbnailing = set() # type: Set[int] + + # Track the unmounting of unscanned cameras by port and model + # port: model + self.cameras_to_gvfs_unmount_for_scan = {} # type: Dict[str, str] + + # Which scanned cameras need to be unmounted for a download to start, by scan_id + self.cameras_to_gvfs_unmount_for_download = set() # type: Set[int] + self.cameras_to_stop_thumbnailing = set() + + # Automatically detected devices where the user has explicitly said to ignore it + # port: model + self.ignored_cameras = {} # type: Dict[str, str] + # List[path] + self.ignored_volumes = [] # type: List[str] + + # Devices that were set to autodownload while the program + # is in a paused state + self.queued_to_download = set() # type: Set[int] + + self.volumes_and_cameras = set() # type: Set[int] + self.this_computer = set() # type: Set[int] + + # List of devices that were detected at program startup + # scan_id + self.startup_devices = [] # type: List[int] + + # Sample exif bytes of photo on most recent device scanned + self._sample_photo = None # type: Optional[Photo] + self._sample_video = None # type: Optional[Video] + self._sample_videos_complete_files = [] # type: List[str] + self.exiftool_process = exiftool_process + + self._map_set = {DeviceType.path: self.this_computer, + DeviceType.camera: self.volumes_and_cameras, + DeviceType.volume: self.volumes_and_cameras} + self._map_plural_types = {DeviceType.camera: _('Cameras'), + DeviceType.volume: _('Devices')} + + def download_start_blocked(self) -> bool: + """ + Determine if a camera needs to be unmounted or thumbnailing needs to be + terminated for a camera in order for a download to proceed + :return: True if so, else False + """ + + if len(self.cameras_to_gvfs_unmount_for_download) > 0 and len( + self.cameras_to_stop_thumbnailing): + logging.debug("Download is blocked because %s camera(s) are being unmounted from GVFS " + "and %s camera(s) are having their thumbnailing terminated", + len(self.cameras_to_gvfs_unmount_for_download), + len(self.cameras_to_stop_thumbnailing)) + elif len(self.cameras_to_gvfs_unmount_for_download) > 0: + logging.debug("Download is blocked because %s camera(s) are being unmounted from GVFS", + len(self.cameras_to_gvfs_unmount_for_download)) + elif len(self.cameras_to_stop_thumbnailing) > 0: + logging.debug("Download is blocked because %s camera(s) are having their thumbnailing " + "terminated", len(self.cameras_to_stop_thumbnailing)) + + return len(self.cameras_to_gvfs_unmount_for_download) > 0 or len( + self.cameras_to_stop_thumbnailing) > 0 + + def logState(self) -> None: + logging.debug("-- Device Collection --") + logging.debug('%s devices: %s volumes/cameras (%s cameras), %s this computer', + len(self.devices), len(self.volumes_and_cameras), len(self.cameras), + len(self.this_computer)) + logging.debug("Device states: %s", ', '.join( + '%s: %s' % (self[scan_id].display_name, self.device_state[scan_id].name) + for scan_id in self.device_state)) + if len(self.scanning): + scanning = ('%s' % ', '.join(self[scan_id].display_name for scan_id in self.scanning)) + logging.debug("Scanning: %s", scanning) + else: + logging.debug("No devices scanning") + if len(self.downloading): + downloading = ('%s' % ', '.join(self[scan_id].display_name + for scan_id in self.downloading)) + logging.debug("Downloading: %s", downloading) + else: + logging.debug("No devices downloading") + if len(self.thumbnailing): + thumbnailing = ('%s' % ', '.join(self[scan_id].display_name + for scan_id in self.thumbnailing)) + logging.debug("Thumbnailing: %s", thumbnailing) + else: + logging.debug("No devices thumbnailing") + + def add_device(self, device: Device, on_startup: bool=False) -> int: + """ + Add a new device to the device collection + :param device: device to add + :param on_startup: if True, the device is being added during + the program's startup phase + :return: the scan id assigned to the device + """ + + scan_id = self.scan_counter + self.scan_counter += 1 + self.devices[scan_id] = device + self.device_state[scan_id] = DeviceState.pre_scan + if on_startup: + self.startup_devices.append(scan_id) + if device.camera_port: + port = device.camera_port + assert port not in self.cameras + self.cameras[port] = device.camera_model + if device.device_type in display_devices: + self.volumes_and_cameras.add(scan_id) + else: + self.this_computer.add(scan_id) + return scan_id + + def set_device_state(self, scan_id: int, state: DeviceState) -> None: + logging.debug("Setting device state for %s to %s", + self.devices[scan_id].display_name, state.name) + self.device_state[scan_id] = state + if state == DeviceState.scanning: + self.scanning.add(scan_id) + elif state == DeviceState.downloading: + self.downloading.add(scan_id) + self.have_downloaded_from.add(self.devices[scan_id].display_name) + elif state == DeviceState.thumbnailing: + self.thumbnailing.add(scan_id) + + if state != DeviceState.scanning and scan_id in self.scanning: + self.scanning.remove(scan_id) + if state != DeviceState.downloading and scan_id in self.downloading: + self.downloading.remove(scan_id) + if state != DeviceState.thumbnailing and scan_id in self.thumbnailing: + self.thumbnailing.remove(scan_id) + + def ignore_device(self, scan_id: int) -> None: + """ + For the remainder of this program's instantiation, don't + automatically detect this device. + + A limitation of this is that when a camera is physically removed + and plugged in again, it gets a new port. In which casae it's a + "different" device. + + :param scan_id: scan id of the device to ignore + """ + + device = self.devices[scan_id] + if device.device_type == DeviceType.camera: + logging.debug("Marking camera %s on port %s as explicitly removed. Will ignore it " + "until program exit.", device.camera_model, device.camera_port) + self.ignored_cameras[device.camera_port] = device.camera_model + elif device.device_type == DeviceType.volume: + logging.debug("Marking volume %s as explicitly removed. Will ignore it " + "until program exit.", device.path) + self.ignored_volumes.append(device.path) + else: + logging.error("Device collection unexpectedly received path to ignore: ignoring") + + def user_marked_camera_as_ignored(self, model: str, port: str) -> bool: + """ + Check if camera is in set of devices to ignore because they were explicitly + removed by the user + + :param model: camera model + :param port: camera port + :return: return True if camera is in set of devices to ignore + """ + + if port in self.ignored_cameras: + return self.ignored_cameras[port] == model + return False + + def user_marked_volume_as_ignored(self, path: str) -> bool: + """ + Check if volume's path is in list of devices to ignore because they were explicitly + removed by the user + + :param: path: the device's path + :return: return True if camera is in set of devices to ignore + """ + + return path in self.ignored_volumes + + def known_camera(self, model: str, port: str) -> bool: + """ + Check if the camera is already in the list of devices + :param model: camera model as specified by libgohoto2 + :param port: camera port as specified by libgohoto2 + :return: True if this camera is already being processed, else False + """ + if port in self.cameras: + assert self.cameras[port] == model + return True + return False + + def known_path(self, path: str, device_type: Optional[DeviceType]=None) -> bool: + """ + Check if the path is already in the list of devices + :param path: path to check + :return: True if the path is already being processed, else False + """ + for scan_id in self.devices: + device = self.devices[scan_id] # type: Device + if device.path == path: + if device_type is None or device.device_type == device_type: + return True + return False + + def known_device(self, device: Device) -> bool: + return device in list(self.devices.values()) + + def scan_id_from_path(self, path: str, device_type: Optional[DeviceType]=None) -> Optional[int]: + for scan_id, device in self.devices.items(): + if device.path == path: + if device_type is None or device.device_type == device_type: + return scan_id + return None + + def scan_id_from_camera_model_port(self, model: str, port: str) -> Optional[int]: + """ + + :param model: model name of camera being searched for + :param port: port of camera being searched for + :return: scan id of camera if known, else None + """ + + for scan_id, device in self.devices.items(): + if (device.device_type == DeviceType.camera and device.camera_model == model and + device.camera_port == port): + return scan_id + return None + + def delete_device(self, device: Device) -> bool: + """ + Delete the device from the collection. + :param device: the device to delete + :return: True if device was deleted, else return False + """ + for scan_id in self.devices: + if self.devices[scan_id] == device: + del self[scan_id] + return True + return False + + def delete_cache_dirs_and_sample_video(self) -> None: + """ + Delete all Download Caches and their contents any devices might + have, as well as any sample video. + """ + for device in self.devices.values(): + device.delete_cache_dirs() + self._delete_sample_video(at_program_close=True) + + def _delete_sample_video(self, at_program_close: bool) -> None: + """ + Delete sample video that is used for metadata extraction + to provide example for file renaming. + + Does not delete + + :param at_program_close: if True, the program is exiting + """ + + if (self._sample_video is not None and + self._sample_video.temp_sample_full_file_name is not None and + self._sample_video.from_camera): + try: + assert self._sample_video.temp_sample_full_file_name + except: + logging.error("Expected sample file name in sample video") + else: + if os.path.isfile(self._sample_video.temp_sample_full_file_name): + logging.info("Removing temporary sample video %s", + self._sample_video.temp_sample_full_file_name) + try: + os.remove(self._sample_video.temp_sample_full_file_name) + except Exception: + logging.exception("Error removing temporary sample video file %s", + self._sample_video.temp_sample_full_file_name) + + if at_program_close and self._sample_videos_complete_files: + remaining_videos = (video for video in self._sample_videos_complete_files + if os.path.isfile(video)) + for video in remaining_videos: + logging.info("Removing temporary sample video %s", video) + try: + os.remove(video) + except Exception: + logging.exception("Error removing temporary sample video file %s", video) + + def map_set(self, device: Device) -> Set: + return self._map_set[device.device_type] + + def downloading_from(self) -> str: + """ + :return: string showing which devices are being downloaded from + """ + + display_names = [self.devices[scan_id].display_name for scan_id in self.downloading] + return _('Downloading from %(device_names)s') % dict( + device_names=make_internationalized_list(display_names)) + + def reset_and_return_have_downloaded_from(self) -> str: + """ + Reset the set of devices that have been downloaded from, + and return the string that + :return: string showing which devices have been downloaded from + during this download + """ + display_names = make_internationalized_list(list(self.have_downloaded_from)) + self.have_downloaded_from = set() # type: Set[str] + return display_names + + def __delitem__(self, scan_id: int): + d = self.devices[scan_id] # type: Device + if d.device_type == DeviceType.camera: + del self.cameras[d.camera_port] + if d.camera_port in self.cameras_to_gvfs_unmount_for_scan: + del self.cameras_to_gvfs_unmount_for_scan[d.camera_port] + + self.map_set(d).remove(scan_id) + d.delete_cache_dirs() + del self.devices[scan_id] + if scan_id in self.scanning: + self.scanning.remove(scan_id) + if scan_id in self.downloading: + self.downloading.remove(scan_id) + if scan_id in self.queued_to_download: + self.queued_to_download.remove(scan_id) + if scan_id in self.thumbnailing: + self.thumbnailing.remove(scan_id) + if scan_id in self.cameras_to_gvfs_unmount_for_download: + self.cameras_to_gvfs_unmount_for_download.remove(scan_id) + if scan_id in self.cameras_to_stop_thumbnailing: + self.cameras_to_stop_thumbnailing.remove(scan_id) + if scan_id in self.this_computer: + self.this_computer.remove(scan_id) + if scan_id in self.volumes_and_cameras: + self.volumes_and_cameras.remove(scan_id) + del self.device_state[scan_id] + + def __getitem__(self, scan_id: int) -> Device: + return self.devices[scan_id] + + def __len__(self) -> int: + return len(self.devices) + + def __contains__(self, scan_id: int) -> bool: + return scan_id in self.devices + + def __iter__(self): + return iter(self.devices) + + def _mixed_devices(self, device_type_text: str) -> str: + try: + text_number = number(len(self.volumes_and_cameras)).number.capitalize() + except KeyError: + text_number = len(self.volumes_and_cameras) + # Translators: e.g. Three Devices + return _('%(no_devices)s %(device_type)s') % dict( + no_devices=text_number, device_type=device_type_text) + + def _update_sample_file(self, file_type: FileType) -> None: + + if file_type == FileType.photo: + assert self._sample_photo.file_type == FileType.photo + full_file_name = self._sample_photo.full_file_name + rpd_file = self._sample_photo + else: + assert self._sample_video.file_type == FileType.video + full_file_name = self._sample_video_full_file_name() + rpd_file = self._sample_video + + if not os.path.isfile(full_file_name): + # file no longer exists - it may have been downloaded or deleted + # attempt to find an appropriate file from the in memory sql database of displayed + # files + scan_id = rpd_file.scan_id + rpd_file = self.rapidApp.thumbnailModel.getSampleFile(scan_id=scan_id, + device_type=self[scan_id].device_type, file_type=file_type) + if rpd_file is None: + logging.debug('Failed to set new sample %s because suitable sample does not' + 'exist', file_type.name) + else: + sample_full_file_name = rpd_file.get_current_full_file_name() + if file_type == FileType.photo: + logging.debug('Updated sample photo with %s', sample_full_file_name) + self.sample_photo = rpd_file + else: + logging.debug('Updated sample video with %s', sample_full_file_name) + self.sample_video = rpd_file + + @property + def sample_photo(self) -> Optional[Photo]: + if self._sample_photo is None: + return None + + # does the photo still exist? + if self._sample_photo.exif_source == ExifSource.actual_file: + self._update_sample_file(file_type=FileType.photo) + + if self._sample_photo.metadata is None and not self._sample_photo.metadata_failure: + with stdchannel_redirected(sys.stderr, os.devnull): + if self._sample_photo.exif_source == ExifSource.raw_bytes: + self._sample_photo.load_metadata( + raw_bytes=bytearray(self._sample_photo.raw_exif_bytes)) + elif self._sample_photo.exif_source == ExifSource.app1_segment: + self._sample_photo.load_metadata( + app1_segment=bytearray(self._sample_photo.raw_exif_bytes)) + else: + assert self._sample_photo.exif_source == ExifSource.actual_file + full_file_name = self._sample_photo.get_current_full_file_name() + self._sample_photo.load_metadata(full_file_name=full_file_name, + et_process=self.exiftool_process) + return self._sample_photo + + @sample_photo.setter + def sample_photo(self, photo: Photo) -> None: + self._sample_photo = photo + + def _sample_video_full_file_name(self) -> str: + """ + Sample videos can be either excerpts of a video from a camera or + actual videos already on the file system. + + :return: the full path and filename of the sample video regardless + of which type it is + """ + + if (self._sample_video is not None and + self._sample_video.temp_sample_full_file_name is not None): + full_file_name = self._sample_video.temp_sample_full_file_name + else: + full_file_name = self._sample_video.get_current_full_file_name() + return full_file_name + + @property + def sample_video(self) -> Optional[Video]: + if self._sample_video is None: + return None + + self._update_sample_file(file_type=FileType.video) + + if self._sample_video.metadata is None and not self._sample_video.metadata_failure: + + try: + assert self._sample_video.temp_sample_full_file_name or os.path.isfile( + self._sample_video.full_file_name) + + full_file_name = self._sample_video_full_file_name() + + self._sample_video.load_metadata( + full_file_name=full_file_name, + et_process=self.exiftool_process) + if self._sample_video.metadata_failure: + logging.error("Failed to load sample video metadata") + except AssertionError: + logging.error("Expected sample file name in sample video") + except: + logging.error("Exception while attempting to load sample video metadata") + return self._sample_video + + @sample_video.setter + def sample_video(self, video: Video) -> None: + if self._sample_video is not None and self._sample_video.temp_sample_is_complete_file: + # Don't delete this fully downloaded file, as it might be downloaded by the user, + # in which case it's already been recorded as a RPDFile.cache_full_file_name + # Instead add it to a list of files to possibly expunge at program exit + logging.debug("Adding %s to list of complete sample video files to potentially delete " + "at program exit", self._sample_video.temp_sample_full_file_name) + self._sample_videos_complete_files.append(self.sample_video.temp_sample_full_file_name) + else: + self._delete_sample_video(at_program_close=False) + self._sample_video = video + + def get_main_window_display_name_and_icon(self) -> Tuple[str, QIcon]: + """ + Generate the name to display at the top left of the main + window, indicating the source of the files. + + :return: string to display and associated icon + """ + + if not len(self): + return _('Select Source'), QIcon(':/icons/computer.svg') + elif len(self) == 1: + # includes case where path is the only device + device = list(self.devices.values())[0] + return device.display_name, device.get_icon() + else: + non_pc_devices = [device for device in self.devices.values() + if device.device_type != DeviceType.path] # type: List[Device] + assert len(non_pc_devices) == len(self.volumes_and_cameras) + device_types = Counter(d.device_type for d in non_pc_devices) + if len(device_types) == 1: + device_type = list(device_types)[0] + device_type_text = self._map_plural_types[device_type] + else: + device_type = None + device_type_text = _('Devices') + + if len(self.this_computer) == 1: + assert len(self.this_computer) < 2 + assert len(self.this_computer) > 0 + + icon = QIcon(':/icons/computer.svg') + devices = list(self.volumes_and_cameras) + computer_display_name=self.devices[list(self.this_computer)[0]].display_name + + if len(self.volumes_and_cameras) == 1: + device_display_name = self.devices[devices[0]].display_name + else: + assert len(self.volumes_and_cameras) > 1 + device_display_name = self._mixed_devices(device_type_text) + + text = _('%(device1)s + %(device2)s') % {'device1': device_display_name, + 'device2': computer_display_name} + return text, icon + else: + assert len(self.this_computer) == 0 + + mtp_devices = [d for d in non_pc_devices if d.is_mtp_device] + + if len(device_types) == 1: + if len(self) == 2: + devices = non_pc_devices + text = _('%(device1)s + %(device2)s') % {'device1': devices[0].display_name, + 'device2': devices[1].display_name} + if device_type == DeviceType.camera and len(mtp_devices) != 2: + return text, QIcon(':/icons/camera.svg') + return text, devices[0].get_icon() + try: + text_number = number(len(self.volumes_and_cameras)).number.capitalize() + except KeyError: + text_number = len(self.volumes_and_cameras) + if device_type == DeviceType.camera: + # Number of cameras e.g. 3 Cameras + text = _('%(no_cameras)s Cameras') % {'no_cameras': text_number} + if len(mtp_devices) == len(self.volumes_and_cameras): + return text, non_pc_devices[0].get_icon() + return text, QIcon(':/icons/camera.svg') + elif device_type == DeviceType.volume: + text = _('%(no_devices)s Devices') % dict(no_devices=text_number) + return text, QIcon(':/icons/drive-removable-media.svg') + else: + device_display_name = self._mixed_devices(device_type_text) + icon = QIcon(':/icons/computer.svg') + return device_display_name, icon + + +# QStorageInfo, BackupLocationType +BackupDevice = namedtuple('BackupDevice', 'mount, backup_type') + +# QStorageInfo, str, str, BackupLocationType +BackupVolumeDetails = namedtuple('BackupVolumeDetails', 'mount name path backup_type ' + 'os_stat_device') + + +def nth(iterable, n, default=None): + "Returns the nth item or a default value" + + return next(itertools.islice(iterable, n, None), default) + + +class BackupDeviceCollection: + r""" + Track and manage devices (and manual paths) used for backing up. + Photos can be backed up to one location, and videos to another; or + they can be backed up to the same location. + + If a BackupDevice's mount is None, then it is assumed to be + a manually specified path. + + Backup devices are indexed by path, not id + + >>> b = BackupDeviceCollection() + >>> len(b) + 0 + >>> p = BackupDevice(mount=None, backup_type=BackupLocationType.photos) + >>> p2 = BackupDevice(mount=None, backup_type=BackupLocationType.photos) + >>> v = BackupDevice(mount=None, backup_type=BackupLocationType.videos) + >>> pv = BackupDevice(mount=None, + ... backup_type=BackupLocationType.photos_and_videos) + >>> pv2 = BackupDevice(mount=None, + ... backup_type=BackupLocationType.photos_and_videos) + >>> b['/some/photo/path'] = p + >>> b + {'/some/photo/path':None <BackupLocationType.photos: 1> 0} + >>> b.device_id('/some/photo/path') + 0 + >>> b['/some/other/photo/path'] = p2 + >>> del b['/some/other/photo/path'] + >>> b['/some/video/path'] = v + >>> len(b) + 2 + >>> b.device_id('/some/video/path') + 2 + >>> b.device_id('/unknown/path') + >>> + >>> '/some/photo/path' in b + True + >>> b['/some/photo/path'] + BackupDevice(mount=None, backup_type=<BackupLocationType.photos: 1>) + >>> len(b.photo_backup_devices) + 1 + >>> len(b.video_backup_devices) + 1 + >>> b['/some/photo/video/path'] = pv + >>> len(b.photo_backup_devices) + 2 + >>> len(b.video_backup_devices) + 2 + >>> del b['/some/photo/path'] + >>> len(b.photo_backup_devices) + 1 + >>> len(b.video_backup_devices) + 2 + >>> b['/some/video/path'] = pv2 + >>> len(b.photo_backup_devices) + 2 + >>> len(b.video_backup_devices) + 2 + >>> del b['/some/video/path'] + >>> del b['/some/photo/video/path'] + >>> len(b) + 0 + >>> len(b.photo_backup_devices) + 0 + >>> len(b.video_backup_devices) + 0 + """ + def __init__(self, rapidApp=None): + self.rapidApp = rapidApp + self.devices = dict() # type: Dict[str, BackupDevice] + # Set[path] + self.photo_backup_devices = set() # type: Set[str] + self.video_backup_devices = set() # type: Set[str] + + self._device_ids = {} + self._device_id = 0 + + def __setitem__(self, path: str, device: BackupDevice): + if path in self.devices: + del self[path] + self.devices[path] = device + backup_type = device.backup_type + if backup_type in [BackupLocationType.photos, + BackupLocationType.photos_and_videos]: + self.photo_backup_devices.add(path) + if backup_type in [BackupLocationType.videos, + BackupLocationType.photos_and_videos]: + self.video_backup_devices.add(path) + self._device_ids[path] = self._device_id + self._device_id += 1 + + + def __delitem__(self, path): + backup_type = self.devices[path].backup_type + if backup_type in (BackupLocationType.photos, BackupLocationType.photos_and_videos): + self.photo_backup_devices.remove(path) + if backup_type in (BackupLocationType.videos, BackupLocationType.photos_and_videos): + self.video_backup_devices.remove(path) + del self.devices[path] + del self._device_ids[path] + + def __repr__(self): + s = '{' + for key, value in self.devices.items(): + s += r'%r:%r %r %s, ' % (key, value.mount, value.backup_type, + self._device_ids[key]) + s = s[:-2] + '}' + return s + + def __contains__(self, key): + return key in self.devices + + def __len__(self): + return len(self.devices) + + def __getitem__(self, path): + return self.devices[path] + + def __iter__(self): + return iter(self.devices) + + def all_paths(self) -> List[str]: + return list(self.devices.keys()) + + def device_id(self, path: str) -> Optional[int]: + if path in self: + return self._device_ids[path] + return None + + def name(self, path: str, shorten: bool=False) -> str: + """ + :param path: + :param shorten: if True, and backup type is not an + automatically detected device, return the path basename + :return: device mount name, or path / path basename + """ + + if self.devices[path].mount is None: + if shorten: + return get_path_display_name(path)[0] + else: + return path + else: + mount = self.devices[path].mount # type: QStorageInfo + if not shorten: + return mount.displayName() + else: + name = mount.name() + if name: + return name + else: + return get_path_display_name(mount.rootPath())[0] + + def backup_type(self, path) -> BackupLocationType: + return self.devices[path].backup_type + + def multiple_backup_devices(self, file_type: FileType) -> bool: + """ + + :param file_type: whether the file is a photo or video + :return: True if more than one backup device is being used for + the file type + """ + return ((file_type == FileType.photo and len(self.photo_backup_devices) > 1) or + (file_type == FileType.video and len(self.video_backup_devices) > 1)) + + def get_download_backup_device_overlap(self, + photo_download_folder: str, + video_download_folder: str) -> DefaultDict[int, Set[FileType]]: + """ + Determine if the photo/video download locations and the backup locations + are going to the same partitions. + + :param photo_download_folder: where photos are downloaded + :param video_download_folder: where videos are downloaded + :return: partitions that are downloaded and backed up to, + referred to by os.stat.st_dev + """ + + try: + photo_device = os.stat(photo_download_folder).st_dev + except FileNotFoundError: + photo_device = 0 + try: + video_device = os.stat(video_download_folder).st_dev + except: + video_device = 0 + + downloading_to = defaultdict(set) # type: DefaultDict[int, Set[FileType]] + + if photo_device != video_device: + download_dests = (photo_device, video_device) + else: + download_dests = (photo_device, ) + + for path in self.devices: + try: + backup_device = os.stat(path).st_dev + except: + backup_device = 0 + if backup_device != 0: + d = self.devices[path] + backup_type = d.backup_type + for download_device in download_dests: + if backup_device == download_device: + if backup_type in (BackupLocationType.photos, + BackupLocationType.photos_and_videos): + downloading_to[backup_device].add(FileType.photo) + if backup_type in (BackupLocationType.videos, + BackupLocationType.photos_and_videos): + downloading_to[backup_device].add(FileType.video) + return downloading_to + + def get_manual_mounts(self) -> Optional[Tuple[BackupVolumeDetails, ...]]: + """ + Get QStorageInfo, display name, and path for each backup + destination for manually specified backup destinations. + + Display name is the path basename. + + Lists photo backup destination before video backup destination. + + Exceptions are not caught, however invalid destinations are accounted + for. + + :return: Tuple of one or two Tuples containing QStorageInfo, display name, + and path. If no valid backup destinations are found, returns None. + """ + + assert len(self.devices) + + paths = tuple(self.devices.keys()) + + if len(paths) == 1: + if not os.path.isdir(paths[0]): + return None + same_path = True + path = paths[0] + backup_type = BackupLocationType.photos_and_videos + else: + assert len(paths) == 2 + photo_path = tuple(self.photo_backup_devices)[0] + video_path = tuple(self.video_backup_devices)[0] + + photo_path_valid = os.path.isdir(photo_path) + video_path_valid = os.path.isdir(video_path) + + if photo_path_valid and video_path_valid: + same_path = False + elif photo_path_valid: + same_path = True + path = photo_path + backup_type = BackupLocationType.photos + elif video_path_valid: + same_path = True + path = video_path + backup_type = BackupLocationType.videos + else: + return None + + if same_path: + name = self.name(path, shorten=True) + mount = QStorageInfo(path) + os_stat_device = os.stat(path).st_dev + return (BackupVolumeDetails(mount, name, path, backup_type, os_stat_device), ) + else: + photo_name = self.name(photo_path, shorten=True) + video_name = self.name(video_path, shorten=True) + photo_mount = QStorageInfo(photo_path) + photo_os_stat_device = os.stat(photo_path).st_dev + + if same_device(photo_path, video_path): + # Translators: two folder names, separated by a plus sign + names = _('%s + %s') % (photo_name, video_name) + paths = '%s\n%s' % (photo_path, video_path) + return (BackupVolumeDetails(photo_mount, names, paths, + BackupLocationType.photos_and_videos, + photo_os_stat_device),) + else: + video_mount = QStorageInfo(video_path) + video_os_stat_device = os.stat(video_path).st_dev + return (BackupVolumeDetails(photo_mount, photo_name, photo_path, + BackupLocationType.photos, + photo_os_stat_device), + BackupVolumeDetails(video_mount, video_name, video_path, + BackupLocationType.videos, + video_os_stat_device)) + + def get_backup_volume_details(self, path: str) -> BackupVolumeDetails: + """ + For now only used in case of external mounts i.e. not auto-detected. + + :param path: backup path + :return: named tuple of details of the backup volume + """ + + name = self.name(path, shorten=True) + device = self.devices[path] + if device.mount is not None: + mount = device.mount + else: + mount = QStorageInfo(path) + backup_type = device.backup_type + os_stat_device = os.stat(path).st_dev + return BackupVolumeDetails(mount, name, path, backup_type, os_stat_device) + + def backup_possible(self, file_type: FileType) -> bool: + """ + + :param file_type: whether the file is a photo or video + :return: True if more a backup device is being used for + the file type + """ + if file_type == FileType.photo: + return len(self.photo_backup_devices) > 0 + elif file_type == FileType.video: + return len(self.video_backup_devices) > 0 + else: + logging.critical("Unrecognized file type when determining if backup is possible") + + def _add_identifier(self, path: Optional[str], file_type: FileType) -> Optional[str]: + if path is None: + return None + if file_type == FileType.photo: + return os.path.join(path, self.rapidApp.prefs.photo_backup_identifier) + else: + return os.path.join(path, self.rapidApp.prefs.video_backup_identifier) + + def sample_device_paths(self) -> List[str]: + """ + Return a sample of up to three paths on detected backup devices. + + Includes the folder identifier (specified in the user prefs) + used to identify the backup drive. + + Illustrates backup destinations for each of photo, video, such + that: + - If photos are being backed up to a device, show it. + - If videos are being backed up to a device, show it. + - If photos and videos are being backed up to the same device, + show that they are. + + :return: sorted list of the paths + """ + + # Prioritize display of drives that are backing up only one type + both_types = self.photo_backup_devices & self.video_backup_devices + photo_only = self.photo_backup_devices - both_types + video_only = self.video_backup_devices - both_types + + photo0 = nth(iter(photo_only), 0) + video0 = nth(iter(video_only), 0) + both0, both1 = tuple(itertools.chain(itertools.islice(both_types, 2), + itertools.repeat(None, 2)))[:2] + + # Add the identifier specified in the user's prefs + photo0id, photo1id, photo2id = (self._add_identifier(path, FileType.photo) + for path in (photo0, both0, both1)) + video0id, video1id, video2id = (self._add_identifier(path, FileType.video) + for path in (video0, both0, both1)) + + paths = [path for path in (photo0id, video0id, photo1id, video1id, photo2id, video2id) + if path is not None][:3] + + if len(paths) < 3: + + unused_photo = self.photo_backup_devices - {path for path in (photo0, both0, both1) + if path is not None} + unused_video = self.video_backup_devices - {path for path in (video0, both0, both1) + if path is not None} + photo1, photo2 = tuple(itertools.chain(itertools.islice(unused_photo, 2), + itertools.repeat(None, 2)))[:2] + video1, video2 = tuple(itertools.chain(itertools.islice(unused_video, 2), + itertools.repeat(None, 2)))[:2] + photo3id, photo4id = (self._add_identifier(path, FileType.photo) + for path in (photo1, photo2)) + video3id, video4id = (self._add_identifier(path, FileType.video) + for path in (video1, video2)) + + paths += [path for path in (photo3id, video3id, photo4id, video4id) + if path is not None][:3 - len(paths)] + + return sorted(paths) + + def backup_destinations_missing(self, + downloading: DownloadingFileTypes) -> Optional[BackupFailureType]: + """ + Checks if there are backup destinations matching the files + going to be downloaded + :param downloading: the types of file that will be downloaded + :return: None if no problems, or BackupFailureType + """ + prefs = self.rapidApp.prefs + if prefs.backup_files: + photos = downloading in (DownloadingFileTypes.photos, + DownloadingFileTypes.photos_and_videos) + videos = downloading in (DownloadingFileTypes.videos, + DownloadingFileTypes.photos_and_videos) + + if prefs.backup_device_autodetection: + photo_backup_problem = photos and not self.backup_possible(FileType.photo) + video_backup_problem = videos and not self.backup_possible(FileType.video) + else: + photo_backup_problem = photos and not validate_download_folder( + path=prefs.backup_photo_location, + write_on_waccesss_failure=True + ).valid + video_backup_problem = videos and not validate_download_folder( + path=prefs.backup_video_location, + write_on_waccesss_failure=True + ).valid + + if photo_backup_problem: + if video_backup_problem: + return BackupFailureType.photos_and_videos + else: + return BackupFailureType.photos + elif video_backup_problem: + return BackupFailureType.videos + else: + return None + return None + + +class FSMetadataErrors: + """ + When downloading and backing up, filesystem metadata needs to be copied. + Sometimes it's not possible. Track which devices (computer devices, + according to the OS, that is, not the same as above) have problems. + """ + + def __init__(self) -> None: + # A 'device' in this class is the st_dev value returned by os.stat + self.devices = set() # type: Set[int] + self.archived_devices = set() # type: Set[int] + # device: FsMetadataWriteProblem + self.metadata_errors = dict() # type: Dict[int, FsMetadataWriteProblem] + # scan_id / device_id: Set[device] + self.worker_id_devices = defaultdict(set) # type: DefaultDict[int, Set[int]] + + def add_problem(self, worker_id: int, + path: str, + mdata_exceptions: Tuple[Exception]) -> None: + + dev = os.stat(path).st_dev + + if dev not in self.devices: + self.devices.add(dev) + + name, uri, root_path, fstype = fs_device_details(path) + + problem = FsMetadataWriteProblem( + name=name, uri=uri, mdata_exceptions=mdata_exceptions + ) + + self.metadata_errors[dev] = problem + + if worker_id is not None: + self.worker_id_devices[worker_id].add(dev) + + def problems(self, worker_id: int) -> List[FsMetadataWriteProblem]: + problems = [] + for dev in self.worker_id_devices[worker_id]: + if dev not in self.archived_devices: + problems.append(self.metadata_errors[dev]) + self.archived_devices.add(dev) + return problems |