summaryrefslogtreecommitdiff
path: root/testing/framework/TestCmd.py
diff options
context:
space:
mode:
authorJörg Frings-Fürst <debian@jff-webhosting.net>2019-07-24 09:57:09 +0200
committerJörg Frings-Fürst <debian@jff-webhosting.net>2019-07-24 09:57:09 +0200
commitc7665433b2004d2b404d6fb9d6fd064998486f63 (patch)
tree8525ef6d24f7c6ceb238945ebb2cc997c7afc905 /testing/framework/TestCmd.py
parente48d2727885efda8369c7edbc2e3929a59532adc (diff)
parent6e228c305122f0564eda1e67d56651f8386d24d7 (diff)
Merge branch 'release/debian/3.1.0+repack-1'debian/3.1.0+repack-1
Diffstat (limited to 'testing/framework/TestCmd.py')
-rw-r--r--testing/framework/TestCmd.py1999
1 files changed, 1999 insertions, 0 deletions
diff --git a/testing/framework/TestCmd.py b/testing/framework/TestCmd.py
new file mode 100644
index 0000000..81e03f3
--- /dev/null
+++ b/testing/framework/TestCmd.py
@@ -0,0 +1,1999 @@
+"""
+TestCmd.py: a testing framework for commands and scripts.
+
+The TestCmd module provides a framework for portable automated testing
+of executable commands and scripts (in any language, not just Python),
+especially commands and scripts that require file system interaction.
+
+In addition to running tests and evaluating conditions, the TestCmd
+module manages and cleans up one or more temporary workspace
+directories, and provides methods for creating files and directories in
+those workspace directories from in-line data, here-documents), allowing
+tests to be completely self-contained.
+
+A TestCmd environment object is created via the usual invocation:
+
+ import TestCmd
+ test = TestCmd.TestCmd()
+
+There are a bunch of keyword arguments available at instantiation:
+
+ test = TestCmd.TestCmd(description = 'string',
+ program = 'program_or_script_to_test',
+ interpreter = 'script_interpreter',
+ workdir = 'prefix',
+ subdir = 'subdir',
+ verbose = Boolean,
+ match = default_match_function,
+ match_stdout = default_match_stdout_function,
+ match_stderr = default_match_stderr_function,
+ diff = default_diff_stderr_function,
+ diff_stdout = default_diff_stdout_function,
+ diff_stderr = default_diff_stderr_function,
+ combine = Boolean)
+
+There are a bunch of methods that let you do different things:
+
+ test.verbose_set(1)
+
+ test.description_set('string')
+
+ test.program_set('program_or_script_to_test')
+
+ test.interpreter_set('script_interpreter')
+ test.interpreter_set(['script_interpreter', 'arg'])
+
+ test.workdir_set('prefix')
+ test.workdir_set('')
+
+ test.workpath('file')
+ test.workpath('subdir', 'file')
+
+ test.subdir('subdir', ...)
+
+ test.rmdir('subdir', ...)
+
+ test.write('file', "contents\n")
+ test.write(['subdir', 'file'], "contents\n")
+
+ test.read('file')
+ test.read(['subdir', 'file'])
+ test.read('file', mode)
+ test.read(['subdir', 'file'], mode)
+
+ test.writable('dir', 1)
+ test.writable('dir', None)
+
+ test.preserve(condition, ...)
+
+ test.cleanup(condition)
+
+ test.command_args(program = 'program_or_script_to_run',
+ interpreter = 'script_interpreter',
+ arguments = 'arguments to pass to program')
+
+ test.run(program = 'program_or_script_to_run',
+ interpreter = 'script_interpreter',
+ arguments = 'arguments to pass to program',
+ chdir = 'directory_to_chdir_to',
+ stdin = 'input to feed to the program\n')
+ universal_newlines = True)
+
+ p = test.start(program = 'program_or_script_to_run',
+ interpreter = 'script_interpreter',
+ arguments = 'arguments to pass to program',
+ universal_newlines = None)
+
+ test.finish(self, p)
+
+ test.pass_test()
+ test.pass_test(condition)
+ test.pass_test(condition, function)
+
+ test.fail_test()
+ test.fail_test(condition)
+ test.fail_test(condition, function)
+ test.fail_test(condition, function, skip)
+ test.fail_test(condition, function, skip, message)
+
+ test.no_result()
+ test.no_result(condition)
+ test.no_result(condition, function)
+ test.no_result(condition, function, skip)
+
+ test.stdout()
+ test.stdout(run)
+
+ test.stderr()
+ test.stderr(run)
+
+ test.symlink(target, link)
+
+ test.banner(string)
+ test.banner(string, width)
+
+ test.diff(actual, expected)
+
+ test.diff_stderr(actual, expected)
+
+ test.diff_stdout(actual, expected)
+
+ test.match(actual, expected)
+
+ test.match_stderr(actual, expected)
+
+ test.match_stdout(actual, expected)
+
+ test.set_match_function(match, stdout, stderr)
+
+ test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
+ test.match_exact(["actual 1\n", "actual 2\n"],
+ ["expected 1\n", "expected 2\n"])
+ test.match_caseinsensitive("Actual 1\nACTUAL 2\n", "expected 1\nEXPECTED 2\n")
+
+ test.match_re("actual 1\nactual 2\n", regex_string)
+ test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
+
+ test.match_re_dotall("actual 1\nactual 2\n", regex_string)
+ test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
+
+ test.tempdir()
+ test.tempdir('temporary-directory')
+
+ test.sleep()
+ test.sleep(seconds)
+
+ test.where_is('foo')
+ test.where_is('foo', 'PATH1:PATH2')
+ test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
+
+ test.unlink('file')
+ test.unlink('subdir', 'file')
+
+The TestCmd module provides pass_test(), fail_test(), and no_result()
+unbound functions that report test results for use with the Aegis change
+management system. These methods terminate the test immediately,
+reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
+status 0 (success), 1 or 2 respectively. This allows for a distinction
+between an actual failed test and a test that could not be properly
+evaluated because of an external condition (such as a full file system
+or incorrect permissions).
+
+ import TestCmd
+
+ TestCmd.pass_test()
+ TestCmd.pass_test(condition)
+ TestCmd.pass_test(condition, function)
+
+ TestCmd.fail_test()
+ TestCmd.fail_test(condition)
+ TestCmd.fail_test(condition, function)
+ TestCmd.fail_test(condition, function, skip)
+ TestCmd.fail_test(condition, function, skip, message)
+
+ TestCmd.no_result()
+ TestCmd.no_result(condition)
+ TestCmd.no_result(condition, function)
+ TestCmd.no_result(condition, function, skip)
+
+The TestCmd module also provides unbound global functions that handle
+matching in the same way as the match_*() methods described above.
+
+ import TestCmd
+
+ test = TestCmd.TestCmd(match = TestCmd.match_exact)
+
+ test = TestCmd.TestCmd(match = TestCmd.match_caseinsensitive)
+
+ test = TestCmd.TestCmd(match = TestCmd.match_re)
+
+ test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
+
+These functions are also available as static methods:
+
+ import TestCmd
+
+ test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_exact)
+
+ test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_caseinsensitive)
+
+ test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_re)
+
+ test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_re_dotall)
+
+These static methods can be accessed by a string naming the method:
+
+ import TestCmd
+
+ test = TestCmd.TestCmd(match = 'match_exact')
+
+ test = TestCmd.TestCmd(match = 'match_caseinsensitive')
+
+ test = TestCmd.TestCmd(match = 'match_re')
+
+ test = TestCmd.TestCmd(match = 'match_re_dotall')
+
+The TestCmd module provides unbound global functions that can be used
+for the "diff" argument to TestCmd.TestCmd instantiation:
+
+ import TestCmd
+
+ test = TestCmd.TestCmd(match = TestCmd.match_re,
+ diff = TestCmd.diff_re)
+
+ test = TestCmd.TestCmd(diff = TestCmd.simple_diff)
+
+ test = TestCmd.TestCmd(diff = TestCmd.context_diff)
+
+ test = TestCmd.TestCmd(diff = TestCmd.unified_diff)
+
+These functions are also available as static methods:
+
+ import TestCmd
+
+ test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_re,
+ diff = TestCmd.TestCmd.diff_re)
+
+ test = TestCmd.TestCmd(diff = TestCmd.TestCmd.simple_diff)
+
+ test = TestCmd.TestCmd(diff = TestCmd.TestCmd.context_diff)
+
+ test = TestCmd.TestCmd(diff = TestCmd.TestCmd.unified_diff)
+
+These static methods can be accessed by a string naming the method:
+
+ import TestCmd
+
+ test = TestCmd.TestCmd(match = 'match_re', diff = 'diff_re')
+
+ test = TestCmd.TestCmd(diff = 'simple_diff')
+
+ test = TestCmd.TestCmd(diff = 'context_diff')
+
+ test = TestCmd.TestCmd(diff = 'unified_diff')
+
+The "diff" argument can also be used with standard difflib functions:
+
+ import difflib
+
+ test = TestCmd.TestCmd(diff = difflib.context_diff)
+
+ test = TestCmd.TestCmd(diff = difflib.unified_diff)
+
+Lastly, the where_is() method also exists in an unbound function
+version.
+
+ import TestCmd
+
+ TestCmd.where_is('foo')
+ TestCmd.where_is('foo', 'PATH1:PATH2')
+ TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
+"""
+
+# Copyright 2000-2010 Steven Knight
+# This module is free software, and you may redistribute it and/or modify
+# it under the same terms as Python itself, so long as this copyright message
+# and disclaimer are retained in their original form.
+#
+# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
+# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
+# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
+# DAMAGE.
+#
+# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
+# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+from __future__ import division, print_function
+
+__author__ = "Steven Knight <knight at baldmt dot com>"
+__revision__ = "TestCmd.py 1.3.D001 2010/06/03 12:58:27 knight"
+__version__ = "1.3"
+
+import atexit
+import difflib
+import errno
+import os
+import re
+import shutil
+import signal
+import stat
+import sys
+import tempfile
+import threading
+import time
+import traceback
+import types
+
+
+IS_PY3 = sys.version_info[0] == 3
+IS_WINDOWS = sys.platform == 'win32'
+IS_64_BIT = sys.maxsize > 2**32
+
+class null(object):
+ pass
+
+
+_Null = null()
+
+try:
+ from collections import UserList, UserString
+except ImportError:
+ # no 'collections' module or no UserFoo in collections
+ exec('from UserList import UserList')
+ exec('from UserString import UserString')
+
+__all__ = [
+ 'diff_re',
+ 'fail_test',
+ 'no_result',
+ 'pass_test',
+ 'match_exact',
+ 'match_caseinsensitive',
+ 'match_re',
+ 'match_re_dotall',
+ 'python',
+ '_python_',
+ 'TestCmd',
+ 'to_bytes',
+ 'to_str',
+]
+
+
+def is_List(e):
+ return isinstance(e, (list, UserList))
+
+
+def to_bytes(s):
+ if isinstance(s, bytes) or bytes is str:
+ return s
+ return bytes(s, 'utf-8')
+
+
+def to_str(s):
+ if bytes is str or is_String(s):
+ return s
+ return str(s, 'utf-8')
+
+
+try:
+ eval('unicode')
+except NameError:
+ def is_String(e):
+ return isinstance(e, (str, UserString))
+else:
+ def is_String(e):
+ return isinstance(e, (str, unicode, UserString))
+
+testprefix = 'testcmd.'
+if os.name in ('posix', 'nt'):
+ testprefix += "%s." % str(os.getpid())
+
+re_space = re.compile(r'\s')
+
+
+def _caller(tblist, skip):
+ string = ""
+ arr = []
+ for file, line, name, text in tblist:
+ if file[-10:] == "TestCmd.py":
+ break
+ arr = [(file, line, name, text)] + arr
+ atfrom = "at"
+ for file, line, name, text in arr[skip:]:
+ if name in ("?", "<module>"):
+ name = ""
+ else:
+ name = " (" + name + ")"
+ string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
+ atfrom = "\tfrom"
+ return string
+
+
+def fail_test(self=None, condition=1, function=None, skip=0, message=None):
+ """Cause the test to fail.
+
+ By default, the fail_test() method reports that the test FAILED
+ and exits with a status of 1. If a condition argument is supplied,
+ the test fails only if the condition is true.
+ """
+ if not condition:
+ return
+ if not function is None:
+ function()
+ of = ""
+ desc = ""
+ sep = " "
+ if not self is None:
+ if self.program:
+ of = " of " + self.program
+ sep = "\n\t"
+ if self.description:
+ desc = " [" + self.description + "]"
+ sep = "\n\t"
+
+ at = _caller(traceback.extract_stack(), skip)
+ if message:
+ msg = "\t%s\n" % message
+ else:
+ msg = ""
+ sys.stderr.write("FAILED test" + of + desc + sep + at + msg)
+
+ sys.exit(1)
+
+
+def no_result(self=None, condition=1, function=None, skip=0):
+ """Causes a test to exit with no valid result.
+
+ By default, the no_result() method reports NO RESULT for the test
+ and exits with a status of 2. If a condition argument is supplied,
+ the test fails only if the condition is true.
+ """
+ if not condition:
+ return
+ if not function is None:
+ function()
+ of = ""
+ desc = ""
+ sep = " "
+ if not self is None:
+ if self.program:
+ of = " of " + self.program
+ sep = "\n\t"
+ if self.description:
+ desc = " [" + self.description + "]"
+ sep = "\n\t"
+
+ at = _caller(traceback.extract_stack(), skip)
+ sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
+
+ sys.exit(2)
+
+
+def pass_test(self=None, condition=1, function=None):
+ """Causes a test to pass.
+
+ By default, the pass_test() method reports PASSED for the test
+ and exits with a status of 0. If a condition argument is supplied,
+ the test passes only if the condition is true.
+ """
+ if not condition:
+ return
+ if not function is None:
+ function()
+ sys.stderr.write("PASSED\n")
+ sys.exit(0)
+
+
+def match_exact(lines=None, matches=None, newline=os.sep):
+ """
+ Match function using exact match.
+
+ :param lines: data lines
+ :type lines: str or list[str]
+ :param matches: expected lines to match
+ :type matches: str or list[str]
+ :param newline: line separator
+ :returns: an object (1) on match, else None, like re.match
+ """
+
+ if isinstance(lines, bytes) or bytes is str:
+ newline = to_bytes(newline)
+
+ if not is_List(lines):
+ lines = lines.split(newline)
+ if not is_List(matches):
+ matches = matches.split(newline)
+ if len(lines) != len(matches):
+ return None
+ for line, match in zip(lines, matches):
+ if line != match:
+ return None
+ return 1
+
+
+def match_caseinsensitive(lines=None, matches=None):
+ """
+ Match function using case-insensitive matching.
+
+ Only a simplistic comparison is done, based on lowercasing the
+ strings. This has plenty of holes for unicode data using
+ non-English languages.
+
+ TODO: casefold() is better than lower() if we don't need Py2 support.
+
+ :param lines: data lines
+ :type lines: str or list[str]
+ :param matches: expected lines to match
+ :type matches: str or list[str]
+ :returns: True or False
+ :returns: an object (1) on match, else None, like re.match
+ """
+ if not is_List(lines):
+ lines = lines.split("\n")
+ if not is_List(matches):
+ matches = matches.split("\n")
+ if len(lines) != len(matches):
+ return None
+ for line, match in zip(lines, matches):
+ if line.lower() != match.lower():
+ return None
+ return 1
+
+
+def match_re(lines=None, res=None):
+ """
+ Match function using line-by-line regular expression match.
+
+ :param lines: data lines
+ :type lines: str or list[str]
+ :param res: regular expression(s) for matching
+ :type res: str or list[str]
+ :returns: an object (1) on match, else None, like re.match
+ """
+ if not is_List(lines):
+ # CRs mess up matching (Windows) so split carefully
+ lines = re.split('\r?\n', lines)
+ if not is_List(res):
+ res = res.split("\n")
+ if len(lines) != len(res):
+ print("match_re: expected %d lines, found %d" % (len(res), len(lines)))
+ return None
+ for i, (line, regex) in enumerate(zip(lines, res)):
+ s = r"^{}$".format(regex)
+ try:
+ expr = re.compile(s)
+ except re.error as e:
+ msg = "Regular expression error in %s: %s"
+ raise re.error(msg % (repr(s), e.args[0]))
+ if not expr.search(line):
+ miss_tmpl = "match_re: mismatch at line {}:\n search re='{}'\n line='{}'"
+ print(miss_tmpl.format(i, s, line))
+ return None
+ return 1
+
+
+def match_re_dotall(lines=None, res=None):
+ """
+ Match function using regular expression match.
+
+ Unlike match_re, the arguments are converted to strings (if necessary)
+ and must match exactly.
+
+ :param lines: data lines
+ :type lines: str or list[str]
+ :param res: regular expression(s) for matching
+ :type res: str or list[str]
+ :returns: a match object, or None as for re.match
+ """
+ if not isinstance(lines, str):
+ lines = "\n".join(lines)
+ if not isinstance(res, str):
+ res = "\n".join(res)
+ s = r"^{}$".format(res)
+ try:
+ expr = re.compile(s, re.DOTALL)
+ except re.error as e:
+ msg = "Regular expression error in %s: %s"
+ raise re.error(msg % (repr(s), e.args[0]))
+ return expr.match(lines)
+
+
+def simple_diff(a, b, fromfile='', tofile='',
+ fromfiledate='', tofiledate='', n=0, lineterm=''):
+ r"""
+ Compare two sequences of lines; generate the delta as a simple diff.
+
+ Similar to difflib.context_diff and difflib.unified_diff but
+ output is like from the 'diff" command without arguments. The function
+ keeps the same signature as the difflib ones so they will be
+ interchangeable, but except for lineterm, the arguments beyond the
+ two sequences are ignored in this version. By default, the
+ diff is not created with trailing newlines, set the lineterm
+ argument to '\n' to do so.
+
+ :raises re.error: if a regex fails to compile
+
+ Example:
+
+ >>> print(''.join(simple_diff('one\ntwo\nthree\nfour\n'.splitlines(True),
+ ... 'zero\none\ntree\nfour\n'.splitlines(True), lineterm='\n')))
+ 0a1
+ > zero
+ 2,3c3
+ < two
+ < three
+ ---
+ > tree
+
+ """
+ a = [to_str(q) for q in a]
+ b = [to_str(q) for q in b]
+ sm = difflib.SequenceMatcher(None, a, b)
+
+ def comma(x1, x2):
+ return x1 + 1 == x2 and str(x2) or '%s,%s' % (x1 + 1, x2)
+
+ for op, a1, a2, b1, b2 in sm.get_opcodes():
+ if op == 'delete':
+ yield "{}d{}{}".format(comma(a1, a2), b1, lineterm)
+ for l in a[a1:a2]:
+ yield '< ' + l
+ elif op == 'insert':
+ yield "{}a{}{}".format(a1, comma(b1, b2), lineterm)
+ for l in b[b1:b2]:
+ yield '> ' + l
+ elif op == 'replace':
+ yield "{}c{}{}".format(comma(a1, a2), comma(b1, b2), lineterm)
+ for l in a[a1:a2]:
+ yield '< ' + l
+ yield '---{}'.format(lineterm)
+ for l in b[b1:b2]:
+ yield '> ' + l
+
+
+def diff_re(a, b, fromfile='', tofile='',
+ fromfiledate='', tofiledate='', n=3, lineterm='\n'):
+ """
+ Compare a and b (lists of strings) where a are regexes.
+
+ A simple "diff" of two sets of lines when the expected lines
+ are regular expressions. This is a really dumb thing that
+ just compares each line in turn, so it doesn't look for
+ chunks of matching lines and the like--but at least it lets
+ you know exactly which line first didn't compare correctl...
+ """
+ result = []
+ diff = len(a) - len(b)
+ if diff < 0:
+ a = a + [''] * (-diff)
+ elif diff > 0:
+ b = b + [''] * diff
+ for i, (aline, bline) in enumerate(zip(a, b)):
+ s = r"^{}$".format(aline)
+ try:
+ expr = re.compile(s)
+ except re.error as e:
+ msg = "Regular expression error in %s: %s"
+ raise re.error(msg % (repr(s), e.args[0]))
+ if not expr.search(bline):
+ result.append("%sc%s" % (i + 1, i + 1))
+ result.append('< ' + repr(a[i]))
+ result.append('---')
+ result.append('> ' + repr(b[i]))
+ return result
+
+
+if os.name == 'posix':
+ def escape(arg):
+ "escape shell special characters"
+ slash = '\\'
+ special = '"$'
+ arg = arg.replace(slash, slash + slash)
+ for c in special:
+ arg = arg.replace(c, slash + c)
+ if re_space.search(arg):
+ arg = '"' + arg + '"'
+ return arg
+else:
+ # Windows does not allow special characters in file names
+ # anyway, so no need for an escape function, we will just quote
+ # the arg.
+ def escape(arg):
+ if re_space.search(arg):
+ arg = '"' + arg + '"'
+ return arg
+
+if os.name == 'java':
+ python = os.path.join(sys.prefix, 'jython')
+else:
+ python = os.environ.get('python_executable', sys.executable)
+_python_ = escape(python)
+
+if sys.platform == 'win32':
+
+ default_sleep_seconds = 2
+
+ def where_is(file, path=None, pathext=None):
+ if path is None:
+ path = os.environ['PATH']
+ if is_String(path):
+ path = path.split(os.pathsep)
+ if pathext is None:
+ pathext = os.environ['PATHEXT']
+ if is_String(pathext):
+ pathext = pathext.split(os.pathsep)
+ for ext in pathext:
+ if ext.lower() == file[-len(ext):].lower():
+ pathext = ['']
+ break
+ for dir in path:
+ f = os.path.join(dir, file)
+ for ext in pathext:
+ fext = f + ext
+ if os.path.isfile(fext):
+ return fext
+ return None
+
+else:
+
+ def where_is(file, path=None, pathext=None):
+ if path is None:
+ path = os.environ['PATH']
+ if is_String(path):
+ path = path.split(os.pathsep)
+ for dir in path:
+ f = os.path.join(dir, file)
+ if os.path.isfile(f):
+ try:
+ st = os.stat(f)
+ except OSError:
+ continue
+ if stat.S_IMODE(st[stat.ST_MODE]) & 0o111:
+ return f
+ return None
+
+ default_sleep_seconds = 1
+
+
+import subprocess
+
+try:
+ subprocess.Popen.terminate
+except AttributeError:
+ if sys.platform == 'win32':
+ import win32process
+
+ def terminate(self):
+ win32process.TerminateProcess(self._handle, 1)
+ else:
+ def terminate(self):
+ os.kill(self.pid, signal.SIGTERM)
+ method = types.MethodType(terminate, None, subprocess.Popen)
+ setattr(subprocess.Popen, 'terminate', method)
+
+
+# From Josiah Carlson,
+# ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
+# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
+
+PIPE = subprocess.PIPE
+
+if sys.platform == 'win32': # and subprocess.mswindows:
+ try:
+ from win32file import ReadFile, WriteFile
+ from win32pipe import PeekNamedPipe
+ except ImportError:
+ # If PyWin32 is not available, try ctypes instead
+ # XXX These replicate _just_enough_ PyWin32 behaviour for our purposes
+ import ctypes
+ from ctypes.wintypes import DWORD
+
+ def ReadFile(hFile, bufSize, ol=None):
+ assert ol is None
+ lpBuffer = ctypes.create_string_buffer(bufSize)
+ bytesRead = DWORD()
+ bErr = ctypes.windll.kernel32.ReadFile(
+ hFile, lpBuffer, bufSize, ctypes.byref(bytesRead), ol)
+ if not bErr:
+ raise ctypes.WinError()
+ return (0, ctypes.string_at(lpBuffer, bytesRead.value))
+
+ def WriteFile(hFile, data, ol=None):
+ assert ol is None
+ bytesWritten = DWORD()
+ bErr = ctypes.windll.kernel32.WriteFile(
+ hFile, data, len(data), ctypes.byref(bytesWritten), ol)
+ if not bErr:
+ raise ctypes.WinError()
+ return (0, bytesWritten.value)
+
+ def PeekNamedPipe(hPipe, size):
+ assert size == 0
+ bytesAvail = DWORD()
+ bErr = ctypes.windll.kernel32.PeekNamedPipe(
+ hPipe, None, size, None, ctypes.byref(bytesAvail), None)
+ if not bErr:
+ raise ctypes.WinError()
+ return ("", bytesAvail.value, None)
+ import msvcrt
+else:
+ import select
+ import fcntl
+
+ try:
+ fcntl.F_GETFL
+ except AttributeError:
+ fcntl.F_GETFL = 3
+
+ try:
+ fcntl.F_SETFL
+ except AttributeError:
+ fcntl.F_SETFL = 4
+
+
+class Popen(subprocess.Popen):
+ def recv(self, maxsize=None):
+ return self._recv('stdout', maxsize)
+
+ def recv_err(self, maxsize=None):
+ return self._recv('stderr', maxsize)
+
+ def send_recv(self, input='', maxsize=None):
+ return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
+
+ def get_conn_maxsize(self, which, maxsize):
+ if maxsize is None:
+ maxsize = 1024
+ elif maxsize < 1:
+ maxsize = 1
+ return getattr(self, which), maxsize
+
+ def _close(self, which):
+ getattr(self, which).close()
+ setattr(self, which, None)
+
+ if sys.platform == 'win32': # and subprocess.mswindows:
+ def send(self, input):
+ input = to_bytes(input)
+ if not self.stdin:
+ return None
+
+ try:
+ x = msvcrt.get_osfhandle(self.stdin.fileno())
+ (errCode, written) = WriteFile(x, input)
+ except ValueError:
+ return self._close('stdin')
+ except (subprocess.pywintypes.error, Exception) as why:
+ if why.args[0] in (109, errno.ESHUTDOWN):
+ return self._close('stdin')
+ raise
+
+ return written
+
+ def _recv(self, which, maxsize):
+ conn, maxsize = self.get_conn_maxsize(which, maxsize)
+ if conn is None:
+ return None
+
+ try:
+ x = msvcrt.get_osfhandle(conn.fileno())
+ (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
+ if maxsize < nAvail:
+ nAvail = maxsize
+ if nAvail > 0:
+ (errCode, read) = ReadFile(x, nAvail, None)
+ except ValueError:
+ return self._close(which)
+ except (subprocess.pywintypes.error, Exception) as why:
+ if why.args[0] in (109, errno.ESHUTDOWN):
+ return self._close(which)
+ raise
+
+ # if self.universal_newlines:
+ # read = self._translate_newlines(read)
+ return read
+
+ else:
+ def send(self, input):
+ if not self.stdin:
+ return None
+
+ if not select.select([], [self.stdin], [], 0)[1]:
+ return 0
+
+ try:
+ written = os.write(self.stdin.fileno(),
+ bytearray(input, 'utf-8'))
+ except OSError as why:
+ if why.args[0] == errno.EPIPE: # broken pipe
+ return self._close('stdin')
+ raise
+
+ return written
+
+ def _recv(self, which, maxsize):
+ conn, maxsize = self.get_conn_maxsize(which, maxsize)
+ if conn is None:
+ return None
+
+ try:
+ flags = fcntl.fcntl(conn, fcntl.F_GETFL)
+ except TypeError:
+ flags = None
+ else:
+ if not conn.closed:
+ fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+ try:
+ if not select.select([conn], [], [], 0)[0]:
+ return ''
+
+ r = conn.read(maxsize)
+ if not r:
+ return self._close(which)
+
+ # if self.universal_newlines:
+ # r = self._translate_newlines(r)
+ return r
+ finally:
+ if not conn.closed and not flags is None:
+ fcntl.fcntl(conn, fcntl.F_SETFL, flags)
+
+
+disconnect_message = "Other end disconnected!"
+
+
+def recv_some(p, t=.1, e=1, tr=5, stderr=0):
+ if tr < 1:
+ tr = 1
+ x = time.time() + t
+ y = []
+ r = ''
+ pr = p.recv
+ if stderr:
+ pr = p.recv_err
+ while time.time() < x or r:
+ r = pr()
+ if r is None:
+ if e:
+ raise Exception(disconnect_message)
+ else:
+ break
+ elif r:
+ y.append(r)
+ else:
+ time.sleep(max((x - time.time()) / tr, 0))
+ return ''.join(y)
+
+
+def send_all(p, data):
+ while len(data):
+ sent = p.send(data)
+ if sent is None:
+ raise Exception(disconnect_message)
+ data = memoryview(data)[sent:]
+
+
+_Cleanup = []
+
+
+def _clean():
+ global _Cleanup
+ cleanlist = [c for c in _Cleanup if c]
+ del _Cleanup[:]
+ cleanlist.reverse()
+ for test in cleanlist:
+ test.cleanup()
+
+
+atexit.register(_clean)
+
+
+class TestCmd(object):
+ """Class TestCmd
+ """
+
+ def __init__(self, description=None,
+ program=None,
+ interpreter=None,
+ workdir=None,
+ subdir=None,
+ verbose=None,
+ match=None,
+ match_stdout=None,
+ match_stderr=None,
+ diff=None,
+ diff_stdout=None,
+ diff_stderr=None,
+ combine=0,
+ universal_newlines=True,
+ timeout=None):
+ self.external = os.environ.get('SCONS_EXTERNAL_TEST', 0)
+ self._cwd = os.getcwd()
+ self.description_set(description)
+ self.program_set(program)
+ self.interpreter_set(interpreter)
+ if verbose is None:
+ try:
+ verbose = max(0, int(os.environ.get('TESTCMD_VERBOSE', 0)))
+ except ValueError:
+ verbose = 0
+ self.verbose_set(verbose)
+ self.combine = combine
+ self.universal_newlines = universal_newlines
+ self.process = None
+ self.set_timeout(timeout)
+ self.set_match_function(match, match_stdout, match_stderr)
+ self.set_diff_function(diff, diff_stdout, diff_stderr)
+ self._dirlist = []
+ self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
+ preserve_value = os.environ.get('PRESERVE', False)
+ if preserve_value not in [0, '0', 'False']:
+ self._preserve['pass_test'] = os.environ['PRESERVE']
+ self._preserve['fail_test'] = os.environ['PRESERVE']
+ self._preserve['no_result'] = os.environ['PRESERVE']
+ else:
+ try:
+ self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
+ except KeyError:
+ pass
+ try:
+ self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
+ except KeyError:
+ pass
+ try:
+ self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
+ except KeyError:
+ pass
+ self._stdout = []
+ self._stderr = []
+ self.status = None
+ self.condition = 'no_result'
+ self.workdir_set(workdir)
+ self.subdir(subdir)
+ self.fixture_dirs = []
+
+ try:
+ self.fixture_dirs = (os.environ['FIXTURE_DIRS']).split(os.pathsep)
+ except KeyError:
+ pass
+
+
+ def __del__(self):
+ self.cleanup()
+
+ def __repr__(self):
+ return "%x" % id(self)
+
+ banner_char = '='
+ banner_width = 80
+
+ def banner(self, s, width=None):
+ if width is None:
+ width = self.banner_width
+ return s + self.banner_char * (width - len(s))
+
+ escape = staticmethod(escape)
+
+ def canonicalize(self, path):
+ if is_List(path):
+ path = os.path.join(*tuple(path))
+ if not os.path.isabs(path):
+ path = os.path.join(self.workdir, path)
+ return path
+
+ def chmod(self, path, mode):
+ """Changes permissions on the specified file or directory
+ path name."""
+ path = self.canonicalize(path)
+ os.chmod(path, mode)
+
+ def cleanup(self, condition=None):
+ """Removes any temporary working directories for the specified
+ TestCmd environment. If the environment variable PRESERVE was
+ set when the TestCmd environment was created, temporary working
+ directories are not removed. If any of the environment variables
+ PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
+ when the TestCmd environment was created, then temporary working
+ directories are not removed if the test passed, failed, or had
+ no result, respectively. Temporary working directories are also
+ preserved for conditions specified via the preserve method.
+
+ Typically, this method is not called directly, but is used when
+ the script exits to clean up temporary working directories as
+ appropriate for the exit status.
+ """
+ if not self._dirlist:
+ return
+ os.chdir(self._cwd)
+ self.workdir = None
+ if condition is None:
+ condition = self.condition
+ if self._preserve[condition]:
+ for dir in self._dirlist:
+ print(u"Preserved directory " + dir)
+ else:
+ list = self._dirlist[:]
+ list.reverse()
+ for dir in list:
+ self.writable(dir, 1)
+ shutil.rmtree(dir, ignore_errors=1)
+ self._dirlist = []
+
+ global _Cleanup
+ if self in _Cleanup:
+ _Cleanup.remove(self)
+
+ def command_args(self, program=None,
+ interpreter=None,
+ arguments=None):
+ if not self.external:
+ if program:
+ if isinstance(program, str) and not os.path.isabs(program):
+ program = os.path.join(self._cwd, program)
+ else:
+ program = self.program
+ if not interpreter:
+ interpreter = self.interpreter
+ else:
+ if not program:
+ program = self.program
+ if not interpreter:
+ interpreter = self.interpreter
+ if not isinstance(program, (list, tuple)):
+ program = [program]
+ cmd = list(program)
+ if interpreter:
+ if not isinstance(interpreter, (list, tuple)):
+ interpreter = [interpreter]
+ cmd = list(interpreter) + cmd
+ if arguments:
+ if isinstance(arguments, str):
+ arguments = arguments.split()
+ cmd.extend(arguments)
+ return cmd
+
+ def description_set(self, description):
+ """Set the description of the functionality being tested.
+ """
+ self.description = description
+
+ def set_diff_function(self, diff=_Null, stdout=_Null, stderr=_Null):
+ """Sets the specified diff functions.
+ """
+ if diff is not _Null:
+ self._diff_function = diff
+ if stdout is not _Null:
+ self._diff_stdout_function = stdout
+ if stderr is not _Null:
+ self._diff_stderr_function = stderr
+
+ def diff(self, a, b, name=None, diff_function=None, *args, **kw):
+ if diff_function is None:
+ try:
+ diff_function = getattr(self, self._diff_function)
+ except TypeError:
+ diff_function = self._diff_function
+ if diff_function is None:
+ diff_function = self.simple_diff
+ if name is not None:
+ print(self.banner(name))
+
+ if not is_List(a):
+ a=a.splitlines()
+ if not is_List(b):
+ b=b.splitlines()
+
+ args = (a, b) + args
+ for line in diff_function(*args, **kw):
+ print(line)
+
+ def diff_stderr(self, a, b, *args, **kw):
+ """Compare actual and expected file contents.
+ """
+ try:
+ diff_stderr_function = getattr(self, self._diff_stderr_function)
+ except TypeError:
+ diff_stderr_function = self._diff_stderr_function
+ return self.diff(a, b, diff_function=diff_stderr_function, *args, **kw)
+
+ def diff_stdout(self, a, b, *args, **kw):
+ """Compare actual and expected file contents.
+ """
+ try:
+ diff_stdout_function = getattr(self, self._diff_stdout_function)
+ except TypeError:
+ diff_stdout_function = self._diff_stdout_function
+ return self.diff(a, b, diff_function=diff_stdout_function, *args, **kw)
+
+ simple_diff = staticmethod(simple_diff)
+
+ diff_re = staticmethod(diff_re)
+
+ context_diff = staticmethod(difflib.context_diff)
+
+ unified_diff = staticmethod(difflib.unified_diff)
+
+ def fail_test(self, condition=1, function=None, skip=0, message=None):
+ """Cause the test to fail.
+ """
+ if not condition:
+ return
+ self.condition = 'fail_test'
+ fail_test(self=self,
+ condition=condition,
+ function=function,
+ skip=skip,
+ message=message)
+
+ def interpreter_set(self, interpreter):
+ """Set the program to be used to interpret the program
+ under test as a script.
+ """
+ self.interpreter = interpreter
+
+ def set_match_function(self, match=_Null, stdout=_Null, stderr=_Null):
+ """Sets the specified match functions.
+ """
+ if match is not _Null:
+ self._match_function = match
+ if stdout is not _Null:
+ self._match_stdout_function = stdout
+ if stderr is not _Null:
+ self._match_stderr_function = stderr
+
+ def match(self, lines, matches):
+ """Compare actual and expected file contents.
+ """
+ try:
+ match_function = getattr(self, self._match_function)
+ except TypeError:
+ match_function = self._match_function
+ if match_function is None:
+ # Default is regular expression matches.
+ match_function = self.match_re
+ return match_function(lines, matches)
+
+ def match_stderr(self, lines, matches):
+ """Compare actual and expected file contents.
+ """
+ try:
+ match_stderr_function = getattr(self, self._match_stderr_function)
+ except TypeError:
+ match_stderr_function = self._match_stderr_function
+ if match_stderr_function is None:
+ # Default is to use whatever match= is set to.
+ match_stderr_function = self.match
+ return match_stderr_function(lines, matches)
+
+ def match_stdout(self, lines, matches):
+ """Compare actual and expected file contents.
+ """
+ try:
+ match_stdout_function = getattr(self, self._match_stdout_function)
+ except TypeError:
+ match_stdout_function = self._match_stdout_function
+ if match_stdout_function is None:
+ # Default is to use whatever match= is set to.
+ match_stdout_function = self.match
+ return match_stdout_function(lines, matches)
+
+ match_exact = staticmethod(match_exact)
+
+ match_caseinsensitive = staticmethod(match_caseinsensitive)
+
+ match_re = staticmethod(match_re)
+
+ match_re_dotall = staticmethod(match_re_dotall)
+
+ def no_result(self, condition=1, function=None, skip=0):
+ """Report that the test could not be run.
+ """
+ if not condition:
+ return
+ self.condition = 'no_result'
+ no_result(self=self,
+ condition=condition,
+ function=function,
+ skip=skip)
+
+ def pass_test(self, condition=1, function=None):
+ """Cause the test to pass.
+ """
+ if not condition:
+ return
+ self.condition = 'pass_test'
+ pass_test(self=self, condition=condition, function=function)
+
+ def preserve(self, *conditions):
+ """Arrange for the temporary working directories for the
+ specified TestCmd environment to be preserved for one or more
+ conditions. If no conditions are specified, arranges for
+ the temporary working directories to be preserved for all
+ conditions.
+ """
+ if not conditions:
+ conditions = ('pass_test', 'fail_test', 'no_result')
+ for cond in conditions:
+ self._preserve[cond] = 1
+
+ def program_set(self, program):
+ """Set the executable program or script to be tested.
+ """
+ if not self.external:
+ if program and not os.path.isabs(program):
+ program = os.path.join(self._cwd, program)
+ self.program = program
+
+ def read(self, file, mode='rb', newline=None):
+ """Reads and returns the contents of the specified file name.
+
+ The file name may be a list, in which case the elements are
+ concatenated with the os.path.join() method. The file is
+ assumed to be under the temporary working directory unless it
+ is an absolute path name. The I/O mode for the file may
+ be specified; it must begin with an 'r'. The default is
+ 'rb' (binary read).
+ """
+ file = self.canonicalize(file)
+ if mode[0] != 'r':
+ raise ValueError("mode must begin with 'r'")
+ if IS_PY3 and 'b' not in mode:
+ with open(file, mode, newline=newline) as f:
+ return f.read()
+ else:
+ with open(file, mode) as f:
+ return f.read()
+
+ def rmdir(self, dir):
+ """Removes the specified dir name.
+
+ The dir name may be a list, in which case the elements are
+ concatenated with the os.path.join() method. The dir is
+ assumed to be under the temporary working directory unless it
+ is an absolute path name.
+ The dir must be empty.
+ """
+ dir = self.canonicalize(dir)
+ os.rmdir(dir)
+
+ def _timeout(self):
+ self.process.terminate()
+ self.timer.cancel()
+ self.timer = None
+
+ def set_timeout(self, timeout):
+ self.timeout = timeout
+ self.timer = None
+
+ def parse_path(self, path, suppress_current=False):
+ """Return a list with the single path components of path.
+ """
+ head, tail = os.path.split(path)
+ result = []
+ if not tail:
+ if head == path:
+ return [head]
+ else:
+ result.append(tail)
+ head, tail = os.path.split(head)
+ while head and tail:
+ result.append(tail)
+ head, tail = os.path.split(head)
+ result.append(head or tail)
+ result.reverse()
+
+ return result
+
+ def dir_fixture(self, srcdir, dstdir=None):
+ """Copies the contents of the specified folder srcdir from
+ the directory of the called script, to the current
+ working directory.
+
+ The srcdir name may be a list, in which case the elements are
+ concatenated with the os.path.join() method. The dstdir is
+ assumed to be under the temporary working directory, it gets
+ created automatically, if it does not already exist.
+ """
+
+ if srcdir and self.fixture_dirs and not os.path.isabs(srcdir):
+ for dir in self.fixture_dirs:
+ spath = os.path.join(dir, srcdir)
+ if os.path.isdir(spath):
+ break
+ else:
+ spath = srcdir
+
+ if dstdir:
+ dstdir = self.canonicalize(dstdir)
+ else:
+ dstdir = '.'
+
+ if dstdir != '.' and not os.path.exists(dstdir):
+ dstlist = self.parse_path(dstdir)
+ if len(dstlist) > 0 and dstlist[0] == ".":
+ dstlist = dstlist[1:]
+ for idx in range(len(dstlist)):
+ self.subdir(dstlist[:idx + 1])
+
+ if dstdir and self.workdir:
+ dstdir = os.path.join(self.workdir, dstdir)
+
+ for entry in os.listdir(spath):
+ epath = os.path.join(spath, entry)
+ dpath = os.path.join(dstdir, entry)
+ if os.path.isdir(epath):
+ # Copy the subfolder
+ shutil.copytree(epath, dpath)
+ else:
+ shutil.copy(epath, dpath)
+
+ def file_fixture(self, srcfile, dstfile=None):
+ """Copies the file srcfile from the directory of
+ the called script, to the current working directory.
+
+ The dstfile is assumed to be under the temporary working
+ directory unless it is an absolute path name.
+ If dstfile is specified its target directory gets created
+ automatically, if it does not already exist.
+ """
+ srcpath, srctail = os.path.split(srcfile)
+
+ if srcpath and (not self.fixture_dirs or os.path.isabs(srcpath)):
+ spath = srcfile
+ else:
+ for dir in self.fixture_dirs:
+ spath = os.path.join(dir, srcfile)
+ if os.path.isfile(spath):
+ break
+
+ if not dstfile:
+ if srctail:
+ dpath = os.path.join(self.workdir, srctail)
+ else:
+ return
+ else:
+ dstpath, dsttail = os.path.split(dstfile)
+ if dstpath:
+ if not os.path.exists(os.path.join(self.workdir, dstpath)):
+ dstlist = self.parse_path(dstpath)
+ if len(dstlist) > 0 and dstlist[0] == ".":
+ dstlist = dstlist[1:]
+ for idx in range(len(dstlist)):
+ self.subdir(dstlist[:idx + 1])
+
+ dpath = os.path.join(self.workdir, dstfile)
+ shutil.copy(spath, dpath)
+
+ def start(self, program=None,
+ interpreter=None,
+ arguments=None,
+ universal_newlines=None,
+ timeout=_Null,
+ **kw):
+ """
+ Starts a program or script for the test environment.
+
+ The specified program will have the original directory
+ prepended unless it is enclosed in a [list].
+ """
+ cmd = self.command_args(program, interpreter, arguments)
+ if self.verbose:
+ cmd_string = ' '.join([self.escape(c) for c in cmd])
+ sys.stderr.write(cmd_string + "\n")
+ if universal_newlines is None:
+ universal_newlines = self.universal_newlines
+
+ # On Windows, if we make stdin a pipe when we plan to send
+ # no input, and the test program exits before
+ # Popen calls msvcrt.open_osfhandle, that call will fail.
+ # So don't use a pipe for stdin if we don't need one.
+ stdin = kw.get('stdin', None)
+ if stdin is not None:
+ stdin = subprocess.PIPE
+
+ combine = kw.get('combine', self.combine)
+ if combine:
+ stderr_value = subprocess.STDOUT
+ else:
+ stderr_value = subprocess.PIPE
+
+ if timeout is _Null:
+ timeout = self.timeout
+ if timeout:
+ self.timer = threading.Timer(float(timeout), self._timeout)
+ self.timer.start()
+
+ if IS_PY3 and sys.platform == 'win32':
+ # Set this otherwist stdout/stderr pipes default to
+ # windows default locale cp1252 which will throw exception
+ # if using non-ascii characters.
+ # For example test/Install/non-ascii-name.py
+ os.environ['PYTHONIOENCODING'] = 'utf-8'
+
+ # It seems that all pythons up to py3.6 still set text mode if you set encoding.
+ # TODO: File enhancement request on python to propagate universal_newlines even
+ # if encoding is set.hg c
+ p = Popen(cmd,
+ stdin=stdin,
+ stdout=subprocess.PIPE,
+ stderr=stderr_value,
+ env=os.environ,
+ universal_newlines=False)
+
+ self.process = p
+ return p
+
+ @staticmethod
+ def fix_binary_stream(stream):
+ """
+ Handle stdout/stderr from popen when we specify universal_newlines = False.
+
+ This will read from the pipes in binary mode, not decode the output,
+ and not convert line endings to \n.
+ We do this because in py3 (3.5) with universal_newlines=True, it will
+ choose the default system locale to decode the output, and this breaks unicode
+ output. Specifically breaking test/option--tree.py which outputs a unicode char.
+
+ py 3.6 allows us to pass an encoding param to popen thus not requiring the decode
+ nor end of line handling, because we propagate universal_newlines as specified.
+
+ TODO: Do we need to pass universal newlines into this function?
+ """
+
+ if not stream:
+ return stream
+ # TODO: Run full tests on both platforms and see if this fixes failures
+ # It seems that py3.6 still sets text mode if you set encoding.
+ elif sys.version_info[0] == 3: # TODO and sys.version_info[1] < 6:
+ stream = stream.decode('utf-8')
+ stream = stream.replace('\r\n', '\n')
+ elif sys.version_info[0] == 2:
+ stream = stream.replace('\r\n', '\n')
+
+ return stream
+
+
+ def finish(self, popen=None, **kw):
+ """
+ Finishes and waits for the process being run under control of
+ the specified popen argument, recording the exit status,
+ standard output and error output.
+ """
+ if popen is None:
+ popen = self.process
+ stdout, stderr = popen.communicate()
+
+ stdout = self.fix_binary_stream(stdout)
+ stderr = self.fix_binary_stream(stderr)
+
+ if self.timer:
+ self.timer.cancel()
+ self.timer = None
+ self.status = popen.returncode
+ self.process = None
+ self._stdout.append(stdout or '')
+ self._stderr.append(stderr or '')
+
+ def run(self, program=None,
+ interpreter=None,
+ arguments=None,
+ chdir=None,
+ stdin=None,
+ universal_newlines=None,
+ timeout=_Null):
+ """Runs a test of the program or script for the test
+ environment. Standard output and error output are saved for
+ future retrieval via the stdout() and stderr() methods.
+
+ The specified program will have the original directory
+ prepended unless it is enclosed in a [list].
+ """
+ if self.external:
+ if not program:
+ program = self.program
+ if not interpreter:
+ interpreter = self.interpreter
+
+ if universal_newlines is None:
+ universal_newlines = self.universal_newlines
+
+ if chdir:
+ oldcwd = os.getcwd()
+ if not os.path.isabs(chdir):
+ chdir = os.path.join(self.workpath(chdir))
+ if self.verbose:
+ sys.stderr.write("chdir(" + chdir + ")\n")
+ os.chdir(chdir)
+ p = self.start(program=program,
+ interpreter=interpreter,
+ arguments=arguments,
+ universal_newlines=universal_newlines,
+ timeout=timeout,
+ stdin=stdin)
+ if is_List(stdin):
+ stdin = ''.join(stdin)
+
+ if stdin and IS_PY3:# and sys.version_info[1] < 6:
+ stdin = to_bytes(stdin)
+
+ # TODO(sgk): figure out how to re-use the logic in the .finish()
+ # method above. Just calling it from here causes problems with
+ # subclasses that redefine .finish(). We could abstract this
+ # into Yet Another common method called both here and by .finish(),
+ # but that seems ill-thought-out.
+ stdout, stderr = p.communicate(input=stdin)
+ if self.timer:
+ self.timer.cancel()
+ self.timer = None
+ self.status = p.returncode
+ self.process = None
+
+ stdout = self.fix_binary_stream(stdout)
+ stderr = self.fix_binary_stream(stderr)
+
+
+ self._stdout.append(stdout or '')
+ self._stderr.append(stderr or '')
+
+ if chdir:
+ os.chdir(oldcwd)
+ if self.verbose >= 2:
+ write = sys.stdout.write
+ write('============ STATUS: %d\n' % self.status)
+ out = self.stdout()
+ if out or self.verbose >= 3:
+ write('============ BEGIN STDOUT (len=%d):\n' % len(out))
+ write(out)
+ write('============ END STDOUT\n')
+ err = self.stderr()
+ if err or self.verbose >= 3:
+ write('============ BEGIN STDERR (len=%d)\n' % len(err))
+ write(err)
+ write('============ END STDERR\n')
+
+ def sleep(self, seconds=default_sleep_seconds):
+ """Sleeps at least the specified number of seconds. If no
+ number is specified, sleeps at least the minimum number of
+ seconds necessary to advance file time stamps on the current
+ system. Sleeping more seconds is all right.
+ """
+ time.sleep(seconds)
+
+ def stderr(self, run=None):
+ """Returns the error output from the specified run number.
+ If there is no specified run number, then returns the error
+ output of the last run. If the run number is less than zero,
+ then returns the error output from that many runs back from the
+ current run.
+ """
+ if not run:
+ run = len(self._stderr)
+ elif run < 0:
+ run = len(self._stderr) + run
+ run = run - 1
+ return self._stderr[run]
+
+ def stdout(self, run=None):
+ """
+ Returns the stored standard output from a given run.
+
+ Args:
+ run: run number to select. If run number is omitted,
+ return the standard output of the most recent run.
+ If negative, use as a relative offset, so that -2
+ means the run two prior to the most recent.
+
+ Returns:
+ selected stdout string or None if there are no
+ stored runs.
+ """
+ if not run:
+ run = len(self._stdout)
+ elif run < 0:
+ run = len(self._stdout) + run
+ run = run - 1
+ try:
+ return self._stdout[run]
+ except IndexError:
+ return None
+
+ def subdir(self, *subdirs):
+ """Create new subdirectories under the temporary working
+ directory, one for each argument. An argument may be a list,
+ in which case the list elements are concatenated using the
+ os.path.join() method. Subdirectories multiple levels deep
+ must be created using a separate argument for each level:
+
+ test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
+
+ Returns the number of subdirectories actually created.
+ """
+ count = 0
+ for sub in subdirs:
+ if sub is None:
+ continue
+ if is_List(sub):
+ sub = os.path.join(*tuple(sub))
+ new = os.path.join(self.workdir, sub)
+ try:
+ os.mkdir(new)
+ except OSError as e:
+ print("Got error creating dir: %s :%s" % (sub, e))
+ pass
+ else:
+ count = count + 1
+ return count
+
+ def symlink(self, target, link):
+ """Creates a symlink to the specified target.
+ The link name may be a list, in which case the elements are
+ concatenated with the os.path.join() method. The link is
+ assumed to be under the temporary working directory unless it
+ is an absolute path name. The target is *not* assumed to be
+ under the temporary working directory.
+ """
+ if sys.platform == 'win32':
+ # Skip this on windows as we're not enabling it due to
+ # it requiring user permissions which aren't always present
+ # and we don't have a good way to detect those permissions yet.
+ return
+ link = self.canonicalize(link)
+ try:
+ os.symlink(target, link)
+ except AttributeError:
+ pass # Windows has no symlink
+
+ def tempdir(self, path=None):
+ """Creates a temporary directory.
+ A unique directory name is generated if no path name is specified.
+ The directory is created, and will be removed when the TestCmd
+ object is destroyed.
+ """
+ if path is None:
+ try:
+ path = tempfile.mkdtemp(prefix=testprefix)
+ except TypeError:
+ path = tempfile.mkdtemp()
+ else:
+ os.mkdir(path)
+
+ # Symlinks in the path will report things
+ # differently from os.getcwd(), so chdir there
+ # and back to fetch the canonical path.
+ cwd = os.getcwd()
+ try:
+ os.chdir(path)
+ path = os.getcwd()
+ finally:
+ os.chdir(cwd)
+
+ # Uppercase the drive letter since the case of drive
+ # letters is pretty much random on win32:
+ drive, rest = os.path.splitdrive(path)
+ if drive:
+ path = drive.upper() + rest
+
+ #
+ self._dirlist.append(path)
+
+ global _Cleanup
+ if self not in _Cleanup:
+ _Cleanup.append(self)
+
+ return path
+
+ def touch(self, path, mtime=None):
+ """Updates the modification time on the specified file or
+ directory path name. The default is to update to the
+ current time if no explicit modification time is specified.
+ """
+ path = self.canonicalize(path)
+ atime = os.path.getatime(path)
+ if mtime is None:
+ mtime = time.time()
+ os.utime(path, (atime, mtime))
+
+ def unlink(self, file):
+ """Unlinks the specified file name.
+ The file name may be a list, in which case the elements are
+ concatenated with the os.path.join() method. The file is
+ assumed to be under the temporary working directory unless it
+ is an absolute path name.
+ """
+ file = self.canonicalize(file)
+ os.unlink(file)
+
+ def verbose_set(self, verbose):
+ """Set the verbose level.
+ """
+ self.verbose = verbose
+
+ def where_is(self, file, path=None, pathext=None):
+ """Find an executable file.
+ """
+ if is_List(file):
+ file = os.path.join(*tuple(file))
+ if not os.path.isabs(file):
+ file = where_is(file, path, pathext)
+ return file
+
+ def workdir_set(self, path):
+ """Creates a temporary working directory with the specified
+ path name. If the path is a null string (''), a unique
+ directory name is created.
+ """
+ if (path != None):
+ if path == '':
+ path = None
+ path = self.tempdir(path)
+ self.workdir = path
+
+ def workpath(self, *args):
+ """Returns the absolute path name to a subdirectory or file
+ within the current temporary working directory. Concatenates
+ the temporary working directory name with the specified
+ arguments using the os.path.join() method.
+ """
+ return os.path.join(self.workdir, *tuple(args))
+
+ def readable(self, top, read=1):
+ """Make the specified directory tree readable (read == 1)
+ or not (read == None).
+
+ This method has no effect on Windows systems, which use a
+ completely different mechanism to control file readability.
+ """
+
+ if sys.platform == 'win32':
+ return
+
+ if read:
+ def do_chmod(fname):
+ try:
+ st = os.stat(fname)
+ except OSError:
+ pass
+ else:
+ os.chmod(fname, stat.S_IMODE(
+ st[stat.ST_MODE] | stat.S_IREAD))
+ else:
+ def do_chmod(fname):
+ try:
+ st = os.stat(fname)
+ except OSError:
+ pass
+ else:
+ os.chmod(fname, stat.S_IMODE(
+ st[stat.ST_MODE] & ~stat.S_IREAD))
+
+ if os.path.isfile(top):
+ # If it's a file, that's easy, just chmod it.
+ do_chmod(top)
+ elif read:
+ # It's a directory and we're trying to turn on read
+ # permission, so it's also pretty easy, just chmod the
+ # directory and then chmod every entry on our walk down the
+ # tree.
+ do_chmod(top)
+ for dirpath, dirnames, filenames in os.walk(top):
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
+ else:
+ # It's a directory and we're trying to turn off read
+ # permission, which means we have to chmod the directories
+ # in the tree bottom-up, lest disabling read permission from
+ # the top down get in the way of being able to get at lower
+ # parts of the tree.
+ for dirpath, dirnames, filenames in os.walk(top, topdown=0):
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
+ do_chmod(top)
+
+ def writable(self, top, write=1):
+ """Make the specified directory tree writable (write == 1)
+ or not (write == None).
+ """
+
+ if sys.platform == 'win32':
+
+ if write:
+ def do_chmod(fname):
+ try:
+ os.chmod(fname, stat.S_IWRITE)
+ except OSError:
+ pass
+ else:
+ def do_chmod(fname):
+ try:
+ os.chmod(fname, stat.S_IREAD)
+ except OSError:
+ pass
+
+ else:
+
+ if write:
+ def do_chmod(fname):
+ try:
+ st = os.stat(fname)
+ except OSError:
+ pass
+ else:
+ os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE] | 0o200))
+ else:
+ def do_chmod(fname):
+ try:
+ st = os.stat(fname)
+ except OSError:
+ pass
+ else:
+ os.chmod(fname, stat.S_IMODE(
+ st[stat.ST_MODE] & ~0o200))
+
+ if os.path.isfile(top):
+ do_chmod(top)
+ else:
+ do_chmod(top)
+ for dirpath, dirnames, filenames in os.walk(top, topdown=0):
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
+
+ def executable(self, top, execute=1):
+ """Make the specified directory tree executable (execute == 1)
+ or not (execute == None).
+
+ This method has no effect on Windows systems, which use a
+ completely different mechanism to control file executability.
+ """
+
+ if sys.platform == 'win32':
+ return
+
+ if execute:
+ def do_chmod(fname):
+ try:
+ st = os.stat(fname)
+ except OSError:
+ pass
+ else:
+ os.chmod(fname, stat.S_IMODE(
+ st[stat.ST_MODE] | stat.S_IEXEC))
+ else:
+ def do_chmod(fname):
+ try:
+ st = os.stat(fname)
+ except OSError:
+ pass
+ else:
+ os.chmod(fname, stat.S_IMODE(
+ st[stat.ST_MODE] & ~stat.S_IEXEC))
+
+ if os.path.isfile(top):
+ # If it's a file, that's easy, just chmod it.
+ do_chmod(top)
+ elif execute:
+ # It's a directory and we're trying to turn on execute
+ # permission, so it's also pretty easy, just chmod the
+ # directory and then chmod every entry on our walk down the
+ # tree.
+ do_chmod(top)
+ for dirpath, dirnames, filenames in os.walk(top):
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
+ else:
+ # It's a directory and we're trying to turn off execute
+ # permission, which means we have to chmod the directories
+ # in the tree bottom-up, lest disabling execute permission from
+ # the top down get in the way of being able to get at lower
+ # parts of the tree.
+ for dirpath, dirnames, filenames in os.walk(top, topdown=0):
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
+ do_chmod(top)
+
+ def write(self, file, content, mode='wb'):
+ """Writes the specified content text (second argument) to the
+ specified file name (first argument). The file name may be
+ a list, in which case the elements are concatenated with the
+ os.path.join() method. The file is created under the temporary
+ working directory. Any subdirectories in the path must already
+ exist. The I/O mode for the file may be specified; it must
+ begin with a 'w'. The default is 'wb' (binary write).
+ """
+ file = self.canonicalize(file)
+ if mode[0] != 'w':
+ raise ValueError("mode must begin with 'w'")
+ with open(file, mode) as f:
+ try:
+ f.write(content)
+ except TypeError as e:
+ # python 3 default strings are not bytes, but unicode
+ f.write(bytes(content, 'utf-8'))
+
+# Local Variables:
+# tab-width:4
+# indent-tabs-mode:nil
+# End:
+# vim: set expandtab tabstop=4 shiftwidth=4: