From 083849161f075878e4175cd03cb7afa83d64e7f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Frings-F=C3=BCrst?= Date: Thu, 6 Jul 2017 22:55:08 +0200 Subject: New upstream version 0.9.0 --- raphodo/storage.py | 1408 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1408 insertions(+) create mode 100644 raphodo/storage.py (limited to 'raphodo/storage.py') diff --git a/raphodo/storage.py b/raphodo/storage.py new file mode 100644 index 0000000..99c4d28 --- /dev/null +++ b/raphodo/storage.py @@ -0,0 +1,1408 @@ +# Copyright (C) 2015-2017 Damon Lynch +# Copyright (C) 2008-2015 Canonical Ltd. +# Copyright (C) 2013 Bernard Baeyens + +# This file is part of Rapid Photo Downloader. +# +# Rapid Photo Downloader is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Rapid Photo Downloader is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Rapid Photo Downloader. If not, +# see . + +""" +The primary task of this module is to handle addition and removal of +(1) cameras and (2) devices with file systems. + +There are two scenarios: + +1) User is running under a Gnome-like environment in which GVFS will +automatically mount cameras and devices. We can monitor mounts and +send a signal when something is mounted. The camera must be +unmounted before libgphoto2 can access it, so we must handle that too. + +2) User is running under a non Gnome-like environment (e.g. KDE) in +which GVFS may or may not be running. However we can assume GVFS will +not automatically mount cameras and devices. In this case, using GIO +to monitor mounts is useless, as the mounts may not occur. So we must +monitor when cameras and other devices are added or removed ourselves. +To do this, use udev for cameras, and udisks2 for devices with file +systems. When a device with a file system is inserted, if it is not +already mounted, attempt to mount it. + +The secondary task of this module is to provide miscellaneous services +regarding mount points and XDG related functionality. +""" + +__author__ = 'Damon Lynch' +__copyright__ = "Copyright 2011-2017, Damon Lynch. Copyright 2008-2015 Canonical Ltd. Copyright" \ + " 2013 Bernard Baeyens." + +import logging +import os +import re +import sys +import time +import subprocess +import shlex +import pwd +from collections import namedtuple +from typing import Optional, Tuple, List, Dict, Any +from urllib.request import pathname2url +from tempfile import NamedTemporaryFile + +from PyQt5.QtCore import (QStorageInfo, QObject, pyqtSignal, QFileSystemWatcher, pyqtSlot) +from xdg.DesktopEntry import DesktopEntry +from xdg import BaseDirectory +import xdg + +import gi + +gi.require_version('GUdev', '1.0') +gi.require_version('UDisks', '2.0') +gi.require_version('GExiv2', '0.10') +gi.require_version('GLib', '2.0') +from gi.repository import GUdev, UDisks, GLib + +from gettext import gettext as _ + +from raphodo.constants import Desktop, Distro +from raphodo.utilities import ( + process_running, log_os_release, remove_topmost_directory_from_path, find_mount_point +) + +logging_level = logging.DEBUG + +try: + from gi.repository import Gio + + have_gio = True +except ImportError: + have_gio = False + +StorageSpace = namedtuple('StorageSpace', 'bytes_free, bytes_total, path') +CameraDetails = namedtuple('CameraDetails', 'model, port, display_name, is_mtp, storage_desc') +UdevAttr = namedtuple('UdevAttr', 'is_mtp_device, vendor, model') + +PROGRAM_DIRECTORY = 'rapid-photo-downloader' + + +def get_distro_id(id_or_id_like: str) -> Distro: + try: + return Distro[id_or_id_like.strip()] + except KeyError: + return Distro.unknown + + +def get_distro() -> Distro: + if os.path.isfile('/etc/os-release'): + with open('/etc/os-release', 'r') as f: + for line in f: + if line.startswith('ID='): + return get_distro_id(line[3:]) + if line.startswith('ID_LIKE='): + return get_distro_id(line[8:]) + return Distro.unknown + + +def get_user_name() -> str: + """ + Gets the user name of the process owner, with no exception checking + :return: user name of the process owner + """ + + return pwd.getpwuid(os.getuid())[0] + + +def get_path_display_name(path: str) -> Tuple[str, str]: + """ + Return a name for the path (path basename), + removing a final '/' when it's not the root of the + file system. + + :param path: path to generate the display name for + :return: display name and sanitized path + """ + if path.endswith(os.sep) and path != os.sep: + path = path[:-1] + + if path == os.sep: + display_name = _('File system root') + else: + display_name = os.path.basename(path) + return display_name, path + + +def get_media_dir() -> str: + """ + Returns the media directory, i.e. where external mounts are mounted. + + Assumes mount point of /media/. + + """ + + if sys.platform.startswith('linux'): + media_dir = '/media/{}'.format(get_user_name()) + run_media_dir = '/run{}'.format(media_dir) + distro = get_distro() + if os.path.isdir(run_media_dir) and distro not in ( + Distro.ubuntu, Distro.debian, Distro.neon, Distro.galliumos): + if distro not in (Distro.fedora, Distro.manjaro, Distro.arch, Distro.opensuse, + Distro.gentoo): + logging.debug("Detected /run/media directory, but distro does not appear to " + "be Fedora, Arch, openSUSE, Gentoo or Manjaro") + log_os_release() + return run_media_dir + return media_dir + else: + raise ("Mounts.setValidMountPoints() not implemented on %s", sys.platform()) + + +class ValidMounts(): + r""" + Operations to find 'valid' mount points, i.e. the places in which + it's sensible for a user to mount a partition. Valid mount points: + include /home/ , /media/, and /run/media/ + include directories in /etc/fstab, except /, /home, and swap + However if only considering external mounts, the the mount must be + under /media/ or /run/media/ + """ + + def __init__(self, onlyExternalMounts: bool): + """ + :param onlyExternalMounts: if True, valid mounts must be under + /media/ or /run/media/ + """ + self.validMountFolders = None # type: Tuple[str] + self.onlyExternalMounts = onlyExternalMounts + self._setValidMountFolders() + assert '/' not in self.validMountFolders + if logging_level == logging.DEBUG: + self.logValidMountFolders() + + def isValidMountPoint(self, mount: QStorageInfo) -> bool: + """ + Determine if the path of the mount point starts with a valid + path + :param mount: QStorageInfo to be tested + :return:True if mount is a mount under a valid mount, else False + """ + for m in self.validMountFolders: + if mount.rootPath().startswith(m): + return True + return False + + def pathIsValidMountPoint(self, path: str) -> bool: + """ + Determine if path indicates a mount point under a valid mount + point + :param path: path to be tested + :return:True if path is a mount under a valid mount, else False + """ + for m in self.validMountFolders: + if path.startswith(m): + return True + return False + + def mountedValidMountPointPaths(self) -> Tuple[str]: + """ + Return paths of all the currently mounted partitions that are + valid + :return: tuple of currently mounted valid partition paths + """ + + return tuple(filter(self.pathIsValidMountPoint, mountPaths())) + + def mountedValidMountPoints(self) -> Tuple[QStorageInfo]: + """ + Return mount points of all the currently mounted partitions + that are valid + :return: tuple of currently mounted valid partition + """ + + return tuple(filter(self.isValidMountPoint, QStorageInfo.mountedVolumes())) + + def _setValidMountFolders(self) -> None: + """ + Determine the valid mount point folders and set them in + self.validMountFolders, e.g. /media/, etc. + """ + + if not sys.platform.startswith('linux'): + raise ("Mounts.setValidMountPoints() not implemented on %s", sys.platform()) + else: + try: + media_dir = get_media_dir() + except: + logging.critical("Unable to determine username of this process") + media_dir = '' + logging.debug("Media dir is %s", media_dir) + if self.onlyExternalMounts: + self.validMountFolders = (media_dir, ) + else: + home_dir = os.path.expanduser('~') + validPoints = [home_dir, media_dir] + for point in self.mountPointInFstab(): + validPoints.append(point) + self.validMountFolders = tuple(validPoints) + + def mountPointInFstab(self): + """ + Yields a list of mount points in /etc/fstab + The mount points will exclude /, /home, and swap + """ + + with open('/etc/fstab') as f: + l = [] + for line in f: + # As per fstab specs: white space is either Tab or space + # Ignore comments, blank lines + # Also ignore swap file (mount point none), root, and /home + m = re.match(r'^(?![\t ]*#)\S+\s+(?!(none|/[\t ]|/home))(' + r'?P\S+)', + line) + if m is not None: + yield (m.group('point')) + + def logValidMountFolders(self): + """ + Output nicely formatted debug logging message + """ + + assert len(self.validMountFolders) > 0 + if logging_level == logging.DEBUG: + msg = "To be recognized, partitions must be mounted under " + if len(self.validMountFolders) > 2: + msg += "one of " + for p in self.validMountFolders[:-2]: + msg += "{}, ".format(p) + msg += "{} or {}".format(self.validMountFolders[-2], + self.validMountFolders[-1]) + elif len(self.validMountFolders) == 2: + msg += "{} or {}".format(self.validMountFolders[0], + self.validMountFolders[1]) + else: + msg += self.validMountFolders[0] + logging.debug(msg) + + +def mountPaths(): + """ + Yield all the mount paths returned by QStorageInfo + """ + + for m in QStorageInfo.mountedVolumes(): + yield m.rootPath() + + +def has_non_empty_dcim_folder(path: str) -> bool: + """ + Checks to see if below the path there is a DCIM folder, + if the folder is readable, and if it has any contents + :param path: path to check + :return: True if has valid DCIM, False otherwise + """ + + try: + has_dcim = "DCIM" in os.listdir(path) + except (PermissionError, FileNotFoundError, OSError): + return False + except: + logging.error("Unknown error occurred while probing potential source folder %s", path) + return False + if has_dcim: + dcim_folder = os.path.join(path, 'DCIM') + if os.path.isdir(dcim_folder) and os.access(dcim_folder, os.R_OK): + return len(os.listdir(dcim_folder)) > 0 + return False + + +def get_desktop_environment() -> Optional[str]: + """ + Determine desktop environment using environment variable XDG_CURRENT_DESKTOP + + :return: str with XDG_CURRENT_DESKTOP value + """ + + return os.getenv('XDG_CURRENT_DESKTOP') + + +def get_desktop() -> Desktop: + """ + Determine desktop environment + :return: enum representing desktop environment, + Desktop.unknown if unknown. + """ + + try: + env = get_desktop_environment().lower() + except AttributeError: + # Occurs when there is no value set + return Desktop.unknown + + if env == 'unity:unity7': + env = 'unity' + elif env == 'x-cinnamon': + env = 'cinnamon' + try: + return Desktop[env] + except KeyError: + return Desktop.unknown + + +def gvfs_controls_mounts() -> bool: + """ + Determine if GVFS controls mounts on this system. + + By default, common desktop environments known to use it are assumed + to be using it or not. If not found in this list, then the list of + running processes is searched, looking for a match against 'gvfs-gphoto2', + which will match what is at the time of this code being developed called + 'gvfs-gphoto2-volume-monitor', which is what we're most interested in. + + :return: True if so, False otherwise + """ + + desktop = get_desktop() + if desktop in (Desktop.gnome, Desktop.unity, Desktop.cinnamon, Desktop.xfce, + Desktop.mate, Desktop.lxde): + return True + elif desktop == Desktop.kde: + return False + return process_running('gvfs-gphoto2') + + +def _get_xdg_special_dir(dir_type: gi.repository.GLib.UserDirectory, + home_on_failure: bool=True) -> Optional[str]: + path = GLib.get_user_special_dir(dir_type) + if path is None and home_on_failure: + return os.path.expanduser('~') + return path + +def xdg_photos_directory(home_on_failure: bool=True) -> Optional[str]: + """ + Get localized version of /home//Pictures + + :param home_on_failure: if the directory does not exist, return + the home directory instead + :return: the directory if it is specified, else the user's + home directory or None + """ + return _get_xdg_special_dir(GLib.USER_DIRECTORY_PICTURES, home_on_failure) + + +def xdg_videos_directory(home_on_failure: bool=True) -> str: + """ + Get localized version of /home//Videos + + :param home_on_failure: if the directory does not exist, return + the home directory instead + :return: the directory if it is specified, else the user's + home directory or None + """ + return _get_xdg_special_dir(GLib.USER_DIRECTORY_VIDEOS, home_on_failure) + +def xdg_desktop_directory(home_on_failure: bool=True) -> str: + """ + Get localized version of /home//Desktop + + :param home_on_failure: if the directory does not exist, return + the home directory instead + :return: the directory if it is specified, else the user's + home directory or None + """ + return _get_xdg_special_dir(GLib.UserDirectory.DIRECTORY_DESKTOP, home_on_failure) + +def xdg_photos_identifier() -> str: + """ + Get special subfoler indicated by the localized version of /home//Pictures + :return: the subfolder name if it is specified, else the localized version of 'Pictures' + """ + + path = _get_xdg_special_dir(GLib.USER_DIRECTORY_PICTURES, home_on_failure=False) + if path is None: + # translators: the name of the Pictures folder + return _('Pictures') + return os.path.basename(path) + +def xdg_videos_identifier() -> str: + """ + Get special subfoler indicated by the localized version of /home//Pictures + :return: the subfolder name if it is specified, else the localized version of 'Pictures' + """ + + path = _get_xdg_special_dir(GLib.USER_DIRECTORY_VIDEOS, home_on_failure=False) + if path is None: + # translators: the name of the Videos folder + return _('Videos') + return os.path.basename(path) + + +def make_program_directory(path: str) -> str: + """ + Creates a subfolder used by Rapid Photo Downloader. + + Does not catch errors. + + :param path: location where the subfolder should be + :return: the full path of the new directory + """ + program_dir = os.path.join(path, 'rapid-photo-downloader') + if not os.path.exists(program_dir): + os.mkdir(program_dir) + elif not os.path.isdir(program_dir): + os.remove(program_dir) + os.mkdir(program_dir) + return program_dir + + +def get_program_cache_directory(create_if_not_exist: bool = False) -> Optional[str]: + """ + Get Rapid Photo Downloader cache directory. + + Is assumed to be under $XDG_CACHE_HOME or if that doesn't exist, + ~/.cache. + :param create_if_not_exist: creates directory if it does not exist. + :return: the full path of the cache directory, or None on error + """ + try: + cache_directory = BaseDirectory.xdg_cache_home + if not create_if_not_exist: + return os.path.join(cache_directory, PROGRAM_DIRECTORY) + else: + return make_program_directory(cache_directory) + except OSError: + logging.error("An error occurred while creating the cache directory") + return None + + +def get_program_logging_directory(create_if_not_exist: bool = False) -> Optional[str]: + """ + Get directory in which to store program log files. + + Log files are kept in the cache dirctory. + + :param create_if_not_exist: + :return: the full path of the logging directory, or None on error + """ + cache_directory = get_program_cache_directory(create_if_not_exist=create_if_not_exist) + log_dir = os.path.join(cache_directory, 'log') + if os.path.isdir(log_dir): + return log_dir + if create_if_not_exist: + try: + if os.path.isfile(log_dir): + os.remove(log_dir) + os.mkdir(log_dir, 0o700) + return log_dir + except OSError: + logging.error("An error occurred while creating the log directory") + return None + + +def get_program_data_directory(create_if_not_exist=False) -> Optional[str]: + """ + Get Rapid Photo Downloader data directory, which is assumed to be + under $XDG_DATA_HOME or if that doesn't exist, ~/.local/share + :param create_if_not_exist: creates directory if it does not exist. + :return: the full path of the data directory, or None on error + """ + try: + data_directory = BaseDirectory.xdg_data_dirs[0] + if not create_if_not_exist: + return os.path.join(data_directory, PROGRAM_DIRECTORY) + else: + return make_program_directory(data_directory) + except OSError: + logging.error("An error occurred while creating the data directory") + return None + + +def get_fdo_cache_thumb_base_directory() -> str: + """ + Get the Freedesktop.org thumbnail directory location + :return: location + """ + + # LXDE is a special case: handle it + if get_desktop() == Desktop.lxde: + return os.path.join(os.path.expanduser('~'), '.thumbnails') + + return os.path.join(BaseDirectory.xdg_cache_home, 'thumbnails') + + +def get_default_file_manager(remove_args: bool = True) -> Optional[str]: + """ + Attempt to determine the default file manager for the system + :param remove_args: if True, remove any arguments such as %U from + the returned command + :return: command (without path) if found, else None + """ + assert sys.platform.startswith('linux') + cmd = shlex.split('xdg-mime query default inode/directory') + try: + desktop_file = subprocess.check_output(cmd, universal_newlines=True) + except: + return None + # Remove new line character from output + desktop_file = desktop_file[:-1] + if desktop_file.endswith(';'): + desktop_file = desktop_file[:-1] + for desktop_path in ('/usr/local/share/applications/', '/usr/share/applications/'): + path = os.path.join(desktop_path, desktop_file) + if os.path.exists(path): + try: + desktop_entry = DesktopEntry(path) + except xdg.Exceptions.ParsingError: + return None + try: + desktop_entry.parse(path) + except: + return None + fm = desktop_entry.getExec() + if remove_args: + return fm.split()[0] + else: + return fm + + +def get_uri(full_file_name: Optional[str]=None, + path: Optional[str]=None, + camera_details: Optional[CameraDetails]=None, + desktop_environment: Optional[bool]=True) -> str: + """ + Generate and return the URI for the file, which varies depending on + which device it is + + :param full_file_name: full filename and path + :param path: straight path when not passing a full_file_name + :param camera_details: see named tuple CameraDetails for parameters + :param desktop_environment: if True, will to generate a URI accepted + by Gnome and KDE desktops, which means adjusting the URI if it appears to be an + MTP mount. Includes the port too. + :return: the URI + """ + + if camera_details is None: + prefix = 'file://' + if desktop_environment: + desktop = get_desktop() + if full_file_name and desktop in (Desktop.mate, Desktop.kde): + full_file_name = os.path.dirname(full_file_name) + else: + if not desktop_environment: + if full_file_name or path: + prefix = 'gphoto2://' + else: + prefix = 'gphoto2://' + pathname2url('[{}]'.format(camera_details.port)) + else: + # Attempt to generate a URI accepted by desktop environments + if camera_details.is_mtp: + if full_file_name: + full_file_name = remove_topmost_directory_from_path(full_file_name) + elif path: + path = remove_topmost_directory_from_path(path) + + desktop = get_desktop() + if gvfs_controls_mounts(): + prefix = 'mtp://' + pathname2url('[{}]/{}'.format( + camera_details.port, camera_details.storage_desc)) + elif desktop == Desktop.kde: + prefix = 'mtp:/' + pathname2url('{}/{}'.format( + camera_details.display_name, camera_details.storage_desc)) + # Dolphin doesn't highlight the file if it's passed. + # Instead it tries to open it, but fails. + # So don't pass the file, just the directory it's in. + if full_file_name: + full_file_name = os.path.dirname(full_file_name) + else: + logging.error("Don't know how to generate MTP prefix for %s", desktop.name) + else: + prefix = 'gphoto2://' + pathname2url('[{}]'.format(camera_details.port)) + if full_file_name or path: + uri = '{}{}'.format(prefix, pathname2url(full_file_name or path)) + else: + uri = prefix + return uri + + +ValidatedFolder = namedtuple('ValidatedFolder', 'valid, absolute_path') + + +def validate_download_folder(path: Optional[str], + write_on_waccesss_failure: bool=False) -> ValidatedFolder: + r""" + Check if folder exists and is writeable. + + Accepts None as a folder, which will always be invalid. + + :param path: path to analyze + :param write_on_waccesss_failure: if os.access reports path is not writable, test + nonetheless to see if it's writable by writing and deleting a test file + :return: Tuple indicating validity and path made absolute + + >>> validate_download_folder('/some/bogus/and/ridiculous/path') + ValidatedFolder(valid=False, absolute_path='/some/bogus/and/ridiculous/path') + >>> validate_download_folder(None) + ValidatedFolder(valid=False, absolute_path='') + >>> validate_download_folder('') + ValidatedFolder(valid=False, absolute_path='') + """ + + if not path: + return ValidatedFolder(False, '') + absolute_path = os.path.abspath(path) + valid = os.path.isdir(path) and os.access(path, os.W_OK) + if not valid and write_on_waccesss_failure and os.path.isdir(path): + try: + with NamedTemporaryFile(dir=path): + # the path is in fact writeable -- can happen with NFS + valid = True + except Exception: + logging.warning('While validating download / backup folder, failed to write a ' + 'temporary file to %s', path) + + return ValidatedFolder(valid, absolute_path) + + +def validate_source_folder(path: Optional[str]) -> ValidatedFolder: + r""" + Check if folder exists and is readable. + + Accepts None as a folder, which will always be invalid. + + :param path: path to analyze + :return: Tuple indicating validity and path made absolute + + >>> validate_source_folder('/some/bogus/and/ridiculous/path') + ValidatedFolder(valid=False, absolute_path='/some/bogus/and/ridiculous/path') + >>> validate_source_folder(None) + ValidatedFolder(valid=False, absolute_path='') + >>> validate_source_folder('') + ValidatedFolder(valid=False, absolute_path='') + """ + + if not path: + return ValidatedFolder(False, '') + absolute_path = os.path.abspath(path) + valid = os.path.isdir(path) and os.access(path, os.R_OK) + return ValidatedFolder(valid, absolute_path) + + +def udev_attributes(devname: str) -> Optional[UdevAttr]: + """ + Query udev to see if device is an MTP device. + + :param devname: udev DEVNAME e.g. '/dev/bus/usb/001/003' + :return True if udev property ID_MTP_DEVICE == '1', else False + """ + + client = GUdev.Client(subsystems=['usb', 'block']) + enumerator = GUdev.Enumerator.new(client) + enumerator.add_match_property('DEVNAME', devname) + for device in enumerator.execute(): + model = device.get_property('ID_MODEL') # type: str + if model is not None: + is_mtp = device.get_property('ID_MTP_DEVICE') == '1' + vendor = device.get_property('ID_VENDOR') # type: str + model = model.replace('_', ' ').strip() + vendor = vendor.replace('_', ' ').strip() + return UdevAttr(is_mtp, vendor, model) + return None + + +def fs_device_details(path: str) -> Tuple: + """ + :return: device (volume) name, uri, root path and filesystem type + of the mount the path is on + """ + qsInfo = QStorageInfo(path) + name = qsInfo.displayName() + root_path = qsInfo.rootPath() + uri = 'file://{}'.format(pathname2url(root_path)) + fstype = qsInfo.fileSystemType() + if isinstance(fstype, bytes): + fstype = fstype.decode() + return name, uri, root_path, fstype + + +class WatchDownloadDirs(QFileSystemWatcher): + """ + Create a file system watch to monitor if there are changes to the + download directories + """ + + def updateWatchPathsFromPrefs(self, prefs) -> None: + """ + Update the watched directories using values from the program preferences + :param prefs: program preferences + :type prefs: raphodo.preferences.Preferences + """ + + logging.debug("Updating watched paths") + + paths = (os.path.dirname(path) for path in (prefs.photo_download_folder, + prefs.video_download_folder)) + watch = {path for path in paths if path} + + existing_watches = set(self.directories()) + + if watch == existing_watches: + return + + new = watch - existing_watches + if new: + new = list(new) + logging.debug("Adding to watched paths: %s", ', '.join(new)) + failures = self.addPaths(new) + if failures: + logging.debug("Failed to add watched paths: %s", failures) + + old = existing_watches - watch + if old: + old = list(old) + logging.debug("Removing from watched paths: %s", ', '.join(old)) + failures = self.removePaths(old) + if failures: + logging.debug("Failed to remove watched paths: %s", failures) + + def closeWatch(self) -> None: + """ + End all watches. + """ + dirs = self.directories() + if dirs: + self.removePaths(dirs) + + +class CameraHotplug(QObject): + cameraAdded = pyqtSignal() + cameraRemoved = pyqtSignal() + + def __init__(self): + super().__init__() + self.cameras = {} + + @pyqtSlot() + def startMonitor(self): + self.client = GUdev.Client(subsystems=['usb', 'block']) + self.client.connect('uevent', self.ueventCallback) + logging.debug("... camera hotplug monitor started") + self.enumerateCameras() + if self.cameras: + logging.debug("Camera Hotplug found %d cameras:", len(self.cameras)) + for port, model in self.cameras.items(): + logging.debug("%s at %s", model, port) + + def enumerateCameras(self): + """ + Query udev to get the list of cameras store their path and + model in our internal dict, which is useful when responding to + camera removal. + """ + enumerator = GUdev.Enumerator.new(self.client) + enumerator.add_match_property('ID_GPHOTO2', '1') + for device in enumerator.execute(): + model = device.get_property('ID_MODEL') + if model is not None: + path = device.get_sysfs_path() + self.cameras[path] = model + + def ueventCallback(self, client: GUdev.Client, action: str, device: GUdev.Device) -> None: + + # for key in device.get_property_keys(): + # print(key, device.get_property(key)) + if device.get_property('ID_GPHOTO2') == '1': + self.camera(action, device) + + def camera(self, action: str, device: GUdev.Device) -> None: + # For some reason, the add and remove camera event is triggered twice. + # The second time the device information is a variation on information + # from the first time. + path = device.get_sysfs_path() + parent_device = device.get_parent() + parent_path = parent_device.get_sysfs_path() + logging.debug("Device change: %s. Path: %s Parent Device: %s Parent path: %s", + action, path, parent_device, parent_path) + + if action == 'add': + if parent_path not in self.cameras: + model = device.get_property('ID_MODEL') + logging.debug("Hotplug: new camera: %s", model) + self.cameras[path] = model + self.cameraAdded.emit() + else: + logging.debug("Hotplug: already know about %s", self.cameras[ + parent_path]) + + elif action == 'remove': + emit_remove = False + name = '' + if path in self.cameras: + name = self.cameras[path] + del self.cameras[path] + emit_remove = True + elif device.get_property('ID_GPHOTO2') == '1': + # This should not need to be called. However, + # self.enumerateCameras may not have been called earlier + name = device.get_property('ID_MODEL') + if name is not None: + emit_remove = True + if emit_remove: + logging.debug("Hotplug: %s has been removed", name) + self.cameraRemoved.emit() + + +class UDisks2Monitor(QObject): + # Most of this class is Copyright 2008-2015 Canonical + + partitionMounted = pyqtSignal(str, list, bool) + partitionUnmounted = pyqtSignal(str) + + loop_prefix = '/org/freedesktop/UDisks2/block_devices/loop' + not_interesting = ( + '/org/freedesktop/UDisks2/block_devices/dm_', + '/org/freedesktop/UDisks2/block_devices/ram', + '/org/freedesktop/UDisks2/block_devices/zram', + ) + + def __init__(self, validMounts: ValidMounts) -> None: + super().__init__() + self.validMounts = validMounts + + @pyqtSlot() + def startMonitor(self) -> None: + self.udisks = UDisks.Client.new_sync(None) + self.manager = self.udisks.get_object_manager() + self.manager.connect('object-added', + lambda man, obj: self._udisks_obj_added(obj)) + self.manager.connect('object-removed', + lambda man, obj: self._device_removed(obj)) + + # Track the paths of the mount points, which is useful when unmounting + # objects. + self.known_mounts = {} # type: Dict[str, str] + for obj in self.manager.get_objects(): + path = obj.get_object_path() + fs = obj.get_filesystem() + if fs: + mount_points = fs.get_cached_property('MountPoints').get_bytestring_array() + if mount_points: + self.known_mounts[path] = mount_points[0] + logging.debug("... UDisks2 monitor started") + + def _udisks_obj_added(self, obj) -> None: + path = obj.get_object_path() + for boring in self.not_interesting: + if path.startswith(boring): + return + block = obj.get_block() + if not block: + return + + drive = self._get_drive(block) + + part = obj.get_partition() + is_system = block.get_cached_property('HintSystem').get_boolean() + is_loop = path.startswith(self.loop_prefix) and not \ + block.get_cached_property('ReadOnly').get_boolean() + if not is_system or is_loop: + if part: + self._udisks_partition_added(obj, block, drive, path) + + def _get_drive(self, block) -> Optional[UDisks.Drive]: + drive_name = block.get_cached_property('Drive').get_string() + if drive_name != '/': + return self.udisks.get_object(drive_name).get_drive() + else: + return None + + def _udisks_partition_added(self, obj, block, drive, path) -> None: + logging.debug('UDisks: partition added: %s' % path) + fstype = block.get_cached_property('IdType').get_string() + logging.debug('Udisks: id-type: %s' % fstype) + + fs = obj.get_filesystem() + + if fs: + icon_names = self.get_icon_names(obj) + + if drive is not None: + ejectable = drive.get_property('ejectable') + else: + ejectable = False + mount_point = '' + mount_points = fs.get_cached_property('MountPoints').get_bytestring_array() + if len(mount_points) == 0: + try: + logging.debug("UDisks: attempting to mount %s", path) + mount_point = self.retry_mount(fs, fstype) + if not mount_point: + raise + else: + logging.debug("UDisks: successfully mounted at %s", mount_point) + except: + logging.error('UDisks: could not mount the device: %s', path) + return + else: + mount_point = mount_points[0] + logging.debug("UDisks: already mounted at %s", mount_point) + + self.known_mounts[path] = mount_point + if self.validMounts.pathIsValidMountPoint(mount_point): + self.partitionMounted.emit(mount_point, icon_names, ejectable) + + else: + logging.debug("Udisks: partition has no file system %s", path) + + def retry_mount(self, fs, fstype) -> str: + # Variant parameter construction Copyright Bernard Baeyens, and is + # licensed under GNU General Public License Version 2 or higher. + # https://github.com/berbae/udisksvm + list_options = '' + if fstype == 'vfat': + list_options = 'flush' + elif fstype == 'ext2': + list_options = 'sync' + G_VARIANT_TYPE_VARDICT = GLib.VariantType.new('a{sv}') + param_builder = GLib.VariantBuilder.new(G_VARIANT_TYPE_VARDICT) + optname = GLib.Variant.new_string('fstype') # s + value = GLib.Variant.new_string(fstype) + vvalue = GLib.Variant.new_variant(value) # v + newsv = GLib.Variant.new_dict_entry(optname, vvalue) # {sv} + param_builder.add_value(newsv) + optname = GLib.Variant.new_string('options') + value = GLib.Variant.new_string(list_options) + vvalue = GLib.Variant.new_variant(value) + newsv = GLib.Variant.new_dict_entry(optname, vvalue) + param_builder.add_value(newsv) + vparam = param_builder.end() # a{sv} + + # Try to mount until it does not fail with "Busy" + timeout = 10 + while timeout >= 0: + try: + return fs.call_mount_sync(vparam, None) + except GLib.GError as e: + if not 'UDisks2.Error.DeviceBusy' in e.message: + raise + logging.debug('Udisks: Device busy.') + time.sleep(0.3) + timeout -= 1 + return '' + + def get_icon_names(self, obj: UDisks.Object) -> List[str]: + # Get icon information, if possible + icon_names = [] + if have_gio: + info = self.udisks.get_object_info(obj) + icon = info.get_icon() + if isinstance(icon, Gio.ThemedIcon): + icon_names = icon.get_names() + return icon_names + + # Next four class member functions from Damon Lynch, not Canonical + def _device_removed(self, obj: UDisks.Object) -> None: + # path here refers to the udev / udisks path, not the mount point + path = obj.get_object_path() + if path in self.known_mounts: + mount_point = self.known_mounts[path] + del self.known_mounts[path] + self.partitionUnmounted.emit(mount_point) + + def get_can_eject(self, obj: UDisks.Object) -> bool: + block = obj.get_block() + drive = self._get_drive(block) + if drive is not None: + return drive.get_property('ejectable') + return False + + def get_device_props(self, device_path: str) -> Tuple[List[str], bool]: + """ + Given a device, get the icon names suggested by udev, and + determine whether the mount is ejectable or not. + :param device_path: system path of the device to check, + e.g. /dev/sdc1 + :return: icon names and eject boolean + """ + + object_path = '/org/freedesktop/UDisks2/block_devices/{}'.format( + os.path.split(device_path)[1]) + obj = self.udisks.get_object(object_path) + icon_names = self.get_icon_names(obj) + can_eject = self.get_can_eject(obj) + return (icon_names, can_eject) + + @pyqtSlot(str) + def unmount_volume(self, mount_point: str) -> None: + + G_VARIANT_TYPE_VARDICT = GLib.VariantType.new('a{sv}') + param_builder = GLib.VariantBuilder.new(G_VARIANT_TYPE_VARDICT) + + # Variant parameter construction Copyright Bernard Baeyens, and is + # licensed under GNU General Public License Version 2 or higher. + # https://github.com/berbae/udisksvm + + optname = GLib.Variant.new_string('force') + value = GLib.Variant.new_boolean(False) + vvalue = GLib.Variant.new_variant(value) + newsv = GLib.Variant.new_dict_entry(optname, vvalue) + param_builder.add_value(newsv) + + vparam = param_builder.end() # a{sv} + + path = None + # Get the path from the dict we keep of known mounts + for key, value in self.known_mounts.items(): + if value == mount_point: + path = key + break + if path is None: + logging.error("Could not find UDisks2 path used to be able to unmount %s", mount_point) + + fs = None + for obj in self.manager.get_objects(): + opath = obj.get_object_path() + if path == opath: + fs = obj.get_filesystem() + if fs is None: + logging.error("Could not find UDisks2 filesystem used to be able to unmount %s", + mount_point) + + logging.debug("Unmounting %s...", mount_point) + try: + fs.call_unmount(vparam, None, self.umount_volume_callback, (mount_point, fs)) + except GLib.GError: + value = sys.exc_info()[1] + logging.error('Unmounting failed with error:') + logging.error("%s", value) + + def umount_volume_callback(self, source_object: UDisks.FilesystemProxy, + result: Gio.AsyncResult, + user_data: Tuple[str, UDisks.Filesystem]) -> None: + """ + Callback for asynchronous unmount operation. + + :param source_object: the FilesystemProxy object + :param result: result of the unmount + :param user_data: mount_point and the file system + """ + + mount_point, fs = user_data + + try: + if fs.call_unmount_finish(result): + logging.debug("...successfully unmounted %s", mount_point) + else: + # this is the result even when the unmount was unsuccessful + logging.debug("...possibly failed to unmount %s", mount_point) + except GLib.GError as e: + logging.error('Exception occurred unmounting %s', mount_point) + logging.exception('Traceback:') + except: + logging.error('Exception occurred unmounting %s', mount_point) + logging.exception('Traceback:') + + self.partitionUnmounted.emit(mount_point) + + +if have_gio: + class GVolumeMonitor(QObject): + r""" + Monitor the mounting or unmounting of cameras or partitions + using Gnome's GIO/GVFS. Unmount cameras automatically mounted + by GVFS. + + Raises a signal if a volume has been inserted, but will not be + automatically mounted. This is important because this class + is monitoring mounts, and if the volume is not mounted, it will + go unnoticed. + """ + + cameraUnmounted = pyqtSignal(bool, str, str, bool, bool) + cameraMounted = pyqtSignal() + partitionMounted = pyqtSignal(str, list, bool) + partitionUnmounted = pyqtSignal(str) + volumeAddedNoAutomount = pyqtSignal() + cameraPossiblyRemoved = pyqtSignal() + + def __init__(self, validMounts: ValidMounts) -> None: + super().__init__() + self.vm = Gio.VolumeMonitor.get() + self.vm.connect('mount-added', self.mountAdded) + self.vm.connect('volume-added', self.volumeAdded) + self.vm.connect('mount-removed', self.mountRemoved) + self.vm.connect('volume-removed', self.volumeRemoved) + self.portSearch = re.compile(r'usb:([\d]+),([\d]+)') + self.scsiPortSearch = re.compile(r'usbscsi:(.+)') + self.validMounts = validMounts + + def ptpCameraMountPoint(self, model: str, port: str) -> Optional[Gio.Mount]: + """ + :return: the mount point of the PTP / MTP camera, if it is mounted, + else None. If camera is not mounted with PTP / MTP, None is + returned. + """ + + p = self.portSearch.match(port) + if p is not None: + p1 = p.group(1) + p2 = p.group(2) + pattern = re.compile(r'%\S\Susb%\S\S{}%\S\S{}%\S\S'.format(p1, p2)) + else: + p = self.scsiPortSearch.match(port) + if p is None: + logging.error("Unknown camera mount method %s %s", model, port) + return None + + to_unmount = None + + for mount in self.vm.get_mounts(): + folder_extract = self.mountIsCamera(mount) + if folder_extract is not None: + if pattern.match(folder_extract): + to_unmount = mount + break + return to_unmount + + def unmountCamera(self, model: str, + port: str, + download_starting: bool=False, + on_startup: bool=False, + mount_point: Optional[Gio.Mount]=None) -> bool: + """ + Unmount camera mounted on gvfs mount point, if it is + mounted. If not mounted, ignore. + :param model: model as returned by libgphoto2 + :param port: port as returned by libgphoto2, in format like + usb:001,004 + :param download_starting: if True, the unmount is occurring + because a download has been initiated. + :param on_startup: if True, the unmount is occurring during + the program's startup phase + :param mount_point: if not None, try umounting from this + mount point without scanning for it first + :return: True if an unmount operation has been initiated, + else returns False. + """ + + if mount_point is None: + to_unmount = self.ptpCameraMountPoint(model, port) + else: + to_unmount = mount_point + + if to_unmount is not None: + logging.debug("GIO: Attempting to unmount %s...", model) + to_unmount.unmount_with_operation(0, None, None, self.unmountCameraCallback, + (model, port, download_starting, on_startup)) + return True + + return False + + def unmountCameraCallback(self, mount: Gio.Mount, + result: Gio.AsyncResult, + user_data: Tuple[str, str, bool, bool]) -> None: + """ + Called by the asynchronous unmount operation. + When complete, emits a signal indicating operation + success, and the camera model and port + :param mount: camera mount + :param result: result of the unmount process + :param user_data: model and port of the camera being + unmounted, in the format of libgphoto2 + """ + + model, port, download_starting, on_startup = user_data + try: + if mount.unmount_with_operation_finish(result): + logging.debug("...successfully unmounted {}".format(model)) + self.cameraUnmounted.emit(True, model, port, download_starting, on_startup) + else: + logging.debug("...failed to unmount {}".format(model)) + self.cameraUnmounted.emit(False, model, port, download_starting, on_startup) + except GLib.GError as e: + logging.error('Exception occurred unmounting {}'.format(model)) + logging.exception('Traceback:') + self.cameraUnmounted.emit(False, model, port, download_starting, on_startup) + + def unmountVolume(self, path: str) -> None: + """ + Unmounts the volume represented by the path. If no volume is found + representing that path, nothing happens. + + :param path: path of the volume. It should not end with os.sep. + """ + + for mount in self.vm.get_mounts(): + root = mount.get_root() + if root is not None: + mpath = root.get_path() + if path == mpath: + logging.info("Attempting to unmount %s...", path) + mount.unmount_with_operation(0, None, None, self.unmountVolumeCallback, + path) + break + + def unmountVolumeCallback(self, mount: Gio.Mount, + result: Gio.AsyncResult, + user_data: str) -> None: + + """ + Called by the asynchronous unmount operation. + + :param mount: volume mount + :param result: result of the unmount process + :param user_data: the path of the device unmounted + """ + path = user_data + + try: + if mount.unmount_with_operation_finish(result): + logging.info("...successfully unmounted %s", path) + else: + logging.info("...failed to unmount %s", path) + except GLib.GError as e: + logging.error('Exception occurred unmounting %s', path) + logging.exception('Traceback:') + + + def mountIsCamera(self, mount: Gio.Mount) -> Optional[str]: + """ + Determine if the mount point is that of a camera + :param mount: the mount to examine + :return: None if not a camera, or the component of the + folder name that indicates on which port it is mounted + """ + root = mount.get_root() + if root is None: + logging.warning('Unable to get mount root') + else: + path = root.get_path() + if path: + logging.debug("GIO: Looking for camera at mount {}".format(path)) + folder_name = os.path.split(path)[1] + for s in ('gphoto2:host=', 'mtp:host='): + if folder_name.startswith(s): + return folder_name[len(s):] + if path is not None: + logging.debug("GIO: camera not found at {}".format(path)) + return None + + def mountIsPartition(self, mount: Gio.Mount) -> bool: + """ + Determine if the mount point is that of a valid partition, + i.e. is mounted in a valid location, which is under one of + self.validMountDirs + :param mount: the mount to examine + :return: True if the mount is a valid partiion + """ + root = mount.get_root() + if root is None: + logging.warning('Unable to get mount root') + else: + path = root.get_path() + if path: + logging.debug("GIO: Looking for valid partition at mount {}".format(path)) + if self.validMounts.pathIsValidMountPoint(path): + logging.debug("GIO: partition found at {}".format(path)) + return True + if path is not None: + logging.debug("GIO: partition is not valid mount: {}".format(path)) + return False + + def mountAdded(self, volumeMonitor, mount: Gio.Mount) -> None: + if self.mountIsCamera(mount): + self.cameraMounted.emit() + elif self.mountIsPartition(mount): + icon_names = self.getIconNames(mount) + self.partitionMounted.emit(mount.get_root().get_path(), + icon_names, + mount.can_eject()) + + def mountRemoved(self, volumeMonitor, mount: Gio.Mount) -> None: + if not self.mountIsCamera(mount): + if self.mountIsPartition(mount): + logging.debug("GIO: %s has been unmounted", mount.get_name()) + self.partitionUnmounted.emit(mount.get_root().get_path()) + + def volumeAdded(self, volumeMonitor, volume: Gio.Volume) -> None: + logging.debug("GIO: Volume added %s. Automount: %s", + volume.get_name(), + volume.should_automount()) + if not volume.should_automount(): + # TODO is it possible to determine the device type? + self.volumeAddedNoAutomount.emit() + + def volumeRemoved(self, volumeMonitor, volume: Gio.Volume) -> None: + logging.debug("GIO: %s volume removed", volume.get_name()) + if volume.get_activation_root() is not None: + logging.debug("GIO: %s might be a camera", volume.get_name()) + self.cameraPossiblyRemoved.emit() + + def getIconNames(self, mount: Gio.Mount) -> List[str]: + icon_names = [] + icon = mount.get_icon() + if isinstance(icon, Gio.ThemedIcon): + icon_names = icon.get_names() + + return icon_names + + def getProps(self, path: str) -> Tuple[Optional[List[str]], Optional[bool]]: + """ + Given a mount's path, get the icon names suggested by the + volume monitor, and determine whether the mount is + ejectable or not. + :param path: the path of mount to check + :return: icon names and eject boolean + """ + + for mount in self.vm.get_mounts(): + root = mount.get_root() + if root is not None: + p = root.get_path() + if path == p: + icon_names = self.getIconNames(mount) + return (icon_names, mount.can_eject()) + return (None, None) + + +def _get_info_size_value(info: Gio.FileInfo, attr: str) -> int: + if info.get_attribute_data(attr).type == Gio.FileAttributeType.UINT64: + return info.get_attribute_uint64(attr) + else: + return info.get_attribute_uint32(attr) + + +def get_mount_size(mount: QStorageInfo) -> Tuple[int, int]: + """ + Uses GIO to get bytes total and bytes free (available) for the mount that a + path is in. + + :param path: path located anywhere in the mount + :return: bytes_total, bytes_free + """ + + bytes_free = mount.bytesAvailable() + bytes_total = mount.bytesTotal() + + if bytes_total or not have_gio: + return bytes_total, bytes_free + + path = mount.rootPath() + + logging.debug("Using GIO to query file system attributes for %s...", path) + p = Gio.File.new_for_path(os.path.abspath(path)) + info = p.query_filesystem_info(','.join((Gio.FILE_ATTRIBUTE_FILESYSTEM_SIZE, + Gio.FILE_ATTRIBUTE_FILESYSTEM_FREE))) + logging.debug("...query of file system attributes for %s completed", path) + bytes_total = _get_info_size_value(info, Gio.FILE_ATTRIBUTE_FILESYSTEM_SIZE) + bytes_free = _get_info_size_value(info, Gio.FILE_ATTRIBUTE_FILESYSTEM_FREE) + return bytes_total, bytes_free -- cgit v1.2.3