diff options
author | Jörg Frings-Fürst <debian@jff-webhosting.net> | 2019-07-24 09:57:09 +0200 |
---|---|---|
committer | Jörg Frings-Fürst <debian@jff-webhosting.net> | 2019-07-24 09:57:09 +0200 |
commit | c7665433b2004d2b404d6fb9d6fd064998486f63 (patch) | |
tree | 8525ef6d24f7c6ceb238945ebb2cc997c7afc905 /testing/framework/TestCmd.py | |
parent | e48d2727885efda8369c7edbc2e3929a59532adc (diff) | |
parent | 6e228c305122f0564eda1e67d56651f8386d24d7 (diff) |
Merge branch 'release/debian/3.1.0+repack-1'debian/3.1.0+repack-1
Diffstat (limited to 'testing/framework/TestCmd.py')
-rw-r--r-- | testing/framework/TestCmd.py | 1999 |
1 files changed, 1999 insertions, 0 deletions
diff --git a/testing/framework/TestCmd.py b/testing/framework/TestCmd.py new file mode 100644 index 0000000..81e03f3 --- /dev/null +++ b/testing/framework/TestCmd.py @@ -0,0 +1,1999 @@ +""" +TestCmd.py: a testing framework for commands and scripts. + +The TestCmd module provides a framework for portable automated testing +of executable commands and scripts (in any language, not just Python), +especially commands and scripts that require file system interaction. + +In addition to running tests and evaluating conditions, the TestCmd +module manages and cleans up one or more temporary workspace +directories, and provides methods for creating files and directories in +those workspace directories from in-line data, here-documents), allowing +tests to be completely self-contained. + +A TestCmd environment object is created via the usual invocation: + + import TestCmd + test = TestCmd.TestCmd() + +There are a bunch of keyword arguments available at instantiation: + + test = TestCmd.TestCmd(description = 'string', + program = 'program_or_script_to_test', + interpreter = 'script_interpreter', + workdir = 'prefix', + subdir = 'subdir', + verbose = Boolean, + match = default_match_function, + match_stdout = default_match_stdout_function, + match_stderr = default_match_stderr_function, + diff = default_diff_stderr_function, + diff_stdout = default_diff_stdout_function, + diff_stderr = default_diff_stderr_function, + combine = Boolean) + +There are a bunch of methods that let you do different things: + + test.verbose_set(1) + + test.description_set('string') + + test.program_set('program_or_script_to_test') + + test.interpreter_set('script_interpreter') + test.interpreter_set(['script_interpreter', 'arg']) + + test.workdir_set('prefix') + test.workdir_set('') + + test.workpath('file') + test.workpath('subdir', 'file') + + test.subdir('subdir', ...) + + test.rmdir('subdir', ...) + + test.write('file', "contents\n") + test.write(['subdir', 'file'], "contents\n") + + test.read('file') + test.read(['subdir', 'file']) + test.read('file', mode) + test.read(['subdir', 'file'], mode) + + test.writable('dir', 1) + test.writable('dir', None) + + test.preserve(condition, ...) + + test.cleanup(condition) + + test.command_args(program = 'program_or_script_to_run', + interpreter = 'script_interpreter', + arguments = 'arguments to pass to program') + + test.run(program = 'program_or_script_to_run', + interpreter = 'script_interpreter', + arguments = 'arguments to pass to program', + chdir = 'directory_to_chdir_to', + stdin = 'input to feed to the program\n') + universal_newlines = True) + + p = test.start(program = 'program_or_script_to_run', + interpreter = 'script_interpreter', + arguments = 'arguments to pass to program', + universal_newlines = None) + + test.finish(self, p) + + test.pass_test() + test.pass_test(condition) + test.pass_test(condition, function) + + test.fail_test() + test.fail_test(condition) + test.fail_test(condition, function) + test.fail_test(condition, function, skip) + test.fail_test(condition, function, skip, message) + + test.no_result() + test.no_result(condition) + test.no_result(condition, function) + test.no_result(condition, function, skip) + + test.stdout() + test.stdout(run) + + test.stderr() + test.stderr(run) + + test.symlink(target, link) + + test.banner(string) + test.banner(string, width) + + test.diff(actual, expected) + + test.diff_stderr(actual, expected) + + test.diff_stdout(actual, expected) + + test.match(actual, expected) + + test.match_stderr(actual, expected) + + test.match_stdout(actual, expected) + + test.set_match_function(match, stdout, stderr) + + test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n") + test.match_exact(["actual 1\n", "actual 2\n"], + ["expected 1\n", "expected 2\n"]) + test.match_caseinsensitive("Actual 1\nACTUAL 2\n", "expected 1\nEXPECTED 2\n") + + test.match_re("actual 1\nactual 2\n", regex_string) + test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes) + + test.match_re_dotall("actual 1\nactual 2\n", regex_string) + test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes) + + test.tempdir() + test.tempdir('temporary-directory') + + test.sleep() + test.sleep(seconds) + + test.where_is('foo') + test.where_is('foo', 'PATH1:PATH2') + test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4') + + test.unlink('file') + test.unlink('subdir', 'file') + +The TestCmd module provides pass_test(), fail_test(), and no_result() +unbound functions that report test results for use with the Aegis change +management system. These methods terminate the test immediately, +reporting PASSED, FAILED, or NO RESULT respectively, and exiting with +status 0 (success), 1 or 2 respectively. This allows for a distinction +between an actual failed test and a test that could not be properly +evaluated because of an external condition (such as a full file system +or incorrect permissions). + + import TestCmd + + TestCmd.pass_test() + TestCmd.pass_test(condition) + TestCmd.pass_test(condition, function) + + TestCmd.fail_test() + TestCmd.fail_test(condition) + TestCmd.fail_test(condition, function) + TestCmd.fail_test(condition, function, skip) + TestCmd.fail_test(condition, function, skip, message) + + TestCmd.no_result() + TestCmd.no_result(condition) + TestCmd.no_result(condition, function) + TestCmd.no_result(condition, function, skip) + +The TestCmd module also provides unbound global functions that handle +matching in the same way as the match_*() methods described above. + + import TestCmd + + test = TestCmd.TestCmd(match = TestCmd.match_exact) + + test = TestCmd.TestCmd(match = TestCmd.match_caseinsensitive) + + test = TestCmd.TestCmd(match = TestCmd.match_re) + + test = TestCmd.TestCmd(match = TestCmd.match_re_dotall) + +These functions are also available as static methods: + + import TestCmd + + test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_exact) + + test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_caseinsensitive) + + test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_re) + + test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_re_dotall) + +These static methods can be accessed by a string naming the method: + + import TestCmd + + test = TestCmd.TestCmd(match = 'match_exact') + + test = TestCmd.TestCmd(match = 'match_caseinsensitive') + + test = TestCmd.TestCmd(match = 'match_re') + + test = TestCmd.TestCmd(match = 'match_re_dotall') + +The TestCmd module provides unbound global functions that can be used +for the "diff" argument to TestCmd.TestCmd instantiation: + + import TestCmd + + test = TestCmd.TestCmd(match = TestCmd.match_re, + diff = TestCmd.diff_re) + + test = TestCmd.TestCmd(diff = TestCmd.simple_diff) + + test = TestCmd.TestCmd(diff = TestCmd.context_diff) + + test = TestCmd.TestCmd(diff = TestCmd.unified_diff) + +These functions are also available as static methods: + + import TestCmd + + test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_re, + diff = TestCmd.TestCmd.diff_re) + + test = TestCmd.TestCmd(diff = TestCmd.TestCmd.simple_diff) + + test = TestCmd.TestCmd(diff = TestCmd.TestCmd.context_diff) + + test = TestCmd.TestCmd(diff = TestCmd.TestCmd.unified_diff) + +These static methods can be accessed by a string naming the method: + + import TestCmd + + test = TestCmd.TestCmd(match = 'match_re', diff = 'diff_re') + + test = TestCmd.TestCmd(diff = 'simple_diff') + + test = TestCmd.TestCmd(diff = 'context_diff') + + test = TestCmd.TestCmd(diff = 'unified_diff') + +The "diff" argument can also be used with standard difflib functions: + + import difflib + + test = TestCmd.TestCmd(diff = difflib.context_diff) + + test = TestCmd.TestCmd(diff = difflib.unified_diff) + +Lastly, the where_is() method also exists in an unbound function +version. + + import TestCmd + + TestCmd.where_is('foo') + TestCmd.where_is('foo', 'PATH1:PATH2') + TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4') +""" + +# Copyright 2000-2010 Steven Knight +# This module is free software, and you may redistribute it and/or modify +# it under the same terms as Python itself, so long as this copyright message +# and disclaimer are retained in their original form. +# +# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, +# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF +# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH +# DAMAGE. +# +# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, +# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, +# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. +from __future__ import division, print_function + +__author__ = "Steven Knight <knight at baldmt dot com>" +__revision__ = "TestCmd.py 1.3.D001 2010/06/03 12:58:27 knight" +__version__ = "1.3" + +import atexit +import difflib +import errno +import os +import re +import shutil +import signal +import stat +import sys +import tempfile +import threading +import time +import traceback +import types + + +IS_PY3 = sys.version_info[0] == 3 +IS_WINDOWS = sys.platform == 'win32' +IS_64_BIT = sys.maxsize > 2**32 + +class null(object): + pass + + +_Null = null() + +try: + from collections import UserList, UserString +except ImportError: + # no 'collections' module or no UserFoo in collections + exec('from UserList import UserList') + exec('from UserString import UserString') + +__all__ = [ + 'diff_re', + 'fail_test', + 'no_result', + 'pass_test', + 'match_exact', + 'match_caseinsensitive', + 'match_re', + 'match_re_dotall', + 'python', + '_python_', + 'TestCmd', + 'to_bytes', + 'to_str', +] + + +def is_List(e): + return isinstance(e, (list, UserList)) + + +def to_bytes(s): + if isinstance(s, bytes) or bytes is str: + return s + return bytes(s, 'utf-8') + + +def to_str(s): + if bytes is str or is_String(s): + return s + return str(s, 'utf-8') + + +try: + eval('unicode') +except NameError: + def is_String(e): + return isinstance(e, (str, UserString)) +else: + def is_String(e): + return isinstance(e, (str, unicode, UserString)) + +testprefix = 'testcmd.' +if os.name in ('posix', 'nt'): + testprefix += "%s." % str(os.getpid()) + +re_space = re.compile(r'\s') + + +def _caller(tblist, skip): + string = "" + arr = [] + for file, line, name, text in tblist: + if file[-10:] == "TestCmd.py": + break + arr = [(file, line, name, text)] + arr + atfrom = "at" + for file, line, name, text in arr[skip:]: + if name in ("?", "<module>"): + name = "" + else: + name = " (" + name + ")" + string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name)) + atfrom = "\tfrom" + return string + + +def fail_test(self=None, condition=1, function=None, skip=0, message=None): + """Cause the test to fail. + + By default, the fail_test() method reports that the test FAILED + and exits with a status of 1. If a condition argument is supplied, + the test fails only if the condition is true. + """ + if not condition: + return + if not function is None: + function() + of = "" + desc = "" + sep = " " + if not self is None: + if self.program: + of = " of " + self.program + sep = "\n\t" + if self.description: + desc = " [" + self.description + "]" + sep = "\n\t" + + at = _caller(traceback.extract_stack(), skip) + if message: + msg = "\t%s\n" % message + else: + msg = "" + sys.stderr.write("FAILED test" + of + desc + sep + at + msg) + + sys.exit(1) + + +def no_result(self=None, condition=1, function=None, skip=0): + """Causes a test to exit with no valid result. + + By default, the no_result() method reports NO RESULT for the test + and exits with a status of 2. If a condition argument is supplied, + the test fails only if the condition is true. + """ + if not condition: + return + if not function is None: + function() + of = "" + desc = "" + sep = " " + if not self is None: + if self.program: + of = " of " + self.program + sep = "\n\t" + if self.description: + desc = " [" + self.description + "]" + sep = "\n\t" + + at = _caller(traceback.extract_stack(), skip) + sys.stderr.write("NO RESULT for test" + of + desc + sep + at) + + sys.exit(2) + + +def pass_test(self=None, condition=1, function=None): + """Causes a test to pass. + + By default, the pass_test() method reports PASSED for the test + and exits with a status of 0. If a condition argument is supplied, + the test passes only if the condition is true. + """ + if not condition: + return + if not function is None: + function() + sys.stderr.write("PASSED\n") + sys.exit(0) + + +def match_exact(lines=None, matches=None, newline=os.sep): + """ + Match function using exact match. + + :param lines: data lines + :type lines: str or list[str] + :param matches: expected lines to match + :type matches: str or list[str] + :param newline: line separator + :returns: an object (1) on match, else None, like re.match + """ + + if isinstance(lines, bytes) or bytes is str: + newline = to_bytes(newline) + + if not is_List(lines): + lines = lines.split(newline) + if not is_List(matches): + matches = matches.split(newline) + if len(lines) != len(matches): + return None + for line, match in zip(lines, matches): + if line != match: + return None + return 1 + + +def match_caseinsensitive(lines=None, matches=None): + """ + Match function using case-insensitive matching. + + Only a simplistic comparison is done, based on lowercasing the + strings. This has plenty of holes for unicode data using + non-English languages. + + TODO: casefold() is better than lower() if we don't need Py2 support. + + :param lines: data lines + :type lines: str or list[str] + :param matches: expected lines to match + :type matches: str or list[str] + :returns: True or False + :returns: an object (1) on match, else None, like re.match + """ + if not is_List(lines): + lines = lines.split("\n") + if not is_List(matches): + matches = matches.split("\n") + if len(lines) != len(matches): + return None + for line, match in zip(lines, matches): + if line.lower() != match.lower(): + return None + return 1 + + +def match_re(lines=None, res=None): + """ + Match function using line-by-line regular expression match. + + :param lines: data lines + :type lines: str or list[str] + :param res: regular expression(s) for matching + :type res: str or list[str] + :returns: an object (1) on match, else None, like re.match + """ + if not is_List(lines): + # CRs mess up matching (Windows) so split carefully + lines = re.split('\r?\n', lines) + if not is_List(res): + res = res.split("\n") + if len(lines) != len(res): + print("match_re: expected %d lines, found %d" % (len(res), len(lines))) + return None + for i, (line, regex) in enumerate(zip(lines, res)): + s = r"^{}$".format(regex) + try: + expr = re.compile(s) + except re.error as e: + msg = "Regular expression error in %s: %s" + raise re.error(msg % (repr(s), e.args[0])) + if not expr.search(line): + miss_tmpl = "match_re: mismatch at line {}:\n search re='{}'\n line='{}'" + print(miss_tmpl.format(i, s, line)) + return None + return 1 + + +def match_re_dotall(lines=None, res=None): + """ + Match function using regular expression match. + + Unlike match_re, the arguments are converted to strings (if necessary) + and must match exactly. + + :param lines: data lines + :type lines: str or list[str] + :param res: regular expression(s) for matching + :type res: str or list[str] + :returns: a match object, or None as for re.match + """ + if not isinstance(lines, str): + lines = "\n".join(lines) + if not isinstance(res, str): + res = "\n".join(res) + s = r"^{}$".format(res) + try: + expr = re.compile(s, re.DOTALL) + except re.error as e: + msg = "Regular expression error in %s: %s" + raise re.error(msg % (repr(s), e.args[0])) + return expr.match(lines) + + +def simple_diff(a, b, fromfile='', tofile='', + fromfiledate='', tofiledate='', n=0, lineterm=''): + r""" + Compare two sequences of lines; generate the delta as a simple diff. + + Similar to difflib.context_diff and difflib.unified_diff but + output is like from the 'diff" command without arguments. The function + keeps the same signature as the difflib ones so they will be + interchangeable, but except for lineterm, the arguments beyond the + two sequences are ignored in this version. By default, the + diff is not created with trailing newlines, set the lineterm + argument to '\n' to do so. + + :raises re.error: if a regex fails to compile + + Example: + + >>> print(''.join(simple_diff('one\ntwo\nthree\nfour\n'.splitlines(True), + ... 'zero\none\ntree\nfour\n'.splitlines(True), lineterm='\n'))) + 0a1 + > zero + 2,3c3 + < two + < three + --- + > tree + + """ + a = [to_str(q) for q in a] + b = [to_str(q) for q in b] + sm = difflib.SequenceMatcher(None, a, b) + + def comma(x1, x2): + return x1 + 1 == x2 and str(x2) or '%s,%s' % (x1 + 1, x2) + + for op, a1, a2, b1, b2 in sm.get_opcodes(): + if op == 'delete': + yield "{}d{}{}".format(comma(a1, a2), b1, lineterm) + for l in a[a1:a2]: + yield '< ' + l + elif op == 'insert': + yield "{}a{}{}".format(a1, comma(b1, b2), lineterm) + for l in b[b1:b2]: + yield '> ' + l + elif op == 'replace': + yield "{}c{}{}".format(comma(a1, a2), comma(b1, b2), lineterm) + for l in a[a1:a2]: + yield '< ' + l + yield '---{}'.format(lineterm) + for l in b[b1:b2]: + yield '> ' + l + + +def diff_re(a, b, fromfile='', tofile='', + fromfiledate='', tofiledate='', n=3, lineterm='\n'): + """ + Compare a and b (lists of strings) where a are regexes. + + A simple "diff" of two sets of lines when the expected lines + are regular expressions. This is a really dumb thing that + just compares each line in turn, so it doesn't look for + chunks of matching lines and the like--but at least it lets + you know exactly which line first didn't compare correctl... + """ + result = [] + diff = len(a) - len(b) + if diff < 0: + a = a + [''] * (-diff) + elif diff > 0: + b = b + [''] * diff + for i, (aline, bline) in enumerate(zip(a, b)): + s = r"^{}$".format(aline) + try: + expr = re.compile(s) + except re.error as e: + msg = "Regular expression error in %s: %s" + raise re.error(msg % (repr(s), e.args[0])) + if not expr.search(bline): + result.append("%sc%s" % (i + 1, i + 1)) + result.append('< ' + repr(a[i])) + result.append('---') + result.append('> ' + repr(b[i])) + return result + + +if os.name == 'posix': + def escape(arg): + "escape shell special characters" + slash = '\\' + special = '"$' + arg = arg.replace(slash, slash + slash) + for c in special: + arg = arg.replace(c, slash + c) + if re_space.search(arg): + arg = '"' + arg + '"' + return arg +else: + # Windows does not allow special characters in file names + # anyway, so no need for an escape function, we will just quote + # the arg. + def escape(arg): + if re_space.search(arg): + arg = '"' + arg + '"' + return arg + +if os.name == 'java': + python = os.path.join(sys.prefix, 'jython') +else: + python = os.environ.get('python_executable', sys.executable) +_python_ = escape(python) + +if sys.platform == 'win32': + + default_sleep_seconds = 2 + + def where_is(file, path=None, pathext=None): + if path is None: + path = os.environ['PATH'] + if is_String(path): + path = path.split(os.pathsep) + if pathext is None: + pathext = os.environ['PATHEXT'] + if is_String(pathext): + pathext = pathext.split(os.pathsep) + for ext in pathext: + if ext.lower() == file[-len(ext):].lower(): + pathext = [''] + break + for dir in path: + f = os.path.join(dir, file) + for ext in pathext: + fext = f + ext + if os.path.isfile(fext): + return fext + return None + +else: + + def where_is(file, path=None, pathext=None): + if path is None: + path = os.environ['PATH'] + if is_String(path): + path = path.split(os.pathsep) + for dir in path: + f = os.path.join(dir, file) + if os.path.isfile(f): + try: + st = os.stat(f) + except OSError: + continue + if stat.S_IMODE(st[stat.ST_MODE]) & 0o111: + return f + return None + + default_sleep_seconds = 1 + + +import subprocess + +try: + subprocess.Popen.terminate +except AttributeError: + if sys.platform == 'win32': + import win32process + + def terminate(self): + win32process.TerminateProcess(self._handle, 1) + else: + def terminate(self): + os.kill(self.pid, signal.SIGTERM) + method = types.MethodType(terminate, None, subprocess.Popen) + setattr(subprocess.Popen, 'terminate', method) + + +# From Josiah Carlson, +# ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms +# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554 + +PIPE = subprocess.PIPE + +if sys.platform == 'win32': # and subprocess.mswindows: + try: + from win32file import ReadFile, WriteFile + from win32pipe import PeekNamedPipe + except ImportError: + # If PyWin32 is not available, try ctypes instead + # XXX These replicate _just_enough_ PyWin32 behaviour for our purposes + import ctypes + from ctypes.wintypes import DWORD + + def ReadFile(hFile, bufSize, ol=None): + assert ol is None + lpBuffer = ctypes.create_string_buffer(bufSize) + bytesRead = DWORD() + bErr = ctypes.windll.kernel32.ReadFile( + hFile, lpBuffer, bufSize, ctypes.byref(bytesRead), ol) + if not bErr: + raise ctypes.WinError() + return (0, ctypes.string_at(lpBuffer, bytesRead.value)) + + def WriteFile(hFile, data, ol=None): + assert ol is None + bytesWritten = DWORD() + bErr = ctypes.windll.kernel32.WriteFile( + hFile, data, len(data), ctypes.byref(bytesWritten), ol) + if not bErr: + raise ctypes.WinError() + return (0, bytesWritten.value) + + def PeekNamedPipe(hPipe, size): + assert size == 0 + bytesAvail = DWORD() + bErr = ctypes.windll.kernel32.PeekNamedPipe( + hPipe, None, size, None, ctypes.byref(bytesAvail), None) + if not bErr: + raise ctypes.WinError() + return ("", bytesAvail.value, None) + import msvcrt +else: + import select + import fcntl + + try: + fcntl.F_GETFL + except AttributeError: + fcntl.F_GETFL = 3 + + try: + fcntl.F_SETFL + except AttributeError: + fcntl.F_SETFL = 4 + + +class Popen(subprocess.Popen): + def recv(self, maxsize=None): + return self._recv('stdout', maxsize) + + def recv_err(self, maxsize=None): + return self._recv('stderr', maxsize) + + def send_recv(self, input='', maxsize=None): + return self.send(input), self.recv(maxsize), self.recv_err(maxsize) + + def get_conn_maxsize(self, which, maxsize): + if maxsize is None: + maxsize = 1024 + elif maxsize < 1: + maxsize = 1 + return getattr(self, which), maxsize + + def _close(self, which): + getattr(self, which).close() + setattr(self, which, None) + + if sys.platform == 'win32': # and subprocess.mswindows: + def send(self, input): + input = to_bytes(input) + if not self.stdin: + return None + + try: + x = msvcrt.get_osfhandle(self.stdin.fileno()) + (errCode, written) = WriteFile(x, input) + except ValueError: + return self._close('stdin') + except (subprocess.pywintypes.error, Exception) as why: + if why.args[0] in (109, errno.ESHUTDOWN): + return self._close('stdin') + raise + + return written + + def _recv(self, which, maxsize): + conn, maxsize = self.get_conn_maxsize(which, maxsize) + if conn is None: + return None + + try: + x = msvcrt.get_osfhandle(conn.fileno()) + (read, nAvail, nMessage) = PeekNamedPipe(x, 0) + if maxsize < nAvail: + nAvail = maxsize + if nAvail > 0: + (errCode, read) = ReadFile(x, nAvail, None) + except ValueError: + return self._close(which) + except (subprocess.pywintypes.error, Exception) as why: + if why.args[0] in (109, errno.ESHUTDOWN): + return self._close(which) + raise + + # if self.universal_newlines: + # read = self._translate_newlines(read) + return read + + else: + def send(self, input): + if not self.stdin: + return None + + if not select.select([], [self.stdin], [], 0)[1]: + return 0 + + try: + written = os.write(self.stdin.fileno(), + bytearray(input, 'utf-8')) + except OSError as why: + if why.args[0] == errno.EPIPE: # broken pipe + return self._close('stdin') + raise + + return written + + def _recv(self, which, maxsize): + conn, maxsize = self.get_conn_maxsize(which, maxsize) + if conn is None: + return None + + try: + flags = fcntl.fcntl(conn, fcntl.F_GETFL) + except TypeError: + flags = None + else: + if not conn.closed: + fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + try: + if not select.select([conn], [], [], 0)[0]: + return '' + + r = conn.read(maxsize) + if not r: + return self._close(which) + + # if self.universal_newlines: + # r = self._translate_newlines(r) + return r + finally: + if not conn.closed and not flags is None: + fcntl.fcntl(conn, fcntl.F_SETFL, flags) + + +disconnect_message = "Other end disconnected!" + + +def recv_some(p, t=.1, e=1, tr=5, stderr=0): + if tr < 1: + tr = 1 + x = time.time() + t + y = [] + r = '' + pr = p.recv + if stderr: + pr = p.recv_err + while time.time() < x or r: + r = pr() + if r is None: + if e: + raise Exception(disconnect_message) + else: + break + elif r: + y.append(r) + else: + time.sleep(max((x - time.time()) / tr, 0)) + return ''.join(y) + + +def send_all(p, data): + while len(data): + sent = p.send(data) + if sent is None: + raise Exception(disconnect_message) + data = memoryview(data)[sent:] + + +_Cleanup = [] + + +def _clean(): + global _Cleanup + cleanlist = [c for c in _Cleanup if c] + del _Cleanup[:] + cleanlist.reverse() + for test in cleanlist: + test.cleanup() + + +atexit.register(_clean) + + +class TestCmd(object): + """Class TestCmd + """ + + def __init__(self, description=None, + program=None, + interpreter=None, + workdir=None, + subdir=None, + verbose=None, + match=None, + match_stdout=None, + match_stderr=None, + diff=None, + diff_stdout=None, + diff_stderr=None, + combine=0, + universal_newlines=True, + timeout=None): + self.external = os.environ.get('SCONS_EXTERNAL_TEST', 0) + self._cwd = os.getcwd() + self.description_set(description) + self.program_set(program) + self.interpreter_set(interpreter) + if verbose is None: + try: + verbose = max(0, int(os.environ.get('TESTCMD_VERBOSE', 0))) + except ValueError: + verbose = 0 + self.verbose_set(verbose) + self.combine = combine + self.universal_newlines = universal_newlines + self.process = None + self.set_timeout(timeout) + self.set_match_function(match, match_stdout, match_stderr) + self.set_diff_function(diff, diff_stdout, diff_stderr) + self._dirlist = [] + self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0} + preserve_value = os.environ.get('PRESERVE', False) + if preserve_value not in [0, '0', 'False']: + self._preserve['pass_test'] = os.environ['PRESERVE'] + self._preserve['fail_test'] = os.environ['PRESERVE'] + self._preserve['no_result'] = os.environ['PRESERVE'] + else: + try: + self._preserve['pass_test'] = os.environ['PRESERVE_PASS'] + except KeyError: + pass + try: + self._preserve['fail_test'] = os.environ['PRESERVE_FAIL'] + except KeyError: + pass + try: + self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT'] + except KeyError: + pass + self._stdout = [] + self._stderr = [] + self.status = None + self.condition = 'no_result' + self.workdir_set(workdir) + self.subdir(subdir) + self.fixture_dirs = [] + + try: + self.fixture_dirs = (os.environ['FIXTURE_DIRS']).split(os.pathsep) + except KeyError: + pass + + + def __del__(self): + self.cleanup() + + def __repr__(self): + return "%x" % id(self) + + banner_char = '=' + banner_width = 80 + + def banner(self, s, width=None): + if width is None: + width = self.banner_width + return s + self.banner_char * (width - len(s)) + + escape = staticmethod(escape) + + def canonicalize(self, path): + if is_List(path): + path = os.path.join(*tuple(path)) + if not os.path.isabs(path): + path = os.path.join(self.workdir, path) + return path + + def chmod(self, path, mode): + """Changes permissions on the specified file or directory + path name.""" + path = self.canonicalize(path) + os.chmod(path, mode) + + def cleanup(self, condition=None): + """Removes any temporary working directories for the specified + TestCmd environment. If the environment variable PRESERVE was + set when the TestCmd environment was created, temporary working + directories are not removed. If any of the environment variables + PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set + when the TestCmd environment was created, then temporary working + directories are not removed if the test passed, failed, or had + no result, respectively. Temporary working directories are also + preserved for conditions specified via the preserve method. + + Typically, this method is not called directly, but is used when + the script exits to clean up temporary working directories as + appropriate for the exit status. + """ + if not self._dirlist: + return + os.chdir(self._cwd) + self.workdir = None + if condition is None: + condition = self.condition + if self._preserve[condition]: + for dir in self._dirlist: + print(u"Preserved directory " + dir) + else: + list = self._dirlist[:] + list.reverse() + for dir in list: + self.writable(dir, 1) + shutil.rmtree(dir, ignore_errors=1) + self._dirlist = [] + + global _Cleanup + if self in _Cleanup: + _Cleanup.remove(self) + + def command_args(self, program=None, + interpreter=None, + arguments=None): + if not self.external: + if program: + if isinstance(program, str) and not os.path.isabs(program): + program = os.path.join(self._cwd, program) + else: + program = self.program + if not interpreter: + interpreter = self.interpreter + else: + if not program: + program = self.program + if not interpreter: + interpreter = self.interpreter + if not isinstance(program, (list, tuple)): + program = [program] + cmd = list(program) + if interpreter: + if not isinstance(interpreter, (list, tuple)): + interpreter = [interpreter] + cmd = list(interpreter) + cmd + if arguments: + if isinstance(arguments, str): + arguments = arguments.split() + cmd.extend(arguments) + return cmd + + def description_set(self, description): + """Set the description of the functionality being tested. + """ + self.description = description + + def set_diff_function(self, diff=_Null, stdout=_Null, stderr=_Null): + """Sets the specified diff functions. + """ + if diff is not _Null: + self._diff_function = diff + if stdout is not _Null: + self._diff_stdout_function = stdout + if stderr is not _Null: + self._diff_stderr_function = stderr + + def diff(self, a, b, name=None, diff_function=None, *args, **kw): + if diff_function is None: + try: + diff_function = getattr(self, self._diff_function) + except TypeError: + diff_function = self._diff_function + if diff_function is None: + diff_function = self.simple_diff + if name is not None: + print(self.banner(name)) + + if not is_List(a): + a=a.splitlines() + if not is_List(b): + b=b.splitlines() + + args = (a, b) + args + for line in diff_function(*args, **kw): + print(line) + + def diff_stderr(self, a, b, *args, **kw): + """Compare actual and expected file contents. + """ + try: + diff_stderr_function = getattr(self, self._diff_stderr_function) + except TypeError: + diff_stderr_function = self._diff_stderr_function + return self.diff(a, b, diff_function=diff_stderr_function, *args, **kw) + + def diff_stdout(self, a, b, *args, **kw): + """Compare actual and expected file contents. + """ + try: + diff_stdout_function = getattr(self, self._diff_stdout_function) + except TypeError: + diff_stdout_function = self._diff_stdout_function + return self.diff(a, b, diff_function=diff_stdout_function, *args, **kw) + + simple_diff = staticmethod(simple_diff) + + diff_re = staticmethod(diff_re) + + context_diff = staticmethod(difflib.context_diff) + + unified_diff = staticmethod(difflib.unified_diff) + + def fail_test(self, condition=1, function=None, skip=0, message=None): + """Cause the test to fail. + """ + if not condition: + return + self.condition = 'fail_test' + fail_test(self=self, + condition=condition, + function=function, + skip=skip, + message=message) + + def interpreter_set(self, interpreter): + """Set the program to be used to interpret the program + under test as a script. + """ + self.interpreter = interpreter + + def set_match_function(self, match=_Null, stdout=_Null, stderr=_Null): + """Sets the specified match functions. + """ + if match is not _Null: + self._match_function = match + if stdout is not _Null: + self._match_stdout_function = stdout + if stderr is not _Null: + self._match_stderr_function = stderr + + def match(self, lines, matches): + """Compare actual and expected file contents. + """ + try: + match_function = getattr(self, self._match_function) + except TypeError: + match_function = self._match_function + if match_function is None: + # Default is regular expression matches. + match_function = self.match_re + return match_function(lines, matches) + + def match_stderr(self, lines, matches): + """Compare actual and expected file contents. + """ + try: + match_stderr_function = getattr(self, self._match_stderr_function) + except TypeError: + match_stderr_function = self._match_stderr_function + if match_stderr_function is None: + # Default is to use whatever match= is set to. + match_stderr_function = self.match + return match_stderr_function(lines, matches) + + def match_stdout(self, lines, matches): + """Compare actual and expected file contents. + """ + try: + match_stdout_function = getattr(self, self._match_stdout_function) + except TypeError: + match_stdout_function = self._match_stdout_function + if match_stdout_function is None: + # Default is to use whatever match= is set to. + match_stdout_function = self.match + return match_stdout_function(lines, matches) + + match_exact = staticmethod(match_exact) + + match_caseinsensitive = staticmethod(match_caseinsensitive) + + match_re = staticmethod(match_re) + + match_re_dotall = staticmethod(match_re_dotall) + + def no_result(self, condition=1, function=None, skip=0): + """Report that the test could not be run. + """ + if not condition: + return + self.condition = 'no_result' + no_result(self=self, + condition=condition, + function=function, + skip=skip) + + def pass_test(self, condition=1, function=None): + """Cause the test to pass. + """ + if not condition: + return + self.condition = 'pass_test' + pass_test(self=self, condition=condition, function=function) + + def preserve(self, *conditions): + """Arrange for the temporary working directories for the + specified TestCmd environment to be preserved for one or more + conditions. If no conditions are specified, arranges for + the temporary working directories to be preserved for all + conditions. + """ + if not conditions: + conditions = ('pass_test', 'fail_test', 'no_result') + for cond in conditions: + self._preserve[cond] = 1 + + def program_set(self, program): + """Set the executable program or script to be tested. + """ + if not self.external: + if program and not os.path.isabs(program): + program = os.path.join(self._cwd, program) + self.program = program + + def read(self, file, mode='rb', newline=None): + """Reads and returns the contents of the specified file name. + + The file name may be a list, in which case the elements are + concatenated with the os.path.join() method. The file is + assumed to be under the temporary working directory unless it + is an absolute path name. The I/O mode for the file may + be specified; it must begin with an 'r'. The default is + 'rb' (binary read). + """ + file = self.canonicalize(file) + if mode[0] != 'r': + raise ValueError("mode must begin with 'r'") + if IS_PY3 and 'b' not in mode: + with open(file, mode, newline=newline) as f: + return f.read() + else: + with open(file, mode) as f: + return f.read() + + def rmdir(self, dir): + """Removes the specified dir name. + + The dir name may be a list, in which case the elements are + concatenated with the os.path.join() method. The dir is + assumed to be under the temporary working directory unless it + is an absolute path name. + The dir must be empty. + """ + dir = self.canonicalize(dir) + os.rmdir(dir) + + def _timeout(self): + self.process.terminate() + self.timer.cancel() + self.timer = None + + def set_timeout(self, timeout): + self.timeout = timeout + self.timer = None + + def parse_path(self, path, suppress_current=False): + """Return a list with the single path components of path. + """ + head, tail = os.path.split(path) + result = [] + if not tail: + if head == path: + return [head] + else: + result.append(tail) + head, tail = os.path.split(head) + while head and tail: + result.append(tail) + head, tail = os.path.split(head) + result.append(head or tail) + result.reverse() + + return result + + def dir_fixture(self, srcdir, dstdir=None): + """Copies the contents of the specified folder srcdir from + the directory of the called script, to the current + working directory. + + The srcdir name may be a list, in which case the elements are + concatenated with the os.path.join() method. The dstdir is + assumed to be under the temporary working directory, it gets + created automatically, if it does not already exist. + """ + + if srcdir and self.fixture_dirs and not os.path.isabs(srcdir): + for dir in self.fixture_dirs: + spath = os.path.join(dir, srcdir) + if os.path.isdir(spath): + break + else: + spath = srcdir + + if dstdir: + dstdir = self.canonicalize(dstdir) + else: + dstdir = '.' + + if dstdir != '.' and not os.path.exists(dstdir): + dstlist = self.parse_path(dstdir) + if len(dstlist) > 0 and dstlist[0] == ".": + dstlist = dstlist[1:] + for idx in range(len(dstlist)): + self.subdir(dstlist[:idx + 1]) + + if dstdir and self.workdir: + dstdir = os.path.join(self.workdir, dstdir) + + for entry in os.listdir(spath): + epath = os.path.join(spath, entry) + dpath = os.path.join(dstdir, entry) + if os.path.isdir(epath): + # Copy the subfolder + shutil.copytree(epath, dpath) + else: + shutil.copy(epath, dpath) + + def file_fixture(self, srcfile, dstfile=None): + """Copies the file srcfile from the directory of + the called script, to the current working directory. + + The dstfile is assumed to be under the temporary working + directory unless it is an absolute path name. + If dstfile is specified its target directory gets created + automatically, if it does not already exist. + """ + srcpath, srctail = os.path.split(srcfile) + + if srcpath and (not self.fixture_dirs or os.path.isabs(srcpath)): + spath = srcfile + else: + for dir in self.fixture_dirs: + spath = os.path.join(dir, srcfile) + if os.path.isfile(spath): + break + + if not dstfile: + if srctail: + dpath = os.path.join(self.workdir, srctail) + else: + return + else: + dstpath, dsttail = os.path.split(dstfile) + if dstpath: + if not os.path.exists(os.path.join(self.workdir, dstpath)): + dstlist = self.parse_path(dstpath) + if len(dstlist) > 0 and dstlist[0] == ".": + dstlist = dstlist[1:] + for idx in range(len(dstlist)): + self.subdir(dstlist[:idx + 1]) + + dpath = os.path.join(self.workdir, dstfile) + shutil.copy(spath, dpath) + + def start(self, program=None, + interpreter=None, + arguments=None, + universal_newlines=None, + timeout=_Null, + **kw): + """ + Starts a program or script for the test environment. + + The specified program will have the original directory + prepended unless it is enclosed in a [list]. + """ + cmd = self.command_args(program, interpreter, arguments) + if self.verbose: + cmd_string = ' '.join([self.escape(c) for c in cmd]) + sys.stderr.write(cmd_string + "\n") + if universal_newlines is None: + universal_newlines = self.universal_newlines + + # On Windows, if we make stdin a pipe when we plan to send + # no input, and the test program exits before + # Popen calls msvcrt.open_osfhandle, that call will fail. + # So don't use a pipe for stdin if we don't need one. + stdin = kw.get('stdin', None) + if stdin is not None: + stdin = subprocess.PIPE + + combine = kw.get('combine', self.combine) + if combine: + stderr_value = subprocess.STDOUT + else: + stderr_value = subprocess.PIPE + + if timeout is _Null: + timeout = self.timeout + if timeout: + self.timer = threading.Timer(float(timeout), self._timeout) + self.timer.start() + + if IS_PY3 and sys.platform == 'win32': + # Set this otherwist stdout/stderr pipes default to + # windows default locale cp1252 which will throw exception + # if using non-ascii characters. + # For example test/Install/non-ascii-name.py + os.environ['PYTHONIOENCODING'] = 'utf-8' + + # It seems that all pythons up to py3.6 still set text mode if you set encoding. + # TODO: File enhancement request on python to propagate universal_newlines even + # if encoding is set.hg c + p = Popen(cmd, + stdin=stdin, + stdout=subprocess.PIPE, + stderr=stderr_value, + env=os.environ, + universal_newlines=False) + + self.process = p + return p + + @staticmethod + def fix_binary_stream(stream): + """ + Handle stdout/stderr from popen when we specify universal_newlines = False. + + This will read from the pipes in binary mode, not decode the output, + and not convert line endings to \n. + We do this because in py3 (3.5) with universal_newlines=True, it will + choose the default system locale to decode the output, and this breaks unicode + output. Specifically breaking test/option--tree.py which outputs a unicode char. + + py 3.6 allows us to pass an encoding param to popen thus not requiring the decode + nor end of line handling, because we propagate universal_newlines as specified. + + TODO: Do we need to pass universal newlines into this function? + """ + + if not stream: + return stream + # TODO: Run full tests on both platforms and see if this fixes failures + # It seems that py3.6 still sets text mode if you set encoding. + elif sys.version_info[0] == 3: # TODO and sys.version_info[1] < 6: + stream = stream.decode('utf-8') + stream = stream.replace('\r\n', '\n') + elif sys.version_info[0] == 2: + stream = stream.replace('\r\n', '\n') + + return stream + + + def finish(self, popen=None, **kw): + """ + Finishes and waits for the process being run under control of + the specified popen argument, recording the exit status, + standard output and error output. + """ + if popen is None: + popen = self.process + stdout, stderr = popen.communicate() + + stdout = self.fix_binary_stream(stdout) + stderr = self.fix_binary_stream(stderr) + + if self.timer: + self.timer.cancel() + self.timer = None + self.status = popen.returncode + self.process = None + self._stdout.append(stdout or '') + self._stderr.append(stderr or '') + + def run(self, program=None, + interpreter=None, + arguments=None, + chdir=None, + stdin=None, + universal_newlines=None, + timeout=_Null): + """Runs a test of the program or script for the test + environment. Standard output and error output are saved for + future retrieval via the stdout() and stderr() methods. + + The specified program will have the original directory + prepended unless it is enclosed in a [list]. + """ + if self.external: + if not program: + program = self.program + if not interpreter: + interpreter = self.interpreter + + if universal_newlines is None: + universal_newlines = self.universal_newlines + + if chdir: + oldcwd = os.getcwd() + if not os.path.isabs(chdir): + chdir = os.path.join(self.workpath(chdir)) + if self.verbose: + sys.stderr.write("chdir(" + chdir + ")\n") + os.chdir(chdir) + p = self.start(program=program, + interpreter=interpreter, + arguments=arguments, + universal_newlines=universal_newlines, + timeout=timeout, + stdin=stdin) + if is_List(stdin): + stdin = ''.join(stdin) + + if stdin and IS_PY3:# and sys.version_info[1] < 6: + stdin = to_bytes(stdin) + + # TODO(sgk): figure out how to re-use the logic in the .finish() + # method above. Just calling it from here causes problems with + # subclasses that redefine .finish(). We could abstract this + # into Yet Another common method called both here and by .finish(), + # but that seems ill-thought-out. + stdout, stderr = p.communicate(input=stdin) + if self.timer: + self.timer.cancel() + self.timer = None + self.status = p.returncode + self.process = None + + stdout = self.fix_binary_stream(stdout) + stderr = self.fix_binary_stream(stderr) + + + self._stdout.append(stdout or '') + self._stderr.append(stderr or '') + + if chdir: + os.chdir(oldcwd) + if self.verbose >= 2: + write = sys.stdout.write + write('============ STATUS: %d\n' % self.status) + out = self.stdout() + if out or self.verbose >= 3: + write('============ BEGIN STDOUT (len=%d):\n' % len(out)) + write(out) + write('============ END STDOUT\n') + err = self.stderr() + if err or self.verbose >= 3: + write('============ BEGIN STDERR (len=%d)\n' % len(err)) + write(err) + write('============ END STDERR\n') + + def sleep(self, seconds=default_sleep_seconds): + """Sleeps at least the specified number of seconds. If no + number is specified, sleeps at least the minimum number of + seconds necessary to advance file time stamps on the current + system. Sleeping more seconds is all right. + """ + time.sleep(seconds) + + def stderr(self, run=None): + """Returns the error output from the specified run number. + If there is no specified run number, then returns the error + output of the last run. If the run number is less than zero, + then returns the error output from that many runs back from the + current run. + """ + if not run: + run = len(self._stderr) + elif run < 0: + run = len(self._stderr) + run + run = run - 1 + return self._stderr[run] + + def stdout(self, run=None): + """ + Returns the stored standard output from a given run. + + Args: + run: run number to select. If run number is omitted, + return the standard output of the most recent run. + If negative, use as a relative offset, so that -2 + means the run two prior to the most recent. + + Returns: + selected stdout string or None if there are no + stored runs. + """ + if not run: + run = len(self._stdout) + elif run < 0: + run = len(self._stdout) + run + run = run - 1 + try: + return self._stdout[run] + except IndexError: + return None + + def subdir(self, *subdirs): + """Create new subdirectories under the temporary working + directory, one for each argument. An argument may be a list, + in which case the list elements are concatenated using the + os.path.join() method. Subdirectories multiple levels deep + must be created using a separate argument for each level: + + test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory']) + + Returns the number of subdirectories actually created. + """ + count = 0 + for sub in subdirs: + if sub is None: + continue + if is_List(sub): + sub = os.path.join(*tuple(sub)) + new = os.path.join(self.workdir, sub) + try: + os.mkdir(new) + except OSError as e: + print("Got error creating dir: %s :%s" % (sub, e)) + pass + else: + count = count + 1 + return count + + def symlink(self, target, link): + """Creates a symlink to the specified target. + The link name may be a list, in which case the elements are + concatenated with the os.path.join() method. The link is + assumed to be under the temporary working directory unless it + is an absolute path name. The target is *not* assumed to be + under the temporary working directory. + """ + if sys.platform == 'win32': + # Skip this on windows as we're not enabling it due to + # it requiring user permissions which aren't always present + # and we don't have a good way to detect those permissions yet. + return + link = self.canonicalize(link) + try: + os.symlink(target, link) + except AttributeError: + pass # Windows has no symlink + + def tempdir(self, path=None): + """Creates a temporary directory. + A unique directory name is generated if no path name is specified. + The directory is created, and will be removed when the TestCmd + object is destroyed. + """ + if path is None: + try: + path = tempfile.mkdtemp(prefix=testprefix) + except TypeError: + path = tempfile.mkdtemp() + else: + os.mkdir(path) + + # Symlinks in the path will report things + # differently from os.getcwd(), so chdir there + # and back to fetch the canonical path. + cwd = os.getcwd() + try: + os.chdir(path) + path = os.getcwd() + finally: + os.chdir(cwd) + + # Uppercase the drive letter since the case of drive + # letters is pretty much random on win32: + drive, rest = os.path.splitdrive(path) + if drive: + path = drive.upper() + rest + + # + self._dirlist.append(path) + + global _Cleanup + if self not in _Cleanup: + _Cleanup.append(self) + + return path + + def touch(self, path, mtime=None): + """Updates the modification time on the specified file or + directory path name. The default is to update to the + current time if no explicit modification time is specified. + """ + path = self.canonicalize(path) + atime = os.path.getatime(path) + if mtime is None: + mtime = time.time() + os.utime(path, (atime, mtime)) + + def unlink(self, file): + """Unlinks the specified file name. + The file name may be a list, in which case the elements are + concatenated with the os.path.join() method. The file is + assumed to be under the temporary working directory unless it + is an absolute path name. + """ + file = self.canonicalize(file) + os.unlink(file) + + def verbose_set(self, verbose): + """Set the verbose level. + """ + self.verbose = verbose + + def where_is(self, file, path=None, pathext=None): + """Find an executable file. + """ + if is_List(file): + file = os.path.join(*tuple(file)) + if not os.path.isabs(file): + file = where_is(file, path, pathext) + return file + + def workdir_set(self, path): + """Creates a temporary working directory with the specified + path name. If the path is a null string (''), a unique + directory name is created. + """ + if (path != None): + if path == '': + path = None + path = self.tempdir(path) + self.workdir = path + + def workpath(self, *args): + """Returns the absolute path name to a subdirectory or file + within the current temporary working directory. Concatenates + the temporary working directory name with the specified + arguments using the os.path.join() method. + """ + return os.path.join(self.workdir, *tuple(args)) + + def readable(self, top, read=1): + """Make the specified directory tree readable (read == 1) + or not (read == None). + + This method has no effect on Windows systems, which use a + completely different mechanism to control file readability. + """ + + if sys.platform == 'win32': + return + + if read: + def do_chmod(fname): + try: + st = os.stat(fname) + except OSError: + pass + else: + os.chmod(fname, stat.S_IMODE( + st[stat.ST_MODE] | stat.S_IREAD)) + else: + def do_chmod(fname): + try: + st = os.stat(fname) + except OSError: + pass + else: + os.chmod(fname, stat.S_IMODE( + st[stat.ST_MODE] & ~stat.S_IREAD)) + + if os.path.isfile(top): + # If it's a file, that's easy, just chmod it. + do_chmod(top) + elif read: + # It's a directory and we're trying to turn on read + # permission, so it's also pretty easy, just chmod the + # directory and then chmod every entry on our walk down the + # tree. + do_chmod(top) + for dirpath, dirnames, filenames in os.walk(top): + for name in dirnames + filenames: + do_chmod(os.path.join(dirpath, name)) + else: + # It's a directory and we're trying to turn off read + # permission, which means we have to chmod the directories + # in the tree bottom-up, lest disabling read permission from + # the top down get in the way of being able to get at lower + # parts of the tree. + for dirpath, dirnames, filenames in os.walk(top, topdown=0): + for name in dirnames + filenames: + do_chmod(os.path.join(dirpath, name)) + do_chmod(top) + + def writable(self, top, write=1): + """Make the specified directory tree writable (write == 1) + or not (write == None). + """ + + if sys.platform == 'win32': + + if write: + def do_chmod(fname): + try: + os.chmod(fname, stat.S_IWRITE) + except OSError: + pass + else: + def do_chmod(fname): + try: + os.chmod(fname, stat.S_IREAD) + except OSError: + pass + + else: + + if write: + def do_chmod(fname): + try: + st = os.stat(fname) + except OSError: + pass + else: + os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE] | 0o200)) + else: + def do_chmod(fname): + try: + st = os.stat(fname) + except OSError: + pass + else: + os.chmod(fname, stat.S_IMODE( + st[stat.ST_MODE] & ~0o200)) + + if os.path.isfile(top): + do_chmod(top) + else: + do_chmod(top) + for dirpath, dirnames, filenames in os.walk(top, topdown=0): + for name in dirnames + filenames: + do_chmod(os.path.join(dirpath, name)) + + def executable(self, top, execute=1): + """Make the specified directory tree executable (execute == 1) + or not (execute == None). + + This method has no effect on Windows systems, which use a + completely different mechanism to control file executability. + """ + + if sys.platform == 'win32': + return + + if execute: + def do_chmod(fname): + try: + st = os.stat(fname) + except OSError: + pass + else: + os.chmod(fname, stat.S_IMODE( + st[stat.ST_MODE] | stat.S_IEXEC)) + else: + def do_chmod(fname): + try: + st = os.stat(fname) + except OSError: + pass + else: + os.chmod(fname, stat.S_IMODE( + st[stat.ST_MODE] & ~stat.S_IEXEC)) + + if os.path.isfile(top): + # If it's a file, that's easy, just chmod it. + do_chmod(top) + elif execute: + # It's a directory and we're trying to turn on execute + # permission, so it's also pretty easy, just chmod the + # directory and then chmod every entry on our walk down the + # tree. + do_chmod(top) + for dirpath, dirnames, filenames in os.walk(top): + for name in dirnames + filenames: + do_chmod(os.path.join(dirpath, name)) + else: + # It's a directory and we're trying to turn off execute + # permission, which means we have to chmod the directories + # in the tree bottom-up, lest disabling execute permission from + # the top down get in the way of being able to get at lower + # parts of the tree. + for dirpath, dirnames, filenames in os.walk(top, topdown=0): + for name in dirnames + filenames: + do_chmod(os.path.join(dirpath, name)) + do_chmod(top) + + def write(self, file, content, mode='wb'): + """Writes the specified content text (second argument) to the + specified file name (first argument). The file name may be + a list, in which case the elements are concatenated with the + os.path.join() method. The file is created under the temporary + working directory. Any subdirectories in the path must already + exist. The I/O mode for the file may be specified; it must + begin with a 'w'. The default is 'wb' (binary write). + """ + file = self.canonicalize(file) + if mode[0] != 'w': + raise ValueError("mode must begin with 'w'") + with open(file, mode) as f: + try: + f.write(content) + except TypeError as e: + # python 3 default strings are not bytes, but unicode + f.write(bytes(content, 'utf-8')) + +# Local Variables: +# tab-width:4 +# indent-tabs-mode:nil +# End: +# vim: set expandtab tabstop=4 shiftwidth=4: |