summaryrefslogtreecommitdiff
path: root/QMTest/TestCmd.py
diff options
context:
space:
mode:
Diffstat (limited to 'QMTest/TestCmd.py')
-rw-r--r--QMTest/TestCmd.py481
1 files changed, 324 insertions, 157 deletions
diff --git a/QMTest/TestCmd.py b/QMTest/TestCmd.py
index cd559b7..0aab9a8 100644
--- a/QMTest/TestCmd.py
+++ b/QMTest/TestCmd.py
@@ -285,7 +285,7 @@ version.
# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
-from __future__ import division
+from __future__ import division, print_function
__author__ = "Steven Knight <knight at baldmt dot com>"
__revision__ = "TestCmd.py 1.3.D001 2010/06/03 12:58:27 knight"
@@ -306,8 +306,15 @@ import time
import traceback
import types
+
+IS_PY3 = sys.version_info[0] == 3
+IS_WINDOWS = sys.platform == 'win32'
+
+
class null(object):
pass
+
+
_Null = null()
try:
@@ -328,12 +335,28 @@ __all__ = [
'match_re_dotall',
'python',
'_python_',
- 'TestCmd'
+ 'TestCmd',
+ 'to_bytes',
+ 'to_str',
]
+
def is_List(e):
return isinstance(e, (list, UserList))
+
+def to_bytes(s):
+ if isinstance(s, bytes) or bytes is str:
+ return s
+ return bytes(s, 'utf-8')
+
+
+def to_str(s):
+ if bytes is str or is_String(s):
+ return s
+ return str(s, 'utf-8')
+
+
try:
eval('unicode')
except NameError:
@@ -351,12 +374,13 @@ else:
re_space = re.compile('\s')
+
def _caller(tblist, skip):
string = ""
arr = []
for file, line, name, text in tblist:
if file[-10:] == "TestCmd.py":
- break
+ break
arr = [(file, line, name, text)] + arr
atfrom = "at"
for file, line, name, text in arr[skip:]:
@@ -368,7 +392,8 @@ def _caller(tblist, skip):
atfrom = "\tfrom"
return string
-def fail_test(self = None, condition = 1, function = None, skip = 0, message=None):
+
+def fail_test(self=None, condition=1, function=None, skip=0, message=None):
"""Cause the test to fail.
By default, the fail_test() method reports that the test FAILED
@@ -392,14 +417,15 @@ def fail_test(self = None, condition = 1, function = None, skip = 0, message=Non
at = _caller(traceback.extract_stack(), skip)
if message:
- msg = "\t%s\n"%message
+ msg = "\t%s\n" % message
else:
msg = ""
sys.stderr.write("FAILED test" + of + desc + sep + at + msg)
sys.exit(1)
-def no_result(self = None, condition = 1, function = None, skip = 0):
+
+def no_result(self=None, condition=1, function=None, skip=0):
"""Causes a test to exit with no valid result.
By default, the no_result() method reports NO RESULT for the test
@@ -426,7 +452,8 @@ def no_result(self = None, condition = 1, function = None, skip = 0):
sys.exit(2)
-def pass_test(self = None, condition = 1, function = None):
+
+def pass_test(self=None, condition=1, function=None):
"""Causes a test to pass.
By default, the pass_test() method reports PASSED for the test
@@ -440,13 +467,18 @@ def pass_test(self = None, condition = 1, function = None):
sys.stderr.write("PASSED\n")
sys.exit(0)
-def match_exact(lines = None, matches = None):
+
+def match_exact(lines=None, matches=None, newline=os.sep):
"""
"""
+
+ if isinstance(lines, bytes) or bytes is str:
+ newline = to_bytes(newline)
+
if not is_List(lines):
- lines = lines.split("\n")
+ lines = lines.split(newline)
if not is_List(matches):
- matches = matches.split("\n")
+ matches = matches.split(newline)
if len(lines) != len(matches):
return
for i in range(len(lines)):
@@ -454,7 +486,8 @@ def match_exact(lines = None, matches = None):
return
return 1
-def match_caseinsensitive(lines = None, matches = None):
+
+def match_caseinsensitive(lines=None, matches=None):
"""
"""
if not is_List(lines):
@@ -468,7 +501,8 @@ def match_caseinsensitive(lines = None, matches = None):
return
return 1
-def match_re(lines = None, res = None):
+
+def match_re(lines=None, res=None):
"""
"""
if not is_List(lines):
@@ -477,21 +511,23 @@ def match_re(lines = None, res = None):
if not is_List(res):
res = res.split("\n")
if len(lines) != len(res):
- print "match_re: expected %d lines, found %d"%(len(res), len(lines))
+ print("match_re: expected %d lines, found %d" % (len(res), len(lines)))
return
for i in range(len(lines)):
s = "^" + res[i] + "$"
try:
expr = re.compile(s)
- except re.error, e:
+ except re.error as e:
msg = "Regular expression error in %s: %s"
raise re.error(msg % (repr(s), e.args[0]))
if not expr.search(lines[i]):
- print "match_re: mismatch at line %d:\n search re='%s'\n line='%s'"%(i,s,lines[i])
+ print("match_re: mismatch at line %d:\n search re='%s'\n line='%s'" % (
+ i, s, lines[i]))
return
return 1
-def match_re_dotall(lines = None, res = None):
+
+def match_re_dotall(lines=None, res=None):
"""
"""
if not isinstance(lines, str):
@@ -501,11 +537,12 @@ def match_re_dotall(lines = None, res = None):
s = "^" + res + "$"
try:
expr = re.compile(s, re.DOTALL)
- except re.error, e:
+ except re.error as e:
msg = "Regular expression error in %s: %s"
raise re.error(msg % (repr(s), e.args[0]))
return expr.match(lines)
+
def simple_diff(a, b, fromfile='', tofile='',
fromfiledate='', tofiledate='', n=3, lineterm='\n'):
"""
@@ -513,26 +550,30 @@ def simple_diff(a, b, fromfile='', tofile='',
(diff -c) and difflib.unified_diff (diff -u) but which prints
output like the simple, unadorned 'diff" command.
"""
+ a = [to_str(q) for q in a]
+ b = [to_str(q) for q in b]
sm = difflib.SequenceMatcher(None, a, b)
+
def comma(x1, x2):
- return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
+ return x1 + 1 == x2 and str(x2) or '%s,%s' % (x1 + 1, x2)
result = []
for op, a1, a2, b1, b2 in sm.get_opcodes():
if op == 'delete':
result.append("%sd%d" % (comma(a1, a2), b1))
- result.extend([ '< ' + l for l in a[a1:a2] ])
+ result.extend(['< ' + l for l in a[a1:a2]])
elif op == 'insert':
result.append("%da%s" % (a1, comma(b1, b2)))
- result.extend([ '> ' + l for l in b[b1:b2] ])
+ result.extend(['> ' + l for l in b[b1:b2]])
elif op == 'replace':
result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
- result.extend([ '< ' + l for l in a[a1:a2] ])
+ result.extend(['< ' + l for l in a[a1:a2]])
result.append('---')
- result.extend([ '> ' + l for l in b[b1:b2] ])
+ result.extend(['> ' + l for l in b[b1:b2]])
return result
+
def diff_re(a, b, fromfile='', tofile='',
- fromfiledate='', tofiledate='', n=3, lineterm='\n'):
+ fromfiledate='', tofiledate='', n=3, lineterm='\n'):
"""
A simple "diff" of two sets of lines when the expected lines
are regular expressions. This is a really dumb thing that
@@ -543,33 +584,34 @@ def diff_re(a, b, fromfile='', tofile='',
result = []
diff = len(a) - len(b)
if diff < 0:
- a = a + ['']*(-diff)
+ a = a + [''] * (-diff)
elif diff > 0:
- b = b + ['']*diff
+ b = b + [''] * diff
i = 0
for aline, bline in zip(a, b):
s = "^" + aline + "$"
try:
expr = re.compile(s)
- except re.error, e:
+ except re.error as e:
msg = "Regular expression error in %s: %s"
raise re.error(msg % (repr(s), e.args[0]))
if not expr.search(bline):
- result.append("%sc%s" % (i+1, i+1))
+ result.append("%sc%s" % (i + 1, i + 1))
result.append('< ' + repr(a[i]))
result.append('---')
result.append('> ' + repr(b[i]))
- i = i+1
+ i = i + 1
return result
+
if os.name == 'posix':
def escape(arg):
"escape shell special characters"
slash = '\\'
special = '"$'
- arg = arg.replace(slash, slash+slash)
+ arg = arg.replace(slash, slash + slash)
for c in special:
- arg = arg.replace(c, slash+c)
+ arg = arg.replace(c, slash + c)
if re_space.search(arg):
arg = '"' + arg + '"'
return arg
@@ -627,14 +669,13 @@ else:
st = os.stat(f)
except OSError:
continue
- if stat.S_IMODE(st[stat.ST_MODE]) & 0111:
+ if stat.S_IMODE(st[stat.ST_MODE]) & 0o111:
return f
return None
default_sleep_seconds = 1
-
import subprocess
try:
@@ -642,6 +683,7 @@ try:
except AttributeError:
if sys.platform == 'win32':
import win32process
+
def terminate(self):
win32process.TerminateProcess(self._handle, 1)
else:
@@ -651,53 +693,64 @@ except AttributeError:
setattr(subprocess.Popen, 'terminate', method)
-
# From Josiah Carlson,
# ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
PIPE = subprocess.PIPE
-if subprocess.mswindows:
+if sys.platform == 'win32': # and subprocess.mswindows:
try:
from win32file import ReadFile, WriteFile
from win32pipe import PeekNamedPipe
except ImportError:
# If PyWin32 is not available, try ctypes instead
# XXX These replicate _just_enough_ PyWin32 behaviour for our purposes
- import ctypes; from ctypes.wintypes import DWORD
+ import ctypes
+ from ctypes.wintypes import DWORD
+
def ReadFile(hFile, bufSize, ol=None):
assert ol is None
lpBuffer = ctypes.create_string_buffer(bufSize)
bytesRead = DWORD()
bErr = ctypes.windll.kernel32.ReadFile(
- hFile, lpBuffer, bufSize, ctypes.byref(bytesRead), ol)
- if not bErr: raise ctypes.WinError()
+ hFile, lpBuffer, bufSize, ctypes.byref(bytesRead), ol)
+ if not bErr:
+ raise ctypes.WinError()
return (0, ctypes.string_at(lpBuffer, bytesRead.value))
+
def WriteFile(hFile, data, ol=None):
assert ol is None
bytesWritten = DWORD()
bErr = ctypes.windll.kernel32.WriteFile(
- hFile, data, len(data), ctypes.byref(bytesWritten), ol)
- if not bErr: raise ctypes.WinError()
+ hFile, data, len(data), ctypes.byref(bytesWritten), ol)
+ if not bErr:
+ raise ctypes.WinError()
return (0, bytesWritten.value)
+
def PeekNamedPipe(hPipe, size):
assert size == 0
bytesAvail = DWORD()
bErr = ctypes.windll.kernel32.PeekNamedPipe(
- hPipe, None, size, None, ctypes.byref(bytesAvail), None)
- if not bErr: raise ctypes.WinError()
+ hPipe, None, size, None, ctypes.byref(bytesAvail), None)
+ if not bErr:
+ raise ctypes.WinError()
return ("", bytesAvail.value, None)
import msvcrt
else:
import select
import fcntl
- try: fcntl.F_GETFL
- except AttributeError: fcntl.F_GETFL = 3
+ try:
+ fcntl.F_GETFL
+ except AttributeError:
+ fcntl.F_GETFL = 3
+
+ try:
+ fcntl.F_SETFL
+ except AttributeError:
+ fcntl.F_SETFL = 4
- try: fcntl.F_SETFL
- except AttributeError: fcntl.F_SETFL = 4
class Popen(subprocess.Popen):
def recv(self, maxsize=None):
@@ -720,8 +773,9 @@ class Popen(subprocess.Popen):
getattr(self, which).close()
setattr(self, which, None)
- if subprocess.mswindows:
+ if sys.platform == 'win32': # and subprocess.mswindows:
def send(self, input):
+ input = to_bytes(input)
if not self.stdin:
return None
@@ -730,7 +784,7 @@ class Popen(subprocess.Popen):
(errCode, written) = WriteFile(x, input)
except ValueError:
return self._close('stdin')
- except (subprocess.pywintypes.error, Exception), why:
+ except (subprocess.pywintypes.error, Exception) as why:
if why.args[0] in (109, errno.ESHUTDOWN):
return self._close('stdin')
raise
@@ -751,12 +805,12 @@ class Popen(subprocess.Popen):
(errCode, read) = ReadFile(x, nAvail, None)
except ValueError:
return self._close(which)
- except (subprocess.pywintypes.error, Exception), why:
+ except (subprocess.pywintypes.error, Exception) as why:
if why.args[0] in (109, errno.ESHUTDOWN):
return self._close(which)
raise
- #if self.universal_newlines:
+ # if self.universal_newlines:
# read = self._translate_newlines(read)
return read
@@ -769,9 +823,10 @@ class Popen(subprocess.Popen):
return 0
try:
- written = os.write(self.stdin.fileno(), input)
- except OSError, why:
- if why.args[0] == errno.EPIPE: #broken pipe
+ written = os.write(self.stdin.fileno(),
+ bytearray(input, 'utf-8'))
+ except OSError as why:
+ if why.args[0] == errno.EPIPE: # broken pipe
return self._close('stdin')
raise
@@ -788,7 +843,7 @@ class Popen(subprocess.Popen):
flags = None
else:
if not conn.closed:
- fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
+ fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
try:
if not select.select([conn], [], [], 0)[0]:
@@ -798,19 +853,21 @@ class Popen(subprocess.Popen):
if not r:
return self._close(which)
- #if self.universal_newlines:
+ # if self.universal_newlines:
# r = self._translate_newlines(r)
return r
finally:
if not conn.closed and not flags is None:
fcntl.fcntl(conn, fcntl.F_SETFL, flags)
+
disconnect_message = "Other end disconnected!"
+
def recv_some(p, t=.1, e=1, tr=5, stderr=0):
if tr < 1:
tr = 1
- x = time.time()+t
+ x = time.time() + t
y = []
r = ''
pr = p.recv
@@ -826,9 +883,10 @@ def recv_some(p, t=.1, e=1, tr=5, stderr=0):
elif r:
y.append(r)
else:
- time.sleep(max((x-time.time())/tr, 0))
+ time.sleep(max((x - time.time()) / tr, 0))
return ''.join(y)
+
def send_all(p, data):
while len(data):
sent = p.send(data)
@@ -836,16 +894,19 @@ def send_all(p, data):
raise Exception(disconnect_message)
data = memoryview(data)[sent:]
+
_Cleanup = []
+
def _clean():
global _Cleanup
- cleanlist = [ c for c in _Cleanup if c ]
+ cleanlist = [c for c in _Cleanup if c]
del _Cleanup[:]
cleanlist.reverse()
for test in cleanlist:
test.cleanup()
+
atexit.register(_clean)
@@ -853,21 +914,21 @@ class TestCmd(object):
"""Class TestCmd
"""
- def __init__(self, description = None,
- program = None,
- interpreter = None,
- workdir = None,
- subdir = None,
- verbose = None,
- match = None,
- match_stdout = None,
- match_stderr = None,
- diff = None,
- diff_stdout = None,
- diff_stderr = None,
- combine = 0,
- universal_newlines = 1,
- timeout = None):
+ def __init__(self, description=None,
+ program=None,
+ interpreter=None,
+ workdir=None,
+ subdir=None,
+ verbose=None,
+ match=None,
+ match_stdout=None,
+ match_stderr=None,
+ diff=None,
+ diff_stdout=None,
+ diff_stderr=None,
+ combine=0,
+ universal_newlines=True,
+ timeout=None):
self.external = os.environ.get('SCONS_EXTERNAL_TEST', 0)
self._cwd = os.getcwd()
self.description_set(description)
@@ -875,7 +936,7 @@ class TestCmd(object):
self.interpreter_set(interpreter)
if verbose is None:
try:
- verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
+ verbose = max(0, int(os.environ.get('TESTCMD_VERBOSE', 0)))
except ValueError:
verbose = 0
self.verbose_set(verbose)
@@ -887,7 +948,8 @@ class TestCmd(object):
self.set_diff_function(diff, diff_stdout, diff_stderr)
self._dirlist = []
self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
- if 'PRESERVE' in os.environ and not os.environ['PRESERVE'] is '':
+ preserve_value = os.environ.get('PRESERVE', False)
+ if preserve_value not in [0, '0', 'False']:
self._preserve['pass_test'] = os.environ['PRESERVE']
self._preserve['fail_test'] = os.environ['PRESERVE']
self._preserve['no_result'] = os.environ['PRESERVE']
@@ -910,7 +972,7 @@ class TestCmd(object):
self.condition = 'no_result'
self.workdir_set(workdir)
self.subdir(subdir)
- self.script_srcdir = None
+ self.fixture_dirs = []
def __del__(self):
self.cleanup()
@@ -941,7 +1003,7 @@ class TestCmd(object):
path = self.canonicalize(path)
os.chmod(path, mode)
- def cleanup(self, condition = None):
+ def cleanup(self, condition=None):
"""Removes any temporary working directories for the specified
TestCmd environment. If the environment variable PRESERVE was
set when the TestCmd environment was created, temporary working
@@ -964,22 +1026,22 @@ class TestCmd(object):
condition = self.condition
if self._preserve[condition]:
for dir in self._dirlist:
- print unicode("Preserved directory " + dir + "\n"),
+ print(u"Preserved directory " + dir + "\n")
else:
list = self._dirlist[:]
list.reverse()
for dir in list:
self.writable(dir, 1)
- shutil.rmtree(dir, ignore_errors = 1)
+ shutil.rmtree(dir, ignore_errors=1)
self._dirlist = []
global _Cleanup
if self in _Cleanup:
_Cleanup.remove(self)
- def command_args(self, program = None,
- interpreter = None,
- arguments = None):
+ def command_args(self, program=None,
+ interpreter=None,
+ arguments=None):
if not self.external:
if program:
if isinstance(program, str) and not os.path.isabs(program):
@@ -1030,10 +1092,10 @@ class TestCmd(object):
if diff_function is None:
diff_function = self.simple_diff
if name is not None:
- print self.banner(name)
+ print(self.banner(name))
args = (a.splitlines(), b.splitlines()) + args
for line in diff_function(*args, **kw):
- print line
+ print(line)
def diff_stderr(self, a, b, *args, **kw):
"""Compare actual and expected file contents.
@@ -1061,17 +1123,17 @@ class TestCmd(object):
unified_diff = staticmethod(difflib.unified_diff)
- def fail_test(self, condition = 1, function = None, skip = 0, message = None):
+ def fail_test(self, condition=1, function=None, skip=0, message=None):
"""Cause the test to fail.
"""
if not condition:
return
self.condition = 'fail_test'
- fail_test(self = self,
- condition = condition,
- function = function,
- skip = skip,
- message = message)
+ fail_test(self=self,
+ condition=condition,
+ function=function,
+ skip=skip,
+ message=message)
def interpreter_set(self, interpreter):
"""Set the program to be used to interpret the program
@@ -1133,24 +1195,24 @@ class TestCmd(object):
match_re_dotall = staticmethod(match_re_dotall)
- def no_result(self, condition = 1, function = None, skip = 0):
+ def no_result(self, condition=1, function=None, skip=0):
"""Report that the test could not be run.
"""
if not condition:
return
self.condition = 'no_result'
- no_result(self = self,
- condition = condition,
- function = function,
- skip = skip)
+ no_result(self=self,
+ condition=condition,
+ function=function,
+ skip=skip)
- def pass_test(self, condition = 1, function = None):
+ def pass_test(self, condition=1, function=None):
"""Cause the test to pass.
"""
if not condition:
return
self.condition = 'pass_test'
- pass_test(self = self, condition = condition, function = function)
+ pass_test(self=self, condition=condition, function=function)
def preserve(self, *conditions):
"""Arrange for the temporary working directories for the
@@ -1172,7 +1234,7 @@ class TestCmd(object):
program = os.path.join(self._cwd, program)
self.program = program
- def read(self, file, mode = 'rb'):
+ def read(self, file, mode='rb', newline=None):
"""Reads and returns the contents of the specified file name.
The file name may be a list, in which case the elements are
concatenated with the os.path.join() method. The file is
@@ -1184,7 +1246,10 @@ class TestCmd(object):
file = self.canonicalize(file)
if mode[0] != 'r':
raise ValueError("mode must begin with 'r'")
- return open(file, mode).read()
+ if IS_PY3 and 'b' not in mode:
+ return open(file, mode, newline=newline).read()
+ else:
+ return open(file, mode).read()
def rmdir(self, dir):
"""Removes the specified dir name.
@@ -1234,10 +1299,15 @@ class TestCmd(object):
assumed to be under the temporary working directory, it gets
created automatically, if it does not already exist.
"""
- if srcdir and self.script_srcdir and not os.path.isabs(srcdir):
- spath = os.path.join(self.script_srcdir, srcdir)
+
+ if srcdir and self.fixture_dirs and not os.path.isabs(srcdir):
+ for dir in self.fixture_dirs:
+ spath = os.path.join(dir, srcdir)
+ if os.path.isdir(spath):
+ break
else:
spath = srcdir
+
if dstdir:
dstdir = self.canonicalize(dstdir)
else:
@@ -1248,7 +1318,7 @@ class TestCmd(object):
if len(dstlist) > 0 and dstlist[0] == ".":
dstlist = dstlist[1:]
for idx in range(len(dstlist)):
- self.subdir(dstlist[:idx+1])
+ self.subdir(dstlist[:idx + 1])
if dstdir and self.workdir:
dstdir = os.path.join(self.workdir, dstdir)
@@ -1271,13 +1341,15 @@ class TestCmd(object):
automatically, if it does not already exist.
"""
srcpath, srctail = os.path.split(srcfile)
- if srcpath:
- if self.script_srcdir and not os.path.isabs(srcpath):
- spath = os.path.join(self.script_srcdir, srcfile)
- else:
- spath = srcfile
+
+ if srcpath and (not self.fixture_dirs or os.path.isabs(srcpath)):
+ spath = srcfile
else:
- spath = os.path.join(self.script_srcdir, srcfile)
+ for dir in self.fixture_dirs:
+ spath = os.path.join(dir, srcfile)
+ if os.path.isfile(spath):
+ break
+
if not dstfile:
if srctail:
dpath = os.path.join(self.workdir, srctail)
@@ -1291,17 +1363,17 @@ class TestCmd(object):
if len(dstlist) > 0 and dstlist[0] == ".":
dstlist = dstlist[1:]
for idx in range(len(dstlist)):
- self.subdir(dstlist[:idx+1])
+ self.subdir(dstlist[:idx + 1])
dpath = os.path.join(self.workdir, dstfile)
shutil.copy(spath, dpath)
- def start(self, program = None,
- interpreter = None,
- arguments = None,
- universal_newlines = None,
- timeout = _Null,
- **kw):
+ def start(self, program=None,
+ interpreter=None,
+ arguments=None,
+ universal_newlines=None,
+ timeout=_Null,
+ **kw):
"""
Starts a program or script for the test environment.
@@ -1310,7 +1382,7 @@ class TestCmd(object):
"""
cmd = self.command_args(program, interpreter, arguments)
if self.verbose:
- cmd_string = ' '.join([ self.escape(c) for c in cmd ])
+ cmd_string = ' '.join([self.escape(c) for c in cmd])
sys.stderr.write(cmd_string + "\n")
if universal_newlines is None:
universal_newlines = self.universal_newlines
@@ -1334,14 +1406,56 @@ class TestCmd(object):
if timeout:
self.timer = threading.Timer(float(timeout), self._timeout)
self.timer.start()
+
+ if IS_PY3 and sys.platform == 'win32':
+ # Set this otherwist stdout/stderr pipes default to
+ # windows default locale cp1252 which will throw exception
+ # if using non-ascii characters.
+ # For example test/Install/non-ascii-name.py
+ os.environ['PYTHONIOENCODING'] = 'utf-8'
+
+ # It seems that all pythons up to py3.6 still set text mode if you set encoding.
+ # TODO: File enhancement request on python to propagate universal_newlines even
+ # if encoding is set.hg c
p = Popen(cmd,
stdin=stdin,
stdout=subprocess.PIPE,
stderr=stderr_value,
- universal_newlines=universal_newlines)
+ env=os.environ,
+ universal_newlines=False)
+
self.process = p
return p
+ @staticmethod
+ def fix_binary_stream(stream):
+ """
+ Handle stdout/stderr from popen when we specify universal_newlines = False.
+ This will read from the pipes in binary mode, not decode the output,
+ and not convert line endings to \n.
+ We do this because in py3 (3.5) with universal_newlines=True, it will
+ choose the default system locale to decode the output, and this breaks unicode
+ output. Specifically breaking test/option--tree.py which outputs a unicode char.
+
+ py 3.6 allows us to pass an encoding param to popen thus not requiring the decode
+ nor end of line handling, because we propagate universal_newlines as specified.
+
+ TODO: Do we need to pass universal newlines into this function?
+ """
+
+ if not stream:
+ return stream
+ # TODO: Run full tests on both platforms and see if this fixes failures
+ # It seems that py3.6 still sets text mode if you set encoding.
+ elif sys.version_info[0] == 3:# TODO and sys.version_info[1] < 6:
+ stream = stream.decode('utf-8')
+ stream = stream.replace('\r\n', '\n')
+ elif sys.version_info[0] == 2:
+ stream = stream.replace('\r\n', '\n')
+
+ return stream
+
+
def finish(self, popen=None, **kw):
"""
Finishes and waits for the process being run under control of
@@ -1351,6 +1465,10 @@ class TestCmd(object):
if popen is None:
popen = self.process
stdout, stderr = popen.communicate()
+
+ stdout = self.fix_binary_stream(stdout)
+ stderr = self.fix_binary_stream(stderr)
+
if self.timer:
self.timer.cancel()
self.timer = None
@@ -1359,13 +1477,13 @@ class TestCmd(object):
self._stdout.append(stdout or '')
self._stderr.append(stderr or '')
- def run(self, program = None,
- interpreter = None,
- arguments = None,
- chdir = None,
- stdin = None,
- universal_newlines = None,
- timeout = _Null):
+ def run(self, program=None,
+ interpreter=None,
+ arguments=None,
+ chdir=None,
+ stdin=None,
+ universal_newlines=None,
+ timeout=_Null):
"""Runs a test of the program or script for the test
environment. Standard output and error output are saved for
future retrieval via the stdout() and stderr() methods.
@@ -1379,6 +1497,9 @@ class TestCmd(object):
if not interpreter:
interpreter = self.interpreter
+ if universal_newlines is None:
+ universal_newlines = self.universal_newlines
+
if chdir:
oldcwd = os.getcwd()
if not os.path.isabs(chdir):
@@ -1386,14 +1507,18 @@ class TestCmd(object):
if self.verbose:
sys.stderr.write("chdir(" + chdir + ")\n")
os.chdir(chdir)
- p = self.start(program = program,
- interpreter = interpreter,
- arguments = arguments,
- universal_newlines = universal_newlines,
- timeout = timeout,
- stdin = stdin)
+ p = self.start(program=program,
+ interpreter=interpreter,
+ arguments=arguments,
+ universal_newlines=universal_newlines,
+ timeout=timeout,
+ stdin=stdin)
if is_List(stdin):
stdin = ''.join(stdin)
+
+ if stdin and IS_PY3:# and sys.version_info[1] < 6:
+ stdin = to_bytes(stdin)
+
# TODO(sgk): figure out how to re-use the logic in the .finish()
# method above. Just calling it from here causes problems with
# subclasses that redefine .finish(). We could abstract this
@@ -1405,6 +1530,11 @@ class TestCmd(object):
self.timer = None
self.status = p.returncode
self.process = None
+
+ stdout = self.fix_binary_stream(stdout)
+ stderr = self.fix_binary_stream(stderr)
+
+
self._stdout.append(stdout or '')
self._stderr.append(stderr or '')
@@ -1424,7 +1554,7 @@ class TestCmd(object):
write(err)
write('============ END STDERR\n')
- def sleep(self, seconds = default_sleep_seconds):
+ def sleep(self, seconds=default_sleep_seconds):
"""Sleeps at least the specified number of seconds. If no
number is specified, sleeps at least the minimum number of
seconds necessary to advance file time stamps on the current
@@ -1432,7 +1562,7 @@ class TestCmd(object):
"""
time.sleep(seconds)
- def stderr(self, run = None):
+ def stderr(self, run=None):
"""Returns the error output from the specified run number.
If there is no specified run number, then returns the error
output of the last run. If the run number is less than zero,
@@ -1446,7 +1576,7 @@ class TestCmd(object):
run = run - 1
return self._stderr[run]
- def stdout(self, run = None):
+ def stdout(self, run=None):
"""Returns the standard output from the specified run number.
If there is no specified run number, then returns the standard
output of the last run. If the run number is less than zero,
@@ -1494,6 +1624,11 @@ class TestCmd(object):
is an absolute path name. The target is *not* assumed to be
under the temporary working directory.
"""
+ if sys.platform == 'win32':
+ # Skip this on windows as we're not enabling it due to
+ # it requiring user permissions which aren't always present
+ # and we don't have a good way to detect those permissions yet.
+ return
link = self.canonicalize(link)
try:
os.symlink(target, link)
@@ -1525,7 +1660,7 @@ class TestCmd(object):
# Uppercase the drive letter since the case of drive
# letters is pretty much random on win32:
- drive,rest = os.path.splitdrive(path)
+ drive, rest = os.path.splitdrive(path)
if drive:
path = drive.upper() + rest
@@ -1605,14 +1740,22 @@ class TestCmd(object):
if read:
def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
+ try:
+ st = os.stat(fname)
+ except OSError:
+ pass
+ else:
+ os.chmod(fname, stat.S_IMODE(
+ st[stat.ST_MODE] | stat.S_IREAD))
else:
def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
+ try:
+ st = os.stat(fname)
+ except OSError:
+ pass
+ else:
+ os.chmod(fname, stat.S_IMODE(
+ st[stat.ST_MODE] & ~stat.S_IREAD))
if os.path.isfile(top):
# If it's a file, that's easy, just chmod it.
@@ -1646,25 +1789,36 @@ class TestCmd(object):
if write:
def do_chmod(fname):
- try: os.chmod(fname, stat.S_IWRITE)
- except OSError: pass
+ try:
+ os.chmod(fname, stat.S_IWRITE)
+ except OSError:
+ pass
else:
def do_chmod(fname):
- try: os.chmod(fname, stat.S_IREAD)
- except OSError: pass
+ try:
+ os.chmod(fname, stat.S_IREAD)
+ except OSError:
+ pass
else:
if write:
def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
+ try:
+ st = os.stat(fname)
+ except OSError:
+ pass
+ else:
+ os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE] | 0o200))
else:
def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
+ try:
+ st = os.stat(fname)
+ except OSError:
+ pass
+ else:
+ os.chmod(fname, stat.S_IMODE(
+ st[stat.ST_MODE] & ~0o200))
if os.path.isfile(top):
do_chmod(top)
@@ -1687,14 +1841,22 @@ class TestCmd(object):
if execute:
def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
+ try:
+ st = os.stat(fname)
+ except OSError:
+ pass
+ else:
+ os.chmod(fname, stat.S_IMODE(
+ st[stat.ST_MODE] | stat.S_IEXEC))
else:
def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
+ try:
+ st = os.stat(fname)
+ except OSError:
+ pass
+ else:
+ os.chmod(fname, stat.S_IMODE(
+ st[stat.ST_MODE] & ~stat.S_IEXEC))
if os.path.isfile(top):
# If it's a file, that's easy, just chmod it.
@@ -1719,7 +1881,7 @@ class TestCmd(object):
do_chmod(os.path.join(dirpath, name))
do_chmod(top)
- def write(self, file, content, mode = 'wb'):
+ def write(self, file, content, mode='wb'):
"""Writes the specified content text (second argument) to the
specified file name (first argument). The file name may be
a list, in which case the elements are concatenated with the
@@ -1731,7 +1893,12 @@ class TestCmd(object):
file = self.canonicalize(file)
if mode[0] != 'w':
raise ValueError("mode must begin with 'w'")
- open(file, mode).write(content)
+ with open(file, mode) as f:
+ try:
+ f.write(content)
+ except TypeError as e:
+ # python 3 default strings are not bytes, but unicode
+ f.write(bytes(content, 'utf-8'))
# Local Variables:
# tab-width:4