diff options
Diffstat (limited to 'raphodo/utilities.py')
-rw-r--r-- | raphodo/utilities.py | 806 |
1 files changed, 806 insertions, 0 deletions
diff --git a/raphodo/utilities.py b/raphodo/utilities.py new file mode 100644 index 0000000..c98f46c --- /dev/null +++ b/raphodo/utilities.py @@ -0,0 +1,806 @@ +# Copyright (C) 2007-2017 Damon Lynch <damonlynch@gmail.com> + +# This file is part of Rapid Photo Downloader. +# +# Rapid Photo Downloader is free software: you can redistribute it and/or +# modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Rapid Photo Downloader is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Rapid Photo Downloader. If not, +# see <http://www.gnu.org/licenses/>. + +__author__ = 'Damon Lynch' +__copyright__ = "Copyright 2007-2017, Damon Lynch" + +import contextlib +import locale +import logging +import os +import random +import re +import string +import sys +import tempfile +import time +import tarfile +from collections import namedtuple, defaultdict +from datetime import datetime +from gettext import gettext as _ +from itertools import groupby, zip_longest +from typing import Optional, List, Union, Any +import struct +import ctypes +import signal +import pkg_resources + + +import arrow +import psutil + +import raphodo.__about__ as __about__ + + +# Linux specific code to ensure child processes exit when parent dies +# See http://stackoverflow.com/questions/19447603/ +# how-to-kill-a-python-child-process-created-with-subprocess-check-output-when-t/ +libc = ctypes.CDLL("libc.so.6") +def set_pdeathsig(sig = signal.SIGTERM): + def callable(): + return libc.prctl(1, sig) + return callable + + +def available_cpu_count(physical_only=False) -> int: + """ + Determine the number of CPUs available. + + A CPU is "available" if cpuset has not restricted the number of + cpus. Portions of this code from + http://stackoverflow.com/questions/1006289/how-to-find-out-the-number-of- + cpus-using-python + + :return available CPU count, or 1 if cannot be determined. + Value guaranteed to be >= 1. + """ + + # cpuset may restrict the number of *available* processors + available = None + if sys.platform.startswith('linux'): + try: + m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$', + open('/proc/self/status').read()) + if m: + available = bin(int(m.group(1).replace(',', ''), 16)).count('1') + if available > 0 and not physical_only: + return available + except IOError: + pass + + if physical_only: + physical = psutil.cpu_count(logical=False) + if physical is not None: + if available is not None: + return min(available, physical) + return physical + + c = os.cpu_count() + if c is not None: + return max(c, 1) + c = psutil.cpu_count() + if c is not None: + return max(c, 1) + else: + return 1 + +def confirm(prompt: Optional[str]=None, resp: Optional[bool]=False) -> bool: + r""" + Prompts for yes or no response from the user. + + :param prompt: prompt displayed to user + :param resp: the default value assumed by the caller when user + simply types ENTER. + :return: True for yes and False for no. + """ + + # >>> confirm(prompt='Create Directory?', resp=True) + # Create Directory? [y]|n: + # True + # >>> confirm(prompt='Create Directory?', resp=False) + # Create Directory? [n]|y: + # False + # >>> confirm(prompt='Create Directory?', resp=False) + # Create Directory? [n]|y: y + # True + + if prompt is None: + prompt = 'Confirm' + + if resp: + prompt = '%s [%s]|%s: ' % (prompt, 'y', 'n') + else: + prompt = '%s [%s]|%s: ' % (prompt, 'n', 'y') + + while True: + ans = input(prompt) + if not ans: + return resp + if ans not in ['y', 'Y', 'n', 'N']: + print('please enter y or n.') + continue + return ans in ['y', 'Y'] + + +@contextlib.contextmanager +def stdchannel_redirected(stdchannel, dest_filename): + """ + A context manager to temporarily redirect stdout or stderr + + Usage: + with stdchannel_redirected(sys.stderr, os.devnull): + do_work() + + Source: + http://marc-abramowitz.com/archives/2013/07/19/python-context-manager-for-redirected-stdout-and-stderr/ + """ + oldstdchannel = dest_file = None + try: + oldstdchannel = os.dup(stdchannel.fileno()) + dest_file = open(dest_filename, 'w') + os.dup2(dest_file.fileno(), stdchannel.fileno()) + yield + finally: + if oldstdchannel is not None: + os.dup2(oldstdchannel, stdchannel.fileno()) + if dest_file is not None: + dest_file.close() + +@contextlib.contextmanager +def show_errors(): + yield + +suffixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'] + +def format_size_for_user(size_in_bytes: int, + zero_string: str='', + no_decimals: int=2) -> str: + r""" + Humanize display of bytes. + + Uses Microsoft style i.e. 1000 Bytes = 1 KB + + :param size: size in bytes + :param zero_string: string to use if size == 0 + + >>> format_size_for_user(0) + '' + >>> format_size_for_user(1) + '1 B' + >>> format_size_for_user(123) + '123 B' + >>> format_size_for_user(1000) + '1 KB' + >>> format_size_for_user(1024) + '1.02 KB' + >>> format_size_for_user(1024, no_decimals=0) + '1 KB' + >>> format_size_for_user(1100, no_decimals=2) + '1.1 KB' + >>> format_size_for_user(1000000, no_decimals=2) + '1 MB' + >>> format_size_for_user(1000001, no_decimals=2) + '1 MB' + >>> format_size_for_user(1020001, no_decimals=2) + '1.02 MB' + """ + + if size_in_bytes == 0: return zero_string + i = 0 + while size_in_bytes >= 1000 and i < len(suffixes)-1: + size_in_bytes /= 1000 + i += 1 + + if no_decimals: + s = '{:.{prec}f}'.format(size_in_bytes, prec=no_decimals).rstrip('0').rstrip('.') + else: + s = '{:.0f}'.format(size_in_bytes) + return s + ' ' + suffixes[i] + +def divide_list(source: list, no_pieces: int) -> list: + r""" + Returns a list containing no_pieces lists, with the items + of the original list evenly distributed + :param source: the list to divide + :param no_pieces: the nubmer of pieces the lists + :return: the new list + + >>> divide_list(list(range(12)), 4) + [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11]] + >>> divide_list(list(range(11)), 4) + [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]] + """ + source_size = len(source) + slice_size = source_size // no_pieces + remainder = source_size % no_pieces + result = [] + + extra = 0 + for i in range(no_pieces): + start = i * slice_size + extra + source_slice = source[start:start + slice_size] + if remainder: + source_slice += [source[start + slice_size]] + remainder -= 1 + extra += 1 + result.append(source_slice) + return result + +def divide_list_on_length(source: List, length: int) -> List: + + r""" + Break a list into lists no longer than length. + + >>> l=list(range(11)) + >>> divide_list_on_length(l, 3) + [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]] + >>> l=list(range(12)) + >>> divide_list_on_length(l, 3) + [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11]] + """ + + return [source[i:i+length] for i in range(0, len(source), length)] + +def addPushButtonLabelSpacer(s: str) -> str: + return ' ' + s + + +class GenerateRandomFileName: + def __init__(self): + # the characters used to generate temporary filenames + self.filename_characters = list(string.ascii_letters + string.digits) + + def name(self, extension: str=None) -> str: + """ + + :return: filename 5 characters long without any extension + """ + if extension is not None: + return '{}.{}'.format(''.join( + random.sample(self.filename_characters, 5)), + extension) + else: + return ''.join(random.sample(self.filename_characters, 5)) + + +TempDirs = namedtuple('TempDirs', 'photo_temp_dir, video_temp_dir') +CacheDirs = namedtuple('CacheDirs', 'photo_cache_dir, video_cache_dir') + + +def create_temp_dir(folder: Optional[str]=None, + prefix: Optional[str]=None, + force_no_prefix: bool=False) -> str: + """ + Creates a temporary director and logs errors + :param folder: the folder in which the temporary directory should + be created. If not specified, uses the tempfile.mkstemp default. + :param prefix: any name the directory should start with. If None, + default rpd-tmp will be used as prefix, unless force_no_prefix + is True + :param force_no_prefix: if True, a directory prefix will never + be used + :return: full path of the temporary directory + """ + if prefix is None and not force_no_prefix: + prefix = "rpd-tmp-" + try: + temp_dir = tempfile.mkdtemp(prefix=prefix, dir=folder) + except OSError as inst: + msg = "Failed to create temporary directory in %s: %s %s" % ( + folder, + inst.errno, + inst.strerror) + logging.critical(msg) + temp_dir = None + return temp_dir + + +def create_temp_dirs(photo_download_folder: str, + video_download_folder: str) -> TempDirs: + """ + Create pair of temporary directories for photo and video download + :param photo_download_folder: where photos will be downloaded to + :param video_download_folder: where videos will be downloaded to + :return: the directories + """ + photo_temp_dir = video_temp_dir = None + if photo_download_folder is not None: + photo_temp_dir = create_temp_dir(photo_download_folder) + logging.debug("Photo temporary directory: %s", photo_temp_dir) + if video_download_folder is not None: + video_temp_dir = create_temp_dir(video_download_folder) + logging.debug("Video temporary directory: %s", video_temp_dir) + return TempDirs(photo_temp_dir, video_temp_dir) + + +def same_device(file1: str, file2: str) -> bool: + """ + Returns True if the files / directories are on the same device (partition). + + No error checking. + + :param file1: first file / directory to check + :param file2: second file / directory to check + :return: True if the same file system, else false + """ + + dev1 = os.stat(file1).st_dev + dev2 = os.stat(file2).st_dev + return dev1 == dev2 + + +def find_mount_point(path: str) -> str: + """ + Find the mount point of a path + See: + http://stackoverflow.com/questions/4453602/how-to-find-the-mountpoint-a-file-resides-on + + >>> print(find_mount_point('/crazy/path')) + / + + :param path: + :return: + """ + path = os.path.realpath(path) + while not os.path.ismount(path): + path = os.path.dirname(path) + return path + + +def make_internationalized_list(items: List[str]) -> str: + r""" + Makes a string of items conforming to i18n + + >>> print(make_internationalized_list([])) + <BLANKLINE> + >>> print(make_internationalized_list(['one'])) + one + >>> print(make_internationalized_list(['one', 'two'])) + one and two + >>> print(make_internationalized_list(['one', 'two', 'three'])) + one, two and three + >>> print(make_internationalized_list(['one', 'two', 'three', 'four'])) + one, two, three and four + + Loosely follows the guideline here: + http://cldr.unicode.org/translation/lists + + :param items: the list of items to make a string out of + :return: internationalized string + """ + if len(items) == 1: + return items[0] + if len(items) == 2: + # two things in a list e.g. "device1 and device2" + return _('%(first_item)s and %(last_item)s') % dict( + first_item=items[0], last_item=items[1]) + if len(items) > 2: + s = items[0] + for item in items[1:-1]: + # the middle of a list of things + s = '%(first_items)s, %(last_items)s'% dict(first_items=s, + last_items=item) + # the end of a list of things + s = '%(start_items)s and %(last_item)s' % dict(start_items=s, + last_item=items[-1]) + return s + return '' + + +def thousands(i: int) -> str: + """ + Add a thousands seperator (or its locale equivalent) to an + integer. Assumes the module level locale setting has already been + set. + :param i: the integer e.g. 1000 + :return: string with seperators e.g. '1,000' + """ + try: + return locale.format("%d", i, grouping=True) + except TypeError: + return i + + +# Source of class AdjacentKey, first_and_last and runs: +# http://stupidpythonideas.blogspot.com/2014/01/grouping-into-runs-of-adjacent-values.html +class AdjacentKey: + r""" + >>> [list(g) for k, g in groupby([0, 1, 2, 3, 5, 6, 7, 10, 11, 13, 16], AdjacentKey)] + [[0, 1, 2, 3], [5, 6, 7], [10, 11], [13], [16]] + """ + __slots__ = ['obj'] + + def __init__(self, obj) -> None: + self.obj = obj + + def __eq__(self, other) -> bool: + ret = self.obj - 1 <= other.obj <= self.obj + 1 + if ret: + self.obj = other.obj + return ret + + +def first_and_last(iterable): + start = end = next(iterable) + for end in iterable: pass + return start, end + +def runs(iterable): + r""" + identify adjacent elements in pre-sorted data + + :param iterable: sorted data + + >>> list(runs([0, 1, 2, 3, 5, 6, 7, 10, 11, 13, 16])) + [(0, 3), (5, 7), (10, 11), (13, 13), (16, 16)] + >>> list(runs([0])) + [(0, 0)] + >>> list(runs([0, 1, 10, 100, 101])) + [(0, 1), (10, 10), (100, 101)] + """ + + for k, g in groupby(iterable, AdjacentKey): + yield first_and_last(g) + +numbers = namedtuple('numbers', 'number, plural') + +long_numbers = { + 1: _('one'), + 2: _('two'), + 3: _('three'), + 4: _('four'), + 5: _('five'), + 6: _('six'), + 7: _('seven'), + 8: _('eight'), + 9: _('nine'), + 10: _('ten'), + 11: _('eleven'), + 12: _('twelve'), + 13: _('thirteen'), + 14: _('fourteen'), + 15: _('fifteen'), + 16: _('sixteen'), + 17: _('seventeen'), + 18: _('eighteen'), + 19: _('ninenteen'), + 20: _('twenty') +} + + +def number(value: int) -> numbers: + r""" + Convert integer to written form, e.g. one, two, etc. + + Will propagate TypeError or KeyError on + failure. + + >>> number(1) + numbers(number='one', plural=False) + >>> number(2) + numbers(number='two', plural=True) + >>> number(10) + numbers(number='ten', plural=True) + >>> number(20) + numbers(number='twenty', plural=True) + >>> + + :param value: int between 1 and 20 + :return: tuple of str and whether it is plural + """ + + plural = value > 1 + text = long_numbers[value] + return numbers(text, plural) + + +def datetime_roughly_equal(dt1: Union[datetime, float], dt2: Union[datetime, float], + seconds: int=120) -> bool: + r""" + Check to see if date times are equal, give or take n seconds + :param dt1: python datetime, or timestamp, to check + :param dt2:python datetime, or timestamp to check + :param seconds: number of seconds leeway + :return: True if "equal", False otherwise + + >>> dt1 = datetime.now() + >>> time.sleep(.1) + >>> dt2 = datetime.now() + >>> datetime_roughly_equal(dt1, dt2, 1) + True + >>> dt1 = 1458561776.0 + >>> dt2 = 1458561776.0 + >>> datetime_roughly_equal(dt1, dt2, 120) + True + >>> dt2 += 450 + >>> datetime_roughly_equal(dt1, dt2, 120) + False + >>> datetime_roughly_equal(dt1, dt2, 500) + True + """ + + at1 = arrow.get(dt1) + at2 = arrow.get(dt2) + return at1.replace(seconds=-seconds) < at2 < at1.replace(seconds=+seconds) + + +def process_running(process_name: str, partial_name: bool=True) -> bool: + """ + Search the list of the system's running processes to see if a process with this + name is running + + :param process_name: the name of the process to search for + :param partial_name: if True, the process_name argument can be a + partial match + :return: True if found, else False + """ + + for proc in psutil.process_iter(): + try: + name = proc.name() + except psutil.NoSuchProcess: + pass + else: + if partial_name: + if name.find(process_name) >= 0: + return True + else: + if name == process_name: + return True + return False + +def make_html_path_non_breaking(path: str) -> str: + """ + When /some/path is displayed in rich text, it will be word-wrapped on the + slashes. Inhibit that using a special unicode character. + + :param path: the path + :return: the path containing the special characters + """ + + return path.replace(os.sep, '{}⁠'.format(os.sep)) + + +def prefs_list_from_gconftool2_string(value: str) -> List[str]: + r""" + Take a raw string preference value as returned by gconftool-2 + and convert it to a list of strings. + + Handles escaped characters + + :param value: the raw value as returned by gconftool-2 + :return: the list of strings + + >>> prefs_list_from_gconftool2_string( # doctest: +ELLIPSIS + ... '[Text,IMG_,,Sequences,Stored number,Four digits,Filename,Extension,UPPERCASE]') + ... # doctest: +NORMALIZE_WHITESPACE + ['Text', 'IMG_', '', 'Sequences', 'Stored number', 'Four digits', 'Filename', 'Extension', + 'UPPERCASE'] + >>> prefs_list_from_gconftool2_string('[Text,IMG_\,\\;+=|!@\,#^&*()$%/",,]') + ['Text', 'IMG_,\\;+=|!@,#^&*()$%/"', '', ''] + >>> prefs_list_from_gconftool2_string('[Manila,Dubai,London]') + ['Manila', 'Dubai', 'London'] + """ + # Trim the left and right square brackets + value = value[1:-1] + + # Split on the comma, but not commas that were escaped. + # Use a regex with a negative lookbehind assertion + splits = re.split(r'(?<!\\),', value) + # Replace the escaped commas with just plain commas + return [s.replace('\\,', ',') for s in splits] + + +def pref_bool_from_gconftool2_string(value: str) -> bool: + if value == 'true': + return True + elif value == 'false': + return False + raise ValueError + + +def remove_last_char_from_list_str(items: List[str]) -> List[str]: + r""" + Remove the last character from a list of strings, modifying the list in place, + such that the last item is never empty + + :param items: the list to modify + :return: in place copy + + >>> remove_last_char_from_list_str([' abc', 'def', 'ghi']) + [' abc', 'def', 'gh'] + >>> remove_last_char_from_list_str([' abc', 'def', 'gh'] ) + [' abc', 'def', 'g'] + >>> remove_last_char_from_list_str([' abc', 'def', 'g'] ) + [' abc', 'def'] + >>> remove_last_char_from_list_str([' a']) + [' '] + >>> remove_last_char_from_list_str([' ']) + [] + >>> remove_last_char_from_list_str([]) + [] + """ + if items: + if not items[-1]: + items = items[:-1] + else: + items[-1] = items[-1][:-1] + if items and not items[-1]: + items = items[:-1] + return items + +def platform_c_maxint() -> int: + """ + See http://stackoverflow.com/questions/13795758/what-is-sys-maxint-in-python-3 + + :return: the maximum size of an int in C when compiled the same way Python was + """ + return 2 ** (struct.Struct('i').size * 8 - 1) - 1 + +def commonprefix(*paths) -> str: + """ + Python 3.4 compatible. + + Remove when Python 3.5 becomes the minimum. + """ + + return os.path.dirname(os.path.commonprefix(paths)) + +def _recursive_identify_depth(*paths, depth) -> int: + basenames = [os.path.basename(path) for path in paths] + if len(basenames) != len(set(basenames)): + duplicates = _collect_duplicates(basenames, paths) + + for basename in duplicates: + chop = len(basename) + 1 + chopped = (path[:-chop] for path in duplicates[basename]) + depth = max(depth, _recursive_identify_depth(*chopped, depth=depth + 1)) + return depth + +def _collect_duplicates(basenames, paths): + duplicates = defaultdict(list) + for basename, path in zip(basenames, paths): + duplicates[basename].append(path) + return {basename: paths for basename, paths in duplicates.items() if len(paths) > 1} + +def make_path_end_snippets_unique(*paths) -> List[str]: + r""" + Make list of path ends unique given possible common path endings. + + A snippet starts from the end of the path, in extreme cases possibly up the path start. + + :param paths: sequence of paths to generate unique end snippets for + :return: list of unique snippets + + >>> p0 = '/home/damon/photos' + >>> p1 = '/media/damon/backup1/photos' + >>> p2 = '/media/damon/backup2/photos' + >>> p3 = '/home/damon/videos' + >>> p4 = '/media/damon/backup1/videos' + >>> p5 = '/media/damon/backup2/videos' + >>> p6 = '/media/damon/drive1/home/damon/photos' + >>> s0 = make_path_end_snippets_unique(p0, p3) + >>> print(s0) + ['photos', 'videos'] + >>> s1 = make_path_end_snippets_unique(p0, p1, p2) + >>> print(s1) + ['damon/photos', 'backup1/photos', 'backup2/photos'] + >>> s2 = make_path_end_snippets_unique(p0, p1, p2, p3) + >>> print(s2) + ['damon/photos', 'backup1/photos', 'backup2/photos', 'videos'] + >>> s3 = make_path_end_snippets_unique(p3, p4, p5) + >>> print(s3) + ['damon/videos', 'backup1/videos', 'backup2/videos'] + >>> s4 = make_path_end_snippets_unique(p0, p1, p2, p3, p6) + >>> print(s4) #doctest: +NORMALIZE_WHITESPACE + ['/home/damon/photos', '/media/damon/backup1/photos', '/media/damon/backup2/photos', 'videos', + 'drive1/home/damon/photos'] + >>> s5 = make_path_end_snippets_unique(p1, p2, p3, p6) + >>> print(s5) + ['backup1/photos', 'backup2/photos', 'videos', 'damon/photos'] + """ + + basenames = [os.path.basename(path) for path in paths] + + if len(basenames) != len(set(basenames)): + names = [] + depths = defaultdict(int) + duplicates = _collect_duplicates(basenames, paths) + + for basename, path in zip(basenames, paths): + if basename in duplicates: + depths[basename] = _recursive_identify_depth(*duplicates[basename], depth=0) + + for basename, path in zip(basenames, paths): + depth = depths[basename] + if depth: + dirs = path.split(os.sep) + index = len(dirs) - depth - 1 + name = (os.sep.join(dirs[max(index, 0): ])) + if index > 1: + pass + # name = '...' + name + elif index == 1: + name = os.sep + name + else: + name = basename + names.append(name) + return names + else: + return basenames + +have_logged_os_release = False + +def log_os_release() -> None: + """ + Log the entired contents of /etc/os-release, but only if + we didn't do so already. + """ + + global have_logged_os_release + + if not have_logged_os_release: + try: + with open('/etc/os-release', 'r') as f: + for line in f: + logging.debug(line.rstrip('\n')) + except: + pass + have_logged_os_release = True + + +def extract_file_from_tar(full_tar_path, member_filename) -> bool: + """ + Extracts a file from a tar.gz and places it beside the tar file + :param full_tar_path: path and filename of the tar.gz file + :param member_filename: file wanted + :return: True if successful, False otherwise + """ + + tar_dir, tar_name = os.path.split(full_tar_path) + tar_name = tar_name[:len('.tar.gz') * -1] + member = os.path.join(tar_name, member_filename) + try: + with tarfile.open(full_tar_path) as tar: + tar.extractall(members=(tar.getmember(member),), path=tar_dir) + except Exception: + logging.error('Unable to extract %s from tarfile', member_filename) + return False + else: + try: + src = os.path.join(tar_dir, tar_name, member_filename) + dst = os.path.join(tar_dir, member_filename) + os.rename(src, dst) + os.rmdir(os.path.join(tar_dir, tar_name)) + return True + except OSError: + logging.error('Unable to move %s to new location', member_filename) + return False + + +def current_version_is_dev_version(current_version=None) -> bool: + if current_version is None: + current_version = pkg_resources.parse_version(__about__.__version__) + return current_version.is_prerelease + + +def remove_topmost_directory_from_path(path: str) -> str: + if os.sep not in path: + return path + return path[path[1:].find(os.sep) + 1:] |