diff options
Diffstat (limited to 'src/engine/SCons/Node')
-rw-r--r-- | src/engine/SCons/Node/Alias.py | 153 | ||||
-rw-r--r-- | src/engine/SCons/Node/AliasTests.py | 130 | ||||
-rw-r--r-- | src/engine/SCons/Node/FS.py | 3220 | ||||
-rw-r--r-- | src/engine/SCons/Node/FSTests.py | 3520 | ||||
-rw-r--r-- | src/engine/SCons/Node/NodeTests.py | 1317 | ||||
-rw-r--r-- | src/engine/SCons/Node/Python.py | 128 | ||||
-rw-r--r-- | src/engine/SCons/Node/PythonTests.py | 130 | ||||
-rw-r--r-- | src/engine/SCons/Node/__init__.py | 1341 |
8 files changed, 9939 insertions, 0 deletions
diff --git a/src/engine/SCons/Node/Alias.py b/src/engine/SCons/Node/Alias.py new file mode 100644 index 0000000..2aa5821 --- /dev/null +++ b/src/engine/SCons/Node/Alias.py @@ -0,0 +1,153 @@ + +"""scons.Node.Alias + +Alias nodes. + +This creates a hash of global Aliases (dummy targets). + +""" + +# +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 The SCons Foundation +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY +# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# + +__revision__ = "src/engine/SCons/Node/Alias.py 4577 2009/12/27 19:44:43 scons" + +import string +import UserDict + +import SCons.Errors +import SCons.Node +import SCons.Util + +class AliasNameSpace(UserDict.UserDict): + def Alias(self, name, **kw): + if isinstance(name, SCons.Node.Alias.Alias): + return name + try: + a = self[name] + except KeyError: + a = apply(SCons.Node.Alias.Alias, (name,), kw) + self[name] = a + return a + + def lookup(self, name, **kw): + try: + return self[name] + except KeyError: + return None + +class AliasNodeInfo(SCons.Node.NodeInfoBase): + current_version_id = 1 + field_list = ['csig'] + def str_to_node(self, s): + return default_ans.Alias(s) + +class AliasBuildInfo(SCons.Node.BuildInfoBase): + current_version_id = 1 + +class Alias(SCons.Node.Node): + + NodeInfo = AliasNodeInfo + BuildInfo = AliasBuildInfo + + def __init__(self, name): + SCons.Node.Node.__init__(self) + self.name = name + + def str_for_display(self): + return '"' + self.__str__() + '"' + + def __str__(self): + return self.name + + def make_ready(self): + self.get_csig() + + really_build = SCons.Node.Node.build + is_up_to_date = SCons.Node.Node.children_are_up_to_date + + def is_under(self, dir): + # Make Alias nodes get built regardless of + # what directory scons was run from. Alias nodes + # are outside the filesystem: + return 1 + + def get_contents(self): + """The contents of an alias is the concatenation + of the content signatures of all its sources.""" + childsigs = map(lambda n: n.get_csig(), self.children()) + return string.join(childsigs, '') + + def sconsign(self): + """An Alias is not recorded in .sconsign files""" + pass + + # + # + # + + def changed_since_last_build(self, target, prev_ni): + cur_csig = self.get_csig() + try: + return cur_csig != prev_ni.csig + except AttributeError: + return 1 + + def build(self): + """A "builder" for aliases.""" + pass + + def convert(self): + try: del self.builder + except AttributeError: pass + self.reset_executor() + self.build = self.really_build + + def get_csig(self): + """ + Generate a node's content signature, the digested signature + of its content. + + node - the node + cache - alternate node to use for the signature cache + returns - the content signature + """ + try: + return self.ninfo.csig + except AttributeError: + pass + + contents = self.get_contents() + csig = SCons.Util.MD5signature(contents) + self.get_ninfo().csig = csig + return csig + +default_ans = AliasNameSpace() + +SCons.Node.arg2nodes_lookups.append(default_ans.lookup) + +# Local Variables: +# tab-width:4 +# indent-tabs-mode:nil +# End: +# vim: set expandtab tabstop=4 shiftwidth=4: diff --git a/src/engine/SCons/Node/AliasTests.py b/src/engine/SCons/Node/AliasTests.py new file mode 100644 index 0000000..8828c19 --- /dev/null +++ b/src/engine/SCons/Node/AliasTests.py @@ -0,0 +1,130 @@ +# +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 The SCons Foundation +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY +# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# + +__revision__ = "src/engine/SCons/Node/AliasTests.py 4577 2009/12/27 19:44:43 scons" + +import sys +import unittest + +import SCons.Errors +import SCons.Node.Alias + +class AliasTestCase(unittest.TestCase): + + def test_AliasNameSpace(self): + """Test creating an Alias name space + """ + ans = SCons.Node.Alias.AliasNameSpace() + assert ans is not None, ans + + def test_ANS_Alias(self): + """Test the Alias() factory + """ + ans = SCons.Node.Alias.AliasNameSpace() + + a1 = ans.Alias('a1') + assert a1.name == 'a1', a1.name + + a2 = ans.Alias('a1') + assert a1 is a2, (a1, a2) + + def test_get_contents(self): + """Test the get_contents() method + """ + class DummyNode: + def __init__(self, contents): + self.contents = contents + def get_csig(self): + return self.contents + def get_contents(self): + return self.contents + + ans = SCons.Node.Alias.AliasNameSpace() + + ans.Alias('a1') + a = ans.lookup('a1') + + a.sources = [ DummyNode('one'), DummyNode('two'), DummyNode('three') ] + + c = a.get_contents() + assert c == 'onetwothree', c + + def test_lookup(self): + """Test the lookup() method + """ + ans = SCons.Node.Alias.AliasNameSpace() + + ans.Alias('a1') + a = ans.lookup('a1') + assert a.name == 'a1', a.name + + a1 = ans.lookup('a1') + assert a is a1, a1 + + a = ans.lookup('a2') + assert a is None, a + + def test_Alias(self): + """Test creating an Alias() object + """ + a1 = SCons.Node.Alias.Alias('a') + assert a1.name == 'a', a1.name + + a2 = SCons.Node.Alias.Alias('a') + assert a2.name == 'a', a2.name + + assert not a1 is a2 + assert a1.name == a2.name + +class AliasNodeInfoTestCase(unittest.TestCase): + def test___init__(self): + """Test AliasNodeInfo initialization""" + ans = SCons.Node.Alias.AliasNameSpace() + aaa = ans.Alias('aaa') + ni = SCons.Node.Alias.AliasNodeInfo(aaa) + +class AliasBuildInfoTestCase(unittest.TestCase): + def test___init__(self): + """Test AliasBuildInfo initialization""" + ans = SCons.Node.Alias.AliasNameSpace() + aaa = ans.Alias('aaa') + bi = SCons.Node.Alias.AliasBuildInfo(aaa) + +if __name__ == "__main__": + suite = unittest.TestSuite() + tclasses = [ + AliasTestCase, + AliasBuildInfoTestCase, + AliasNodeInfoTestCase, + ] + for tclass in tclasses: + names = unittest.getTestCaseNames(tclass, 'test_') + suite.addTests(map(tclass, names)) + if not unittest.TextTestRunner().run(suite).wasSuccessful(): + sys.exit(1) + +# Local Variables: +# tab-width:4 +# indent-tabs-mode:nil +# End: +# vim: set expandtab tabstop=4 shiftwidth=4: diff --git a/src/engine/SCons/Node/FS.py b/src/engine/SCons/Node/FS.py new file mode 100644 index 0000000..0e9f14f --- /dev/null +++ b/src/engine/SCons/Node/FS.py @@ -0,0 +1,3220 @@ +"""scons.Node.FS + +File system nodes. + +These Nodes represent the canonical external objects that people think +of when they think of building software: files and directories. + +This holds a "default_fs" variable that should be initialized with an FS +that can be used by scripts or modules looking for the canonical default. + +""" + +# +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 The SCons Foundation +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY +# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# + +__revision__ = "src/engine/SCons/Node/FS.py 4577 2009/12/27 19:44:43 scons" + +from itertools import izip +import cStringIO +import fnmatch +import os +import os.path +import re +import shutil +import stat +import string +import sys +import time + +try: + import codecs +except ImportError: + pass +else: + # TODO(2.2): Remove when 2.3 becomes the minimal supported version. + try: + codecs.BOM_UTF8 + except AttributeError: + codecs.BOM_UTF8 = '\xef\xbb\xbf' + try: + codecs.BOM_UTF16_LE + codecs.BOM_UTF16_BE + except AttributeError: + codecs.BOM_UTF16_LE = '\xff\xfe' + codecs.BOM_UTF16_BE = '\xfe\xff' + + # Provide a wrapper function to handle decoding differences in + # different versions of Python. Normally, we'd try to do this in the + # compat layer (and maybe it still makes sense to move there?) but + # that doesn't provide a way to supply the string class used in + # pre-2.3 Python versions with a .decode() method that all strings + # naturally have. Plus, the 2.[01] encodings behave differently + # enough that we have to settle for a lowest-common-denominator + # wrapper approach. + # + # Note that the 2.[012] implementations below may be inefficient + # because they perform an explicit look up of the encoding for every + # decode, but they're old enough (and we want to stop supporting + # them soon enough) that it's not worth complicating the interface. + # Think of it as additional incentive for people to upgrade... + try: + ''.decode + except AttributeError: + # 2.0 through 2.2: strings have no .decode() method + try: + codecs.lookup('ascii').decode + except AttributeError: + # 2.0 and 2.1: encodings are a tuple of functions, and the + # decode() function returns a (result, length) tuple. + def my_decode(contents, encoding): + return codecs.lookup(encoding)[1](contents)[0] + else: + # 2.2: encodings are an object with methods, and the + # .decode() method returns just the decoded bytes. + def my_decode(contents, encoding): + return codecs.lookup(encoding).decode(contents) + else: + # 2.3 or later: use the .decode() string method + def my_decode(contents, encoding): + return contents.decode(encoding) + +import SCons.Action +from SCons.Debug import logInstanceCreation +import SCons.Errors +import SCons.Memoize +import SCons.Node +import SCons.Node.Alias +import SCons.Subst +import SCons.Util +import SCons.Warnings + +from SCons.Debug import Trace + +do_store_info = True + + +class EntryProxyAttributeError(AttributeError): + """ + An AttributeError subclass for recording and displaying the name + of the underlying Entry involved in an AttributeError exception. + """ + def __init__(self, entry_proxy, attribute): + AttributeError.__init__(self) + self.entry_proxy = entry_proxy + self.attribute = attribute + def __str__(self): + entry = self.entry_proxy.get() + fmt = "%s instance %s has no attribute %s" + return fmt % (entry.__class__.__name__, + repr(entry.name), + repr(self.attribute)) + +# The max_drift value: by default, use a cached signature value for +# any file that's been untouched for more than two days. +default_max_drift = 2*24*60*60 + +# +# We stringify these file system Nodes a lot. Turning a file system Node +# into a string is non-trivial, because the final string representation +# can depend on a lot of factors: whether it's a derived target or not, +# whether it's linked to a repository or source directory, and whether +# there's duplication going on. The normal technique for optimizing +# calculations like this is to memoize (cache) the string value, so you +# only have to do the calculation once. +# +# A number of the above factors, however, can be set after we've already +# been asked to return a string for a Node, because a Repository() or +# VariantDir() call or the like may not occur until later in SConscript +# files. So this variable controls whether we bother trying to save +# string values for Nodes. The wrapper interface can set this whenever +# they're done mucking with Repository and VariantDir and the other stuff, +# to let this module know it can start returning saved string values +# for Nodes. +# +Save_Strings = None + +def save_strings(val): + global Save_Strings + Save_Strings = val + +# +# Avoid unnecessary function calls by recording a Boolean value that +# tells us whether or not os.path.splitdrive() actually does anything +# on this system, and therefore whether we need to bother calling it +# when looking up path names in various methods below. +# + +do_splitdrive = None + +def initialize_do_splitdrive(): + global do_splitdrive + drive, path = os.path.splitdrive('X:/foo') + do_splitdrive = not not drive + +initialize_do_splitdrive() + +# + +needs_normpath_check = None + +def initialize_normpath_check(): + """ + Initialize the normpath_check regular expression. + + This function is used by the unit tests to re-initialize the pattern + when testing for behavior with different values of os.sep. + """ + global needs_normpath_check + if os.sep == '/': + pattern = r'.*/|\.$|\.\.$' + else: + pattern = r'.*[/%s]|\.$|\.\.$' % re.escape(os.sep) + needs_normpath_check = re.compile(pattern) + +initialize_normpath_check() + +# +# SCons.Action objects for interacting with the outside world. +# +# The Node.FS methods in this module should use these actions to +# create and/or remove files and directories; they should *not* use +# os.{link,symlink,unlink,mkdir}(), etc., directly. +# +# Using these SCons.Action objects ensures that descriptions of these +# external activities are properly displayed, that the displays are +# suppressed when the -s (silent) option is used, and (most importantly) +# the actions are disabled when the the -n option is used, in which case +# there should be *no* changes to the external file system(s)... +# + +if hasattr(os, 'link'): + def _hardlink_func(fs, src, dst): + # If the source is a symlink, we can't just hard-link to it + # because a relative symlink may point somewhere completely + # different. We must disambiguate the symlink and then + # hard-link the final destination file. + while fs.islink(src): + link = fs.readlink(src) + if not os.path.isabs(link): + src = link + else: + src = os.path.join(os.path.dirname(src), link) + fs.link(src, dst) +else: + _hardlink_func = None + +if hasattr(os, 'symlink'): + def _softlink_func(fs, src, dst): + fs.symlink(src, dst) +else: + _softlink_func = None + +def _copy_func(fs, src, dest): + shutil.copy2(src, dest) + st = fs.stat(src) + fs.chmod(dest, stat.S_IMODE(st[stat.ST_MODE]) | stat.S_IWRITE) + + +Valid_Duplicates = ['hard-soft-copy', 'soft-hard-copy', + 'hard-copy', 'soft-copy', 'copy'] + +Link_Funcs = [] # contains the callables of the specified duplication style + +def set_duplicate(duplicate): + # Fill in the Link_Funcs list according to the argument + # (discarding those not available on the platform). + + # Set up the dictionary that maps the argument names to the + # underlying implementations. We do this inside this function, + # not in the top-level module code, so that we can remap os.link + # and os.symlink for testing purposes. + link_dict = { + 'hard' : _hardlink_func, + 'soft' : _softlink_func, + 'copy' : _copy_func + } + + if not duplicate in Valid_Duplicates: + raise SCons.Errors.InternalError, ("The argument of set_duplicate " + "should be in Valid_Duplicates") + global Link_Funcs + Link_Funcs = [] + for func in string.split(duplicate,'-'): + if link_dict[func]: + Link_Funcs.append(link_dict[func]) + +def LinkFunc(target, source, env): + # Relative paths cause problems with symbolic links, so + # we use absolute paths, which may be a problem for people + # who want to move their soft-linked src-trees around. Those + # people should use the 'hard-copy' mode, softlinks cannot be + # used for that; at least I have no idea how ... + src = source[0].abspath + dest = target[0].abspath + dir, file = os.path.split(dest) + if dir and not target[0].fs.isdir(dir): + os.makedirs(dir) + if not Link_Funcs: + # Set a default order of link functions. + set_duplicate('hard-soft-copy') + fs = source[0].fs + # Now link the files with the previously specified order. + for func in Link_Funcs: + try: + func(fs, src, dest) + break + except (IOError, OSError): + # An OSError indicates something happened like a permissions + # problem or an attempt to symlink across file-system + # boundaries. An IOError indicates something like the file + # not existing. In either case, keeping trying additional + # functions in the list and only raise an error if the last + # one failed. + if func == Link_Funcs[-1]: + # exception of the last link method (copy) are fatal + raise + return 0 + +Link = SCons.Action.Action(LinkFunc, None) +def LocalString(target, source, env): + return 'Local copy of %s from %s' % (target[0], source[0]) + +LocalCopy = SCons.Action.Action(LinkFunc, LocalString) + +def UnlinkFunc(target, source, env): + t = target[0] + t.fs.unlink(t.abspath) + return 0 + +Unlink = SCons.Action.Action(UnlinkFunc, None) + +def MkdirFunc(target, source, env): + t = target[0] + if not t.exists(): + t.fs.mkdir(t.abspath) + return 0 + +Mkdir = SCons.Action.Action(MkdirFunc, None, presub=None) + +MkdirBuilder = None + +def get_MkdirBuilder(): + global MkdirBuilder + if MkdirBuilder is None: + import SCons.Builder + import SCons.Defaults + # "env" will get filled in by Executor.get_build_env() + # calling SCons.Defaults.DefaultEnvironment() when necessary. + MkdirBuilder = SCons.Builder.Builder(action = Mkdir, + env = None, + explain = None, + is_explicit = None, + target_scanner = SCons.Defaults.DirEntryScanner, + name = "MkdirBuilder") + return MkdirBuilder + +class _Null: + pass + +_null = _Null() + +DefaultSCCSBuilder = None +DefaultRCSBuilder = None + +def get_DefaultSCCSBuilder(): + global DefaultSCCSBuilder + if DefaultSCCSBuilder is None: + import SCons.Builder + # "env" will get filled in by Executor.get_build_env() + # calling SCons.Defaults.DefaultEnvironment() when necessary. + act = SCons.Action.Action('$SCCSCOM', '$SCCSCOMSTR') + DefaultSCCSBuilder = SCons.Builder.Builder(action = act, + env = None, + name = "DefaultSCCSBuilder") + return DefaultSCCSBuilder + +def get_DefaultRCSBuilder(): + global DefaultRCSBuilder + if DefaultRCSBuilder is None: + import SCons.Builder + # "env" will get filled in by Executor.get_build_env() + # calling SCons.Defaults.DefaultEnvironment() when necessary. + act = SCons.Action.Action('$RCS_COCOM', '$RCS_COCOMSTR') + DefaultRCSBuilder = SCons.Builder.Builder(action = act, + env = None, + name = "DefaultRCSBuilder") + return DefaultRCSBuilder + +# Cygwin's os.path.normcase pretends it's on a case-sensitive filesystem. +_is_cygwin = sys.platform == "cygwin" +if os.path.normcase("TeSt") == os.path.normpath("TeSt") and not _is_cygwin: + def _my_normcase(x): + return x +else: + def _my_normcase(x): + return string.upper(x) + + + +class DiskChecker: + def __init__(self, type, do, ignore): + self.type = type + self.do = do + self.ignore = ignore + self.set_do() + def set_do(self): + self.__call__ = self.do + def set_ignore(self): + self.__call__ = self.ignore + def set(self, list): + if self.type in list: + self.set_do() + else: + self.set_ignore() + +def do_diskcheck_match(node, predicate, errorfmt): + result = predicate() + try: + # If calling the predicate() cached a None value from stat(), + # remove it so it doesn't interfere with later attempts to + # build this Node as we walk the DAG. (This isn't a great way + # to do this, we're reaching into an interface that doesn't + # really belong to us, but it's all about performance, so + # for now we'll just document the dependency...) + if node._memo['stat'] is None: + del node._memo['stat'] + except (AttributeError, KeyError): + pass + if result: + raise TypeError, errorfmt % node.abspath + +def ignore_diskcheck_match(node, predicate, errorfmt): + pass + +def do_diskcheck_rcs(node, name): + try: + rcs_dir = node.rcs_dir + except AttributeError: + if node.entry_exists_on_disk('RCS'): + rcs_dir = node.Dir('RCS') + else: + rcs_dir = None + node.rcs_dir = rcs_dir + if rcs_dir: + return rcs_dir.entry_exists_on_disk(name+',v') + return None + +def ignore_diskcheck_rcs(node, name): + return None + +def do_diskcheck_sccs(node, name): + try: + sccs_dir = node.sccs_dir + except AttributeError: + if node.entry_exists_on_disk('SCCS'): + sccs_dir = node.Dir('SCCS') + else: + sccs_dir = None + node.sccs_dir = sccs_dir + if sccs_dir: + return sccs_dir.entry_exists_on_disk('s.'+name) + return None + +def ignore_diskcheck_sccs(node, name): + return None + +diskcheck_match = DiskChecker('match', do_diskcheck_match, ignore_diskcheck_match) +diskcheck_rcs = DiskChecker('rcs', do_diskcheck_rcs, ignore_diskcheck_rcs) +diskcheck_sccs = DiskChecker('sccs', do_diskcheck_sccs, ignore_diskcheck_sccs) + +diskcheckers = [ + diskcheck_match, + diskcheck_rcs, + diskcheck_sccs, +] + +def set_diskcheck(list): + for dc in diskcheckers: + dc.set(list) + +def diskcheck_types(): + return map(lambda dc: dc.type, diskcheckers) + + + +class EntryProxy(SCons.Util.Proxy): + def __get_abspath(self): + entry = self.get() + return SCons.Subst.SpecialAttrWrapper(entry.get_abspath(), + entry.name + "_abspath") + + def __get_filebase(self): + name = self.get().name + return SCons.Subst.SpecialAttrWrapper(SCons.Util.splitext(name)[0], + name + "_filebase") + + def __get_suffix(self): + name = self.get().name + return SCons.Subst.SpecialAttrWrapper(SCons.Util.splitext(name)[1], + name + "_suffix") + + def __get_file(self): + name = self.get().name + return SCons.Subst.SpecialAttrWrapper(name, name + "_file") + + def __get_base_path(self): + """Return the file's directory and file name, with the + suffix stripped.""" + entry = self.get() + return SCons.Subst.SpecialAttrWrapper(SCons.Util.splitext(entry.get_path())[0], + entry.name + "_base") + + def __get_posix_path(self): + """Return the path with / as the path separator, + regardless of platform.""" + if os.sep == '/': + return self + else: + entry = self.get() + r = string.replace(entry.get_path(), os.sep, '/') + return SCons.Subst.SpecialAttrWrapper(r, entry.name + "_posix") + + def __get_windows_path(self): + """Return the path with \ as the path separator, + regardless of platform.""" + if os.sep == '\\': + return self + else: + entry = self.get() + r = string.replace(entry.get_path(), os.sep, '\\') + return SCons.Subst.SpecialAttrWrapper(r, entry.name + "_windows") + + def __get_srcnode(self): + return EntryProxy(self.get().srcnode()) + + def __get_srcdir(self): + """Returns the directory containing the source node linked to this + node via VariantDir(), or the directory of this node if not linked.""" + return EntryProxy(self.get().srcnode().dir) + + def __get_rsrcnode(self): + return EntryProxy(self.get().srcnode().rfile()) + + def __get_rsrcdir(self): + """Returns the directory containing the source node linked to this + node via VariantDir(), or the directory of this node if not linked.""" + return EntryProxy(self.get().srcnode().rfile().dir) + + def __get_dir(self): + return EntryProxy(self.get().dir) + + dictSpecialAttrs = { "base" : __get_base_path, + "posix" : __get_posix_path, + "windows" : __get_windows_path, + "win32" : __get_windows_path, + "srcpath" : __get_srcnode, + "srcdir" : __get_srcdir, + "dir" : __get_dir, + "abspath" : __get_abspath, + "filebase" : __get_filebase, + "suffix" : __get_suffix, + "file" : __get_file, + "rsrcpath" : __get_rsrcnode, + "rsrcdir" : __get_rsrcdir, + } + + def __getattr__(self, name): + # This is how we implement the "special" attributes + # such as base, posix, srcdir, etc. + try: + attr_function = self.dictSpecialAttrs[name] + except KeyError: + try: + attr = SCons.Util.Proxy.__getattr__(self, name) + except AttributeError, e: + # Raise our own AttributeError subclass with an + # overridden __str__() method that identifies the + # name of the entry that caused the exception. + raise EntryProxyAttributeError(self, name) + return attr + else: + return attr_function(self) + +class Base(SCons.Node.Node): + """A generic class for file system entries. This class is for + when we don't know yet whether the entry being looked up is a file + or a directory. Instances of this class can morph into either + Dir or File objects by a later, more precise lookup. + + Note: this class does not define __cmp__ and __hash__ for + efficiency reasons. SCons does a lot of comparing of + Node.FS.{Base,Entry,File,Dir} objects, so those operations must be + as fast as possible, which means we want to use Python's built-in + object identity comparisons. + """ + + memoizer_counters = [] + + def __init__(self, name, directory, fs): + """Initialize a generic Node.FS.Base object. + + Call the superclass initialization, take care of setting up + our relative and absolute paths, identify our parent + directory, and indicate that this node should use + signatures.""" + if __debug__: logInstanceCreation(self, 'Node.FS.Base') + SCons.Node.Node.__init__(self) + + # Filenames and paths are probably reused and are intern'ed to + # save some memory. + self.name = SCons.Util.silent_intern(name) + self.suffix = SCons.Util.silent_intern(SCons.Util.splitext(name)[1]) + self.fs = fs + + assert directory, "A directory must be provided" + + self.abspath = SCons.Util.silent_intern(directory.entry_abspath(name)) + self.labspath = SCons.Util.silent_intern(directory.entry_labspath(name)) + if directory.path == '.': + self.path = SCons.Util.silent_intern(name) + else: + self.path = SCons.Util.silent_intern(directory.entry_path(name)) + if directory.tpath == '.': + self.tpath = SCons.Util.silent_intern(name) + else: + self.tpath = SCons.Util.silent_intern(directory.entry_tpath(name)) + self.path_elements = directory.path_elements + [self] + + self.dir = directory + self.cwd = None # will hold the SConscript directory for target nodes + self.duplicate = directory.duplicate + + def str_for_display(self): + return '"' + self.__str__() + '"' + + def must_be_same(self, klass): + """ + This node, which already existed, is being looked up as the + specified klass. Raise an exception if it isn't. + """ + if isinstance(self, klass) or klass is Entry: + return + raise TypeError, "Tried to lookup %s '%s' as a %s." %\ + (self.__class__.__name__, self.path, klass.__name__) + + def get_dir(self): + return self.dir + + def get_suffix(self): + return self.suffix + + def rfile(self): + return self + + def __str__(self): + """A Node.FS.Base object's string representation is its path + name.""" + global Save_Strings + if Save_Strings: + return self._save_str() + return self._get_str() + + memoizer_counters.append(SCons.Memoize.CountValue('_save_str')) + + def _save_str(self): + try: + return self._memo['_save_str'] + except KeyError: + pass + result = intern(self._get_str()) + self._memo['_save_str'] = result + return result + + def _get_str(self): + global Save_Strings + if self.duplicate or self.is_derived(): + return self.get_path() + srcnode = self.srcnode() + if srcnode.stat() is None and self.stat() is not None: + result = self.get_path() + else: + result = srcnode.get_path() + if not Save_Strings: + # We're not at the point where we're saving the string string + # representations of FS Nodes (because we haven't finished + # reading the SConscript files and need to have str() return + # things relative to them). That also means we can't yet + # cache values returned (or not returned) by stat(), since + # Python code in the SConscript files might still create + # or otherwise affect the on-disk file. So get rid of the + # values that the underlying stat() method saved. + try: del self._memo['stat'] + except KeyError: pass + if self is not srcnode: + try: del srcnode._memo['stat'] + except KeyError: pass + return result + + rstr = __str__ + + memoizer_counters.append(SCons.Memoize.CountValue('stat')) + + def stat(self): + try: return self._memo['stat'] + except KeyError: pass + try: result = self.fs.stat(self.abspath) + except os.error: result = None + self._memo['stat'] = result + return result + + def exists(self): + return self.stat() is not None + + def rexists(self): + return self.rfile().exists() + + def getmtime(self): + st = self.stat() + if st: return st[stat.ST_MTIME] + else: return None + + def getsize(self): + st = self.stat() + if st: return st[stat.ST_SIZE] + else: return None + + def isdir(self): + st = self.stat() + return st is not None and stat.S_ISDIR(st[stat.ST_MODE]) + + def isfile(self): + st = self.stat() + return st is not None and stat.S_ISREG(st[stat.ST_MODE]) + + if hasattr(os, 'symlink'): + def islink(self): + try: st = self.fs.lstat(self.abspath) + except os.error: return 0 + return stat.S_ISLNK(st[stat.ST_MODE]) + else: + def islink(self): + return 0 # no symlinks + + def is_under(self, dir): + if self is dir: + return 1 + else: + return self.dir.is_under(dir) + + def set_local(self): + self._local = 1 + + def srcnode(self): + """If this node is in a build path, return the node + corresponding to its source file. Otherwise, return + ourself. + """ + srcdir_list = self.dir.srcdir_list() + if srcdir_list: + srcnode = srcdir_list[0].Entry(self.name) + srcnode.must_be_same(self.__class__) + return srcnode + return self + + def get_path(self, dir=None): + """Return path relative to the current working directory of the + Node.FS.Base object that owns us.""" + if not dir: + dir = self.fs.getcwd() + if self == dir: + return '.' + path_elems = self.path_elements + try: i = path_elems.index(dir) + except ValueError: pass + else: path_elems = path_elems[i+1:] + path_elems = map(lambda n: n.name, path_elems) + return string.join(path_elems, os.sep) + + def set_src_builder(self, builder): + """Set the source code builder for this node.""" + self.sbuilder = builder + if not self.has_builder(): + self.builder_set(builder) + + def src_builder(self): + """Fetch the source code builder for this node. + + If there isn't one, we cache the source code builder specified + for the directory (which in turn will cache the value from its + parent directory, and so on up to the file system root). + """ + try: + scb = self.sbuilder + except AttributeError: + scb = self.dir.src_builder() + self.sbuilder = scb + return scb + + def get_abspath(self): + """Get the absolute path of the file.""" + return self.abspath + + def for_signature(self): + # Return just our name. Even an absolute path would not work, + # because that can change thanks to symlinks or remapped network + # paths. + return self.name + + def get_subst_proxy(self): + try: + return self._proxy + except AttributeError: + ret = EntryProxy(self) + self._proxy = ret + return ret + + def target_from_source(self, prefix, suffix, splitext=SCons.Util.splitext): + """ + + Generates a target entry that corresponds to this entry (usually + a source file) with the specified prefix and suffix. + + Note that this method can be overridden dynamically for generated + files that need different behavior. See Tool/swig.py for + an example. + """ + return self.dir.Entry(prefix + splitext(self.name)[0] + suffix) + + def _Rfindalldirs_key(self, pathlist): + return pathlist + + memoizer_counters.append(SCons.Memoize.CountDict('Rfindalldirs', _Rfindalldirs_key)) + + def Rfindalldirs(self, pathlist): + """ + Return all of the directories for a given path list, including + corresponding "backing" directories in any repositories. + + The Node lookups are relative to this Node (typically a + directory), so memoizing result saves cycles from looking + up the same path for each target in a given directory. + """ + try: + memo_dict = self._memo['Rfindalldirs'] + except KeyError: + memo_dict = {} + self._memo['Rfindalldirs'] = memo_dict + else: + try: + return memo_dict[pathlist] + except KeyError: + pass + + create_dir_relative_to_self = self.Dir + result = [] + for path in pathlist: + if isinstance(path, SCons.Node.Node): + result.append(path) + else: + dir = create_dir_relative_to_self(path) + result.extend(dir.get_all_rdirs()) + + memo_dict[pathlist] = result + + return result + + def RDirs(self, pathlist): + """Search for a list of directories in the Repository list.""" + cwd = self.cwd or self.fs._cwd + return cwd.Rfindalldirs(pathlist) + + memoizer_counters.append(SCons.Memoize.CountValue('rentry')) + + def rentry(self): + try: + return self._memo['rentry'] + except KeyError: + pass + result = self + if not self.exists(): + norm_name = _my_normcase(self.name) + for dir in self.dir.get_all_rdirs(): + try: + node = dir.entries[norm_name] + except KeyError: + if dir.entry_exists_on_disk(self.name): + result = dir.Entry(self.name) + break + self._memo['rentry'] = result + return result + + def _glob1(self, pattern, ondisk=True, source=False, strings=False): + return [] + +class Entry(Base): + """This is the class for generic Node.FS entries--that is, things + that could be a File or a Dir, but we're just not sure yet. + Consequently, the methods in this class really exist just to + transform their associated object into the right class when the + time comes, and then call the same-named method in the transformed + class.""" + + def diskcheck_match(self): + pass + + def disambiguate(self, must_exist=None): + """ + """ + if self.isdir(): + self.__class__ = Dir + self._morph() + elif self.isfile(): + self.__class__ = File + self._morph() + self.clear() + else: + # There was nothing on-disk at this location, so look in + # the src directory. + # + # We can't just use self.srcnode() straight away because + # that would create an actual Node for this file in the src + # directory, and there might not be one. Instead, use the + # dir_on_disk() method to see if there's something on-disk + # with that name, in which case we can go ahead and call + # self.srcnode() to create the right type of entry. + srcdir = self.dir.srcnode() + if srcdir != self.dir and \ + srcdir.entry_exists_on_disk(self.name) and \ + self.srcnode().isdir(): + self.__class__ = Dir + self._morph() + elif must_exist: + msg = "No such file or directory: '%s'" % self.abspath + raise SCons.Errors.UserError, msg + else: + self.__class__ = File + self._morph() + self.clear() + return self + + def rfile(self): + """We're a generic Entry, but the caller is actually looking for + a File at this point, so morph into one.""" + self.__class__ = File + self._morph() + self.clear() + return File.rfile(self) + + def scanner_key(self): + return self.get_suffix() + + def get_contents(self): + """Fetch the contents of the entry. Returns the exact binary + contents of the file.""" + try: + self = self.disambiguate(must_exist=1) + except SCons.Errors.UserError: + # There was nothing on disk with which to disambiguate + # this entry. Leave it as an Entry, but return a null + # string so calls to get_contents() in emitters and the + # like (e.g. in qt.py) don't have to disambiguate by hand + # or catch the exception. + return '' + else: + return self.get_contents() + + def get_text_contents(self): + """Fetch the decoded text contents of a Unicode encoded Entry. + + Since this should return the text contents from the file + system, we check to see into what sort of subclass we should + morph this Entry.""" + try: + self = self.disambiguate(must_exist=1) + except SCons.Errors.UserError: + # There was nothing on disk with which to disambiguate + # this entry. Leave it as an Entry, but return a null + # string so calls to get_text_contents() in emitters and + # the like (e.g. in qt.py) don't have to disambiguate by + # hand or catch the exception. + return '' + else: + return self.get_text_contents() + + def must_be_same(self, klass): + """Called to make sure a Node is a Dir. Since we're an + Entry, we can morph into one.""" + if self.__class__ is not klass: + self.__class__ = klass + self._morph() + self.clear() + + # The following methods can get called before the Taskmaster has + # had a chance to call disambiguate() directly to see if this Entry + # should really be a Dir or a File. We therefore use these to call + # disambiguate() transparently (from our caller's point of view). + # + # Right now, this minimal set of methods has been derived by just + # looking at some of the methods that will obviously be called early + # in any of the various Taskmasters' calling sequences, and then + # empirically figuring out which additional methods are necessary + # to make various tests pass. + + def exists(self): + """Return if the Entry exists. Check the file system to see + what we should turn into first. Assume a file if there's no + directory.""" + return self.disambiguate().exists() + + def rel_path(self, other): + d = self.disambiguate() + if d.__class__ is Entry: + raise "rel_path() could not disambiguate File/Dir" + return d.rel_path(other) + + def new_ninfo(self): + return self.disambiguate().new_ninfo() + + def changed_since_last_build(self, target, prev_ni): + return self.disambiguate().changed_since_last_build(target, prev_ni) + + def _glob1(self, pattern, ondisk=True, source=False, strings=False): + return self.disambiguate()._glob1(pattern, ondisk, source, strings) + + def get_subst_proxy(self): + return self.disambiguate().get_subst_proxy() + +# This is for later so we can differentiate between Entry the class and Entry +# the method of the FS class. +_classEntry = Entry + + +class LocalFS: + + if SCons.Memoize.use_memoizer: + __metaclass__ = SCons.Memoize.Memoized_Metaclass + + # This class implements an abstraction layer for operations involving + # a local file system. Essentially, this wraps any function in + # the os, os.path or shutil modules that we use to actually go do + # anything with or to the local file system. + # + # Note that there's a very good chance we'll refactor this part of + # the architecture in some way as we really implement the interface(s) + # for remote file system Nodes. For example, the right architecture + # might be to have this be a subclass instead of a base class. + # Nevertheless, we're using this as a first step in that direction. + # + # We're not using chdir() yet because the calling subclass method + # needs to use os.chdir() directly to avoid recursion. Will we + # really need this one? + #def chdir(self, path): + # return os.chdir(path) + def chmod(self, path, mode): + return os.chmod(path, mode) + def copy(self, src, dst): + return shutil.copy(src, dst) + def copy2(self, src, dst): + return shutil.copy2(src, dst) + def exists(self, path): + return os.path.exists(path) + def getmtime(self, path): + return os.path.getmtime(path) + def getsize(self, path): + return os.path.getsize(path) + def isdir(self, path): + return os.path.isdir(path) + def isfile(self, path): + return os.path.isfile(path) + def link(self, src, dst): + return os.link(src, dst) + def lstat(self, path): + return os.lstat(path) + def listdir(self, path): + return os.listdir(path) + def makedirs(self, path): + return os.makedirs(path) + def mkdir(self, path): + return os.mkdir(path) + def rename(self, old, new): + return os.rename(old, new) + def stat(self, path): + return os.stat(path) + def symlink(self, src, dst): + return os.symlink(src, dst) + def open(self, path): + return open(path) + def unlink(self, path): + return os.unlink(path) + + if hasattr(os, 'symlink'): + def islink(self, path): + return os.path.islink(path) + else: + def islink(self, path): + return 0 # no symlinks + + if hasattr(os, 'readlink'): + def readlink(self, file): + return os.readlink(file) + else: + def readlink(self, file): + return '' + + +#class RemoteFS: +# # Skeleton for the obvious methods we might need from the +# # abstraction layer for a remote filesystem. +# def upload(self, local_src, remote_dst): +# pass +# def download(self, remote_src, local_dst): +# pass + + +class FS(LocalFS): + + memoizer_counters = [] + + def __init__(self, path = None): + """Initialize the Node.FS subsystem. + + The supplied path is the top of the source tree, where we + expect to find the top-level build file. If no path is + supplied, the current directory is the default. + + The path argument must be a valid absolute path. + """ + if __debug__: logInstanceCreation(self, 'Node.FS') + + self._memo = {} + + self.Root = {} + self.SConstruct_dir = None + self.max_drift = default_max_drift + + self.Top = None + if path is None: + self.pathTop = os.getcwd() + else: + self.pathTop = path + self.defaultDrive = _my_normcase(os.path.splitdrive(self.pathTop)[0]) + + self.Top = self.Dir(self.pathTop) + self.Top.path = '.' + self.Top.tpath = '.' + self._cwd = self.Top + + DirNodeInfo.fs = self + FileNodeInfo.fs = self + + def set_SConstruct_dir(self, dir): + self.SConstruct_dir = dir + + def get_max_drift(self): + return self.max_drift + + def set_max_drift(self, max_drift): + self.max_drift = max_drift + + def getcwd(self): + return self._cwd + + def chdir(self, dir, change_os_dir=0): + """Change the current working directory for lookups. + If change_os_dir is true, we will also change the "real" cwd + to match. + """ + curr=self._cwd + try: + if dir is not None: + self._cwd = dir + if change_os_dir: + os.chdir(dir.abspath) + except OSError: + self._cwd = curr + raise + + def get_root(self, drive): + """ + Returns the root directory for the specified drive, creating + it if necessary. + """ + drive = _my_normcase(drive) + try: + return self.Root[drive] + except KeyError: + root = RootDir(drive, self) + self.Root[drive] = root + if not drive: + self.Root[self.defaultDrive] = root + elif drive == self.defaultDrive: + self.Root[''] = root + return root + + def _lookup(self, p, directory, fsclass, create=1): + """ + The generic entry point for Node lookup with user-supplied data. + + This translates arbitrary input into a canonical Node.FS object + of the specified fsclass. The general approach for strings is + to turn it into a fully normalized absolute path and then call + the root directory's lookup_abs() method for the heavy lifting. + + If the path name begins with '#', it is unconditionally + interpreted relative to the top-level directory of this FS. '#' + is treated as a synonym for the top-level SConstruct directory, + much like '~' is treated as a synonym for the user's home + directory in a UNIX shell. So both '#foo' and '#/foo' refer + to the 'foo' subdirectory underneath the top-level SConstruct + directory. + + If the path name is relative, then the path is looked up relative + to the specified directory, or the current directory (self._cwd, + typically the SConscript directory) if the specified directory + is None. + """ + if isinstance(p, Base): + # It's already a Node.FS object. Make sure it's the right + # class and return. + p.must_be_same(fsclass) + return p + # str(p) in case it's something like a proxy object + p = str(p) + + initial_hash = (p[0:1] == '#') + if initial_hash: + # There was an initial '#', so we strip it and override + # whatever directory they may have specified with the + # top-level SConstruct directory. + p = p[1:] + directory = self.Top + + if directory and not isinstance(directory, Dir): + directory = self.Dir(directory) + + if do_splitdrive: + drive, p = os.path.splitdrive(p) + else: + drive = '' + if drive and not p: + # This causes a naked drive letter to be treated as a synonym + # for the root directory on that drive. + p = os.sep + absolute = os.path.isabs(p) + + needs_normpath = needs_normpath_check.match(p) + + if initial_hash or not absolute: + # This is a relative lookup, either to the top-level + # SConstruct directory (because of the initial '#') or to + # the current directory (the path name is not absolute). + # Add the string to the appropriate directory lookup path, + # after which the whole thing gets normalized. + if not directory: + directory = self._cwd + if p: + p = directory.labspath + '/' + p + else: + p = directory.labspath + + if needs_normpath: + p = os.path.normpath(p) + + if drive or absolute: + root = self.get_root(drive) + else: + if not directory: + directory = self._cwd + root = directory.root + + if os.sep != '/': + p = string.replace(p, os.sep, '/') + return root._lookup_abs(p, fsclass, create) + + def Entry(self, name, directory = None, create = 1): + """Look up or create a generic Entry node with the specified name. + If the name is a relative path (begins with ./, ../, or a file + name), then it is looked up relative to the supplied directory + node, or to the top level directory of the FS (supplied at + construction time) if no directory is supplied. + """ + return self._lookup(name, directory, Entry, create) + + def File(self, name, directory = None, create = 1): + """Look up or create a File node with the specified name. If + the name is a relative path (begins with ./, ../, or a file name), + then it is looked up relative to the supplied directory node, + or to the top level directory of the FS (supplied at construction + time) if no directory is supplied. + + This method will raise TypeError if a directory is found at the + specified path. + """ + return self._lookup(name, directory, File, create) + + def Dir(self, name, directory = None, create = True): + """Look up or create a Dir node with the specified name. If + the name is a relative path (begins with ./, ../, or a file name), + then it is looked up relative to the supplied directory node, + or to the top level directory of the FS (supplied at construction + time) if no directory is supplied. + + This method will raise TypeError if a normal file is found at the + specified path. + """ + return self._lookup(name, directory, Dir, create) + + def VariantDir(self, variant_dir, src_dir, duplicate=1): + """Link the supplied variant directory to the source directory + for purposes of building files.""" + + if not isinstance(src_dir, SCons.Node.Node): + src_dir = self.Dir(src_dir) + if not isinstance(variant_dir, SCons.Node.Node): + variant_dir = self.Dir(variant_dir) + if src_dir.is_under(variant_dir): + raise SCons.Errors.UserError, "Source directory cannot be under variant directory." + if variant_dir.srcdir: + if variant_dir.srcdir == src_dir: + return # We already did this. + raise SCons.Errors.UserError, "'%s' already has a source directory: '%s'."%(variant_dir, variant_dir.srcdir) + variant_dir.link(src_dir, duplicate) + + def Repository(self, *dirs): + """Specify Repository directories to search.""" + for d in dirs: + if not isinstance(d, SCons.Node.Node): + d = self.Dir(d) + self.Top.addRepository(d) + + def variant_dir_target_climb(self, orig, dir, tail): + """Create targets in corresponding variant directories + + Climb the directory tree, and look up path names + relative to any linked variant directories we find. + + Even though this loops and walks up the tree, we don't memoize + the return value because this is really only used to process + the command-line targets. + """ + targets = [] + message = None + fmt = "building associated VariantDir targets: %s" + start_dir = dir + while dir: + for bd in dir.variant_dirs: + if start_dir.is_under(bd): + # If already in the build-dir location, don't reflect + return [orig], fmt % str(orig) + p = apply(os.path.join, [bd.path] + tail) + targets.append(self.Entry(p)) + tail = [dir.name] + tail + dir = dir.up() + if targets: + message = fmt % string.join(map(str, targets)) + return targets, message + + def Glob(self, pathname, ondisk=True, source=True, strings=False, cwd=None): + """ + Globs + + This is mainly a shim layer + """ + if cwd is None: + cwd = self.getcwd() + return cwd.glob(pathname, ondisk, source, strings) + +class DirNodeInfo(SCons.Node.NodeInfoBase): + # This should get reset by the FS initialization. + current_version_id = 1 + + fs = None + + def str_to_node(self, s): + top = self.fs.Top + root = top.root + if do_splitdrive: + drive, s = os.path.splitdrive(s) + if drive: + root = self.fs.get_root(drive) + if not os.path.isabs(s): + s = top.labspath + '/' + s + return root._lookup_abs(s, Entry) + +class DirBuildInfo(SCons.Node.BuildInfoBase): + current_version_id = 1 + +glob_magic_check = re.compile('[*?[]') + +def has_glob_magic(s): + return glob_magic_check.search(s) is not None + +class Dir(Base): + """A class for directories in a file system. + """ + + memoizer_counters = [] + + NodeInfo = DirNodeInfo + BuildInfo = DirBuildInfo + + def __init__(self, name, directory, fs): + if __debug__: logInstanceCreation(self, 'Node.FS.Dir') + Base.__init__(self, name, directory, fs) + self._morph() + + def _morph(self): + """Turn a file system Node (either a freshly initialized directory + object or a separate Entry object) into a proper directory object. + + Set up this directory's entries and hook it into the file + system tree. Specify that directories (this Node) don't use + signatures for calculating whether they're current. + """ + + self.repositories = [] + self.srcdir = None + + self.entries = {} + self.entries['.'] = self + self.entries['..'] = self.dir + self.cwd = self + self.searched = 0 + self._sconsign = None + self.variant_dirs = [] + self.root = self.dir.root + + # Don't just reset the executor, replace its action list, + # because it might have some pre-or post-actions that need to + # be preserved. + self.builder = get_MkdirBuilder() + self.get_executor().set_action_list(self.builder.action) + + def diskcheck_match(self): + diskcheck_match(self, self.isfile, + "File %s found where directory expected.") + + def __clearRepositoryCache(self, duplicate=None): + """Called when we change the repository(ies) for a directory. + This clears any cached information that is invalidated by changing + the repository.""" + + for node in self.entries.values(): + if node != self.dir: + if node != self and isinstance(node, Dir): + node.__clearRepositoryCache(duplicate) + else: + node.clear() + try: + del node._srcreps + except AttributeError: + pass + if duplicate is not None: + node.duplicate=duplicate + + def __resetDuplicate(self, node): + if node != self: + node.duplicate = node.get_dir().duplicate + + def Entry(self, name): + """ + Looks up or creates an entry node named 'name' relative to + this directory. + """ + return self.fs.Entry(name, self) + + def Dir(self, name, create=True): + """ + Looks up or creates a directory node named 'name' relative to + this directory. + """ + return self.fs.Dir(name, self, create) + + def File(self, name): + """ + Looks up or creates a file node named 'name' relative to + this directory. + """ + return self.fs.File(name, self) + + def _lookup_rel(self, name, klass, create=1): + """ + Looks up a *normalized* relative path name, relative to this + directory. + + This method is intended for use by internal lookups with + already-normalized path data. For general-purpose lookups, + use the Entry(), Dir() and File() methods above. + + This method does *no* input checking and will die or give + incorrect results if it's passed a non-normalized path name (e.g., + a path containing '..'), an absolute path name, a top-relative + ('#foo') path name, or any kind of object. + """ + name = self.entry_labspath(name) + return self.root._lookup_abs(name, klass, create) + + def link(self, srcdir, duplicate): + """Set this directory as the variant directory for the + supplied source directory.""" + self.srcdir = srcdir + self.duplicate = duplicate + self.__clearRepositoryCache(duplicate) + srcdir.variant_dirs.append(self) + + def getRepositories(self): + """Returns a list of repositories for this directory. + """ + if self.srcdir and not self.duplicate: + return self.srcdir.get_all_rdirs() + self.repositories + return self.repositories + + memoizer_counters.append(SCons.Memoize.CountValue('get_all_rdirs')) + + def get_all_rdirs(self): + try: + return list(self._memo['get_all_rdirs']) + except KeyError: + pass + + result = [self] + fname = '.' + dir = self + while dir: + for rep in dir.getRepositories(): + result.append(rep.Dir(fname)) + if fname == '.': + fname = dir.name + else: + fname = dir.name + os.sep + fname + dir = dir.up() + + self._memo['get_all_rdirs'] = list(result) + + return result + + def addRepository(self, dir): + if dir != self and not dir in self.repositories: + self.repositories.append(dir) + dir.tpath = '.' + self.__clearRepositoryCache() + + def up(self): + return self.entries['..'] + + def _rel_path_key(self, other): + return str(other) + + memoizer_counters.append(SCons.Memoize.CountDict('rel_path', _rel_path_key)) + + def rel_path(self, other): + """Return a path to "other" relative to this directory. + """ + + # This complicated and expensive method, which constructs relative + # paths between arbitrary Node.FS objects, is no longer used + # by SCons itself. It was introduced to store dependency paths + # in .sconsign files relative to the target, but that ended up + # being significantly inefficient. + # + # We're continuing to support the method because some SConstruct + # files out there started using it when it was available, and + # we're all about backwards compatibility.. + + try: + memo_dict = self._memo['rel_path'] + except KeyError: + memo_dict = {} + self._memo['rel_path'] = memo_dict + else: + try: + return memo_dict[other] + except KeyError: + pass + + if self is other: + result = '.' + + elif not other in self.path_elements: + try: + other_dir = other.get_dir() + except AttributeError: + result = str(other) + else: + if other_dir is None: + result = other.name + else: + dir_rel_path = self.rel_path(other_dir) + if dir_rel_path == '.': + result = other.name + else: + result = dir_rel_path + os.sep + other.name + else: + i = self.path_elements.index(other) + 1 + + path_elems = ['..'] * (len(self.path_elements) - i) \ + + map(lambda n: n.name, other.path_elements[i:]) + + result = string.join(path_elems, os.sep) + + memo_dict[other] = result + + return result + + def get_env_scanner(self, env, kw={}): + import SCons.Defaults + return SCons.Defaults.DirEntryScanner + + def get_target_scanner(self): + import SCons.Defaults + return SCons.Defaults.DirEntryScanner + + def get_found_includes(self, env, scanner, path): + """Return this directory's implicit dependencies. + + We don't bother caching the results because the scan typically + shouldn't be requested more than once (as opposed to scanning + .h file contents, which can be requested as many times as the + files is #included by other files). + """ + if not scanner: + return [] + # Clear cached info for this Dir. If we already visited this + # directory on our walk down the tree (because we didn't know at + # that point it was being used as the source for another Node) + # then we may have calculated build signature before realizing + # we had to scan the disk. Now that we have to, though, we need + # to invalidate the old calculated signature so that any node + # dependent on our directory structure gets one that includes + # info about everything on disk. + self.clear() + return scanner(self, env, path) + + # + # Taskmaster interface subsystem + # + + def prepare(self): + pass + + def build(self, **kw): + """A null "builder" for directories.""" + global MkdirBuilder + if self.builder is not MkdirBuilder: + apply(SCons.Node.Node.build, [self,], kw) + + # + # + # + + def _create(self): + """Create this directory, silently and without worrying about + whether the builder is the default or not.""" + listDirs = [] + parent = self + while parent: + if parent.exists(): + break + listDirs.append(parent) + p = parent.up() + if p is None: + # Don't use while: - else: for this condition because + # if so, then parent is None and has no .path attribute. + raise SCons.Errors.StopError, parent.path + parent = p + listDirs.reverse() + for dirnode in listDirs: + try: + # Don't call dirnode.build(), call the base Node method + # directly because we definitely *must* create this + # directory. The dirnode.build() method will suppress + # the build if it's the default builder. + SCons.Node.Node.build(dirnode) + dirnode.get_executor().nullify() + # The build() action may or may not have actually + # created the directory, depending on whether the -n + # option was used or not. Delete the _exists and + # _rexists attributes so they can be reevaluated. + dirnode.clear() + except OSError: + pass + + def multiple_side_effect_has_builder(self): + global MkdirBuilder + return self.builder is not MkdirBuilder and self.has_builder() + + def alter_targets(self): + """Return any corresponding targets in a variant directory. + """ + return self.fs.variant_dir_target_climb(self, self, []) + + def scanner_key(self): + """A directory does not get scanned.""" + return None + + def get_text_contents(self): + """We already emit things in text, so just return the binary + version.""" + return self.get_contents() + + def get_contents(self): + """Return content signatures and names of all our children + separated by new-lines. Ensure that the nodes are sorted.""" + contents = [] + name_cmp = lambda a, b: cmp(a.name, b.name) + sorted_children = self.children()[:] + sorted_children.sort(name_cmp) + for node in sorted_children: + contents.append('%s %s\n' % (node.get_csig(), node.name)) + return string.join(contents, '') + + def get_csig(self): + """Compute the content signature for Directory nodes. In + general, this is not needed and the content signature is not + stored in the DirNodeInfo. However, if get_contents on a Dir + node is called which has a child directory, the child + directory should return the hash of its contents.""" + contents = self.get_contents() + return SCons.Util.MD5signature(contents) + + def do_duplicate(self, src): + pass + + changed_since_last_build = SCons.Node.Node.state_has_changed + + def is_up_to_date(self): + """If any child is not up-to-date, then this directory isn't, + either.""" + if self.builder is not MkdirBuilder and not self.exists(): + return 0 + up_to_date = SCons.Node.up_to_date + for kid in self.children(): + if kid.get_state() > up_to_date: + return 0 + return 1 + + def rdir(self): + if not self.exists(): + norm_name = _my_normcase(self.name) + for dir in self.dir.get_all_rdirs(): + try: node = dir.entries[norm_name] + except KeyError: node = dir.dir_on_disk(self.name) + if node and node.exists() and \ + (isinstance(dir, Dir) or isinstance(dir, Entry)): + return node + return self + + def sconsign(self): + """Return the .sconsign file info for this directory, + creating it first if necessary.""" + if not self._sconsign: + import SCons.SConsign + self._sconsign = SCons.SConsign.ForDirectory(self) + return self._sconsign + + def srcnode(self): + """Dir has a special need for srcnode()...if we + have a srcdir attribute set, then that *is* our srcnode.""" + if self.srcdir: + return self.srcdir + return Base.srcnode(self) + + def get_timestamp(self): + """Return the latest timestamp from among our children""" + stamp = 0 + for kid in self.children(): + if kid.get_timestamp() > stamp: + stamp = kid.get_timestamp() + return stamp + + def entry_abspath(self, name): + return self.abspath + os.sep + name + + def entry_labspath(self, name): + return self.labspath + '/' + name + + def entry_path(self, name): + return self.path + os.sep + name + + def entry_tpath(self, name): + return self.tpath + os.sep + name + + def entry_exists_on_disk(self, name): + try: + d = self.on_disk_entries + except AttributeError: + d = {} + try: + entries = os.listdir(self.abspath) + except OSError: + pass + else: + for entry in map(_my_normcase, entries): + d[entry] = True + self.on_disk_entries = d + if sys.platform == 'win32': + name = _my_normcase(name) + result = d.get(name) + if result is None: + # Belt-and-suspenders for Windows: check directly for + # 8.3 file names that don't show up in os.listdir(). + result = os.path.exists(self.abspath + os.sep + name) + d[name] = result + return result + else: + return d.has_key(name) + + memoizer_counters.append(SCons.Memoize.CountValue('srcdir_list')) + + def srcdir_list(self): + try: + return self._memo['srcdir_list'] + except KeyError: + pass + + result = [] + + dirname = '.' + dir = self + while dir: + if dir.srcdir: + result.append(dir.srcdir.Dir(dirname)) + dirname = dir.name + os.sep + dirname + dir = dir.up() + + self._memo['srcdir_list'] = result + + return result + + def srcdir_duplicate(self, name): + for dir in self.srcdir_list(): + if self.is_under(dir): + # We shouldn't source from something in the build path; + # variant_dir is probably under src_dir, in which case + # we are reflecting. + break + if dir.entry_exists_on_disk(name): + srcnode = dir.Entry(name).disambiguate() + if self.duplicate: + node = self.Entry(name).disambiguate() + node.do_duplicate(srcnode) + return node + else: + return srcnode + return None + + def _srcdir_find_file_key(self, filename): + return filename + + memoizer_counters.append(SCons.Memoize.CountDict('srcdir_find_file', _srcdir_find_file_key)) + + def srcdir_find_file(self, filename): + try: + memo_dict = self._memo['srcdir_find_file'] + except KeyError: + memo_dict = {} + self._memo['srcdir_find_file'] = memo_dict + else: + try: + return memo_dict[filename] + except KeyError: + pass + + def func(node): + if (isinstance(node, File) or isinstance(node, Entry)) and \ + (node.is_derived() or node.exists()): + return node + return None + + norm_name = _my_normcase(filename) + + for rdir in self.get_all_rdirs(): + try: node = rdir.entries[norm_name] + except KeyError: node = rdir.file_on_disk(filename) + else: node = func(node) + if node: + result = (node, self) + memo_dict[filename] = result + return result + + for srcdir in self.srcdir_list(): + for rdir in srcdir.get_all_rdirs(): + try: node = rdir.entries[norm_name] + except KeyError: node = rdir.file_on_disk(filename) + else: node = func(node) + if node: + result = (File(filename, self, self.fs), srcdir) + memo_dict[filename] = result + return result + + result = (None, None) + memo_dict[filename] = result + return result + + def dir_on_disk(self, name): + if self.entry_exists_on_disk(name): + try: return self.Dir(name) + except TypeError: pass + node = self.srcdir_duplicate(name) + if isinstance(node, File): + return None + return node + + def file_on_disk(self, name): + if self.entry_exists_on_disk(name) or \ + diskcheck_rcs(self, name) or \ + diskcheck_sccs(self, name): + try: return self.File(name) + except TypeError: pass + node = self.srcdir_duplicate(name) + if isinstance(node, Dir): + return None + return node + + def walk(self, func, arg): + """ + Walk this directory tree by calling the specified function + for each directory in the tree. + + This behaves like the os.path.walk() function, but for in-memory + Node.FS.Dir objects. The function takes the same arguments as + the functions passed to os.path.walk(): + + func(arg, dirname, fnames) + + Except that "dirname" will actually be the directory *Node*, + not the string. The '.' and '..' entries are excluded from + fnames. The fnames list may be modified in-place to filter the + subdirectories visited or otherwise impose a specific order. + The "arg" argument is always passed to func() and may be used + in any way (or ignored, passing None is common). + """ + entries = self.entries + names = entries.keys() + names.remove('.') + names.remove('..') + func(arg, self, names) + select_dirs = lambda n, e=entries: isinstance(e[n], Dir) + for dirname in filter(select_dirs, names): + entries[dirname].walk(func, arg) + + def glob(self, pathname, ondisk=True, source=False, strings=False): + """ + Returns a list of Nodes (or strings) matching a specified + pathname pattern. + + Pathname patterns follow UNIX shell semantics: * matches + any-length strings of any characters, ? matches any character, + and [] can enclose lists or ranges of characters. Matches do + not span directory separators. + + The matches take into account Repositories, returning local + Nodes if a corresponding entry exists in a Repository (either + an in-memory Node or something on disk). + + By defafult, the glob() function matches entries that exist + on-disk, in addition to in-memory Nodes. Setting the "ondisk" + argument to False (or some other non-true value) causes the glob() + function to only match in-memory Nodes. The default behavior is + to return both the on-disk and in-memory Nodes. + + The "source" argument, when true, specifies that corresponding + source Nodes must be returned if you're globbing in a build + directory (initialized with VariantDir()). The default behavior + is to return Nodes local to the VariantDir(). + + The "strings" argument, when true, returns the matches as strings, + not Nodes. The strings are path names relative to this directory. + + The underlying algorithm is adapted from the glob.glob() function + in the Python library (but heavily modified), and uses fnmatch() + under the covers. + """ + dirname, basename = os.path.split(pathname) + if not dirname: + result = self._glob1(basename, ondisk, source, strings) + result.sort(lambda a, b: cmp(str(a), str(b))) + return result + if has_glob_magic(dirname): + list = self.glob(dirname, ondisk, source, strings=False) + else: + list = [self.Dir(dirname, create=True)] + result = [] + for dir in list: + r = dir._glob1(basename, ondisk, source, strings) + if strings: + r = map(lambda x, d=str(dir): os.path.join(d, x), r) + result.extend(r) + result.sort(lambda a, b: cmp(str(a), str(b))) + return result + + def _glob1(self, pattern, ondisk=True, source=False, strings=False): + """ + Globs for and returns a list of entry names matching a single + pattern in this directory. + + This searches any repositories and source directories for + corresponding entries and returns a Node (or string) relative + to the current directory if an entry is found anywhere. + + TODO: handle pattern with no wildcard + """ + search_dir_list = self.get_all_rdirs() + for srcdir in self.srcdir_list(): + search_dir_list.extend(srcdir.get_all_rdirs()) + + selfEntry = self.Entry + names = [] + for dir in search_dir_list: + # We use the .name attribute from the Node because the keys of + # the dir.entries dictionary are normalized (that is, all upper + # case) on case-insensitive systems like Windows. + #node_names = [ v.name for k, v in dir.entries.items() if k not in ('.', '..') ] + entry_names = filter(lambda n: n not in ('.', '..'), dir.entries.keys()) + node_names = map(lambda n, e=dir.entries: e[n].name, entry_names) + names.extend(node_names) + if not strings: + # Make sure the working directory (self) actually has + # entries for all Nodes in repositories or variant dirs. + for name in node_names: selfEntry(name) + if ondisk: + try: + disk_names = os.listdir(dir.abspath) + except os.error: + continue + names.extend(disk_names) + if not strings: + # We're going to return corresponding Nodes in + # the local directory, so we need to make sure + # those Nodes exist. We only want to create + # Nodes for the entries that will match the + # specified pattern, though, which means we + # need to filter the list here, even though + # the overall list will also be filtered later, + # after we exit this loop. + if pattern[0] != '.': + #disk_names = [ d for d in disk_names if d[0] != '.' ] + disk_names = filter(lambda x: x[0] != '.', disk_names) + disk_names = fnmatch.filter(disk_names, pattern) + dirEntry = dir.Entry + for name in disk_names: + # Add './' before disk filename so that '#' at + # beginning of filename isn't interpreted. + name = './' + name + node = dirEntry(name).disambiguate() + n = selfEntry(name) + if n.__class__ != node.__class__: + n.__class__ = node.__class__ + n._morph() + + names = set(names) + if pattern[0] != '.': + #names = [ n for n in names if n[0] != '.' ] + names = filter(lambda x: x[0] != '.', names) + names = fnmatch.filter(names, pattern) + + if strings: + return names + + #return [ self.entries[_my_normcase(n)] for n in names ] + return map(lambda n, e=self.entries: e[_my_normcase(n)], names) + +class RootDir(Dir): + """A class for the root directory of a file system. + + This is the same as a Dir class, except that the path separator + ('/' or '\\') is actually part of the name, so we don't need to + add a separator when creating the path names of entries within + this directory. + """ + def __init__(self, name, fs): + if __debug__: logInstanceCreation(self, 'Node.FS.RootDir') + # We're going to be our own parent directory (".." entry and .dir + # attribute) so we have to set up some values so Base.__init__() + # won't gag won't it calls some of our methods. + self.abspath = '' + self.labspath = '' + self.path = '' + self.tpath = '' + self.path_elements = [] + self.duplicate = 0 + self.root = self + Base.__init__(self, name, self, fs) + + # Now set our paths to what we really want them to be: the + # initial drive letter (the name) plus the directory separator, + # except for the "lookup abspath," which does not have the + # drive letter. + self.abspath = name + os.sep + self.labspath = '' + self.path = name + os.sep + self.tpath = name + os.sep + self._morph() + + self._lookupDict = {} + + # The // and os.sep + os.sep entries are necessary because + # os.path.normpath() seems to preserve double slashes at the + # beginning of a path (presumably for UNC path names), but + # collapses triple slashes to a single slash. + self._lookupDict[''] = self + self._lookupDict['/'] = self + self._lookupDict['//'] = self + self._lookupDict[os.sep] = self + self._lookupDict[os.sep + os.sep] = self + + def must_be_same(self, klass): + if klass is Dir: + return + Base.must_be_same(self, klass) + + def _lookup_abs(self, p, klass, create=1): + """ + Fast (?) lookup of a *normalized* absolute path. + + This method is intended for use by internal lookups with + already-normalized path data. For general-purpose lookups, + use the FS.Entry(), FS.Dir() or FS.File() methods. + + The caller is responsible for making sure we're passed a + normalized absolute path; we merely let Python's dictionary look + up and return the One True Node.FS object for the path. + + If no Node for the specified "p" doesn't already exist, and + "create" is specified, the Node may be created after recursive + invocation to find or create the parent directory or directories. + """ + k = _my_normcase(p) + try: + result = self._lookupDict[k] + except KeyError: + if not create: + msg = "No such file or directory: '%s' in '%s' (and create is False)" % (p, str(self)) + raise SCons.Errors.UserError, msg + # There is no Node for this path name, and we're allowed + # to create it. + dir_name, file_name = os.path.split(p) + dir_node = self._lookup_abs(dir_name, Dir) + result = klass(file_name, dir_node, self.fs) + + # Double-check on disk (as configured) that the Node we + # created matches whatever is out there in the real world. + result.diskcheck_match() + + self._lookupDict[k] = result + dir_node.entries[_my_normcase(file_name)] = result + dir_node.implicit = None + else: + # There is already a Node for this path name. Allow it to + # complain if we were looking for an inappropriate type. + result.must_be_same(klass) + return result + + def __str__(self): + return self.abspath + + def entry_abspath(self, name): + return self.abspath + name + + def entry_labspath(self, name): + return '/' + name + + def entry_path(self, name): + return self.path + name + + def entry_tpath(self, name): + return self.tpath + name + + def is_under(self, dir): + if self is dir: + return 1 + else: + return 0 + + def up(self): + return None + + def get_dir(self): + return None + + def src_builder(self): + return _null + +class FileNodeInfo(SCons.Node.NodeInfoBase): + current_version_id = 1 + + field_list = ['csig', 'timestamp', 'size'] + + # This should get reset by the FS initialization. + fs = None + + def str_to_node(self, s): + top = self.fs.Top + root = top.root + if do_splitdrive: + drive, s = os.path.splitdrive(s) + if drive: + root = self.fs.get_root(drive) + if not os.path.isabs(s): + s = top.labspath + '/' + s + return root._lookup_abs(s, Entry) + +class FileBuildInfo(SCons.Node.BuildInfoBase): + current_version_id = 1 + + def convert_to_sconsign(self): + """ + Converts this FileBuildInfo object for writing to a .sconsign file + + This replaces each Node in our various dependency lists with its + usual string representation: relative to the top-level SConstruct + directory, or an absolute path if it's outside. + """ + if os.sep == '/': + node_to_str = str + else: + def node_to_str(n): + try: + s = n.path + except AttributeError: + s = str(n) + else: + s = string.replace(s, os.sep, '/') + return s + for attr in ['bsources', 'bdepends', 'bimplicit']: + try: + val = getattr(self, attr) + except AttributeError: + pass + else: + setattr(self, attr, map(node_to_str, val)) + def convert_from_sconsign(self, dir, name): + """ + Converts a newly-read FileBuildInfo object for in-SCons use + + For normal up-to-date checking, we don't have any conversion to + perform--but we're leaving this method here to make that clear. + """ + pass + def prepare_dependencies(self): + """ + Prepares a FileBuildInfo object for explaining what changed + + The bsources, bdepends and bimplicit lists have all been + stored on disk as paths relative to the top-level SConstruct + directory. Convert the strings to actual Nodes (for use by the + --debug=explain code and --implicit-cache). + """ + attrs = [ + ('bsources', 'bsourcesigs'), + ('bdepends', 'bdependsigs'), + ('bimplicit', 'bimplicitsigs'), + ] + for (nattr, sattr) in attrs: + try: + strings = getattr(self, nattr) + nodeinfos = getattr(self, sattr) + except AttributeError: + continue + nodes = [] + for s, ni in izip(strings, nodeinfos): + if not isinstance(s, SCons.Node.Node): + s = ni.str_to_node(s) + nodes.append(s) + setattr(self, nattr, nodes) + def format(self, names=0): + result = [] + bkids = self.bsources + self.bdepends + self.bimplicit + bkidsigs = self.bsourcesigs + self.bdependsigs + self.bimplicitsigs + for bkid, bkidsig in izip(bkids, bkidsigs): + result.append(str(bkid) + ': ' + + string.join(bkidsig.format(names=names), ' ')) + result.append('%s [%s]' % (self.bactsig, self.bact)) + return string.join(result, '\n') + +class File(Base): + """A class for files in a file system. + """ + + memoizer_counters = [] + + NodeInfo = FileNodeInfo + BuildInfo = FileBuildInfo + + md5_chunksize = 64 + + def diskcheck_match(self): + diskcheck_match(self, self.isdir, + "Directory %s found where file expected.") + + def __init__(self, name, directory, fs): + if __debug__: logInstanceCreation(self, 'Node.FS.File') + Base.__init__(self, name, directory, fs) + self._morph() + + def Entry(self, name): + """Create an entry node named 'name' relative to + the directory of this file.""" + return self.dir.Entry(name) + + def Dir(self, name, create=True): + """Create a directory node named 'name' relative to + the directory of this file.""" + return self.dir.Dir(name, create=create) + + def Dirs(self, pathlist): + """Create a list of directories relative to the SConscript + directory of this file.""" + # TODO(1.5) + # return [self.Dir(p) for p in pathlist] + return map(lambda p, s=self: s.Dir(p), pathlist) + + def File(self, name): + """Create a file node named 'name' relative to + the directory of this file.""" + return self.dir.File(name) + + #def generate_build_dict(self): + # """Return an appropriate dictionary of values for building + # this File.""" + # return {'Dir' : self.Dir, + # 'File' : self.File, + # 'RDirs' : self.RDirs} + + def _morph(self): + """Turn a file system node into a File object.""" + self.scanner_paths = {} + if not hasattr(self, '_local'): + self._local = 0 + + # If there was already a Builder set on this entry, then + # we need to make sure we call the target-decider function, + # not the source-decider. Reaching in and doing this by hand + # is a little bogus. We'd prefer to handle this by adding + # an Entry.builder_set() method that disambiguates like the + # other methods, but that starts running into problems with the + # fragile way we initialize Dir Nodes with their Mkdir builders, + # yet still allow them to be overridden by the user. Since it's + # not clear right now how to fix that, stick with what works + # until it becomes clear... + if self.has_builder(): + self.changed_since_last_build = self.decide_target + + def scanner_key(self): + return self.get_suffix() + + def get_contents(self): + if not self.rexists(): + return '' + fname = self.rfile().abspath + try: + contents = open(fname, "rb").read() + except EnvironmentError, e: + if not e.filename: + e.filename = fname + raise + return contents + + try: + import codecs + except ImportError: + get_text_contents = get_contents + else: + # This attempts to figure out what the encoding of the text is + # based upon the BOM bytes, and then decodes the contents so that + # it's a valid python string. + def get_text_contents(self): + contents = self.get_contents() + # The behavior of various decode() methods and functions + # w.r.t. the initial BOM bytes is different for different + # encodings and/or Python versions. ('utf-8' does not strip + # them, but has a 'utf-8-sig' which does; 'utf-16' seems to + # strip them; etc.) Just side step all the complication by + # explicitly stripping the BOM before we decode(). + if contents.startswith(codecs.BOM_UTF8): + contents = contents[len(codecs.BOM_UTF8):] + # TODO(2.2): Remove when 2.3 becomes floor. + #contents = contents.decode('utf-8') + contents = my_decode(contents, 'utf-8') + elif contents.startswith(codecs.BOM_UTF16_LE): + contents = contents[len(codecs.BOM_UTF16_LE):] + # TODO(2.2): Remove when 2.3 becomes floor. + #contents = contents.decode('utf-16-le') + contents = my_decode(contents, 'utf-16-le') + elif contents.startswith(codecs.BOM_UTF16_BE): + contents = contents[len(codecs.BOM_UTF16_BE):] + # TODO(2.2): Remove when 2.3 becomes floor. + #contents = contents.decode('utf-16-be') + contents = my_decode(contents, 'utf-16-be') + return contents + + def get_content_hash(self): + """ + Compute and return the MD5 hash for this file. + """ + if not self.rexists(): + return SCons.Util.MD5signature('') + fname = self.rfile().abspath + try: + cs = SCons.Util.MD5filesignature(fname, + chunksize=SCons.Node.FS.File.md5_chunksize*1024) + except EnvironmentError, e: + if not e.filename: + e.filename = fname + raise + return cs + + + memoizer_counters.append(SCons.Memoize.CountValue('get_size')) + + def get_size(self): + try: + return self._memo['get_size'] + except KeyError: + pass + + if self.rexists(): + size = self.rfile().getsize() + else: + size = 0 + + self._memo['get_size'] = size + + return size + + memoizer_counters.append(SCons.Memoize.CountValue('get_timestamp')) + + def get_timestamp(self): + try: + return self._memo['get_timestamp'] + except KeyError: + pass + + if self.rexists(): + timestamp = self.rfile().getmtime() + else: + timestamp = 0 + + self._memo['get_timestamp'] = timestamp + + return timestamp + + def store_info(self): + # Merge our build information into the already-stored entry. + # This accomodates "chained builds" where a file that's a target + # in one build (SConstruct file) is a source in a different build. + # See test/chained-build.py for the use case. + if do_store_info: + self.dir.sconsign().store_info(self.name, self) + + convert_copy_attrs = [ + 'bsources', + 'bimplicit', + 'bdepends', + 'bact', + 'bactsig', + 'ninfo', + ] + + + convert_sig_attrs = [ + 'bsourcesigs', + 'bimplicitsigs', + 'bdependsigs', + ] + + def convert_old_entry(self, old_entry): + # Convert a .sconsign entry from before the Big Signature + # Refactoring, doing what we can to convert its information + # to the new .sconsign entry format. + # + # The old format looked essentially like this: + # + # BuildInfo + # .ninfo (NodeInfo) + # .bsig + # .csig + # .timestamp + # .size + # .bsources + # .bsourcesigs ("signature" list) + # .bdepends + # .bdependsigs ("signature" list) + # .bimplicit + # .bimplicitsigs ("signature" list) + # .bact + # .bactsig + # + # The new format looks like this: + # + # .ninfo (NodeInfo) + # .bsig + # .csig + # .timestamp + # .size + # .binfo (BuildInfo) + # .bsources + # .bsourcesigs (NodeInfo list) + # .bsig + # .csig + # .timestamp + # .size + # .bdepends + # .bdependsigs (NodeInfo list) + # .bsig + # .csig + # .timestamp + # .size + # .bimplicit + # .bimplicitsigs (NodeInfo list) + # .bsig + # .csig + # .timestamp + # .size + # .bact + # .bactsig + # + # The basic idea of the new structure is that a NodeInfo always + # holds all available information about the state of a given Node + # at a certain point in time. The various .b*sigs lists can just + # be a list of pointers to the .ninfo attributes of the different + # dependent nodes, without any copying of information until it's + # time to pickle it for writing out to a .sconsign file. + # + # The complicating issue is that the *old* format only stored one + # "signature" per dependency, based on however the *last* build + # was configured. We don't know from just looking at it whether + # it was a build signature, a content signature, or a timestamp + # "signature". Since we no longer use build signatures, the + # best we can do is look at the length and if it's thirty two, + # assume that it was (or might have been) a content signature. + # If it was actually a build signature, then it will cause a + # rebuild anyway when it doesn't match the new content signature, + # but that's probably the best we can do. + import SCons.SConsign + new_entry = SCons.SConsign.SConsignEntry() + new_entry.binfo = self.new_binfo() + binfo = new_entry.binfo + for attr in self.convert_copy_attrs: + try: + value = getattr(old_entry, attr) + except AttributeError: + continue + setattr(binfo, attr, value) + delattr(old_entry, attr) + for attr in self.convert_sig_attrs: + try: + sig_list = getattr(old_entry, attr) + except AttributeError: + continue + value = [] + for sig in sig_list: + ninfo = self.new_ninfo() + if len(sig) == 32: + ninfo.csig = sig + else: + ninfo.timestamp = sig + value.append(ninfo) + setattr(binfo, attr, value) + delattr(old_entry, attr) + return new_entry + + memoizer_counters.append(SCons.Memoize.CountValue('get_stored_info')) + + def get_stored_info(self): + try: + return self._memo['get_stored_info'] + except KeyError: + pass + + try: + sconsign_entry = self.dir.sconsign().get_entry(self.name) + except (KeyError, EnvironmentError): + import SCons.SConsign + sconsign_entry = SCons.SConsign.SConsignEntry() + sconsign_entry.binfo = self.new_binfo() + sconsign_entry.ninfo = self.new_ninfo() + else: + if isinstance(sconsign_entry, FileBuildInfo): + # This is a .sconsign file from before the Big Signature + # Refactoring; convert it as best we can. + sconsign_entry = self.convert_old_entry(sconsign_entry) + try: + delattr(sconsign_entry.ninfo, 'bsig') + except AttributeError: + pass + + self._memo['get_stored_info'] = sconsign_entry + + return sconsign_entry + + def get_stored_implicit(self): + binfo = self.get_stored_info().binfo + binfo.prepare_dependencies() + try: return binfo.bimplicit + except AttributeError: return None + + def rel_path(self, other): + return self.dir.rel_path(other) + + def _get_found_includes_key(self, env, scanner, path): + return (id(env), id(scanner), path) + + memoizer_counters.append(SCons.Memoize.CountDict('get_found_includes', _get_found_includes_key)) + + def get_found_includes(self, env, scanner, path): + """Return the included implicit dependencies in this file. + Cache results so we only scan the file once per path + regardless of how many times this information is requested. + """ + memo_key = (id(env), id(scanner), path) + try: + memo_dict = self._memo['get_found_includes'] + except KeyError: + memo_dict = {} + self._memo['get_found_includes'] = memo_dict + else: + try: + return memo_dict[memo_key] + except KeyError: + pass + + if scanner: + # result = [n.disambiguate() for n in scanner(self, env, path)] + result = scanner(self, env, path) + result = map(lambda N: N.disambiguate(), result) + else: + result = [] + + memo_dict[memo_key] = result + + return result + + def _createDir(self): + # ensure that the directories for this node are + # created. + self.dir._create() + + def push_to_cache(self): + """Try to push the node into a cache + """ + # This should get called before the Nodes' .built() method is + # called, which would clear the build signature if the file has + # a source scanner. + # + # We have to clear the local memoized values *before* we push + # the node to cache so that the memoization of the self.exists() + # return value doesn't interfere. + if self.nocache: + return + self.clear_memoized_values() + if self.exists(): + self.get_build_env().get_CacheDir().push(self) + + def retrieve_from_cache(self): + """Try to retrieve the node's content from a cache + + This method is called from multiple threads in a parallel build, + so only do thread safe stuff here. Do thread unsafe stuff in + built(). + + Returns true iff the node was successfully retrieved. + """ + if self.nocache: + return None + if not self.is_derived(): + return None + return self.get_build_env().get_CacheDir().retrieve(self) + + def visited(self): + if self.exists(): + self.get_build_env().get_CacheDir().push_if_forced(self) + + ninfo = self.get_ninfo() + + csig = self.get_max_drift_csig() + if csig: + ninfo.csig = csig + + ninfo.timestamp = self.get_timestamp() + ninfo.size = self.get_size() + + if not self.has_builder(): + # This is a source file, but it might have been a target file + # in another build that included more of the DAG. Copy + # any build information that's stored in the .sconsign file + # into our binfo object so it doesn't get lost. + old = self.get_stored_info() + self.get_binfo().__dict__.update(old.binfo.__dict__) + + self.store_info() + + def find_src_builder(self): + if self.rexists(): + return None + scb = self.dir.src_builder() + if scb is _null: + if diskcheck_sccs(self.dir, self.name): + scb = get_DefaultSCCSBuilder() + elif diskcheck_rcs(self.dir, self.name): + scb = get_DefaultRCSBuilder() + else: + scb = None + if scb is not None: + try: + b = self.builder + except AttributeError: + b = None + if b is None: + self.builder_set(scb) + return scb + + def has_src_builder(self): + """Return whether this Node has a source builder or not. + + If this Node doesn't have an explicit source code builder, this + is where we figure out, on the fly, if there's a transparent + source code builder for it. + + Note that if we found a source builder, we also set the + self.builder attribute, so that all of the methods that actually + *build* this file don't have to do anything different. + """ + try: + scb = self.sbuilder + except AttributeError: + scb = self.sbuilder = self.find_src_builder() + return scb is not None + + def alter_targets(self): + """Return any corresponding targets in a variant directory. + """ + if self.is_derived(): + return [], None + return self.fs.variant_dir_target_climb(self, self.dir, [self.name]) + + def _rmv_existing(self): + self.clear_memoized_values() + e = Unlink(self, [], None) + if isinstance(e, SCons.Errors.BuildError): + raise e + + # + # Taskmaster interface subsystem + # + + def make_ready(self): + self.has_src_builder() + self.get_binfo() + + def prepare(self): + """Prepare for this file to be created.""" + SCons.Node.Node.prepare(self) + + if self.get_state() != SCons.Node.up_to_date: + if self.exists(): + if self.is_derived() and not self.precious: + self._rmv_existing() + else: + try: + self._createDir() + except SCons.Errors.StopError, drive: + desc = "No drive `%s' for target `%s'." % (drive, self) + raise SCons.Errors.StopError, desc + + # + # + # + + def remove(self): + """Remove this file.""" + if self.exists() or self.islink(): + self.fs.unlink(self.path) + return 1 + return None + + def do_duplicate(self, src): + self._createDir() + Unlink(self, None, None) + e = Link(self, src, None) + if isinstance(e, SCons.Errors.BuildError): + desc = "Cannot duplicate `%s' in `%s': %s." % (src.path, self.dir.path, e.errstr) + raise SCons.Errors.StopError, desc + self.linked = 1 + # The Link() action may or may not have actually + # created the file, depending on whether the -n + # option was used or not. Delete the _exists and + # _rexists attributes so they can be reevaluated. + self.clear() + + memoizer_counters.append(SCons.Memoize.CountValue('exists')) + + def exists(self): + try: + return self._memo['exists'] + except KeyError: + pass + # Duplicate from source path if we are set up to do this. + if self.duplicate and not self.is_derived() and not self.linked: + src = self.srcnode() + if src is not self: + # At this point, src is meant to be copied in a variant directory. + src = src.rfile() + if src.abspath != self.abspath: + if src.exists(): + self.do_duplicate(src) + # Can't return 1 here because the duplication might + # not actually occur if the -n option is being used. + else: + # The source file does not exist. Make sure no old + # copy remains in the variant directory. + if Base.exists(self) or self.islink(): + self.fs.unlink(self.path) + # Return None explicitly because the Base.exists() call + # above will have cached its value if the file existed. + self._memo['exists'] = None + return None + result = Base.exists(self) + self._memo['exists'] = result + return result + + # + # SIGNATURE SUBSYSTEM + # + + def get_max_drift_csig(self): + """ + Returns the content signature currently stored for this node + if it's been unmodified longer than the max_drift value, or the + max_drift value is 0. Returns None otherwise. + """ + old = self.get_stored_info() + mtime = self.get_timestamp() + + max_drift = self.fs.max_drift + if max_drift > 0: + if (time.time() - mtime) > max_drift: + try: + n = old.ninfo + if n.timestamp and n.csig and n.timestamp == mtime: + return n.csig + except AttributeError: + pass + elif max_drift == 0: + try: + return old.ninfo.csig + except AttributeError: + pass + + return None + + def get_csig(self): + """ + Generate a node's content signature, the digested signature + of its content. + + node - the node + cache - alternate node to use for the signature cache + returns - the content signature + """ + ninfo = self.get_ninfo() + try: + return ninfo.csig + except AttributeError: + pass + + csig = self.get_max_drift_csig() + if csig is None: + + try: + if self.get_size() < SCons.Node.FS.File.md5_chunksize: + contents = self.get_contents() + else: + csig = self.get_content_hash() + except IOError: + # This can happen if there's actually a directory on-disk, + # which can be the case if they've disabled disk checks, + # or if an action with a File target actually happens to + # create a same-named directory by mistake. + csig = '' + else: + if not csig: + csig = SCons.Util.MD5signature(contents) + + ninfo.csig = csig + + return csig + + # + # DECISION SUBSYSTEM + # + + def builder_set(self, builder): + SCons.Node.Node.builder_set(self, builder) + self.changed_since_last_build = self.decide_target + + def changed_content(self, target, prev_ni): + cur_csig = self.get_csig() + try: + return cur_csig != prev_ni.csig + except AttributeError: + return 1 + + def changed_state(self, target, prev_ni): + return self.state != SCons.Node.up_to_date + + def changed_timestamp_then_content(self, target, prev_ni): + if not self.changed_timestamp_match(target, prev_ni): + try: + self.get_ninfo().csig = prev_ni.csig + except AttributeError: + pass + return False + return self.changed_content(target, prev_ni) + + def changed_timestamp_newer(self, target, prev_ni): + try: + return self.get_timestamp() > target.get_timestamp() + except AttributeError: + return 1 + + def changed_timestamp_match(self, target, prev_ni): + try: + return self.get_timestamp() != prev_ni.timestamp + except AttributeError: + return 1 + + def decide_source(self, target, prev_ni): + return target.get_build_env().decide_source(self, target, prev_ni) + + def decide_target(self, target, prev_ni): + return target.get_build_env().decide_target(self, target, prev_ni) + + # Initialize this Node's decider function to decide_source() because + # every file is a source file until it has a Builder attached... + changed_since_last_build = decide_source + + def is_up_to_date(self): + T = 0 + if T: Trace('is_up_to_date(%s):' % self) + if not self.exists(): + if T: Trace(' not self.exists():') + # The file doesn't exist locally... + r = self.rfile() + if r != self: + # ...but there is one in a Repository... + if not self.changed(r): + if T: Trace(' changed(%s):' % r) + # ...and it's even up-to-date... + if self._local: + # ...and they'd like a local copy. + e = LocalCopy(self, r, None) + if isinstance(e, SCons.Errors.BuildError): + raise + self.store_info() + if T: Trace(' 1\n') + return 1 + self.changed() + if T: Trace(' None\n') + return None + else: + r = self.changed() + if T: Trace(' self.exists(): %s\n' % r) + return not r + + memoizer_counters.append(SCons.Memoize.CountValue('rfile')) + + def rfile(self): + try: + return self._memo['rfile'] + except KeyError: + pass + result = self + if not self.exists(): + norm_name = _my_normcase(self.name) + for dir in self.dir.get_all_rdirs(): + try: node = dir.entries[norm_name] + except KeyError: node = dir.file_on_disk(self.name) + if node and node.exists() and \ + (isinstance(node, File) or isinstance(node, Entry) \ + or not node.is_derived()): + result = node + # Copy over our local attributes to the repository + # Node so we identify shared object files in the + # repository and don't assume they're static. + # + # This isn't perfect; the attribute would ideally + # be attached to the object in the repository in + # case it was built statically in the repository + # and we changed it to shared locally, but that's + # rarely the case and would only occur if you + # intentionally used the same suffix for both + # shared and static objects anyway. So this + # should work well in practice. + result.attributes = self.attributes + break + self._memo['rfile'] = result + return result + + def rstr(self): + return str(self.rfile()) + + def get_cachedir_csig(self): + """ + Fetch a Node's content signature for purposes of computing + another Node's cachesig. + + This is a wrapper around the normal get_csig() method that handles + the somewhat obscure case of using CacheDir with the -n option. + Any files that don't exist would normally be "built" by fetching + them from the cache, but the normal get_csig() method will try + to open up the local file, which doesn't exist because the -n + option meant we didn't actually pull the file from cachedir. + But since the file *does* actually exist in the cachedir, we + can use its contents for the csig. + """ + try: + return self.cachedir_csig + except AttributeError: + pass + + cachedir, cachefile = self.get_build_env().get_CacheDir().cachepath(self) + if not self.exists() and cachefile and os.path.exists(cachefile): + self.cachedir_csig = SCons.Util.MD5filesignature(cachefile, \ + SCons.Node.FS.File.md5_chunksize * 1024) + else: + self.cachedir_csig = self.get_csig() + return self.cachedir_csig + + def get_cachedir_bsig(self): + try: + return self.cachesig + except AttributeError: + pass + + # Add the path to the cache signature, because multiple + # targets built by the same action will all have the same + # build signature, and we have to differentiate them somehow. + children = self.children() + executor = self.get_executor() + # sigs = [n.get_cachedir_csig() for n in children] + sigs = map(lambda n: n.get_cachedir_csig(), children) + sigs.append(SCons.Util.MD5signature(executor.get_contents())) + sigs.append(self.path) + result = self.cachesig = SCons.Util.MD5collect(sigs) + return result + + +default_fs = None + +def get_default_fs(): + global default_fs + if not default_fs: + default_fs = FS() + return default_fs + +class FileFinder: + """ + """ + if SCons.Memoize.use_memoizer: + __metaclass__ = SCons.Memoize.Memoized_Metaclass + + memoizer_counters = [] + + def __init__(self): + self._memo = {} + + def filedir_lookup(self, p, fd=None): + """ + A helper method for find_file() that looks up a directory for + a file we're trying to find. This only creates the Dir Node if + it exists on-disk, since if the directory doesn't exist we know + we won't find any files in it... :-) + + It would be more compact to just use this as a nested function + with a default keyword argument (see the commented-out version + below), but that doesn't work unless you have nested scopes, + so we define it here just so this work under Python 1.5.2. + """ + if fd is None: + fd = self.default_filedir + dir, name = os.path.split(fd) + drive, d = os.path.splitdrive(dir) + if not name and d[:1] in ('/', os.sep): + #return p.fs.get_root(drive).dir_on_disk(name) + return p.fs.get_root(drive) + if dir: + p = self.filedir_lookup(p, dir) + if not p: + return None + norm_name = _my_normcase(name) + try: + node = p.entries[norm_name] + except KeyError: + return p.dir_on_disk(name) + if isinstance(node, Dir): + return node + if isinstance(node, Entry): + node.must_be_same(Dir) + return node + return None + + def _find_file_key(self, filename, paths, verbose=None): + return (filename, paths) + + memoizer_counters.append(SCons.Memoize.CountDict('find_file', _find_file_key)) + + def find_file(self, filename, paths, verbose=None): + """ + find_file(str, [Dir()]) -> [nodes] + + filename - a filename to find + paths - a list of directory path *nodes* to search in. Can be + represented as a list, a tuple, or a callable that is + called with no arguments and returns the list or tuple. + + returns - the node created from the found file. + + Find a node corresponding to either a derived file or a file + that exists already. + + Only the first file found is returned, and none is returned + if no file is found. + """ + memo_key = self._find_file_key(filename, paths) + try: + memo_dict = self._memo['find_file'] + except KeyError: + memo_dict = {} + self._memo['find_file'] = memo_dict + else: + try: + return memo_dict[memo_key] + except KeyError: + pass + + if verbose and not callable(verbose): + if not SCons.Util.is_String(verbose): + verbose = "find_file" + verbose = ' %s: ' % verbose + verbose = lambda s, v=verbose: sys.stdout.write(v + s) + + filedir, filename = os.path.split(filename) + if filedir: + # More compact code that we can't use until we drop + # support for Python 1.5.2: + # + #def filedir_lookup(p, fd=filedir): + # """ + # A helper function that looks up a directory for a file + # we're trying to find. This only creates the Dir Node + # if it exists on-disk, since if the directory doesn't + # exist we know we won't find any files in it... :-) + # """ + # dir, name = os.path.split(fd) + # if dir: + # p = filedir_lookup(p, dir) + # if not p: + # return None + # norm_name = _my_normcase(name) + # try: + # node = p.entries[norm_name] + # except KeyError: + # return p.dir_on_disk(name) + # if isinstance(node, Dir): + # return node + # if isinstance(node, Entry): + # node.must_be_same(Dir) + # return node + # if isinstance(node, Dir) or isinstance(node, Entry): + # return node + # return None + #paths = filter(None, map(filedir_lookup, paths)) + + self.default_filedir = filedir + paths = filter(None, map(self.filedir_lookup, paths)) + + result = None + for dir in paths: + if verbose: + verbose("looking for '%s' in '%s' ...\n" % (filename, dir)) + node, d = dir.srcdir_find_file(filename) + if node: + if verbose: + verbose("... FOUND '%s' in '%s'\n" % (filename, d)) + result = node + break + + memo_dict[memo_key] = result + + return result + +find_file = FileFinder().find_file + + +def invalidate_node_memos(targets): + """ + Invalidate the memoized values of all Nodes (files or directories) + that are associated with the given entries. Has been added to + clear the cache of nodes affected by a direct execution of an + action (e.g. Delete/Copy/Chmod). Existing Node caches become + inconsistent if the action is run through Execute(). The argument + `targets` can be a single Node object or filename, or a sequence + of Nodes/filenames. + """ + from traceback import extract_stack + + # First check if the cache really needs to be flushed. Only + # actions run in the SConscript with Execute() seem to be + # affected. XXX The way to check if Execute() is in the stacktrace + # is a very dirty hack and should be replaced by a more sensible + # solution. + for f in extract_stack(): + if f[2] == 'Execute' and f[0][-14:] == 'Environment.py': + break + else: + # Dont have to invalidate, so return + return + + if not SCons.Util.is_List(targets): + targets = [targets] + + for entry in targets: + # If the target is a Node object, clear the cache. If it is a + # filename, look up potentially existing Node object first. + try: + entry.clear_memoized_values() + except AttributeError: + # Not a Node object, try to look up Node by filename. XXX + # This creates Node objects even for those filenames which + # do not correspond to an existing Node object. + node = get_default_fs().Entry(entry) + if node: + node.clear_memoized_values() + +# Local Variables: +# tab-width:4 +# indent-tabs-mode:nil +# End: +# vim: set expandtab tabstop=4 shiftwidth=4: diff --git a/src/engine/SCons/Node/FSTests.py b/src/engine/SCons/Node/FSTests.py new file mode 100644 index 0000000..bf64ab7 --- /dev/null +++ b/src/engine/SCons/Node/FSTests.py @@ -0,0 +1,3520 @@ +# +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 The SCons Foundation +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY +# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# + +__revision__ = "src/engine/SCons/Node/FSTests.py 4577 2009/12/27 19:44:43 scons" + +import os +import os.path +import string +import sys +import time +import unittest +from TestCmd import TestCmd +import shutil +import stat + +import SCons.Errors +import SCons.Node.FS +import SCons.Util +import SCons.Warnings + +built_it = None + +# This will be built-in in 2.3. For now fake it. +try : + True , False +except NameError : + True = 1 ; False = 0 + + +scanner_count = 0 + +class Scanner: + def __init__(self, node=None): + global scanner_count + scanner_count = scanner_count + 1 + self.hash = scanner_count + self.node = node + def path(self, env, dir, target=None, source=None): + return () + def __call__(self, node, env, path): + return [self.node] + def __hash__(self): + return self.hash + def select(self, node): + return self + def recurse_nodes(self, nodes): + return nodes + +class Environment: + def __init__(self): + self.scanner = Scanner() + def Dictionary(self, *args): + return {} + def autogenerate(self, **kw): + return {} + def get_scanner(self, skey): + return self.scanner + def Override(self, overrides): + return self + def _update(self, dict): + pass + +class Action: + def __call__(self, targets, sources, env, **kw): + global built_it + if kw.get('execute', 1): + built_it = 1 + return 0 + def show(self, string): + pass + def get_contents(self, target, source, env): + return "" + def genstring(self, target, source, env): + return "" + def strfunction(self, targets, sources, env): + return "" + def get_implicit_deps(self, target, source, env): + return [] + +class Builder: + def __init__(self, factory, action=Action()): + self.factory = factory + self.env = Environment() + self.overrides = {} + self.action = action + self.target_scanner = None + self.source_scanner = None + + def targets(self, t): + return [t] + + def source_factory(self, name): + return self.factory(name) + +class _tempdirTestCase(unittest.TestCase): + def setUp(self): + self.save_cwd = os.getcwd() + self.test = TestCmd(workdir='') + # FS doesn't like the cwd to be something other than its root. + os.chdir(self.test.workpath("")) + self.fs = SCons.Node.FS.FS() + + def tearDown(self): + os.chdir(self.save_cwd) + +class VariantDirTestCase(unittest.TestCase): + def runTest(self): + """Test variant dir functionality""" + test=TestCmd(workdir='') + + fs = SCons.Node.FS.FS() + f1 = fs.File('build/test1') + fs.VariantDir('build', 'src') + f2 = fs.File('build/test2') + d1 = fs.Dir('build') + assert f1.srcnode().path == os.path.normpath('src/test1'), f1.srcnode().path + assert f2.srcnode().path == os.path.normpath('src/test2'), f2.srcnode().path + assert d1.srcnode().path == 'src', d1.srcnode().path + + fs = SCons.Node.FS.FS() + f1 = fs.File('build/test1') + fs.VariantDir('build', '.') + f2 = fs.File('build/test2') + d1 = fs.Dir('build') + assert f1.srcnode().path == 'test1', f1.srcnode().path + assert f2.srcnode().path == 'test2', f2.srcnode().path + assert d1.srcnode().path == '.', d1.srcnode().path + + fs = SCons.Node.FS.FS() + fs.VariantDir('build/var1', 'src') + fs.VariantDir('build/var2', 'src') + f1 = fs.File('build/var1/test1') + f2 = fs.File('build/var2/test1') + assert f1.srcnode().path == os.path.normpath('src/test1'), f1.srcnode().path + assert f2.srcnode().path == os.path.normpath('src/test1'), f2.srcnode().path + + fs = SCons.Node.FS.FS() + fs.VariantDir('../var1', 'src') + fs.VariantDir('../var2', 'src') + f1 = fs.File('../var1/test1') + f2 = fs.File('../var2/test1') + assert f1.srcnode().path == os.path.normpath('src/test1'), f1.srcnode().path + assert f2.srcnode().path == os.path.normpath('src/test1'), f2.srcnode().path + + # Set up some files + test.subdir('work', ['work', 'src']) + test.subdir(['work', 'build'], ['work', 'build', 'var1']) + test.subdir(['work', 'build', 'var2']) + test.subdir('rep1', ['rep1', 'src']) + test.subdir(['rep1', 'build'], ['rep1', 'build', 'var1']) + test.subdir(['rep1', 'build', 'var2']) + + # A source file in the source directory + test.write([ 'work', 'src', 'test.in' ], 'test.in') + + # A source file in a subdir of the source directory + test.subdir([ 'work', 'src', 'new_dir' ]) + test.write([ 'work', 'src', 'new_dir', 'test9.out' ], 'test9.out\n') + + # A source file in the repository + test.write([ 'rep1', 'src', 'test2.in' ], 'test2.in') + + # Some source files in the variant directory + test.write([ 'work', 'build', 'var2', 'test.in' ], 'test.old') + test.write([ 'work', 'build', 'var2', 'test2.in' ], 'test2.old') + + # An old derived file in the variant directories + test.write([ 'work', 'build', 'var1', 'test.out' ], 'test.old') + test.write([ 'work', 'build', 'var2', 'test.out' ], 'test.old') + + # And just in case we are weird, a derived file in the source + # dir. + test.write([ 'work', 'src', 'test.out' ], 'test.out.src') + + # A derived file in the repository + test.write([ 'rep1', 'build', 'var1', 'test2.out' ], 'test2.out_rep') + test.write([ 'rep1', 'build', 'var2', 'test2.out' ], 'test2.out_rep') + + os.chdir(test.workpath('work')) + + fs = SCons.Node.FS.FS(test.workpath('work')) + fs.VariantDir('build/var1', 'src', duplicate=0) + fs.VariantDir('build/var2', 'src') + f1 = fs.File('build/var1/test.in') + f1out = fs.File('build/var1/test.out') + f1out.builder = 1 + f1out_2 = fs.File('build/var1/test2.out') + f1out_2.builder = 1 + f2 = fs.File('build/var2/test.in') + f2out = fs.File('build/var2/test.out') + f2out.builder = 1 + f2out_2 = fs.File('build/var2/test2.out') + f2out_2.builder = 1 + fs.Repository(test.workpath('rep1')) + + assert f1.srcnode().path == os.path.normpath('src/test.in'),\ + f1.srcnode().path + # str(node) returns source path for duplicate = 0 + assert str(f1) == os.path.normpath('src/test.in'), str(f1) + # Build path does not exist + assert not f1.exists() + # ...but the actual file is not there... + assert not os.path.exists(f1.get_abspath()) + # And duplicate=0 should also work just like a Repository + assert f1.rexists() + # rfile() should point to the source path + assert f1.rfile().path == os.path.normpath('src/test.in'),\ + f1.rfile().path + + assert f2.srcnode().path == os.path.normpath('src/test.in'),\ + f2.srcnode().path + # str(node) returns build path for duplicate = 1 + assert str(f2) == os.path.normpath('build/var2/test.in'), str(f2) + # Build path exists + assert f2.exists() + # ...and exists() should copy the file from src to build path + assert test.read(['work', 'build', 'var2', 'test.in']) == 'test.in',\ + test.read(['work', 'build', 'var2', 'test.in']) + # Since exists() is true, so should rexists() be + assert f2.rexists() + + f3 = fs.File('build/var1/test2.in') + f4 = fs.File('build/var2/test2.in') + + assert f3.srcnode().path == os.path.normpath('src/test2.in'),\ + f3.srcnode().path + # str(node) returns source path for duplicate = 0 + assert str(f3) == os.path.normpath('src/test2.in'), str(f3) + # Build path does not exist + assert not f3.exists() + # Source path does not either + assert not f3.srcnode().exists() + # But we do have a file in the Repository + assert f3.rexists() + # rfile() should point to the source path + assert f3.rfile().path == os.path.normpath(test.workpath('rep1/src/test2.in')),\ + f3.rfile().path + + assert f4.srcnode().path == os.path.normpath('src/test2.in'),\ + f4.srcnode().path + # str(node) returns build path for duplicate = 1 + assert str(f4) == os.path.normpath('build/var2/test2.in'), str(f4) + # Build path should exist + assert f4.exists() + # ...and copy over the file into the local build path + assert test.read(['work', 'build', 'var2', 'test2.in']) == 'test2.in' + # should exist in repository, since exists() is true + assert f4.rexists() + # rfile() should point to ourselves + assert f4.rfile().path == os.path.normpath('build/var2/test2.in'),\ + f4.rfile().path + + f5 = fs.File('build/var1/test.out') + f6 = fs.File('build/var2/test.out') + + assert f5.exists() + # We should not copy the file from the source dir, since this is + # a derived file. + assert test.read(['work', 'build', 'var1', 'test.out']) == 'test.old' + + assert f6.exists() + # We should not copy the file from the source dir, since this is + # a derived file. + assert test.read(['work', 'build', 'var2', 'test.out']) == 'test.old' + + f7 = fs.File('build/var1/test2.out') + f8 = fs.File('build/var2/test2.out') + + assert not f7.exists() + assert f7.rexists() + r = f7.rfile().path + expect = os.path.normpath(test.workpath('rep1/build/var1/test2.out')) + assert r == expect, (repr(r), repr(expect)) + + assert not f8.exists() + assert f8.rexists() + assert f8.rfile().path == os.path.normpath(test.workpath('rep1/build/var2/test2.out')),\ + f8.rfile().path + + # Verify the Mkdir and Link actions are called + d9 = fs.Dir('build/var2/new_dir') + f9 = fs.File('build/var2/new_dir/test9.out') + + class MkdirAction(Action): + def __init__(self, dir_made): + self.dir_made = dir_made + def __call__(self, target, source, env, executor=None): + if executor: + target = executor.get_all_targets() + source = executor.get_all_sources() + self.dir_made.extend(target) + + save_Link = SCons.Node.FS.Link + link_made = [] + def link_func(target, source, env, link_made=link_made): + link_made.append(target) + SCons.Node.FS.Link = link_func + + try: + dir_made = [] + d9.builder = Builder(fs.Dir, action=MkdirAction(dir_made)) + d9.reset_executor() + f9.exists() + expect = os.path.join('build', 'var2', 'new_dir') + assert dir_made[0].path == expect, dir_made[0].path + expect = os.path.join('build', 'var2', 'new_dir', 'test9.out') + assert link_made[0].path == expect, link_made[0].path + assert f9.linked + finally: + SCons.Node.FS.Link = save_Link + + # Test for an interesting pathological case...we have a source + # file in a build path, but not in a source path. This can + # happen if you switch from duplicate=1 to duplicate=0, then + # delete a source file. At one time, this would cause exists() + # to return a 1 but get_contents() to throw. + test.write([ 'work', 'build', 'var1', 'asourcefile' ], 'stuff') + f10 = fs.File('build/var1/asourcefile') + assert f10.exists() + assert f10.get_contents() == 'stuff', f10.get_contents() + + f11 = fs.File('src/file11') + t, m = f11.alter_targets() + bdt = map(lambda n: n.path, t) + var1_file11 = os.path.normpath('build/var1/file11') + var2_file11 = os.path.normpath('build/var2/file11') + assert bdt == [var1_file11, var2_file11], bdt + + f12 = fs.File('src/file12') + f12.builder = 1 + bdt, m = f12.alter_targets() + assert bdt == [], map(lambda n: n.path, bdt) + + d13 = fs.Dir('src/new_dir') + t, m = d13.alter_targets() + bdt = map(lambda n: n.path, t) + var1_new_dir = os.path.normpath('build/var1/new_dir') + var2_new_dir = os.path.normpath('build/var2/new_dir') + assert bdt == [var1_new_dir, var2_new_dir], bdt + + # Test that an IOError trying to Link a src file + # into a VariantDir ends up throwing a StopError. + fIO = fs.File("build/var2/IOError") + + save_Link = SCons.Node.FS.Link + def Link_IOError(target, source, env): + raise IOError, (17, "Link_IOError") + SCons.Node.FS.Link = SCons.Action.Action(Link_IOError, None) + + test.write(['work', 'src', 'IOError'], "work/src/IOError\n") + + try: + exc_caught = 0 + try: + fIO.exists() + except SCons.Errors.StopError: + exc_caught = 1 + assert exc_caught, "Should have caught a StopError" + + finally: + SCons.Node.FS.Link = save_Link + + # Test to see if Link() works... + test.subdir('src','build') + test.write('src/foo', 'src/foo\n') + os.chmod(test.workpath('src/foo'), stat.S_IRUSR) + SCons.Node.FS.Link(fs.File(test.workpath('build/foo')), + fs.File(test.workpath('src/foo')), + None) + os.chmod(test.workpath('src/foo'), stat.S_IRUSR | stat.S_IWRITE) + st=os.stat(test.workpath('build/foo')) + assert (stat.S_IMODE(st[stat.ST_MODE]) & stat.S_IWRITE), \ + stat.S_IMODE(st[stat.ST_MODE]) + + # This used to generate a UserError when we forbid the source + # directory from being outside the top-level SConstruct dir. + fs = SCons.Node.FS.FS() + fs.VariantDir('build', '/test/foo') + + exc_caught = 0 + try: + try: + fs = SCons.Node.FS.FS() + fs.VariantDir('build', 'build/src') + except SCons.Errors.UserError: + exc_caught = 1 + assert exc_caught, "Should have caught a UserError." + finally: + test.unlink( "src/foo" ) + test.unlink( "build/foo" ) + + fs = SCons.Node.FS.FS() + fs.VariantDir('build', 'src1') + + # Calling the same VariantDir twice should work fine. + fs.VariantDir('build', 'src1') + + # Trying to move a variant dir to a second source dir + # should blow up + try: + fs.VariantDir('build', 'src2') + except SCons.Errors.UserError: + pass + else: + assert 0, "Should have caught a UserError." + + # Test against a former bug. Make sure we can get a repository + # path for the variant directory itself! + fs=SCons.Node.FS.FS(test.workpath('work')) + test.subdir('work') + fs.VariantDir('build/var3', 'src', duplicate=0) + d1 = fs.Dir('build/var3') + r = d1.rdir() + assert r == d1, "%s != %s" % (r, d1) + + # verify the link creation attempts in file_link() + class LinkSimulator : + """A class to intercept os.[sym]link() calls and track them.""" + + def __init__( self, duplicate, link, symlink, copy ) : + self.duplicate = duplicate + self.have = {} + self.have['hard'] = link + self.have['soft'] = symlink + self.have['copy'] = copy + + self.links_to_be_called = [] + for link in string.split(self.duplicate, '-'): + if self.have[link]: + self.links_to_be_called.append(link) + + def link_fail( self , src , dest ) : + next_link = self.links_to_be_called.pop(0) + assert next_link == "hard", \ + "Wrong link order: expected %s to be called "\ + "instead of hard" % next_link + raise OSError( "Simulating hard link creation error." ) + + def symlink_fail( self , src , dest ) : + next_link = self.links_to_be_called.pop(0) + assert next_link == "soft", \ + "Wrong link order: expected %s to be called "\ + "instead of soft" % next_link + raise OSError( "Simulating symlink creation error." ) + + def copy( self , src , dest ) : + next_link = self.links_to_be_called.pop(0) + assert next_link == "copy", \ + "Wrong link order: expected %s to be called "\ + "instead of copy" % next_link + # copy succeeds, but use the real copy + self.have['copy'](src, dest) + # end class LinkSimulator + + try: + SCons.Node.FS.set_duplicate("no-link-order") + assert 0, "Expected exception when passing an invalid duplicate to set_duplicate" + except SCons.Errors.InternalError: + pass + + for duplicate in SCons.Node.FS.Valid_Duplicates: + # save the real functions for later restoration + try: + real_link = os.link + except AttributeError: + real_link = None + try: + real_symlink = os.symlink + except AttributeError: + real_symlink = None + real_copy = shutil.copy2 + + simulator = LinkSimulator(duplicate, real_link, real_symlink, real_copy) + + # override the real functions with our simulation + os.link = simulator.link_fail + os.symlink = simulator.symlink_fail + shutil.copy2 = simulator.copy + + try: + + SCons.Node.FS.set_duplicate(duplicate) + + src_foo = test.workpath('src', 'foo') + build_foo = test.workpath('build', 'foo') + + test.write(src_foo, 'src/foo\n') + os.chmod(src_foo, stat.S_IRUSR) + try: + SCons.Node.FS.Link(fs.File(build_foo), + fs.File(src_foo), + None) + finally: + os.chmod(src_foo, stat.S_IRUSR | stat.S_IWRITE) + test.unlink(src_foo) + test.unlink(build_foo) + + finally: + # restore the real functions + if real_link: + os.link = real_link + else: + delattr(os, 'link') + if real_symlink: + os.symlink = real_symlink + else: + delattr(os, 'symlink') + shutil.copy2 = real_copy + + # Test VariantDir "reflection," where a same-named subdirectory + # exists underneath a variant_dir. + fs = SCons.Node.FS.FS() + fs.VariantDir('work/src/b1/b2', 'work/src') + + dir_list = [ + 'work/src', + 'work/src/b1', + 'work/src/b1/b2', + 'work/src/b1/b2/b1', + 'work/src/b1/b2/b1/b2', + 'work/src/b1/b2/b1/b2/b1', + 'work/src/b1/b2/b1/b2/b1/b2', + ] + + srcnode_map = { + 'work/src/b1/b2' : 'work/src', + 'work/src/b1/b2/f' : 'work/src/f', + 'work/src/b1/b2/b1' : 'work/src/b1/', + 'work/src/b1/b2/b1/f' : 'work/src/b1/f', + 'work/src/b1/b2/b1/b2' : 'work/src/b1/b2', + 'work/src/b1/b2/b1/b2/f' : 'work/src/b1/b2/f', + 'work/src/b1/b2/b1/b2/b1' : 'work/src/b1/b2/b1', + 'work/src/b1/b2/b1/b2/b1/f' : 'work/src/b1/b2/b1/f', + 'work/src/b1/b2/b1/b2/b1/b2' : 'work/src/b1/b2/b1/b2', + 'work/src/b1/b2/b1/b2/b1/b2/f' : 'work/src/b1/b2/b1/b2/f', + } + + alter_map = { + 'work/src' : 'work/src/b1/b2', + 'work/src/f' : 'work/src/b1/b2/f', + 'work/src/b1' : 'work/src/b1/b2/b1', + 'work/src/b1/f' : 'work/src/b1/b2/b1/f', + } + + errors = 0 + + for dir in dir_list: + dnode = fs.Dir(dir) + f = dir + '/f' + fnode = fs.File(dir + '/f') + + dp = dnode.srcnode().path + expect = os.path.normpath(srcnode_map.get(dir, dir)) + if dp != expect: + print "Dir `%s' srcnode() `%s' != expected `%s'" % (dir, dp, expect) + errors = errors + 1 + + fp = fnode.srcnode().path + expect = os.path.normpath(srcnode_map.get(f, f)) + if fp != expect: + print "File `%s' srcnode() `%s' != expected `%s'" % (f, fp, expect) + errors = errors + 1 + + for dir in dir_list: + dnode = fs.Dir(dir) + f = dir + '/f' + fnode = fs.File(dir + '/f') + + t, m = dnode.alter_targets() + tp = t[0].path + expect = os.path.normpath(alter_map.get(dir, dir)) + if tp != expect: + print "Dir `%s' alter_targets() `%s' != expected `%s'" % (dir, tp, expect) + errors = errors + 1 + + t, m = fnode.alter_targets() + tp = t[0].path + expect = os.path.normpath(alter_map.get(f, f)) + if tp != expect: + print "File `%s' alter_targets() `%s' != expected `%s'" % (f, tp, expect) + errors = errors + 1 + + self.failIf(errors) + +class BaseTestCase(_tempdirTestCase): + def test_stat(self): + """Test the Base.stat() method""" + test = self.test + test.write("e1", "e1\n") + fs = SCons.Node.FS.FS() + + e1 = fs.Entry('e1') + s = e1.stat() + assert s is not None, s + + e2 = fs.Entry('e2') + s = e2.stat() + assert s is None, s + + def test_getmtime(self): + """Test the Base.getmtime() method""" + test = self.test + test.write("file", "file\n") + fs = SCons.Node.FS.FS() + + file = fs.Entry('file') + assert file.getmtime() + + file = fs.Entry('nonexistent') + mtime = file.getmtime() + assert mtime is None, mtime + + def test_getsize(self): + """Test the Base.getsize() method""" + test = self.test + test.write("file", "file\n") + fs = SCons.Node.FS.FS() + + file = fs.Entry('file') + size = file.getsize() + assert size == 5, size + + file = fs.Entry('nonexistent') + size = file.getsize() + assert size is None, size + + def test_isdir(self): + """Test the Base.isdir() method""" + test = self.test + test.subdir('dir') + test.write("file", "file\n") + fs = SCons.Node.FS.FS() + + dir = fs.Entry('dir') + assert dir.isdir() + + file = fs.Entry('file') + assert not file.isdir() + + nonexistent = fs.Entry('nonexistent') + assert not nonexistent.isdir() + + def test_isfile(self): + """Test the Base.isfile() method""" + test = self.test + test.subdir('dir') + test.write("file", "file\n") + fs = SCons.Node.FS.FS() + + dir = fs.Entry('dir') + assert not dir.isfile() + + file = fs.Entry('file') + assert file.isfile() + + nonexistent = fs.Entry('nonexistent') + assert not nonexistent.isfile() + + if hasattr(os, 'symlink'): + def test_islink(self): + """Test the Base.islink() method""" + test = self.test + test.subdir('dir') + test.write("file", "file\n") + test.symlink("symlink", "symlink") + fs = SCons.Node.FS.FS() + + dir = fs.Entry('dir') + assert not dir.islink() + + file = fs.Entry('file') + assert not file.islink() + + symlink = fs.Entry('symlink') + assert symlink.islink() + + nonexistent = fs.Entry('nonexistent') + assert not nonexistent.islink() + +class DirNodeInfoTestCase(_tempdirTestCase): + def test___init__(self): + """Test DirNodeInfo initialization""" + ddd = self.fs.Dir('ddd') + ni = SCons.Node.FS.DirNodeInfo(ddd) + +class DirBuildInfoTestCase(_tempdirTestCase): + def test___init__(self): + """Test DirBuildInfo initialization""" + ddd = self.fs.Dir('ddd') + bi = SCons.Node.FS.DirBuildInfo(ddd) + +class FileNodeInfoTestCase(_tempdirTestCase): + def test___init__(self): + """Test FileNodeInfo initialization""" + fff = self.fs.File('fff') + ni = SCons.Node.FS.FileNodeInfo(fff) + assert isinstance(ni, SCons.Node.FS.FileNodeInfo) + + def test_update(self): + """Test updating a File.NodeInfo with on-disk information""" + test = self.test + fff = self.fs.File('fff') + + ni = SCons.Node.FS.FileNodeInfo(fff) + + test.write('fff', "fff\n") + + st = os.stat('fff') + + ni.update(fff) + + assert hasattr(ni, 'timestamp') + assert hasattr(ni, 'size') + + ni.timestamp = 0 + ni.size = 0 + + ni.update(fff) + + mtime = st[stat.ST_MTIME] + assert ni.timestamp == mtime, (ni.timestamp, mtime) + size = st[stat.ST_SIZE] + assert ni.size == size, (ni.size, size) + + import time + time.sleep(2) + + test.write('fff', "fff longer size, different time stamp\n") + + st = os.stat('fff') + + mtime = st[stat.ST_MTIME] + assert ni.timestamp != mtime, (ni.timestamp, mtime) + size = st[stat.ST_SIZE] + assert ni.size != size, (ni.size, size) + + #fff.clear() + #ni.update(fff) + + #st = os.stat('fff') + + #mtime = st[stat.ST_MTIME] + #assert ni.timestamp == mtime, (ni.timestamp, mtime) + #size = st[stat.ST_SIZE] + #assert ni.size == size, (ni.size, size) + +class FileBuildInfoTestCase(_tempdirTestCase): + def test___init__(self): + """Test File.BuildInfo initialization""" + fff = self.fs.File('fff') + bi = SCons.Node.FS.FileBuildInfo(fff) + assert bi, bi + + def test_convert_to_sconsign(self): + """Test converting to .sconsign file format""" + fff = self.fs.File('fff') + bi = SCons.Node.FS.FileBuildInfo(fff) + assert hasattr(bi, 'convert_to_sconsign') + + def test_convert_from_sconsign(self): + """Test converting from .sconsign file format""" + fff = self.fs.File('fff') + bi = SCons.Node.FS.FileBuildInfo(fff) + assert hasattr(bi, 'convert_from_sconsign') + + def test_prepare_dependencies(self): + """Test that we have a prepare_dependencies() method""" + fff = self.fs.File('fff') + bi = SCons.Node.FS.FileBuildInfo(fff) + bi.prepare_dependencies() + + def test_format(self): + """Test the format() method""" + f1 = self.fs.File('f1') + bi1 = SCons.Node.FS.FileBuildInfo(f1) + + s1sig = SCons.Node.FS.FileNodeInfo(self.fs.File('n1')) + s1sig.csig = 1 + d1sig = SCons.Node.FS.FileNodeInfo(self.fs.File('n2')) + d1sig.timestamp = 2 + i1sig = SCons.Node.FS.FileNodeInfo(self.fs.File('n3')) + i1sig.size = 3 + + bi1.bsources = [self.fs.File('s1')] + bi1.bdepends = [self.fs.File('d1')] + bi1.bimplicit = [self.fs.File('i1')] + bi1.bsourcesigs = [s1sig] + bi1.bdependsigs = [d1sig] + bi1.bimplicitsigs = [i1sig] + bi1.bact = 'action' + bi1.bactsig = 'actionsig' + + expect_lines = [ + 's1: 1 None None', + 'd1: None 2 None', + 'i1: None None 3', + 'actionsig [action]', + ] + + expect = string.join(expect_lines, '\n') + format = bi1.format() + assert format == expect, (repr(expect), repr(format)) + +class FSTestCase(_tempdirTestCase): + def test_runTest(self): + """Test FS (file system) Node operations + + This test case handles all of the file system node + tests in one environment, so we don't have to set up a + complicated directory structure for each test individually. + """ + test = self.test + + test.subdir('sub', ['sub', 'dir']) + + wp = test.workpath('') + sub = test.workpath('sub', '') + sub_dir = test.workpath('sub', 'dir', '') + sub_dir_foo = test.workpath('sub', 'dir', 'foo', '') + sub_dir_foo_bar = test.workpath('sub', 'dir', 'foo', 'bar', '') + sub_foo = test.workpath('sub', 'foo', '') + + os.chdir(sub_dir) + + fs = SCons.Node.FS.FS() + + e1 = fs.Entry('e1') + assert isinstance(e1, SCons.Node.FS.Entry) + + d1 = fs.Dir('d1') + assert isinstance(d1, SCons.Node.FS.Dir) + assert d1.cwd is d1, d1 + + f1 = fs.File('f1', directory = d1) + assert isinstance(f1, SCons.Node.FS.File) + + d1_f1 = os.path.join('d1', 'f1') + assert f1.path == d1_f1, "f1.path %s != %s" % (f1.path, d1_f1) + assert str(f1) == d1_f1, "str(f1) %s != %s" % (str(f1), d1_f1) + + x1 = d1.File('x1') + assert isinstance(x1, SCons.Node.FS.File) + assert str(x1) == os.path.join('d1', 'x1') + + x2 = d1.Dir('x2') + assert isinstance(x2, SCons.Node.FS.Dir) + assert str(x2) == os.path.join('d1', 'x2') + + x3 = d1.Entry('x3') + assert isinstance(x3, SCons.Node.FS.Entry) + assert str(x3) == os.path.join('d1', 'x3') + + assert d1.File(x1) == x1 + assert d1.Dir(x2) == x2 + assert d1.Entry(x3) == x3 + + x1.cwd = d1 + + x4 = x1.File('x4') + assert str(x4) == os.path.join('d1', 'x4') + + x5 = x1.Dir('x5') + assert str(x5) == os.path.join('d1', 'x5') + + x6 = x1.Entry('x6') + assert str(x6) == os.path.join('d1', 'x6') + x7 = x1.Entry('x7') + assert str(x7) == os.path.join('d1', 'x7') + + assert x1.File(x4) == x4 + assert x1.Dir(x5) == x5 + assert x1.Entry(x6) == x6 + assert x1.Entry(x7) == x7 + + assert x1.Entry(x5) == x5 + try: + x1.File(x5) + except TypeError: + pass + else: + raise Exception, "did not catch expected TypeError" + + assert x1.Entry(x4) == x4 + try: + x1.Dir(x4) + except TypeError: + pass + else: + raise Exception, "did not catch expected TypeError" + + x6 = x1.File(x6) + assert isinstance(x6, SCons.Node.FS.File) + + x7 = x1.Dir(x7) + assert isinstance(x7, SCons.Node.FS.Dir) + + seps = [os.sep] + if os.sep != '/': + seps = seps + ['/'] + + drive, path = os.path.splitdrive(os.getcwd()) + + def _do_Dir_test(lpath, path_, abspath_, up_path_, sep, fileSys=fs, drive=drive): + dir = fileSys.Dir(string.replace(lpath, '/', sep)) + + if os.sep != '/': + path_ = string.replace(path_, '/', os.sep) + abspath_ = string.replace(abspath_, '/', os.sep) + up_path_ = string.replace(up_path_, '/', os.sep) + + def strip_slash(p, drive=drive): + if p[-1] == os.sep and len(p) > 1: + p = p[:-1] + if p[0] == os.sep: + p = drive + p + return p + path = strip_slash(path_) + abspath = strip_slash(abspath_) + up_path = strip_slash(up_path_) + name = string.split(abspath, os.sep)[-1] + + assert dir.name == name, \ + "dir.name %s != expected name %s" % \ + (dir.name, name) + assert dir.path == path, \ + "dir.path %s != expected path %s" % \ + (dir.path, path) + assert str(dir) == path, \ + "str(dir) %s != expected path %s" % \ + (str(dir), path) + assert dir.get_abspath() == abspath, \ + "dir.abspath %s != expected absolute path %s" % \ + (dir.get_abspath(), abspath) + assert dir.up().path == up_path, \ + "dir.up().path %s != expected parent path %s" % \ + (dir.up().path, up_path) + + for sep in seps: + + def Dir_test(lpath, path_, abspath_, up_path_, sep=sep, func=_do_Dir_test): + return func(lpath, path_, abspath_, up_path_, sep) + + Dir_test('', './', sub_dir, sub) + Dir_test('foo', 'foo/', sub_dir_foo, './') + Dir_test('foo/bar', 'foo/bar/', sub_dir_foo_bar, 'foo/') + Dir_test('/foo', '/foo/', '/foo/', '/') + Dir_test('/foo/bar', '/foo/bar/', '/foo/bar/', '/foo/') + Dir_test('..', sub, sub, wp) + Dir_test('foo/..', './', sub_dir, sub) + Dir_test('../foo', sub_foo, sub_foo, sub) + Dir_test('.', './', sub_dir, sub) + Dir_test('./.', './', sub_dir, sub) + Dir_test('foo/./bar', 'foo/bar/', sub_dir_foo_bar, 'foo/') + Dir_test('#../foo', sub_foo, sub_foo, sub) + Dir_test('#/../foo', sub_foo, sub_foo, sub) + Dir_test('#foo/bar', 'foo/bar/', sub_dir_foo_bar, 'foo/') + Dir_test('#/foo/bar', 'foo/bar/', sub_dir_foo_bar, 'foo/') + Dir_test('#', './', sub_dir, sub) + + try: + f2 = fs.File(string.join(['f1', 'f2'], sep), directory = d1) + except TypeError, x: + assert str(x) == ("Tried to lookup File '%s' as a Dir." % + d1_f1), x + except: + raise + + try: + dir = fs.Dir(string.join(['d1', 'f1'], sep)) + except TypeError, x: + assert str(x) == ("Tried to lookup File '%s' as a Dir." % + d1_f1), x + except: + raise + + try: + f2 = fs.File('d1') + except TypeError, x: + assert str(x) == ("Tried to lookup Dir '%s' as a File." % + 'd1'), x + except: + raise + + # Test that just specifying the drive works to identify + # its root directory. + p = os.path.abspath(test.workpath('root_file')) + drive, path = os.path.splitdrive(p) + if drive: + # The assert below probably isn't correct for the general + # case, but it works for Windows, which covers a lot + # of ground... + dir = fs.Dir(drive) + assert str(dir) == drive + os.sep, str(dir) + + # Make sure that lookups with and without the drive are + # equivalent. + p = os.path.abspath(test.workpath('some/file')) + drive, path = os.path.splitdrive(p) + + e1 = fs.Entry(p) + e2 = fs.Entry(path) + assert e1 is e2, (e1, e2) + assert str(e1) is str(e2), (str(e1), str(e2)) + + # Test for a bug in 0.04 that did not like looking up + # dirs with a trailing slash on Windows. + d=fs.Dir('./') + assert d.path == '.', d.abspath + d=fs.Dir('foo/') + assert d.path == 'foo', d.abspath + + # Test for sub-classing of node building. + global built_it + + built_it = None + assert not built_it + d1.add_source([SCons.Node.Node()]) # XXX FAKE SUBCLASS ATTRIBUTE + d1.builder_set(Builder(fs.File)) + d1.reset_executor() + d1.env_set(Environment()) + d1.build() + assert built_it + + built_it = None + assert not built_it + f1.add_source([SCons.Node.Node()]) # XXX FAKE SUBCLASS ATTRIBUTE + f1.builder_set(Builder(fs.File)) + f1.reset_executor() + f1.env_set(Environment()) + f1.build() + assert built_it + + def match(path, expect): + expect = string.replace(expect, '/', os.sep) + assert path == expect, "path %s != expected %s" % (path, expect) + + e1 = fs.Entry("d1") + assert e1.__class__.__name__ == 'Dir' + match(e1.path, "d1") + match(e1.dir.path, ".") + + e2 = fs.Entry("d1/f1") + assert e2.__class__.__name__ == 'File' + match(e2.path, "d1/f1") + match(e2.dir.path, "d1") + + e3 = fs.Entry("e3") + assert e3.__class__.__name__ == 'Entry' + match(e3.path, "e3") + match(e3.dir.path, ".") + + e4 = fs.Entry("d1/e4") + assert e4.__class__.__name__ == 'Entry' + match(e4.path, "d1/e4") + match(e4.dir.path, "d1") + + e5 = fs.Entry("e3/e5") + assert e3.__class__.__name__ == 'Dir' + match(e3.path, "e3") + match(e3.dir.path, ".") + assert e5.__class__.__name__ == 'Entry' + match(e5.path, "e3/e5") + match(e5.dir.path, "e3") + + e6 = fs.Dir("d1/e4") + assert e6 is e4 + assert e4.__class__.__name__ == 'Dir' + match(e4.path, "d1/e4") + match(e4.dir.path, "d1") + + e7 = fs.File("e3/e5") + assert e7 is e5 + assert e5.__class__.__name__ == 'File' + match(e5.path, "e3/e5") + match(e5.dir.path, "e3") + + fs.chdir(fs.Dir('subdir')) + f11 = fs.File("f11") + match(f11.path, "subdir/f11") + d12 = fs.Dir("d12") + e13 = fs.Entry("subdir/e13") + match(e13.path, "subdir/subdir/e13") + fs.chdir(fs.Dir('..')) + + # Test scanning + f1.builder_set(Builder(fs.File)) + f1.env_set(Environment()) + xyz = fs.File("xyz") + f1.builder.target_scanner = Scanner(xyz) + + f1.scan() + assert f1.implicit[0].path == "xyz" + f1.implicit = [] + f1.scan() + assert f1.implicit == [] + f1.implicit = None + f1.scan() + assert f1.implicit[0].path == "xyz" + + # Test underlying scanning functionality in get_found_includes() + env = Environment() + f12 = fs.File("f12") + t1 = fs.File("t1") + + deps = f12.get_found_includes(env, None, t1) + assert deps == [], deps + + class MyScanner(Scanner): + call_count = 0 + def __call__(self, node, env, path): + self.call_count = self.call_count + 1 + return Scanner.__call__(self, node, env, path) + s = MyScanner(xyz) + + deps = f12.get_found_includes(env, s, t1) + assert deps == [xyz], deps + assert s.call_count == 1, s.call_count + + f12.built() + + deps = f12.get_found_includes(env, s, t1) + assert deps == [xyz], deps + assert s.call_count == 2, s.call_count + + env2 = Environment() + + deps = f12.get_found_includes(env2, s, t1) + assert deps == [xyz], deps + assert s.call_count == 3, s.call_count + + + + # Make sure we can scan this file even if the target isn't + # a file that has a scanner (it might be an Alias, e.g.). + class DummyNode: + pass + + deps = f12.get_found_includes(env, s, DummyNode()) + assert deps == [xyz], deps + + # Test building a file whose directory is not there yet... + f1 = fs.File(test.workpath("foo/bar/baz/ack")) + assert not f1.dir.exists() + f1.prepare() + f1.build() + assert f1.dir.exists() + + os.chdir('..') + + # Test getcwd() + fs = SCons.Node.FS.FS() + assert str(fs.getcwd()) == ".", str(fs.getcwd()) + fs.chdir(fs.Dir('subdir')) + # The cwd's path is always "." + assert str(fs.getcwd()) == ".", str(fs.getcwd()) + assert fs.getcwd().path == 'subdir', fs.getcwd().path + fs.chdir(fs.Dir('../..')) + assert fs.getcwd().path == test.workdir, fs.getcwd().path + + f1 = fs.File(test.workpath("do_i_exist")) + assert not f1.exists() + test.write("do_i_exist","\n") + assert not f1.exists(), "exists() call not cached" + f1.built() + assert f1.exists(), "exists() call caching not reset" + test.unlink("do_i_exist") + assert f1.exists() + f1.built() + assert not f1.exists() + + # For some reason, in Windows, the \x1a character terminates + # the reading of files in text mode. This tests that + # get_contents() returns the binary contents. + test.write("binary_file", "Foo\x1aBar") + f1 = fs.File(test.workpath("binary_file")) + assert f1.get_contents() == "Foo\x1aBar", f1.get_contents() + + try: + # TODO(1.5) + eval('test_string = u"Foo\x1aBar"') + except SyntaxError: + pass + else: + # This tests to make sure we can decode UTF-8 text files. + test.write("utf8_file", test_string.encode('utf-8')) + f1 = fs.File(test.workpath("utf8_file")) + assert eval('f1.get_text_contents() == u"Foo\x1aBar"'), \ + f1.get_text_contents() + + def nonexistent(method, s): + try: + x = method(s, create = 0) + except SCons.Errors.UserError: + pass + else: + raise Exception, "did not catch expected UserError" + + nonexistent(fs.Entry, 'nonexistent') + nonexistent(fs.Entry, 'nonexistent/foo') + + nonexistent(fs.File, 'nonexistent') + nonexistent(fs.File, 'nonexistent/foo') + + nonexistent(fs.Dir, 'nonexistent') + nonexistent(fs.Dir, 'nonexistent/foo') + + test.write("preserve_me", "\n") + assert os.path.exists(test.workpath("preserve_me")) + f1 = fs.File(test.workpath("preserve_me")) + f1.prepare() + assert os.path.exists(test.workpath("preserve_me")) + + test.write("remove_me", "\n") + assert os.path.exists(test.workpath("remove_me")) + f1 = fs.File(test.workpath("remove_me")) + f1.builder = Builder(fs.File) + f1.env_set(Environment()) + f1.prepare() + assert not os.path.exists(test.workpath("remove_me")) + + e = fs.Entry('e_local') + assert not hasattr(e, '_local') + e.set_local() + assert e._local == 1 + f = fs.File('e_local') + assert f._local == 1 + f = fs.File('f_local') + assert f._local == 0 + + #XXX test_is_up_to_date() for directories + + #XXX test_sconsign() for directories + + #XXX test_set_signature() for directories + + #XXX test_build() for directories + + #XXX test_root() + + # test Entry.get_contents() + e = fs.Entry('does_not_exist') + c = e.get_contents() + assert c == "", c + assert e.__class__ == SCons.Node.FS.Entry + + test.write("file", "file\n") + try: + e = fs.Entry('file') + c = e.get_contents() + assert c == "file\n", c + assert e.__class__ == SCons.Node.FS.File + finally: + test.unlink("file") + + # test Entry.get_text_contents() + e = fs.Entry('does_not_exist') + c = e.get_text_contents() + assert c == "", c + assert e.__class__ == SCons.Node.FS.Entry + + test.write("file", "file\n") + try: + e = fs.Entry('file') + c = e.get_text_contents() + assert c == "file\n", c + assert e.__class__ == SCons.Node.FS.File + finally: + test.unlink("file") + + test.subdir("dir") + e = fs.Entry('dir') + c = e.get_contents() + assert c == "", c + assert e.__class__ == SCons.Node.FS.Dir + + c = e.get_text_contents() + try: + eval('assert c == u"", c') + except SyntaxError: + assert c == "" + + if hasattr(os, 'symlink'): + os.symlink('nonexistent', test.workpath('dangling_symlink')) + e = fs.Entry('dangling_symlink') + c = e.get_contents() + assert e.__class__ == SCons.Node.FS.Entry, e.__class__ + assert c == "", c + c = e.get_text_contents() + try: + eval('assert c == u"", c') + except SyntaxError: + assert c == "", c + + test.write("tstamp", "tstamp\n") + try: + # Okay, *this* manipulation accomodates Windows FAT file systems + # that only have two-second granularity on their timestamps. + # We round down the current time to the nearest even integer + # value, subtract two to make sure the timestamp is not "now," + # and then convert it back to a float. + tstamp = float(int(time.time() / 2) * 2) - 2 + os.utime(test.workpath("tstamp"), (tstamp - 2.0, tstamp)) + f = fs.File("tstamp") + t = f.get_timestamp() + assert t == tstamp, "expected %f, got %f" % (tstamp, t) + finally: + test.unlink("tstamp") + + test.subdir('tdir1') + d = fs.Dir('tdir1') + t = d.get_timestamp() + assert t == 0, "expected 0, got %s" % str(t) + + test.subdir('tdir2') + f1 = test.workpath('tdir2', 'file1') + f2 = test.workpath('tdir2', 'file2') + test.write(f1, 'file1\n') + test.write(f2, 'file2\n') + current_time = float(int(time.time() / 2) * 2) + t1 = current_time - 4.0 + t2 = current_time - 2.0 + os.utime(f1, (t1 - 2.0, t1)) + os.utime(f2, (t2 - 2.0, t2)) + d = fs.Dir('tdir2') + fs.File(f1) + fs.File(f2) + t = d.get_timestamp() + assert t == t2, "expected %f, got %f" % (t2, t) + + skey = fs.Entry('eee.x').scanner_key() + assert skey == '.x', skey + skey = fs.Entry('eee.xyz').scanner_key() + assert skey == '.xyz', skey + + skey = fs.File('fff.x').scanner_key() + assert skey == '.x', skey + skey = fs.File('fff.xyz').scanner_key() + assert skey == '.xyz', skey + + skey = fs.Dir('ddd.x').scanner_key() + assert skey is None, skey + + test.write("i_am_not_a_directory", "\n") + try: + exc_caught = 0 + try: + fs.Dir(test.workpath("i_am_not_a_directory")) + except TypeError: + exc_caught = 1 + assert exc_caught, "Should have caught a TypeError" + finally: + test.unlink("i_am_not_a_directory") + + exc_caught = 0 + try: + fs.File(sub_dir) + except TypeError: + exc_caught = 1 + assert exc_caught, "Should have caught a TypeError" + + # XXX test_is_up_to_date() + + d = fs.Dir('dir') + r = d.remove() + assert r is None, r + + f = fs.File('does_not_exist') + r = f.remove() + assert r is None, r + + test.write('exists', "exists\n") + f = fs.File('exists') + r = f.remove() + assert r, r + assert not os.path.exists(test.workpath('exists')), "exists was not removed" + + symlink = test.workpath('symlink') + try: + os.symlink(test.workpath('does_not_exist'), symlink) + assert os.path.islink(symlink) + f = fs.File('symlink') + r = f.remove() + assert r, r + assert not os.path.islink(symlink), "symlink was not removed" + except AttributeError: + pass + + test.write('can_not_remove', "can_not_remove\n") + test.writable(test.workpath('.'), 0) + fp = open(test.workpath('can_not_remove')) + + f = fs.File('can_not_remove') + exc_caught = 0 + try: + r = f.remove() + except OSError: + exc_caught = 1 + + fp.close() + + assert exc_caught, "Should have caught an OSError, r = " + str(r) + + f = fs.Entry('foo/bar/baz') + assert f.for_signature() == 'baz', f.for_signature() + assert f.get_string(0) == os.path.normpath('foo/bar/baz'), \ + f.get_string(0) + assert f.get_string(1) == 'baz', f.get_string(1) + + def test_drive_letters(self): + """Test drive-letter look-ups""" + + test = self.test + + test.subdir('sub', ['sub', 'dir']) + + def drive_workpath(drive, dirs, test=test): + x = apply(test.workpath, dirs) + drive, path = os.path.splitdrive(x) + return 'X:' + path + + wp = drive_workpath('X:', ['']) + + if wp[-1] in (os.sep, '/'): + tmp = os.path.split(wp[:-1])[0] + else: + tmp = os.path.split(wp)[0] + + parent_tmp = os.path.split(tmp)[0] + if parent_tmp == 'X:': + parent_tmp = 'X:' + os.sep + + tmp_foo = os.path.join(tmp, 'foo') + + foo = drive_workpath('X:', ['foo']) + foo_bar = drive_workpath('X:', ['foo', 'bar']) + sub = drive_workpath('X:', ['sub', '']) + sub_dir = drive_workpath('X:', ['sub', 'dir', '']) + sub_dir_foo = drive_workpath('X:', ['sub', 'dir', 'foo', '']) + sub_dir_foo_bar = drive_workpath('X:', ['sub', 'dir', 'foo', 'bar', '']) + sub_foo = drive_workpath('X:', ['sub', 'foo', '']) + + fs = SCons.Node.FS.FS() + + seps = [os.sep] + if os.sep != '/': + seps = seps + ['/'] + + def _do_Dir_test(lpath, path_, up_path_, sep, fileSys=fs): + dir = fileSys.Dir(string.replace(lpath, '/', sep)) + + if os.sep != '/': + path_ = string.replace(path_, '/', os.sep) + up_path_ = string.replace(up_path_, '/', os.sep) + + def strip_slash(p): + if p[-1] == os.sep and len(p) > 3: + p = p[:-1] + return p + path = strip_slash(path_) + up_path = strip_slash(up_path_) + name = string.split(path, os.sep)[-1] + + assert dir.name == name, \ + "dir.name %s != expected name %s" % \ + (dir.name, name) + assert dir.path == path, \ + "dir.path %s != expected path %s" % \ + (dir.path, path) + assert str(dir) == path, \ + "str(dir) %s != expected path %s" % \ + (str(dir), path) + assert dir.up().path == up_path, \ + "dir.up().path %s != expected parent path %s" % \ + (dir.up().path, up_path) + + save_os_path = os.path + save_os_sep = os.sep + try: + import ntpath + os.path = ntpath + os.sep = '\\' + SCons.Node.FS.initialize_do_splitdrive() + SCons.Node.FS.initialize_normpath_check() + + for sep in seps: + + def Dir_test(lpath, path_, up_path_, sep=sep, func=_do_Dir_test): + return func(lpath, path_, up_path_, sep) + + Dir_test('#X:', wp, tmp) + Dir_test('X:foo', foo, wp) + Dir_test('X:foo/bar', foo_bar, foo) + Dir_test('X:/foo', 'X:/foo', 'X:/') + Dir_test('X:/foo/bar', 'X:/foo/bar/', 'X:/foo/') + Dir_test('X:..', tmp, parent_tmp) + Dir_test('X:foo/..', wp, tmp) + Dir_test('X:../foo', tmp_foo, tmp) + Dir_test('X:.', wp, tmp) + Dir_test('X:./.', wp, tmp) + Dir_test('X:foo/./bar', foo_bar, foo) + Dir_test('#X:../foo', tmp_foo, tmp) + Dir_test('#X:/../foo', tmp_foo, tmp) + Dir_test('#X:foo/bar', foo_bar, foo) + Dir_test('#X:/foo/bar', foo_bar, foo) + Dir_test('#X:/', wp, tmp) + finally: + os.path = save_os_path + os.sep = save_os_sep + SCons.Node.FS.initialize_do_splitdrive() + SCons.Node.FS.initialize_normpath_check() + + def test_target_from_source(self): + """Test the method for generating target nodes from sources""" + fs = self.fs + + x = fs.File('x.c') + t = x.target_from_source('pre-', '-suf') + assert str(t) == 'pre-x-suf', str(t) + assert t.__class__ == SCons.Node.FS.Entry + + y = fs.File('dir/y') + t = y.target_from_source('pre-', '-suf') + assert str(t) == os.path.join('dir', 'pre-y-suf'), str(t) + assert t.__class__ == SCons.Node.FS.Entry + + z = fs.File('zz') + t = z.target_from_source('pre-', '-suf', lambda x: x[:-1]) + assert str(t) == 'pre-z-suf', str(t) + assert t.__class__ == SCons.Node.FS.Entry + + d = fs.Dir('ddd') + t = d.target_from_source('pre-', '-suf') + assert str(t) == 'pre-ddd-suf', str(t) + assert t.__class__ == SCons.Node.FS.Entry + + e = fs.Entry('eee') + t = e.target_from_source('pre-', '-suf') + assert str(t) == 'pre-eee-suf', str(t) + assert t.__class__ == SCons.Node.FS.Entry + + def test_same_name(self): + """Test that a local same-named file isn't found for a Dir lookup""" + test = self.test + fs = self.fs + + test.subdir('subdir') + test.write(['subdir', 'build'], "subdir/build\n") + + subdir = fs.Dir('subdir') + fs.chdir(subdir, change_os_dir=1) + self.fs._lookup('#build/file', subdir, SCons.Node.FS.File) + + def test_above_root(self): + """Testing looking up a path above the root directory""" + test = self.test + fs = self.fs + + d1 = fs.Dir('d1') + d2 = d1.Dir('d2') + dirs = string.split(os.path.normpath(d2.abspath), os.sep) + above_path = apply(os.path.join, ['..']*len(dirs) + ['above']) + above = d2.Dir(above_path) + + def test_rel_path(self): + """Test the rel_path() method""" + test = self.test + fs = self.fs + + d1 = fs.Dir('d1') + d1_f = d1.File('f') + d1_d2 = d1.Dir('d2') + d1_d2_f = d1_d2.File('f') + + d3 = fs.Dir('d3') + d3_f = d3.File('f') + d3_d4 = d3.Dir('d4') + d3_d4_f = d3_d4.File('f') + + cases = [ + d1, d1, '.', + d1, d1_f, 'f', + d1, d1_d2, 'd2', + d1, d1_d2_f, 'd2/f', + d1, d3, '../d3', + d1, d3_f, '../d3/f', + d1, d3_d4, '../d3/d4', + d1, d3_d4_f, '../d3/d4/f', + + d1_f, d1, '.', + d1_f, d1_f, 'f', + d1_f, d1_d2, 'd2', + d1_f, d1_d2_f, 'd2/f', + d1_f, d3, '../d3', + d1_f, d3_f, '../d3/f', + d1_f, d3_d4, '../d3/d4', + d1_f, d3_d4_f, '../d3/d4/f', + + d1_d2, d1, '..', + d1_d2, d1_f, '../f', + d1_d2, d1_d2, '.', + d1_d2, d1_d2_f, 'f', + d1_d2, d3, '../../d3', + d1_d2, d3_f, '../../d3/f', + d1_d2, d3_d4, '../../d3/d4', + d1_d2, d3_d4_f, '../../d3/d4/f', + + d1_d2_f, d1, '..', + d1_d2_f, d1_f, '../f', + d1_d2_f, d1_d2, '.', + d1_d2_f, d1_d2_f, 'f', + d1_d2_f, d3, '../../d3', + d1_d2_f, d3_f, '../../d3/f', + d1_d2_f, d3_d4, '../../d3/d4', + d1_d2_f, d3_d4_f, '../../d3/d4/f', + ] + + if sys.platform in ('win32',): + x_d1 = fs.Dir(r'X:\d1') + x_d1_d2 = x_d1.Dir('d2') + y_d1 = fs.Dir(r'Y:\d1') + y_d1_d2 = y_d1.Dir('d2') + y_d2 = fs.Dir(r'Y:\d2') + + win32_cases = [ + x_d1, x_d1, '.', + x_d1, x_d1_d2, 'd2', + x_d1, y_d1, r'Y:\d1', + x_d1, y_d1_d2, r'Y:\d1\d2', + x_d1, y_d2, r'Y:\d2', + ] + + cases.extend(win32_cases) + + failed = 0 + while cases: + dir, other, expect = cases[:3] + expect = os.path.normpath(expect) + del cases[:3] + result = dir.rel_path(other) + if result != expect: + if failed == 0: print + fmt = " dir_path(%(dir)s, %(other)s) => '%(result)s' did not match '%(expect)s'" + print fmt % locals() + failed = failed + 1 + assert failed == 0, "%d rel_path() cases failed" % failed + + def test_proxy(self): + """Test a Node.FS object wrapped in a proxy instance""" + f1 = self.fs.File('fff') + class Proxy: + # Simplest possibly Proxy class that works for our test, + # this is stripped down from SCons.Util.Proxy. + def __init__(self, subject): + self.__subject = subject + def __getattr__(self, name): + return getattr(self.__subject, name) + p = Proxy(f1) + f2 = self.fs.Entry(p) + assert f1 is f2, (f1, f2) + + + +class DirTestCase(_tempdirTestCase): + + def test__morph(self): + """Test handling of actions when morphing an Entry into a Dir""" + test = self.test + e = self.fs.Entry('eee') + x = e.get_executor() + x.add_pre_action('pre') + x.add_post_action('post') + e.must_be_same(SCons.Node.FS.Dir) + a = x.get_action_list() + assert a[0] == 'pre', a + assert a[2] == 'post', a + + def test_subclass(self): + """Test looking up subclass of Dir nodes""" + class DirSubclass(SCons.Node.FS.Dir): + pass + sd = self.fs._lookup('special_dir', None, DirSubclass, create=1) + sd.must_be_same(SCons.Node.FS.Dir) + + def test_get_env_scanner(self): + """Test the Dir.get_env_scanner() method + """ + import SCons.Defaults + d = self.fs.Dir('ddd') + s = d.get_env_scanner(Environment()) + assert s is SCons.Defaults.DirEntryScanner, s + + def test_get_target_scanner(self): + """Test the Dir.get_target_scanner() method + """ + import SCons.Defaults + d = self.fs.Dir('ddd') + s = d.get_target_scanner() + assert s is SCons.Defaults.DirEntryScanner, s + + def test_scan(self): + """Test scanning a directory for in-memory entries + """ + fs = self.fs + + dir = fs.Dir('ddd') + fs.File(os.path.join('ddd', 'f1')) + fs.File(os.path.join('ddd', 'f2')) + fs.File(os.path.join('ddd', 'f3')) + fs.Dir(os.path.join('ddd', 'd1')) + fs.Dir(os.path.join('ddd', 'd1', 'f4')) + fs.Dir(os.path.join('ddd', 'd1', 'f5')) + dir.scan() + kids = map(lambda x: x.path, dir.children(None)) + kids.sort() + assert kids == [os.path.join('ddd', 'd1'), + os.path.join('ddd', 'f1'), + os.path.join('ddd', 'f2'), + os.path.join('ddd', 'f3')], kids + + def test_get_contents(self): + """Test getting the contents for a directory. + """ + test = self.test + + test.subdir('d') + test.write(['d', 'g'], "67890\n") + test.write(['d', 'f'], "12345\n") + test.subdir(['d','sub']) + test.write(['d', 'sub','h'], "abcdef\n") + test.subdir(['d','empty']) + + d = self.fs.Dir('d') + g = self.fs.File(os.path.join('d', 'g')) + f = self.fs.File(os.path.join('d', 'f')) + h = self.fs.File(os.path.join('d', 'sub', 'h')) + e = self.fs.Dir(os.path.join('d', 'empty')) + s = self.fs.Dir(os.path.join('d', 'sub')) + + #TODO(1.5) files = d.get_contents().split('\n') + files = string.split(d.get_contents(), '\n') + + assert e.get_contents() == '', e.get_contents() + assert e.get_text_contents() == '', e.get_text_contents() + assert e.get_csig()+" empty" == files[0], files + assert f.get_csig()+" f" == files[1], files + assert g.get_csig()+" g" == files[2], files + assert s.get_csig()+" sub" == files[3], files + + def test_implicit_re_scans(self): + """Test that adding entries causes a directory to be re-scanned + """ + + fs = self.fs + + dir = fs.Dir('ddd') + + fs.File(os.path.join('ddd', 'f1')) + dir.scan() + kids = map(lambda x: x.path, dir.children()) + kids.sort() + assert kids == [os.path.join('ddd', 'f1')], kids + + fs.File(os.path.join('ddd', 'f2')) + dir.scan() + kids = map(lambda x: x.path, dir.children()) + kids.sort() + assert kids == [os.path.join('ddd', 'f1'), + os.path.join('ddd', 'f2')], kids + + def test_entry_exists_on_disk(self): + """Test the Dir.entry_exists_on_disk() method + """ + test = self.test + + does_not_exist = self.fs.Dir('does_not_exist') + assert not does_not_exist.entry_exists_on_disk('foo') + + test.subdir('d') + test.write(['d', 'exists'], "d/exists\n") + test.write(['d', 'Case-Insensitive'], "d/Case-Insensitive\n") + + d = self.fs.Dir('d') + assert d.entry_exists_on_disk('exists') + assert not d.entry_exists_on_disk('does_not_exist') + + if os.path.normcase("TeSt") != os.path.normpath("TeSt") or sys.platform == "cygwin": + assert d.entry_exists_on_disk('case-insensitive') + + def test_srcdir_list(self): + """Test the Dir.srcdir_list() method + """ + src = self.fs.Dir('src') + bld = self.fs.Dir('bld') + sub1 = bld.Dir('sub') + sub2 = sub1.Dir('sub') + sub3 = sub2.Dir('sub') + self.fs.VariantDir(bld, src, duplicate=0) + self.fs.VariantDir(sub2, src, duplicate=0) + + def check(result, expect): + result = map(str, result) + expect = map(os.path.normpath, expect) + assert result == expect, result + + s = src.srcdir_list() + check(s, []) + + s = bld.srcdir_list() + check(s, ['src']) + + s = sub1.srcdir_list() + check(s, ['src/sub']) + + s = sub2.srcdir_list() + check(s, ['src', 'src/sub/sub']) + + s = sub3.srcdir_list() + check(s, ['src/sub', 'src/sub/sub/sub']) + + self.fs.VariantDir('src/b1/b2', 'src') + b1 = src.Dir('b1') + b1_b2 = b1.Dir('b2') + b1_b2_b1 = b1_b2.Dir('b1') + b1_b2_b1_b2 = b1_b2_b1.Dir('b2') + b1_b2_b1_b2_sub = b1_b2_b1_b2.Dir('sub') + + s = b1.srcdir_list() + check(s, []) + + s = b1_b2.srcdir_list() + check(s, ['src']) + + s = b1_b2_b1.srcdir_list() + check(s, ['src/b1']) + + s = b1_b2_b1_b2.srcdir_list() + check(s, ['src/b1/b2']) + + s = b1_b2_b1_b2_sub.srcdir_list() + check(s, ['src/b1/b2/sub']) + + def test_srcdir_duplicate(self): + """Test the Dir.srcdir_duplicate() method + """ + test = self.test + + test.subdir('src0') + test.write(['src0', 'exists'], "src0/exists\n") + + bld0 = self.fs.Dir('bld0') + src0 = self.fs.Dir('src0') + self.fs.VariantDir(bld0, src0, duplicate=0) + + n = bld0.srcdir_duplicate('does_not_exist') + assert n is None, n + assert not os.path.exists(test.workpath('bld0', 'does_not_exist')) + + n = bld0.srcdir_duplicate('exists') + assert str(n) == os.path.normpath('src0/exists'), str(n) + assert not os.path.exists(test.workpath('bld0', 'exists')) + + test.subdir('src1') + test.write(['src1', 'exists'], "src0/exists\n") + + bld1 = self.fs.Dir('bld1') + src1 = self.fs.Dir('src1') + self.fs.VariantDir(bld1, src1, duplicate=1) + + n = bld1.srcdir_duplicate('does_not_exist') + assert n is None, n + assert not os.path.exists(test.workpath('bld1', 'does_not_exist')) + + n = bld1.srcdir_duplicate('exists') + assert str(n) == os.path.normpath('bld1/exists'), str(n) + assert os.path.exists(test.workpath('bld1', 'exists')) + + def test_srcdir_find_file(self): + """Test the Dir.srcdir_find_file() method + """ + test = self.test + + return_true = lambda: 1 + + test.subdir('src0') + test.write(['src0', 'on-disk-f1'], "src0/on-disk-f1\n") + test.write(['src0', 'on-disk-f2'], "src0/on-disk-f2\n") + test.write(['src0', 'on-disk-e1'], "src0/on-disk-e1\n") + test.write(['src0', 'on-disk-e2'], "src0/on-disk-e2\n") + + bld0 = self.fs.Dir('bld0') + src0 = self.fs.Dir('src0') + self.fs.VariantDir(bld0, src0, duplicate=0) + + derived_f = src0.File('derived-f') + derived_f.is_derived = return_true + exists_f = src0.File('exists-f') + exists_f.exists = return_true + + derived_e = src0.Entry('derived-e') + derived_e.is_derived = return_true + exists_e = src0.Entry('exists-e') + exists_e.exists = return_true + + def check(result, expect): + result = map(str, result) + expect = map(os.path.normpath, expect) + assert result == expect, result + + # First check from the source directory. + n = src0.srcdir_find_file('does_not_exist') + assert n == (None, None), n + + n = src0.srcdir_find_file('derived-f') + check(n, ['src0/derived-f', 'src0']) + n = src0.srcdir_find_file('exists-f') + check(n, ['src0/exists-f', 'src0']) + n = src0.srcdir_find_file('on-disk-f1') + check(n, ['src0/on-disk-f1', 'src0']) + + n = src0.srcdir_find_file('derived-e') + check(n, ['src0/derived-e', 'src0']) + n = src0.srcdir_find_file('exists-e') + check(n, ['src0/exists-e', 'src0']) + n = src0.srcdir_find_file('on-disk-e1') + check(n, ['src0/on-disk-e1', 'src0']) + + # Now check from the variant directory. + n = bld0.srcdir_find_file('does_not_exist') + assert n == (None, None), n + + n = bld0.srcdir_find_file('derived-f') + check(n, ['src0/derived-f', 'bld0']) + n = bld0.srcdir_find_file('exists-f') + check(n, ['src0/exists-f', 'bld0']) + n = bld0.srcdir_find_file('on-disk-f2') + check(n, ['src0/on-disk-f2', 'bld0']) + + n = bld0.srcdir_find_file('derived-e') + check(n, ['src0/derived-e', 'bld0']) + n = bld0.srcdir_find_file('exists-e') + check(n, ['src0/exists-e', 'bld0']) + n = bld0.srcdir_find_file('on-disk-e2') + check(n, ['src0/on-disk-e2', 'bld0']) + + test.subdir('src1') + test.write(['src1', 'on-disk-f1'], "src1/on-disk-f1\n") + test.write(['src1', 'on-disk-f2'], "src1/on-disk-f2\n") + test.write(['src1', 'on-disk-e1'], "src1/on-disk-e1\n") + test.write(['src1', 'on-disk-e2'], "src1/on-disk-e2\n") + + bld1 = self.fs.Dir('bld1') + src1 = self.fs.Dir('src1') + self.fs.VariantDir(bld1, src1, duplicate=1) + + derived_f = src1.File('derived-f') + derived_f.is_derived = return_true + exists_f = src1.File('exists-f') + exists_f.exists = return_true + + derived_e = src1.Entry('derived-e') + derived_e.is_derived = return_true + exists_e = src1.Entry('exists-e') + exists_e.exists = return_true + + # First check from the source directory. + n = src1.srcdir_find_file('does_not_exist') + assert n == (None, None), n + + n = src1.srcdir_find_file('derived-f') + check(n, ['src1/derived-f', 'src1']) + n = src1.srcdir_find_file('exists-f') + check(n, ['src1/exists-f', 'src1']) + n = src1.srcdir_find_file('on-disk-f1') + check(n, ['src1/on-disk-f1', 'src1']) + + n = src1.srcdir_find_file('derived-e') + check(n, ['src1/derived-e', 'src1']) + n = src1.srcdir_find_file('exists-e') + check(n, ['src1/exists-e', 'src1']) + n = src1.srcdir_find_file('on-disk-e1') + check(n, ['src1/on-disk-e1', 'src1']) + + # Now check from the variant directory. + n = bld1.srcdir_find_file('does_not_exist') + assert n == (None, None), n + + n = bld1.srcdir_find_file('derived-f') + check(n, ['bld1/derived-f', 'src1']) + n = bld1.srcdir_find_file('exists-f') + check(n, ['bld1/exists-f', 'src1']) + n = bld1.srcdir_find_file('on-disk-f2') + check(n, ['bld1/on-disk-f2', 'bld1']) + + n = bld1.srcdir_find_file('derived-e') + check(n, ['bld1/derived-e', 'src1']) + n = bld1.srcdir_find_file('exists-e') + check(n, ['bld1/exists-e', 'src1']) + n = bld1.srcdir_find_file('on-disk-e2') + check(n, ['bld1/on-disk-e2', 'bld1']) + + def test_dir_on_disk(self): + """Test the Dir.dir_on_disk() method""" + self.test.subdir('sub', ['sub', 'exists']) + self.test.write(['sub', 'file'], "self/file\n") + sub = self.fs.Dir('sub') + + r = sub.dir_on_disk('does_not_exist') + assert not r, r + + r = sub.dir_on_disk('exists') + assert r, r + + r = sub.dir_on_disk('file') + assert not r, r + + def test_file_on_disk(self): + """Test the Dir.file_on_disk() method""" + self.test.subdir('sub', ['sub', 'dir']) + self.test.write(['sub', 'exists'], "self/exists\n") + sub = self.fs.Dir('sub') + + r = sub.file_on_disk('does_not_exist') + assert not r, r + + r = sub.file_on_disk('exists') + assert r, r + + r = sub.file_on_disk('dir') + assert not r, r + +class EntryTestCase(_tempdirTestCase): + def test_runTest(self): + """Test methods specific to the Entry sub-class. + """ + test = TestCmd(workdir='') + # FS doesn't like the cwd to be something other than its root. + os.chdir(test.workpath("")) + + fs = SCons.Node.FS.FS() + + e1 = fs.Entry('e1') + e1.rfile() + assert e1.__class__ is SCons.Node.FS.File, e1.__class__ + + test.subdir('e3d') + test.write('e3f', "e3f\n") + + e3d = fs.Entry('e3d') + e3d.get_contents() + assert e3d.__class__ is SCons.Node.FS.Dir, e3d.__class__ + + e3f = fs.Entry('e3f') + e3f.get_contents() + assert e3f.__class__ is SCons.Node.FS.File, e3f.__class__ + + e3n = fs.Entry('e3n') + e3n.get_contents() + assert e3n.__class__ is SCons.Node.FS.Entry, e3n.__class__ + + test.subdir('e4d') + test.write('e4f', "e4f\n") + + e4d = fs.Entry('e4d') + exists = e4d.exists() + assert e4d.__class__ is SCons.Node.FS.Dir, e4d.__class__ + assert exists, "e4d does not exist?" + + e4f = fs.Entry('e4f') + exists = e4f.exists() + assert e4f.__class__ is SCons.Node.FS.File, e4f.__class__ + assert exists, "e4f does not exist?" + + e4n = fs.Entry('e4n') + exists = e4n.exists() + assert e4n.__class__ is SCons.Node.FS.File, e4n.__class__ + assert not exists, "e4n exists?" + + class MyCalc: + def __init__(self, val): + self.max_drift = 0 + class M: + def __init__(self, val): + self.val = val + def collect(self, args): + return reduce(lambda x, y: x+y, args) + def signature(self, executor): + return self.val + 222 + self.module = M(val) + + test.subdir('e5d') + test.write('e5f', "e5f\n") + + def test_Entry_Entry_lookup(self): + """Test looking up an Entry within another Entry""" + self.fs.Entry('#topdir') + self.fs.Entry('#topdir/a/b/c') + + + +class FileTestCase(_tempdirTestCase): + + def test_subclass(self): + """Test looking up subclass of File nodes""" + class FileSubclass(SCons.Node.FS.File): + pass + sd = self.fs._lookup('special_file', None, FileSubclass, create=1) + sd.must_be_same(SCons.Node.FS.File) + + def test_Dirs(self): + """Test the File.Dirs() method""" + fff = self.fs.File('subdir/fff') + # This simulates that the SConscript file that defined + # fff is in subdir/. + fff.cwd = self.fs.Dir('subdir') + d1 = self.fs.Dir('subdir/d1') + d2 = self.fs.Dir('subdir/d2') + dirs = fff.Dirs(['d1', 'd2']) + assert dirs == [d1, d2], map(str, dirs) + + def test_exists(self): + """Test the File.exists() method""" + fs = self.fs + test = self.test + + src_f1 = fs.File('src/f1') + assert not src_f1.exists(), "%s apparently exists?" % src_f1 + + test.subdir('src') + test.write(['src', 'f1'], "src/f1\n") + + assert not src_f1.exists(), "%s did not cache previous exists() value" % src_f1 + src_f1.clear() + assert src_f1.exists(), "%s apparently does not exist?" % src_f1 + + test.subdir('build') + fs.VariantDir('build', 'src') + build_f1 = fs.File('build/f1') + + assert build_f1.exists(), "%s did not realize that %s exists" % (build_f1, src_f1) + assert os.path.exists(build_f1.abspath), "%s did not get duplicated on disk" % build_f1.abspath + + test.unlink(['src', 'f1']) + src_f1.clear() # so the next exists() call will look on disk again + + assert build_f1.exists(), "%s did not cache previous exists() value" % build_f1 + build_f1.clear() + build_f1.linked = None + assert not build_f1.exists(), "%s did not realize that %s disappeared" % (build_f1, src_f1) + assert not os.path.exists(build_f1.abspath), "%s did not get removed after %s was removed" % (build_f1, src_f1) + + + +class GlobTestCase(_tempdirTestCase): + def setUp(self): + _tempdirTestCase.setUp(self) + + fs = SCons.Node.FS.FS() + self.fs = fs + + # Make entries on disk that will not have Nodes, so we can verify + # the behavior of looking for things on disk. + self.test.write('disk-bbb', "disk-bbb\n") + self.test.write('disk-aaa', "disk-aaa\n") + self.test.write('disk-ccc', "disk-ccc\n") + self.test.write('#disk-hash', "#disk-hash\n") + self.test.subdir('disk-sub') + self.test.write(['disk-sub', 'disk-ddd'], "disk-sub/disk-ddd\n") + self.test.write(['disk-sub', 'disk-eee'], "disk-sub/disk-eee\n") + self.test.write(['disk-sub', 'disk-fff'], "disk-sub/disk-fff\n") + + # Make some entries that have both Nodes and on-disk entries, + # so we can verify what we do with + self.test.write('both-aaa', "both-aaa\n") + self.test.write('both-bbb', "both-bbb\n") + self.test.write('both-ccc', "both-ccc\n") + self.test.write('#both-hash', "#both-hash\n") + self.test.subdir('both-sub1') + self.test.write(['both-sub1', 'both-ddd'], "both-sub1/both-ddd\n") + self.test.write(['both-sub1', 'both-eee'], "both-sub1/both-eee\n") + self.test.write(['both-sub1', 'both-fff'], "both-sub1/both-fff\n") + self.test.subdir('both-sub2') + self.test.write(['both-sub2', 'both-ddd'], "both-sub2/both-ddd\n") + self.test.write(['both-sub2', 'both-eee'], "both-sub2/both-eee\n") + self.test.write(['both-sub2', 'both-fff'], "both-sub2/both-fff\n") + + self.both_aaa = fs.File('both-aaa') + self.both_bbb = fs.File('both-bbb') + self.both_ccc = fs.File('both-ccc') + self._both_hash = fs.File('./#both-hash') + self.both_sub1 = fs.Dir('both-sub1') + self.both_sub1_both_ddd = self.both_sub1.File('both-ddd') + self.both_sub1_both_eee = self.both_sub1.File('both-eee') + self.both_sub1_both_fff = self.both_sub1.File('both-fff') + self.both_sub2 = fs.Dir('both-sub2') + self.both_sub2_both_ddd = self.both_sub2.File('both-ddd') + self.both_sub2_both_eee = self.both_sub2.File('both-eee') + self.both_sub2_both_fff = self.both_sub2.File('both-fff') + + # Make various Nodes (that don't have on-disk entries) so we + # can verify how we match them. + self.ggg = fs.File('ggg') + self.hhh = fs.File('hhh') + self.iii = fs.File('iii') + self._hash = fs.File('./#hash') + self.subdir1 = fs.Dir('subdir1') + self.subdir1_lll = self.subdir1.File('lll') + self.subdir1_jjj = self.subdir1.File('jjj') + self.subdir1_kkk = self.subdir1.File('kkk') + self.subdir2 = fs.Dir('subdir2') + self.subdir2_lll = self.subdir2.File('lll') + self.subdir2_kkk = self.subdir2.File('kkk') + self.subdir2_jjj = self.subdir2.File('jjj') + self.sub = fs.Dir('sub') + self.sub_dir3 = self.sub.Dir('dir3') + self.sub_dir3_kkk = self.sub_dir3.File('kkk') + self.sub_dir3_jjj = self.sub_dir3.File('jjj') + self.sub_dir3_lll = self.sub_dir3.File('lll') + + + def do_cases(self, cases, **kwargs): + + # First, execute all of the cases with string=True and verify + # that we get the expected strings returned. We do this first + # so the Glob() calls don't add Nodes to the self.fs file system + # hierarchy. + + import copy + strings_kwargs = copy.copy(kwargs) + strings_kwargs['strings'] = True + for input, string_expect, node_expect in cases: + r = apply(self.fs.Glob, (input,), strings_kwargs) + r.sort() + assert r == string_expect, "Glob(%s, strings=True) expected %s, got %s" % (input, string_expect, r) + + # Now execute all of the cases without string=True and look for + # the expected Nodes to be returned. If we don't have a list of + # actual expected Nodes, that means we're expecting a search for + # on-disk-only files to have returned some newly-created nodes. + # Verify those by running the list through str() before comparing + # them with the expected list of strings. + for input, string_expect, node_expect in cases: + r = apply(self.fs.Glob, (input,), kwargs) + if node_expect: + r.sort(lambda a,b: cmp(a.path, b.path)) + result = [] + for n in node_expect: + if type(n) == type(''): + n = self.fs.Entry(n) + result.append(n) + fmt = lambda n: "%s %s" % (repr(n), repr(str(n))) + else: + r = map(str, r) + r.sort() + result = string_expect + fmt = lambda n: n + if r != result: + import pprint + print "Glob(%s) expected:" % repr(input) + pprint.pprint(map(fmt, result)) + print "Glob(%s) got:" % repr(input) + pprint.pprint(map(fmt, r)) + self.fail() + + def test_exact_match(self): + """Test globbing for exact Node matches""" + join = os.path.join + + cases = ( + ('ggg', ['ggg'], [self.ggg]), + + ('subdir1', ['subdir1'], [self.subdir1]), + + ('subdir1/jjj', [join('subdir1', 'jjj')], [self.subdir1_jjj]), + + ('disk-aaa', ['disk-aaa'], None), + + ('disk-sub', ['disk-sub'], None), + + ('both-aaa', ['both-aaa'], []), + ) + + self.do_cases(cases) + + def test_subdir_matches(self): + """Test globbing for exact Node matches in subdirectories""" + join = os.path.join + + cases = ( + ('*/jjj', + [join('subdir1', 'jjj'), join('subdir2', 'jjj')], + [self.subdir1_jjj, self.subdir2_jjj]), + + ('*/disk-ddd', + [join('disk-sub', 'disk-ddd')], + None), + ) + + self.do_cases(cases) + + def test_asterisk1(self): + """Test globbing for simple asterisk Node matches (1)""" + cases = ( + ('h*', + ['hhh'], + [self.hhh]), + + ('*', + ['#both-hash', '#hash', + 'both-aaa', 'both-bbb', 'both-ccc', + 'both-sub1', 'both-sub2', + 'ggg', 'hhh', 'iii', + 'sub', 'subdir1', 'subdir2'], + [self._both_hash, self._hash, + self.both_aaa, self.both_bbb, self.both_ccc, 'both-hash', + self.both_sub1, self.both_sub2, + self.ggg, 'hash', self.hhh, self.iii, + self.sub, self.subdir1, self.subdir2]), + ) + + self.do_cases(cases, ondisk=False) + + def test_asterisk2(self): + """Test globbing for simple asterisk Node matches (2)""" + cases = ( + ('disk-b*', + ['disk-bbb'], + None), + + ('*', + ['#both-hash', '#disk-hash', '#hash', + 'both-aaa', 'both-bbb', 'both-ccc', + 'both-sub1', 'both-sub2', + 'disk-aaa', 'disk-bbb', 'disk-ccc', 'disk-sub', + 'ggg', 'hhh', 'iii', + 'sub', 'subdir1', 'subdir2'], + ['./#both-hash', './#disk-hash', './#hash', + 'both-aaa', 'both-bbb', 'both-ccc', 'both-hash', + 'both-sub1', 'both-sub2', + 'disk-aaa', 'disk-bbb', 'disk-ccc', 'disk-sub', + 'ggg', 'hash', 'hhh', 'iii', + 'sub', 'subdir1', 'subdir2']), + ) + + self.do_cases(cases) + + def test_question_mark(self): + """Test globbing for simple question-mark Node matches""" + join = os.path.join + + cases = ( + ('ii?', + ['iii'], + [self.iii]), + + ('both-sub?/both-eee', + [join('both-sub1', 'both-eee'), join('both-sub2', 'both-eee')], + [self.both_sub1_both_eee, self.both_sub2_both_eee]), + + ('subdir?/jjj', + [join('subdir1', 'jjj'), join('subdir2', 'jjj')], + [self.subdir1_jjj, self.subdir2_jjj]), + + ('disk-cc?', + ['disk-ccc'], + None), + ) + + self.do_cases(cases) + + def test_does_not_exist(self): + """Test globbing for things that don't exist""" + + cases = ( + ('does_not_exist', [], []), + ('no_subdir/*', [], []), + ('subdir?/no_file', [], []), + ) + + self.do_cases(cases) + + def test_subdir_asterisk(self): + """Test globbing for asterisk Node matches in subdirectories""" + join = os.path.join + + cases = ( + ('*/k*', + [join('subdir1', 'kkk'), join('subdir2', 'kkk')], + [self.subdir1_kkk, self.subdir2_kkk]), + + ('both-sub?/*', + [join('both-sub1', 'both-ddd'), + join('both-sub1', 'both-eee'), + join('both-sub1', 'both-fff'), + join('both-sub2', 'both-ddd'), + join('both-sub2', 'both-eee'), + join('both-sub2', 'both-fff')], + [self.both_sub1_both_ddd, + self.both_sub1_both_eee, + self.both_sub1_both_fff, + self.both_sub2_both_ddd, + self.both_sub2_both_eee, + self.both_sub2_both_fff], + ), + + ('subdir?/*', + [join('subdir1', 'jjj'), + join('subdir1', 'kkk'), + join('subdir1', 'lll'), + join('subdir2', 'jjj'), + join('subdir2', 'kkk'), + join('subdir2', 'lll')], + [self.subdir1_jjj, self.subdir1_kkk, self.subdir1_lll, + self.subdir2_jjj, self.subdir2_kkk, self.subdir2_lll]), + + ('sub/*/*', + [join('sub', 'dir3', 'jjj'), + join('sub', 'dir3', 'kkk'), + join('sub', 'dir3', 'lll')], + [self.sub_dir3_jjj, self.sub_dir3_kkk, self.sub_dir3_lll]), + + ('*/k*', + [join('subdir1', 'kkk'), join('subdir2', 'kkk')], + None), + + ('subdir?/*', + [join('subdir1', 'jjj'), + join('subdir1', 'kkk'), + join('subdir1', 'lll'), + join('subdir2', 'jjj'), + join('subdir2', 'kkk'), + join('subdir2', 'lll')], + None), + + ('sub/*/*', + [join('sub', 'dir3', 'jjj'), + join('sub', 'dir3', 'kkk'), + join('sub', 'dir3', 'lll')], + None), + ) + + self.do_cases(cases) + + def test_subdir_question(self): + """Test globbing for question-mark Node matches in subdirectories""" + join = os.path.join + + cases = ( + ('*/?kk', + [join('subdir1', 'kkk'), join('subdir2', 'kkk')], + [self.subdir1_kkk, self.subdir2_kkk]), + + ('subdir?/l?l', + [join('subdir1', 'lll'), join('subdir2', 'lll')], + [self.subdir1_lll, self.subdir2_lll]), + + ('*/disk-?ff', + [join('disk-sub', 'disk-fff')], + None), + + ('subdir?/l?l', + [join('subdir1', 'lll'), join('subdir2', 'lll')], + None), + ) + + self.do_cases(cases) + + def test_sort(self): + """Test whether globbing sorts""" + join = os.path.join + # At least sometimes this should return out-of-order items + # if Glob doesn't sort. + # It's not a very good test though since it depends on the + # order returned by glob, which might already be sorted. + g = self.fs.Glob('disk-sub/*', strings=True) + expect = [ + os.path.join('disk-sub', 'disk-ddd'), + os.path.join('disk-sub', 'disk-eee'), + os.path.join('disk-sub', 'disk-fff'), + ] + assert g == expect, str(g) + " is not sorted, but should be!" + + g = self.fs.Glob('disk-*', strings=True) + expect = [ 'disk-aaa', 'disk-bbb', 'disk-ccc', 'disk-sub' ] + assert g == expect, str(g) + " is not sorted, but should be!" + + +class RepositoryTestCase(_tempdirTestCase): + + def setUp(self): + _tempdirTestCase.setUp(self) + + self.test.subdir('rep1', 'rep2', 'rep3', 'work') + + self.rep1 = self.test.workpath('rep1') + self.rep2 = self.test.workpath('rep2') + self.rep3 = self.test.workpath('rep3') + + os.chdir(self.test.workpath('work')) + + self.fs = SCons.Node.FS.FS() + self.fs.Repository(self.rep1, self.rep2, self.rep3) + + def test_getRepositories(self): + """Test the Dir.getRepositories() method""" + self.fs.Repository('foo') + self.fs.Repository(os.path.join('foo', 'bar')) + self.fs.Repository('bar/foo') + self.fs.Repository('bar') + + expect = [ + self.rep1, + self.rep2, + self.rep3, + 'foo', + os.path.join('foo', 'bar'), + os.path.join('bar', 'foo'), + 'bar' + ] + + rep = self.fs.Dir('#').getRepositories() + r = map(lambda x, np=os.path.normpath: np(str(x)), rep) + assert r == expect, r + + def test_get_all_rdirs(self): + """Test the Dir.get_all_rdirs() method""" + self.fs.Repository('foo') + self.fs.Repository(os.path.join('foo', 'bar')) + self.fs.Repository('bar/foo') + self.fs.Repository('bar') + + expect = [ + '.', + self.rep1, + self.rep2, + self.rep3, + 'foo', + os.path.join('foo', 'bar'), + os.path.join('bar', 'foo'), + 'bar' + ] + + rep = self.fs.Dir('#').get_all_rdirs() + r = map(lambda x, np=os.path.normpath: np(str(x)), rep) + assert r == expect, r + + def test_rentry(self): + """Test the Base.entry() method""" + return_true = lambda: 1 + return_false = lambda: 0 + + d1 = self.fs.Dir('d1') + d2 = self.fs.Dir('d2') + d3 = self.fs.Dir('d3') + + e1 = self.fs.Entry('e1') + e2 = self.fs.Entry('e2') + e3 = self.fs.Entry('e3') + + f1 = self.fs.File('f1') + f2 = self.fs.File('f2') + f3 = self.fs.File('f3') + + self.test.write([self.rep1, 'd2'], "") + self.test.subdir([self.rep2, 'd3']) + self.test.write([self.rep3, 'd3'], "") + + self.test.write([self.rep1, 'e2'], "") + self.test.subdir([self.rep2, 'e3']) + self.test.write([self.rep3, 'e3'], "") + + self.test.write([self.rep1, 'f2'], "") + self.test.subdir([self.rep2, 'f3']) + self.test.write([self.rep3, 'f3'], "") + + r = d1.rentry() + assert r is d1, r + + r = d2.rentry() + assert not r is d2, r + r = str(r) + assert r == os.path.join(self.rep1, 'd2'), r + + r = d3.rentry() + assert not r is d3, r + r = str(r) + assert r == os.path.join(self.rep2, 'd3'), r + + r = e1.rentry() + assert r is e1, r + + r = e2.rentry() + assert not r is e2, r + r = str(r) + assert r == os.path.join(self.rep1, 'e2'), r + + r = e3.rentry() + assert not r is e3, r + r = str(r) + assert r == os.path.join(self.rep2, 'e3'), r + + r = f1.rentry() + assert r is f1, r + + r = f2.rentry() + assert not r is f2, r + r = str(r) + assert r == os.path.join(self.rep1, 'f2'), r + + r = f3.rentry() + assert not r is f3, r + r = str(r) + assert r == os.path.join(self.rep2, 'f3'), r + + def test_rdir(self): + """Test the Dir.rdir() method""" + return_true = lambda: 1 + return_false = lambda: 0 + + d1 = self.fs.Dir('d1') + d2 = self.fs.Dir('d2') + d3 = self.fs.Dir('d3') + + self.test.subdir([self.rep1, 'd2']) + self.test.write([self.rep2, 'd3'], "") + self.test.subdir([self.rep3, 'd3']) + + r = d1.rdir() + assert r is d1, r + + r = d2.rdir() + assert not r is d2, r + r = str(r) + assert r == os.path.join(self.rep1, 'd2'), r + + r = d3.rdir() + assert not r is d3, r + r = str(r) + assert r == os.path.join(self.rep3, 'd3'), r + + e1 = self.fs.Dir('e1') + e1.exists = return_false + e2 = self.fs.Dir('e2') + e2.exists = return_false + + # Make sure we match entries in repositories, + # regardless of whether they're derived or not. + + re1 = self.fs.Entry(os.path.join(self.rep1, 'e1')) + re1.exists = return_true + re1.is_derived = return_true + re2 = self.fs.Entry(os.path.join(self.rep2, 'e2')) + re2.exists = return_true + re2.is_derived = return_false + + r = e1.rdir() + assert r is re1, r + + r = e2.rdir() + assert r is re2, r + + def test_rfile(self): + """Test the File.rfile() method""" + return_true = lambda: 1 + return_false = lambda: 0 + + f1 = self.fs.File('f1') + f2 = self.fs.File('f2') + f3 = self.fs.File('f3') + + self.test.write([self.rep1, 'f2'], "") + self.test.subdir([self.rep2, 'f3']) + self.test.write([self.rep3, 'f3'], "") + + r = f1.rfile() + assert r is f1, r + + r = f2.rfile() + assert not r is f2, r + r = str(r) + assert r == os.path.join(self.rep1, 'f2'), r + + r = f3.rfile() + assert not r is f3, r + r = f3.rstr() + assert r == os.path.join(self.rep3, 'f3'), r + + e1 = self.fs.File('e1') + e1.exists = return_false + e2 = self.fs.File('e2') + e2.exists = return_false + + # Make sure we match entries in repositories, + # regardless of whether they're derived or not. + + re1 = self.fs.Entry(os.path.join(self.rep1, 'e1')) + re1.exists = return_true + re1.is_derived = return_true + re2 = self.fs.Entry(os.path.join(self.rep2, 'e2')) + re2.exists = return_true + re2.is_derived = return_false + + r = e1.rfile() + assert r is re1, r + + r = e2.rfile() + assert r is re2, r + + def test_Rfindalldirs(self): + """Test the Rfindalldirs() methods""" + fs = self.fs + test = self.test + + d1 = fs.Dir('d1') + d2 = fs.Dir('d2') + rep1_d1 = fs.Dir(test.workpath('rep1', 'd1')) + rep2_d1 = fs.Dir(test.workpath('rep2', 'd1')) + rep3_d1 = fs.Dir(test.workpath('rep3', 'd1')) + sub = fs.Dir('sub') + sub_d1 = sub.Dir('d1') + rep1_sub_d1 = fs.Dir(test.workpath('rep1', 'sub', 'd1')) + rep2_sub_d1 = fs.Dir(test.workpath('rep2', 'sub', 'd1')) + rep3_sub_d1 = fs.Dir(test.workpath('rep3', 'sub', 'd1')) + + r = fs.Top.Rfindalldirs((d1,)) + assert r == [d1], map(str, r) + + r = fs.Top.Rfindalldirs((d1, d2)) + assert r == [d1, d2], map(str, r) + + r = fs.Top.Rfindalldirs(('d1',)) + assert r == [d1, rep1_d1, rep2_d1, rep3_d1], map(str, r) + + r = fs.Top.Rfindalldirs(('#d1',)) + assert r == [d1, rep1_d1, rep2_d1, rep3_d1], map(str, r) + + r = sub.Rfindalldirs(('d1',)) + assert r == [sub_d1, rep1_sub_d1, rep2_sub_d1, rep3_sub_d1], map(str, r) + + r = sub.Rfindalldirs(('#d1',)) + assert r == [d1, rep1_d1, rep2_d1, rep3_d1], map(str, r) + + r = fs.Top.Rfindalldirs(('d1', d2)) + assert r == [d1, rep1_d1, rep2_d1, rep3_d1, d2], map(str, r) + + def test_rexists(self): + """Test the Entry.rexists() method""" + fs = self.fs + test = self.test + + test.write([self.rep1, 'f2'], "") + test.write([self.rep2, "i_exist"], "\n") + test.write(["work", "i_exist_too"], "\n") + + fs.VariantDir('build', '.') + + f = fs.File(test.workpath("work", "i_do_not_exist")) + assert not f.rexists() + + f = fs.File(test.workpath("work", "i_exist")) + assert f.rexists() + + f = fs.File(test.workpath("work", "i_exist_too")) + assert f.rexists() + + f1 = fs.File(os.path.join('build', 'f1')) + assert not f1.rexists() + + f2 = fs.File(os.path.join('build', 'f2')) + assert f2.rexists() + + def test_FAT_timestamps(self): + """Test repository timestamps on FAT file systems""" + fs = self.fs + test = self.test + + test.write(["rep2", "tstamp"], "tstamp\n") + try: + # Okay, *this* manipulation accomodates Windows FAT file systems + # that only have two-second granularity on their timestamps. + # We round down the current time to the nearest even integer + # value, subtract two to make sure the timestamp is not "now," + # and then convert it back to a float. + tstamp = float(int(time.time() / 2) * 2) - 2 + os.utime(test.workpath("rep2", "tstamp"), (tstamp - 2.0, tstamp)) + f = fs.File("tstamp") + t = f.get_timestamp() + assert t == tstamp, "expected %f, got %f" % (tstamp, t) + finally: + test.unlink(["rep2", "tstamp"]) + + def test_get_contents(self): + """Ensure get_contents() returns binary contents from Repositories""" + fs = self.fs + test = self.test + + test.write(["rep3", "contents"], "Con\x1aTents\n") + try: + c = fs.File("contents").get_contents() + assert c == "Con\x1aTents\n", "got '%s'" % c + finally: + test.unlink(["rep3", "contents"]) + + def test_get_text_contents(self): + """Ensure get_text_contents() returns text contents from + Repositories""" + fs = self.fs + test = self.test + + # Use a test string that has a file terminator in it to make + # sure we read the entire file, regardless of its contents. + try: + eval('test_string = u"Con\x1aTents\n"') + except SyntaxError: + import UserString + class FakeUnicodeString(UserString.UserString): + def encode(self, encoding): + return str(self) + test_string = FakeUnicodeString("Con\x1aTents\n") + + + # Test with ASCII. + test.write(["rep3", "contents"], test_string.encode('ascii')) + try: + c = fs.File("contents").get_text_contents() + assert test_string == c, "got %s" % repr(c) + finally: + test.unlink(["rep3", "contents"]) + + # Test with utf-8 + test.write(["rep3", "contents"], test_string.encode('utf-8')) + try: + c = fs.File("contents").get_text_contents() + assert test_string == c, "got %s" % repr(c) + finally: + test.unlink(["rep3", "contents"]) + + # Test with utf-16 + test.write(["rep3", "contents"], test_string.encode('utf-16')) + try: + c = fs.File("contents").get_text_contents() + assert test_string == c, "got %s" % repr(c) + finally: + test.unlink(["rep3", "contents"]) + + #def test_is_up_to_date(self): + + + +class find_fileTestCase(unittest.TestCase): + def runTest(self): + """Testing find_file function""" + test = TestCmd(workdir = '') + test.write('./foo', 'Some file\n') + test.write('./foo2', 'Another file\n') + test.subdir('same') + test.subdir('bar') + test.write(['bar', 'on_disk'], 'Another file\n') + test.write(['bar', 'same'], 'bar/same\n') + + fs = SCons.Node.FS.FS(test.workpath("")) + # FS doesn't like the cwd to be something other than its root. + os.chdir(test.workpath("")) + + node_derived = fs.File(test.workpath('bar/baz')) + node_derived.builder_set(1) # Any non-zero value. + node_pseudo = fs.File(test.workpath('pseudo')) + node_pseudo.set_src_builder(1) # Any non-zero value. + + paths = tuple(map(fs.Dir, ['.', 'same', './bar'])) + nodes = [SCons.Node.FS.find_file('foo', paths)] + nodes.append(SCons.Node.FS.find_file('baz', paths)) + nodes.append(SCons.Node.FS.find_file('pseudo', paths)) + nodes.append(SCons.Node.FS.find_file('same', paths)) + + file_names = map(str, nodes) + file_names = map(os.path.normpath, file_names) + expect = ['./foo', './bar/baz', './pseudo', './bar/same'] + expect = map(os.path.normpath, expect) + assert file_names == expect, file_names + + # Make sure we don't blow up if there's already a File in place + # of a directory that we'd otherwise try to search. If this + # is broken, we'll see an exception like "Tried to lookup File + # 'bar/baz' as a Dir. + SCons.Node.FS.find_file('baz/no_file_here', paths) + + import StringIO + save_sys_stdout = sys.stdout + + try: + sio = StringIO.StringIO() + sys.stdout = sio + SCons.Node.FS.find_file('foo2', paths, verbose="xyz") + expect = " xyz: looking for 'foo2' in '.' ...\n" + \ + " xyz: ... FOUND 'foo2' in '.'\n" + c = sio.getvalue() + assert c == expect, c + + sio = StringIO.StringIO() + sys.stdout = sio + SCons.Node.FS.find_file('baz2', paths, verbose=1) + expect = " find_file: looking for 'baz2' in '.' ...\n" + \ + " find_file: looking for 'baz2' in 'same' ...\n" + \ + " find_file: looking for 'baz2' in 'bar' ...\n" + c = sio.getvalue() + assert c == expect, c + + sio = StringIO.StringIO() + sys.stdout = sio + SCons.Node.FS.find_file('on_disk', paths, verbose=1) + expect = " find_file: looking for 'on_disk' in '.' ...\n" + \ + " find_file: looking for 'on_disk' in 'same' ...\n" + \ + " find_file: looking for 'on_disk' in 'bar' ...\n" + \ + " find_file: ... FOUND 'on_disk' in 'bar'\n" + c = sio.getvalue() + assert c == expect, c + finally: + sys.stdout = save_sys_stdout + +class StringDirTestCase(unittest.TestCase): + def runTest(self): + """Test using a string as the second argument of + File() and Dir()""" + + test = TestCmd(workdir = '') + test.subdir('sub') + fs = SCons.Node.FS.FS(test.workpath('')) + + d = fs.Dir('sub', '.') + assert str(d) == 'sub', str(d) + assert d.exists() + f = fs.File('file', 'sub') + assert str(f) == os.path.join('sub', 'file') + assert not f.exists() + +class stored_infoTestCase(unittest.TestCase): + def runTest(self): + """Test how we store build information""" + test = TestCmd(workdir = '') + test.subdir('sub') + fs = SCons.Node.FS.FS(test.workpath('')) + + d = fs.Dir('sub') + f = fs.File('file1', d) + bi = f.get_stored_info() + assert hasattr(bi, 'ninfo') + + class MySConsign: + class Null: + def __init__(self): + self.xyzzy = 7 + def get_entry(self, name): + return self.Null() + + f = fs.File('file2', d) + f.dir.sconsign = MySConsign + bi = f.get_stored_info() + assert bi.xyzzy == 7, bi + +class has_src_builderTestCase(unittest.TestCase): + def runTest(self): + """Test the has_src_builder() method""" + test = TestCmd(workdir = '') + fs = SCons.Node.FS.FS(test.workpath('')) + os.chdir(test.workpath('')) + test.subdir('sub1') + test.subdir('sub2', ['sub2', 'SCCS'], ['sub2', 'RCS']) + + sub1 = fs.Dir('sub1', '.') + f1 = fs.File('f1', sub1) + f2 = fs.File('f2', sub1) + f3 = fs.File('f3', sub1) + sub2 = fs.Dir('sub2', '.') + f4 = fs.File('f4', sub2) + f5 = fs.File('f5', sub2) + f6 = fs.File('f6', sub2) + f7 = fs.File('f7', sub2) + f8 = fs.File('f8', sub2) + + h = f1.has_src_builder() + assert not h, h + h = f1.has_builder() + assert not h, h + + b1 = Builder(fs.File) + sub1.set_src_builder(b1) + + test.write(['sub1', 'f2'], "sub1/f2\n") + h = f1.has_src_builder() # cached from previous call + assert not h, h + h = f1.has_builder() # cached from previous call + assert not h, h + h = f2.has_src_builder() + assert not h, h + h = f2.has_builder() + assert not h, h + h = f3.has_src_builder() + assert h, h + h = f3.has_builder() + assert h, h + assert f3.builder is b1, f3.builder + + f7.set_src_builder(b1) + f8.builder_set(b1) + + test.write(['sub2', 'SCCS', 's.f5'], "sub2/SCCS/s.f5\n") + test.write(['sub2', 'RCS', 'f6,v'], "sub2/RCS/f6,v\n") + h = f4.has_src_builder() + assert not h, h + h = f4.has_builder() + assert not h, h + h = f5.has_src_builder() + assert h, h + h = f5.has_builder() + assert h, h + h = f6.has_src_builder() + assert h, h + h = f6.has_builder() + assert h, h + h = f7.has_src_builder() + assert h, h + h = f7.has_builder() + assert h, h + h = f8.has_src_builder() + assert not h, h + h = f8.has_builder() + assert h, h + +class prepareTestCase(unittest.TestCase): + def runTest(self): + """Test the prepare() method""" + + class MyFile(SCons.Node.FS.File): + def _createDir(self, update=None): + raise SCons.Errors.StopError + def exists(self): + return None + + fs = SCons.Node.FS.FS() + file = MyFile('foo', fs.Dir('.'), fs) + + exc_caught = 0 + try: + file.prepare() + except SCons.Errors.StopError: + exc_caught = 1 + assert exc_caught, "Should have caught a StopError." + + class MkdirAction(Action): + def __init__(self, dir_made): + self.dir_made = dir_made + def __call__(self, target, source, env, executor=None): + if executor: + target = executor.get_all_targets() + source = executor.get_all_sources() + self.dir_made.extend(target) + + dir_made = [] + new_dir = fs.Dir("new_dir") + new_dir.builder = Builder(fs.Dir, action=MkdirAction(dir_made)) + new_dir.reset_executor() + xyz = fs.File(os.path.join("new_dir", "xyz")) + + xyz.set_state(SCons.Node.up_to_date) + xyz.prepare() + assert dir_made == [], dir_made + + xyz.set_state(0) + xyz.prepare() + assert dir_made[0].path == "new_dir", dir_made[0] + + dir = fs.Dir("dir") + dir.prepare() + + + +class SConstruct_dirTestCase(unittest.TestCase): + def runTest(self): + """Test setting the SConstruct directory""" + + fs = SCons.Node.FS.FS() + fs.set_SConstruct_dir(fs.Dir('xxx')) + assert fs.SConstruct_dir.path == 'xxx' + + + +class CacheDirTestCase(unittest.TestCase): + + def test_get_cachedir_csig(self): + fs = SCons.Node.FS.FS() + + f9 = fs.File('f9') + r = f9.get_cachedir_csig() + assert r == 'd41d8cd98f00b204e9800998ecf8427e', r + + + +class clearTestCase(unittest.TestCase): + def runTest(self): + """Test clearing FS nodes of cached data.""" + fs = SCons.Node.FS.FS() + test = TestCmd(workdir='') + + e = fs.Entry('e') + assert not e.exists() + assert not e.rexists() + assert str(e) == 'e', str(d) + e.clear() + assert not e.exists() + assert not e.rexists() + assert str(e) == 'e', str(d) + + d = fs.Dir(test.workpath('d')) + test.subdir('d') + assert d.exists() + assert d.rexists() + assert str(d) == test.workpath('d'), str(d) + fs.rename(test.workpath('d'), test.workpath('gone')) + # Verify caching is active + assert d.exists(), 'caching not active' + assert d.rexists() + assert str(d) == test.workpath('d'), str(d) + # Now verify clear() resets the cache + d.clear() + assert not d.exists() + assert not d.rexists() + assert str(d) == test.workpath('d'), str(d) + + f = fs.File(test.workpath('f')) + test.write(test.workpath('f'), 'file f') + assert f.exists() + assert f.rexists() + assert str(f) == test.workpath('f'), str(f) + # Verify caching is active + test.unlink(test.workpath('f')) + assert f.exists() + assert f.rexists() + assert str(f) == test.workpath('f'), str(f) + # Now verify clear() resets the cache + f.clear() + assert not f.exists() + assert not f.rexists() + assert str(f) == test.workpath('f'), str(f) + + + +class disambiguateTestCase(unittest.TestCase): + def runTest(self): + """Test calling the disambiguate() method.""" + test = TestCmd(workdir='') + + fs = SCons.Node.FS.FS() + + ddd = fs.Dir('ddd') + d = ddd.disambiguate() + assert d is ddd, d + + fff = fs.File('fff') + f = fff.disambiguate() + assert f is fff, f + + test.subdir('edir') + test.write('efile', "efile\n") + + edir = fs.Entry(test.workpath('edir')) + d = edir.disambiguate() + assert d.__class__ is ddd.__class__, d.__class__ + + efile = fs.Entry(test.workpath('efile')) + f = efile.disambiguate() + assert f.__class__ is fff.__class__, f.__class__ + + test.subdir('build') + test.subdir(['build', 'bdir']) + test.write(['build', 'bfile'], "build/bfile\n") + + test.subdir('src') + test.write(['src', 'bdir'], "src/bdir\n") + test.subdir(['src', 'bfile']) + + test.subdir(['src', 'edir']) + test.write(['src', 'efile'], "src/efile\n") + + fs.VariantDir(test.workpath('build'), test.workpath('src')) + + build_bdir = fs.Entry(test.workpath('build/bdir')) + d = build_bdir.disambiguate() + assert d is build_bdir, d + assert d.__class__ is ddd.__class__, d.__class__ + + build_bfile = fs.Entry(test.workpath('build/bfile')) + f = build_bfile.disambiguate() + assert f is build_bfile, f + assert f.__class__ is fff.__class__, f.__class__ + + build_edir = fs.Entry(test.workpath('build/edir')) + d = build_edir.disambiguate() + assert d.__class__ is ddd.__class__, d.__class__ + + build_efile = fs.Entry(test.workpath('build/efile')) + f = build_efile.disambiguate() + assert f.__class__ is fff.__class__, f.__class__ + + build_nonexistant = fs.Entry(test.workpath('build/nonexistant')) + f = build_nonexistant.disambiguate() + assert f.__class__ is fff.__class__, f.__class__ + +class postprocessTestCase(unittest.TestCase): + def runTest(self): + """Test calling the postprocess() method.""" + fs = SCons.Node.FS.FS() + + e = fs.Entry('e') + e.postprocess() + + d = fs.Dir('d') + d.postprocess() + + f = fs.File('f') + f.postprocess() + + + +class SpecialAttrTestCase(unittest.TestCase): + def runTest(self): + """Test special attributes of file nodes.""" + test=TestCmd(workdir='') + fs = SCons.Node.FS.FS(test.workpath('work')) + + f = fs.Entry('foo/bar/baz.blat').get_subst_proxy() + + s = str(f.dir) + assert s == os.path.normpath('foo/bar'), s + assert f.dir.is_literal(), f.dir + for_sig = f.dir.for_signature() + assert for_sig == 'bar', for_sig + + s = str(f.file) + assert s == 'baz.blat', s + assert f.file.is_literal(), f.file + for_sig = f.file.for_signature() + assert for_sig == 'baz.blat_file', for_sig + + s = str(f.base) + assert s == os.path.normpath('foo/bar/baz'), s + assert f.base.is_literal(), f.base + for_sig = f.base.for_signature() + assert for_sig == 'baz.blat_base', for_sig + + s = str(f.filebase) + assert s == 'baz', s + assert f.filebase.is_literal(), f.filebase + for_sig = f.filebase.for_signature() + assert for_sig == 'baz.blat_filebase', for_sig + + s = str(f.suffix) + assert s == '.blat', s + assert f.suffix.is_literal(), f.suffix + for_sig = f.suffix.for_signature() + assert for_sig == 'baz.blat_suffix', for_sig + + s = str(f.abspath) + assert s == test.workpath('work', 'foo', 'bar', 'baz.blat'), s + assert f.abspath.is_literal(), f.abspath + for_sig = f.abspath.for_signature() + assert for_sig == 'baz.blat_abspath', for_sig + + s = str(f.posix) + assert s == 'foo/bar/baz.blat', s + assert f.posix.is_literal(), f.posix + if f.posix != f: + for_sig = f.posix.for_signature() + assert for_sig == 'baz.blat_posix', for_sig + + s = str(f.windows) + assert s == 'foo\\bar\\baz.blat', repr(s) + assert f.windows.is_literal(), f.windows + if f.windows != f: + for_sig = f.windows.for_signature() + assert for_sig == 'baz.blat_windows', for_sig + + # Deprecated synonym for the .windows suffix. + s = str(f.win32) + assert s == 'foo\\bar\\baz.blat', repr(s) + assert f.win32.is_literal(), f.win32 + if f.win32 != f: + for_sig = f.win32.for_signature() + assert for_sig == 'baz.blat_windows', for_sig + + # And now, combinations!!! + s = str(f.srcpath.base) + assert s == os.path.normpath('foo/bar/baz'), s + s = str(f.srcpath.dir) + assert s == str(f.srcdir), s + s = str(f.srcpath.posix) + assert s == 'foo/bar/baz.blat', s + s = str(f.srcpath.windows) + assert s == 'foo\\bar\\baz.blat', s + s = str(f.srcpath.win32) + assert s == 'foo\\bar\\baz.blat', s + + # Test what happens with VariantDir() + fs.VariantDir('foo', 'baz') + + s = str(f.srcpath) + assert s == os.path.normpath('baz/bar/baz.blat'), s + assert f.srcpath.is_literal(), f.srcpath + g = f.srcpath.get() + assert isinstance(g, SCons.Node.FS.File), g.__class__ + + s = str(f.srcdir) + assert s == os.path.normpath('baz/bar'), s + assert f.srcdir.is_literal(), f.srcdir + g = f.srcdir.get() + assert isinstance(g, SCons.Node.FS.Dir), g.__class__ + + # And now what happens with VariantDir() + Repository() + fs.Repository(test.workpath('repository')) + + f = fs.Entry('foo/sub/file.suffix').get_subst_proxy() + test.subdir('repository', + ['repository', 'baz'], + ['repository', 'baz', 'sub']) + + rd = test.workpath('repository', 'baz', 'sub') + rf = test.workpath('repository', 'baz', 'sub', 'file.suffix') + test.write(rf, "\n") + + s = str(f.srcpath) + assert s == os.path.normpath('baz/sub/file.suffix'), s + assert f.srcpath.is_literal(), f.srcpath + g = f.srcpath.get() + # Gets disambiguated to SCons.Node.FS.File by get_subst_proxy(). + assert isinstance(g, SCons.Node.FS.File), g.__class__ + + s = str(f.srcdir) + assert s == os.path.normpath('baz/sub'), s + assert f.srcdir.is_literal(), f.srcdir + g = f.srcdir.get() + assert isinstance(g, SCons.Node.FS.Dir), g.__class__ + + s = str(f.rsrcpath) + assert s == rf, s + assert f.rsrcpath.is_literal(), f.rsrcpath + g = f.rsrcpath.get() + assert isinstance(g, SCons.Node.FS.File), g.__class__ + + s = str(f.rsrcdir) + assert s == rd, s + assert f.rsrcdir.is_literal(), f.rsrcdir + g = f.rsrcdir.get() + assert isinstance(g, SCons.Node.FS.Dir), g.__class__ + + # Check that attempts to access non-existent attributes of the + # subst proxy generate the right exceptions and messages. + caught = None + try: + fs.Dir('ddd').get_subst_proxy().no_such_attr + except AttributeError, e: + assert str(e) == "Dir instance 'ddd' has no attribute 'no_such_attr'", e + caught = 1 + assert caught, "did not catch expected AttributeError" + + caught = None + try: + fs.Entry('eee').get_subst_proxy().no_such_attr + except AttributeError, e: + # Gets disambiguated to File instance by get_subst_proxy(). + assert str(e) == "File instance 'eee' has no attribute 'no_such_attr'", e + caught = 1 + assert caught, "did not catch expected AttributeError" + + caught = None + try: + fs.File('fff').get_subst_proxy().no_such_attr + except AttributeError, e: + assert str(e) == "File instance 'fff' has no attribute 'no_such_attr'", e + caught = 1 + assert caught, "did not catch expected AttributeError" + + + +class SaveStringsTestCase(unittest.TestCase): + def runTest(self): + """Test caching string values of nodes.""" + test=TestCmd(workdir='') + + def setup(fs): + fs.Dir('src') + fs.Dir('d0') + fs.Dir('d1') + + d0_f = fs.File('d0/f') + d1_f = fs.File('d1/f') + d0_b = fs.File('d0/b') + d1_b = fs.File('d1/b') + d1_f.duplicate = 1 + d1_b.duplicate = 1 + d0_b.builder = 1 + d1_b.builder = 1 + + return [d0_f, d1_f, d0_b, d1_b] + + def modify(nodes): + d0_f, d1_f, d0_b, d1_b = nodes + d1_f.duplicate = 0 + d1_b.duplicate = 0 + d0_b.builder = 0 + d1_b.builder = 0 + + fs1 = SCons.Node.FS.FS(test.workpath('fs1')) + nodes = setup(fs1) + fs1.VariantDir('d0', 'src', duplicate=0) + fs1.VariantDir('d1', 'src', duplicate=1) + + s = map(str, nodes) + expect = map(os.path.normpath, ['src/f', 'd1/f', 'd0/b', 'd1/b']) + assert s == expect, s + + modify(nodes) + + s = map(str, nodes) + expect = map(os.path.normpath, ['src/f', 'src/f', 'd0/b', 'd1/b']) + assert s == expect, s + + SCons.Node.FS.save_strings(1) + fs2 = SCons.Node.FS.FS(test.workpath('fs2')) + nodes = setup(fs2) + fs2.VariantDir('d0', 'src', duplicate=0) + fs2.VariantDir('d1', 'src', duplicate=1) + + s = map(str, nodes) + expect = map(os.path.normpath, ['src/f', 'd1/f', 'd0/b', 'd1/b']) + assert s == expect, s + + modify(nodes) + + s = map(str, nodes) + expect = map(os.path.normpath, ['src/f', 'd1/f', 'd0/b', 'd1/b']) + assert s == expect, 'node str() not cached: %s'%s + + +class AbsolutePathTestCase(unittest.TestCase): + def test_root_lookup_equivalence(self): + """Test looking up /fff vs. fff in the / directory""" + test=TestCmd(workdir='') + + fs = SCons.Node.FS.FS('/') + + save_cwd = os.getcwd() + try: + os.chdir('/') + fff1 = fs.File('fff') + fff2 = fs.File('/fff') + assert fff1 is fff2, "fff and /fff returned different Nodes!" + finally: + os.chdir(save_cwd) + + + +if __name__ == "__main__": + suite = unittest.TestSuite() + suite.addTest(VariantDirTestCase()) + suite.addTest(find_fileTestCase()) + suite.addTest(StringDirTestCase()) + suite.addTest(stored_infoTestCase()) + suite.addTest(has_src_builderTestCase()) + suite.addTest(prepareTestCase()) + suite.addTest(SConstruct_dirTestCase()) + suite.addTest(clearTestCase()) + suite.addTest(disambiguateTestCase()) + suite.addTest(postprocessTestCase()) + suite.addTest(SpecialAttrTestCase()) + suite.addTest(SaveStringsTestCase()) + tclasses = [ + AbsolutePathTestCase, + BaseTestCase, + CacheDirTestCase, + DirTestCase, + DirBuildInfoTestCase, + DirNodeInfoTestCase, + EntryTestCase, + FileTestCase, + FileBuildInfoTestCase, + FileNodeInfoTestCase, + FSTestCase, + GlobTestCase, + RepositoryTestCase, + ] + for tclass in tclasses: + names = unittest.getTestCaseNames(tclass, 'test_') + suite.addTests(map(tclass, names)) + if not unittest.TextTestRunner().run(suite).wasSuccessful(): + sys.exit(1) + +# Local Variables: +# tab-width:4 +# indent-tabs-mode:nil +# End: +# vim: set expandtab tabstop=4 shiftwidth=4: diff --git a/src/engine/SCons/Node/NodeTests.py b/src/engine/SCons/Node/NodeTests.py new file mode 100644 index 0000000..7920dcb --- /dev/null +++ b/src/engine/SCons/Node/NodeTests.py @@ -0,0 +1,1317 @@ +# +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 The SCons Foundation +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY +# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# + +__revision__ = "src/engine/SCons/Node/NodeTests.py 4577 2009/12/27 19:44:43 scons" + +import os +import re +import string +import sys +import types +import unittest +import UserList + +import SCons.Errors +import SCons.Node +import SCons.Util + + + +built_it = None +built_target = None +built_source = None +cycle_detected = None +built_order = 0 + +def _actionAppend(a1, a2): + all = [] + for curr_a in [a1, a2]: + if isinstance(curr_a, MyAction): + all.append(curr_a) + elif isinstance(curr_a, MyListAction): + all.extend(curr_a.list) + elif type(curr_a) == type([1,2]): + all.extend(curr_a) + else: + raise 'Cannot Combine Actions' + return MyListAction(all) + +class MyActionBase: + def __add__(self, other): + return _actionAppend(self, other) + + def __radd__(self, other): + return _actionAppend(other, self) + +class MyAction(MyActionBase): + def __init__(self): + self.order = 0 + + def __call__(self, target, source, env, executor=None): + global built_it, built_target, built_source, built_args, built_order + if executor: + target = executor.get_all_targets() + source = executor.get_all_sources() + built_it = 1 + built_target = target + built_source = source + built_args = env + built_order = built_order + 1 + self.order = built_order + return 0 + + def get_implicit_deps(self, target, source, env): + return [] + +class MyExecutor: + def __init__(self, env=None, targets=[], sources=[]): + self.env = env + self.targets = targets + self.sources = sources + def get_build_env(self): + return self.env + def get_build_scanner_path(self, scanner): + return 'executor would call %s' % scanner + def cleanup(self): + self.cleaned_up = 1 + def scan_targets(self, scanner): + if not scanner: + return + d = scanner(self.targets) + for t in self.targets: + t.implicit.extend(d) + def scan_sources(self, scanner): + if not scanner: + return + d = scanner(self.sources) + for t in self.targets: + t.implicit.extend(d) + +class MyListAction(MyActionBase): + def __init__(self, list): + self.list = list + def __call__(self, target, source, env): + for A in self.list: + A(target, source, env) + +class Environment: + def __init__(self, **kw): + self._dict = {} + self._dict.update(kw) + def __getitem__(self, key): + return self._dict[key] + def Dictionary(self, *args): + return {} + def Override(self, overrides): + d = self._dict.copy() + d.update(overrides) + return apply(Environment, (), d) + def _update(self, dict): + self._dict.update(dict) + def get_factory(self, factory): + return factory or MyNode + def get_scanner(self, scanner_key): + return self._dict['SCANNERS'][0] + +class Builder: + def __init__(self, env=None, is_explicit=1): + if env is None: env = Environment() + self.env = env + self.overrides = {} + self.action = MyAction() + self.source_factory = MyNode + self.is_explicit = is_explicit + self.target_scanner = None + self.source_scanner = None + def targets(self, t): + return [t] + def get_actions(self): + return [self.action] + def get_contents(self, target, source, env): + return 7 + +class NoneBuilder(Builder): + def execute(self, target, source, env): + Builder.execute(self, target, source, env) + return None + +class ListBuilder(Builder): + def __init__(self, *nodes): + Builder.__init__(self) + self.nodes = nodes + def execute(self, target, source, env): + if hasattr(self, 'status'): + return self.status + for n in self.nodes: + n.prepare() + target = self.nodes[0] + self.status = Builder.execute(self, target, source, env) + +class FailBuilder: + def execute(self, target, source, env): + return 1 + +class ExceptBuilder: + def execute(self, target, source, env): + raise SCons.Errors.BuildError + +class ExceptBuilder2: + def execute(self, target, source, env): + raise "foo" + +class Scanner: + called = None + def __call__(self, node): + self.called = 1 + return node.found_includes + def path(self, env, dir, target=None, source=None): + return () + def select(self, node): + return self + def recurse_nodes(self, nodes): + return nodes + +class MyNode(SCons.Node.Node): + """The base Node class contains a number of do-nothing methods that + we expect to be overridden by real, functional Node subclasses. So + simulate a real, functional Node subclass. + """ + def __init__(self, name): + SCons.Node.Node.__init__(self) + self.name = name + self.found_includes = [] + def __str__(self): + return self.name + def get_found_includes(self, env, scanner, target): + return scanner(self) + +class Calculator: + def __init__(self, val): + self.max_drift = 0 + class M: + def __init__(self, val): + self.val = val + def signature(self, args): + return self.val + def collect(self, args): + return reduce(lambda x, y: x+y, args, self.val) + self.module = M(val) + + + +class NodeInfoBaseTestCase(unittest.TestCase): + + def test_merge(self): + """Test merging NodeInfoBase attributes""" + ni1 = SCons.Node.NodeInfoBase(SCons.Node.Node()) + ni2 = SCons.Node.NodeInfoBase(SCons.Node.Node()) + + ni1.a1 = 1 + ni1.a2 = 2 + + ni2.a2 = 222 + ni2.a3 = 333 + + ni1.merge(ni2) + expect = {'a1':1, 'a2':222, 'a3':333, '_version_id':1} + assert ni1.__dict__ == expect, ni1.__dict__ + + def test_update(self): + """Test the update() method""" + ni = SCons.Node.NodeInfoBase(SCons.Node.Node()) + ni.update(SCons.Node.Node()) + + def test_format(self): + """Test the NodeInfoBase.format() method""" + ni1 = SCons.Node.NodeInfoBase(SCons.Node.Node()) + ni1.xxx = 'x' + ni1.yyy = 'y' + ni1.zzz = 'z' + + f = ni1.format() + assert f == ['1', 'x', 'y', 'z'], f + + ni1.field_list = ['xxx', 'zzz', 'aaa'] + + f = ni1.format() + assert f == ['x', 'z', 'None'], f + + + +class BuildInfoBaseTestCase(unittest.TestCase): + + def test___init__(self): + """Test BuildInfoBase initialization""" + n = SCons.Node.Node() + bi = SCons.Node.BuildInfoBase(n) + assert bi + + def test_merge(self): + """Test merging BuildInfoBase attributes""" + n1 = SCons.Node.Node() + bi1 = SCons.Node.BuildInfoBase(n1) + n2 = SCons.Node.Node() + bi2 = SCons.Node.BuildInfoBase(n2) + + bi1.a1 = 1 + bi1.a2 = 2 + + bi2.a2 = 222 + bi2.a3 = 333 + + bi1.merge(bi2) + assert bi1.a1 == 1, bi1.a1 + assert bi1.a2 == 222, bi1.a2 + assert bi1.a3 == 333, bi1.a3 + + +class NodeTestCase(unittest.TestCase): + + def test_build(self): + """Test building a node + """ + global built_it, built_order + + # Make sure it doesn't blow up if no builder is set. + node = MyNode("www") + node.build() + assert built_it is None + node.build(extra_kw_argument = 1) + assert built_it is None + + node = MyNode("xxx") + node.builder_set(Builder()) + node.env_set(Environment()) + node.path = "xxx" + node.sources = ["yyy", "zzz"] + node.build() + assert built_it + assert built_target == [node], built_target + assert built_source == ["yyy", "zzz"], built_source + + built_it = None + node = MyNode("qqq") + node.builder_set(NoneBuilder()) + node.env_set(Environment()) + node.path = "qqq" + node.sources = ["rrr", "sss"] + node.builder.overrides = { "foo" : 1, "bar" : 2 } + node.build() + assert built_it + assert built_target == [node], built_target + assert built_source == ["rrr", "sss"], built_source + assert built_args["foo"] == 1, built_args + assert built_args["bar"] == 2, built_args + + fff = MyNode("fff") + ggg = MyNode("ggg") + lb = ListBuilder(fff, ggg) + e = Environment() + fff.builder_set(lb) + fff.env_set(e) + fff.path = "fff" + ggg.builder_set(lb) + ggg.env_set(e) + ggg.path = "ggg" + fff.sources = ["hhh", "iii"] + ggg.sources = ["hhh", "iii"] + # [Charles C. 1/7/2002] Uhhh, why are there no asserts here? + # [SK, 15 May 2003] I dunno, let's add some... + built_it = None + fff.build() + assert built_it + assert built_target == [fff], built_target + assert built_source == ["hhh", "iii"], built_source + built_it = None + ggg.build() + assert built_it + assert built_target == [ggg], built_target + assert built_source == ["hhh", "iii"], built_source + + built_it = None + jjj = MyNode("jjj") + b = Builder() + jjj.builder_set(b) + # NOTE: No env_set()! We should pull the environment from the builder. + b.env = Environment() + b.overrides = { "on" : 3, "off" : 4 } + e.builder = b + jjj.build() + assert built_it + assert built_target[0] == jjj, built_target[0] + assert built_source == [], built_source + assert built_args["on"] == 3, built_args + assert built_args["off"] == 4, built_args + + def test_get_build_scanner_path(self): + """Test the get_build_scanner_path() method""" + n = SCons.Node.Node() + x = MyExecutor() + n.set_executor(x) + p = n.get_build_scanner_path('fake_scanner') + assert p == "executor would call fake_scanner", p + + def test_get_executor(self): + """Test the get_executor() method""" + n = SCons.Node.Node() + + try: + n.get_executor(0) + except AttributeError: + pass + else: + self.fail("did not catch expected AttributeError") + + class Builder: + action = 'act' + env = 'env1' + overrides = {} + + n = SCons.Node.Node() + n.builder_set(Builder()) + x = n.get_executor() + assert x.env == 'env1', x.env + + n = SCons.Node.Node() + n.builder_set(Builder()) + n.env_set('env2') + x = n.get_executor() + assert x.env == 'env2', x.env + + def test_set_executor(self): + """Test the set_executor() method""" + n = SCons.Node.Node() + n.set_executor(1) + assert n.executor == 1, n.executor + + def test_executor_cleanup(self): + """Test letting the executor cleanup its cache""" + n = SCons.Node.Node() + x = MyExecutor() + n.set_executor(x) + n.executor_cleanup() + assert x.cleaned_up + + def test_reset_executor(self): + """Test the reset_executor() method""" + n = SCons.Node.Node() + n.set_executor(1) + assert n.executor == 1, n.executor + n.reset_executor() + assert not hasattr(n, 'executor'), "unexpected executor attribute" + + def test_built(self): + """Test the built() method""" + class SubNodeInfo(SCons.Node.NodeInfoBase): + def update(self, node): + self.updated = 1 + class SubNode(SCons.Node.Node): + def clear(self): + self.cleared = 1 + + n = SubNode() + n.ninfo = SubNodeInfo(n) + n.built() + assert n.cleared, n.cleared + assert n.ninfo.updated, n.ninfo.cleared + + def test_push_to_cache(self): + """Test the base push_to_cache() method""" + n = SCons.Node.Node() + r = n.push_to_cache() + assert r is None, r + + def test_retrieve_from_cache(self): + """Test the base retrieve_from_cache() method""" + n = SCons.Node.Node() + r = n.retrieve_from_cache() + assert r == 0, r + + def test_visited(self): + """Test the base visited() method + + Just make sure it's there and we can call it. + """ + n = SCons.Node.Node() + n.visited() + + def test_builder_set(self): + """Test setting a Node's Builder + """ + node = SCons.Node.Node() + b = Builder() + node.builder_set(b) + assert node.builder == b + + def test_has_builder(self): + """Test the has_builder() method + """ + n1 = SCons.Node.Node() + assert n1.has_builder() == 0 + n1.builder_set(Builder()) + assert n1.has_builder() == 1 + + def test_has_explicit_builder(self): + """Test the has_explicit_builder() method + """ + n1 = SCons.Node.Node() + assert not n1.has_explicit_builder() + n1.set_explicit(1) + assert n1.has_explicit_builder() + n1.set_explicit(None) + assert not n1.has_explicit_builder() + + def test_get_builder(self): + """Test the get_builder() method""" + n1 = SCons.Node.Node() + b = n1.get_builder() + assert b is None, b + b = n1.get_builder(777) + assert b == 777, b + n1.builder_set(888) + b = n1.get_builder() + assert b == 888, b + b = n1.get_builder(999) + assert b == 888, b + + def test_multiple_side_effect_has_builder(self): + """Test the multiple_side_effect_has_builder() method + """ + n1 = SCons.Node.Node() + assert n1.multiple_side_effect_has_builder() == 0 + n1.builder_set(Builder()) + assert n1.multiple_side_effect_has_builder() == 1 + + def test_is_derived(self): + """Test the is_derived() method + """ + n1 = SCons.Node.Node() + n2 = SCons.Node.Node() + n3 = SCons.Node.Node() + + n2.builder_set(Builder()) + n3.side_effect = 1 + + assert n1.is_derived() == 0 + assert n2.is_derived() == 1 + assert n3.is_derived() == 1 + + def test_alter_targets(self): + """Test the alter_targets() method + """ + n = SCons.Node.Node() + t, m = n.alter_targets() + assert t == [], t + assert m is None, m + + def test_is_up_to_date(self): + """Test the default is_up_to_date() method + """ + node = SCons.Node.Node() + assert node.is_up_to_date() is None + + def test_children_are_up_to_date(self): + """Test the children_are_up_to_date() method used by subclasses + """ + n1 = SCons.Node.Node() + n2 = SCons.Node.Node() + + n1.add_source([n2]) + assert n1.children_are_up_to_date(), "expected up to date" + n2.set_state(SCons.Node.executed) + assert not n1.children_are_up_to_date(), "expected not up to date" + n2.set_state(SCons.Node.up_to_date) + assert n1.children_are_up_to_date(), "expected up to date" + n1.always_build = 1 + assert not n1.children_are_up_to_date(), "expected not up to date" + + def test_env_set(self): + """Test setting a Node's Environment + """ + node = SCons.Node.Node() + e = Environment() + node.env_set(e) + assert node.env == e + + def test_get_actions(self): + """Test fetching a Node's action list + """ + node = SCons.Node.Node() + node.builder_set(Builder()) + a = node.builder.get_actions() + assert isinstance(a[0], MyAction), a[0] + + def test_get_csig(self): + """Test generic content signature calculation + """ + node = SCons.Node.Node() + node.get_contents = lambda: 444 + result = node.get_csig() + assert result == '550a141f12de6341fba65b0ad0433500', result + + def test_get_cachedir_csig(self): + """Test content signature calculation for CacheDir + """ + node = SCons.Node.Node() + node.get_contents = lambda: 555 + result = node.get_cachedir_csig() + assert result == '15de21c670ae7c3f6f3f1f37029303c9', result + + def test_get_binfo(self): + """Test fetching/creating a build information structure + """ + node = SCons.Node.Node() + + binfo = node.get_binfo() + assert isinstance(binfo, SCons.Node.BuildInfoBase), binfo + + node = SCons.Node.Node() + d = SCons.Node.Node() + d.get_ninfo().csig = 777 + i = SCons.Node.Node() + i.get_ninfo().csig = 888 + node.depends = [d] + node.implicit = [i] + + binfo = node.get_binfo() + assert isinstance(binfo, SCons.Node.BuildInfoBase), binfo + assert hasattr(binfo, 'bsources') + assert hasattr(binfo, 'bsourcesigs') + assert binfo.bdepends == [d] + assert hasattr(binfo, 'bdependsigs') + assert binfo.bimplicit == [i] + assert hasattr(binfo, 'bimplicitsigs') + + def test_explain(self): + """Test explaining why a Node must be rebuilt + """ + class testNode(SCons.Node.Node): + def __str__(self): return 'xyzzy' + node = testNode() + node.exists = lambda: None + # Can't do this with new-style classes (python bug #1066490) + #node.__str__ = lambda: 'xyzzy' + result = node.explain() + assert result == "building `xyzzy' because it doesn't exist\n", result + + class testNode2(SCons.Node.Node): + def __str__(self): return 'null_binfo' + class FS: + pass + node = testNode2() + node.fs = FS() + node.fs.Top = SCons.Node.Node() + result = node.explain() + assert result is None, result + + def get_null_info(): + class Null_SConsignEntry: + class Null_BuildInfo: + def prepare_dependencies(self): + pass + binfo = Null_BuildInfo() + return Null_SConsignEntry() + + node.get_stored_info = get_null_info + #see above: node.__str__ = lambda: 'null_binfo' + result = node.explain() + assert result == "Cannot explain why `null_binfo' is being rebuilt: No previous build information found\n", result + + # XXX additional tests for the guts of the functionality some day + + #def test_del_binfo(self): + # """Test deleting the build information from a Node + # """ + # node = SCons.Node.Node() + # node.binfo = None + # node.del_binfo() + # assert not hasattr(node, 'binfo'), node + + def test_store_info(self): + """Test calling the method to store build information + """ + node = SCons.Node.Node() + node.store_info() + + def test_get_stored_info(self): + """Test calling the method to fetch stored build information + """ + node = SCons.Node.Node() + result = node.get_stored_info() + assert result is None, result + + def test_set_always_build(self): + """Test setting a Node's always_build value + """ + node = SCons.Node.Node() + node.set_always_build() + assert node.always_build + node.set_always_build(3) + assert node.always_build == 3 + + def test_set_noclean(self): + """Test setting a Node's noclean value + """ + node = SCons.Node.Node() + node.set_noclean() + assert node.noclean == 1, node.noclean + node.set_noclean(7) + assert node.noclean == 1, node.noclean + node.set_noclean(0) + assert node.noclean == 0, node.noclean + node.set_noclean(None) + assert node.noclean == 0, node.noclean + + def test_set_precious(self): + """Test setting a Node's precious value + """ + node = SCons.Node.Node() + node.set_precious() + assert node.precious + node.set_precious(7) + assert node.precious == 7 + + def test_exists(self): + """Test evaluating whether a Node exists. + """ + node = SCons.Node.Node() + e = node.exists() + assert e == 1, e + + def test_exists(self): + """Test evaluating whether a Node exists locally or in a repository. + """ + node = SCons.Node.Node() + e = node.rexists() + assert e == 1, e + + class MyNode(SCons.Node.Node): + def exists(self): + return 'xyz' + + node = MyNode() + e = node.rexists() + assert e == 'xyz', e + + def test_prepare(self): + """Test preparing a node to be built + + By extension, this also tests the missing() method. + """ + node = SCons.Node.Node() + + n1 = SCons.Node.Node() + n1.builder_set(Builder()) + node.implicit = [] + node.implicit_set = set() + node._add_child(node.implicit, node.implicit_set, [n1]) + + node.prepare() # should not throw an exception + + n2 = SCons.Node.Node() + n2.linked = 1 + node.implicit = [] + node.implicit_set = set() + node._add_child(node.implicit, node.implicit_set, [n2]) + + node.prepare() # should not throw an exception + + n3 = SCons.Node.Node() + node.implicit = [] + node.implicit_set = set() + node._add_child(node.implicit, node.implicit_set, [n3]) + + node.prepare() # should not throw an exception + + class MyNode(SCons.Node.Node): + def rexists(self): + return None + n4 = MyNode() + node.implicit = [] + node.implicit_set = set() + node._add_child(node.implicit, node.implicit_set, [n4]) + exc_caught = 0 + try: + node.prepare() + except SCons.Errors.StopError: + exc_caught = 1 + assert exc_caught, "did not catch expected StopError" + + def test_add_dependency(self): + """Test adding dependencies to a Node's list. + """ + node = SCons.Node.Node() + assert node.depends == [] + + zero = SCons.Node.Node() + + one = SCons.Node.Node() + two = SCons.Node.Node() + three = SCons.Node.Node() + four = SCons.Node.Node() + five = SCons.Node.Node() + six = SCons.Node.Node() + + node.add_dependency([zero]) + assert node.depends == [zero] + node.add_dependency([one]) + assert node.depends == [zero, one] + node.add_dependency([two, three]) + assert node.depends == [zero, one, two, three] + node.add_dependency([three, four, one]) + assert node.depends == [zero, one, two, three, four] + + try: + node.add_depends([[five, six]]) + except: + pass + else: + raise "did not catch expected exception" + assert node.depends == [zero, one, two, three, four] + + + def test_add_source(self): + """Test adding sources to a Node's list. + """ + node = SCons.Node.Node() + assert node.sources == [] + + zero = SCons.Node.Node() + one = SCons.Node.Node() + two = SCons.Node.Node() + three = SCons.Node.Node() + four = SCons.Node.Node() + five = SCons.Node.Node() + six = SCons.Node.Node() + + node.add_source([zero]) + assert node.sources == [zero] + node.add_source([one]) + assert node.sources == [zero, one] + node.add_source([two, three]) + assert node.sources == [zero, one, two, three] + node.add_source([three, four, one]) + assert node.sources == [zero, one, two, three, four] + + try: + node.add_source([[five, six]]) + except: + pass + else: + raise "did not catch expected exception" + assert node.sources == [zero, one, two, three, four], node.sources + + def test_add_ignore(self): + """Test adding files whose dependencies should be ignored. + """ + node = SCons.Node.Node() + assert node.ignore == [] + + zero = SCons.Node.Node() + one = SCons.Node.Node() + two = SCons.Node.Node() + three = SCons.Node.Node() + four = SCons.Node.Node() + five = SCons.Node.Node() + six = SCons.Node.Node() + + node.add_ignore([zero]) + assert node.ignore == [zero] + node.add_ignore([one]) + assert node.ignore == [zero, one] + node.add_ignore([two, three]) + assert node.ignore == [zero, one, two, three] + node.add_ignore([three, four, one]) + assert node.ignore == [zero, one, two, three, four] + + try: + node.add_ignore([[five, six]]) + except: + pass + else: + raise "did not catch expected exception" + assert node.ignore == [zero, one, two, three, four] + + def test_get_found_includes(self): + """Test the default get_found_includes() method + """ + node = SCons.Node.Node() + target = SCons.Node.Node() + e = Environment() + deps = node.get_found_includes(e, None, target) + assert deps == [], deps + + def test_get_implicit_deps(self): + """Test get_implicit_deps() + """ + node = MyNode("nnn") + target = MyNode("ttt") + env = Environment() + + # No scanner at all returns [] + deps = node.get_implicit_deps(env, None, target) + assert deps == [], deps + + s = Scanner() + d1 = MyNode("d1") + d2 = MyNode("d2") + node.found_includes = [d1, d2] + + # Simple return of the found includes + deps = node.get_implicit_deps(env, s, target) + assert deps == [d1, d2], deps + + # By default, our fake scanner recurses + e = MyNode("eee") + f = MyNode("fff") + g = MyNode("ggg") + d1.found_includes = [e, f] + d2.found_includes = [e, f] + f.found_includes = [g] + deps = node.get_implicit_deps(env, s, target) + assert deps == [d1, d2, e, f, g], map(str, deps) + + # Recursive scanning eliminates duplicates + e.found_includes = [f] + deps = node.get_implicit_deps(env, s, target) + assert deps == [d1, d2, e, f, g], map(str, deps) + + # Scanner method can select specific nodes to recurse + def no_fff(nodes): + return filter(lambda n: str(n)[0] != 'f', nodes) + s.recurse_nodes = no_fff + deps = node.get_implicit_deps(env, s, target) + assert deps == [d1, d2, e, f], map(str, deps) + + # Scanner method can short-circuit recursing entirely + s.recurse_nodes = lambda nodes: [] + deps = node.get_implicit_deps(env, s, target) + assert deps == [d1, d2], map(str, deps) + + def test_get_env_scanner(self): + """Test fetching the environment scanner for a Node + """ + node = SCons.Node.Node() + scanner = Scanner() + env = Environment(SCANNERS = [scanner]) + s = node.get_env_scanner(env) + assert s == scanner, s + s = node.get_env_scanner(env, {'X':1}) + assert s == scanner, s + + def test_get_target_scanner(self): + """Test fetching the target scanner for a Node + """ + s = Scanner() + b = Builder() + b.target_scanner = s + n = SCons.Node.Node() + n.builder = b + x = n.get_target_scanner() + assert x is s, x + + def test_get_source_scanner(self): + """Test fetching the source scanner for a Node + """ + target = SCons.Node.Node() + source = SCons.Node.Node() + s = target.get_source_scanner(source) + assert isinstance(s, SCons.Util.Null), s + + ts1 = Scanner() + ts2 = Scanner() + ts3 = Scanner() + + class Builder1(Builder): + def __call__(self, source): + r = SCons.Node.Node() + r.builder = self + return [r] + class Builder2(Builder1): + def __init__(self, scanner): + self.source_scanner = scanner + + builder = Builder2(ts1) + + targets = builder([source]) + s = targets[0].get_source_scanner(source) + assert s is ts1, s + + target.builder_set(Builder2(ts1)) + target.builder.source_scanner = ts2 + s = target.get_source_scanner(source) + assert s is ts2, s + + builder = Builder1(env=Environment(SCANNERS = [ts3])) + + targets = builder([source]) + + s = targets[0].get_source_scanner(source) + assert s is ts3, s + + + def test_scan(self): + """Test Scanner functionality + """ + env = Environment() + node = MyNode("nnn") + node.builder = Builder() + node.env_set(env) + x = MyExecutor(env, [node]) + + s = Scanner() + d = MyNode("ddd") + node.found_includes = [d] + + node.builder.target_scanner = s + assert node.implicit is None + + node.scan() + assert s.called + assert node.implicit == [d], node.implicit + + # Check that scanning a node with some stored implicit + # dependencies resets internal attributes appropriately + # if the stored dependencies need recalculation. + class StoredNode(MyNode): + def get_stored_implicit(self): + return [MyNode('implicit1'), MyNode('implicit2')] + + save_implicit_cache = SCons.Node.implicit_cache + save_implicit_deps_changed = SCons.Node.implicit_deps_changed + save_implicit_deps_unchanged = SCons.Node.implicit_deps_unchanged + SCons.Node.implicit_cache = 1 + SCons.Node.implicit_deps_changed = None + SCons.Node.implicit_deps_unchanged = None + try: + sn = StoredNode("eee") + sn.builder_set(Builder()) + sn.builder.target_scanner = s + + sn.scan() + + assert sn.implicit == [], sn.implicit + assert sn.children() == [], sn.children() + + finally: + SCons.Node.implicit_cache = save_implicit_cache + SCons.Node.implicit_deps_changed = save_implicit_deps_changed + SCons.Node.implicit_deps_unchanged = save_implicit_deps_unchanged + + def test_scanner_key(self): + """Test that a scanner_key() method exists""" + assert SCons.Node.Node().scanner_key() is None + + def test_children(self): + """Test fetching the non-ignored "children" of a Node. + """ + node = SCons.Node.Node() + n1 = SCons.Node.Node() + n2 = SCons.Node.Node() + n3 = SCons.Node.Node() + n4 = SCons.Node.Node() + n5 = SCons.Node.Node() + n6 = SCons.Node.Node() + n7 = SCons.Node.Node() + n8 = SCons.Node.Node() + n9 = SCons.Node.Node() + n10 = SCons.Node.Node() + n11 = SCons.Node.Node() + n12 = SCons.Node.Node() + + node.add_source([n1, n2, n3]) + node.add_dependency([n4, n5, n6]) + node.implicit = [] + node.implicit_set = set() + node._add_child(node.implicit, node.implicit_set, [n7, n8, n9]) + node._add_child(node.implicit, node.implicit_set, [n10, n11, n12]) + node.add_ignore([n2, n5, n8, n11]) + + kids = node.children() + for kid in [n1, n3, n4, n6, n7, n9, n10, n12]: + assert kid in kids, kid + for kid in [n2, n5, n8, n11]: + assert not kid in kids, kid + + def test_all_children(self): + """Test fetching all the "children" of a Node. + """ + node = SCons.Node.Node() + n1 = SCons.Node.Node() + n2 = SCons.Node.Node() + n3 = SCons.Node.Node() + n4 = SCons.Node.Node() + n5 = SCons.Node.Node() + n6 = SCons.Node.Node() + n7 = SCons.Node.Node() + n8 = SCons.Node.Node() + n9 = SCons.Node.Node() + n10 = SCons.Node.Node() + n11 = SCons.Node.Node() + n12 = SCons.Node.Node() + + node.add_source([n1, n2, n3]) + node.add_dependency([n4, n5, n6]) + node.implicit = [] + node.implicit_set = set() + node._add_child(node.implicit, node.implicit_set, [n7, n8, n9]) + node._add_child(node.implicit, node.implicit_set, [n10, n11, n12]) + node.add_ignore([n2, n5, n8, n11]) + + kids = node.all_children() + for kid in [n1, n2, n3, n4, n5, n6, n7, n8, n9, n10, n11, n12]: + assert kid in kids, kid + + def test_state(self): + """Test setting and getting the state of a node + """ + node = SCons.Node.Node() + assert node.get_state() == SCons.Node.no_state + node.set_state(SCons.Node.executing) + assert node.get_state() == SCons.Node.executing + assert SCons.Node.pending < SCons.Node.executing + assert SCons.Node.executing < SCons.Node.up_to_date + assert SCons.Node.up_to_date < SCons.Node.executed + assert SCons.Node.executed < SCons.Node.failed + + def test_walker(self): + """Test walking a Node tree. + """ + + n1 = MyNode("n1") + + nw = SCons.Node.Walker(n1) + assert not nw.is_done() + assert nw.next().name == "n1" + assert nw.is_done() + assert nw.next() is None + + n2 = MyNode("n2") + n3 = MyNode("n3") + n1.add_source([n2, n3]) + + nw = SCons.Node.Walker(n1) + n = nw.next() + assert n.name == "n2", n.name + n = nw.next() + assert n.name == "n3", n.name + n = nw.next() + assert n.name == "n1", n.name + n = nw.next() + assert n is None, n + + n4 = MyNode("n4") + n5 = MyNode("n5") + n6 = MyNode("n6") + n7 = MyNode("n7") + n2.add_source([n4, n5]) + n3.add_dependency([n6, n7]) + + nw = SCons.Node.Walker(n1) + assert nw.next().name == "n4" + assert nw.next().name == "n5" + assert nw.history.has_key(n2) + assert nw.next().name == "n2" + assert nw.next().name == "n6" + assert nw.next().name == "n7" + assert nw.history.has_key(n3) + assert nw.next().name == "n3" + assert nw.history.has_key(n1) + assert nw.next().name == "n1" + assert nw.next() is None + + n8 = MyNode("n8") + n8.add_dependency([n3]) + n7.add_dependency([n8]) + + def cycle(node, stack): + global cycle_detected + cycle_detected = 1 + + global cycle_detected + + nw = SCons.Node.Walker(n3, cycle_func = cycle) + n = nw.next() + assert n.name == "n6", n.name + n = nw.next() + assert n.name == "n8", n.name + assert cycle_detected + cycle_detected = None + n = nw.next() + assert n.name == "n7", n.name + n = nw.next() + assert nw.next() is None + + def test_abspath(self): + """Test the get_abspath() method.""" + n = MyNode("foo") + assert n.get_abspath() == str(n), n.get_abspath() + + def test_for_signature(self): + """Test the for_signature() method.""" + n = MyNode("foo") + assert n.for_signature() == str(n), n.get_abspath() + + def test_get_string(self): + """Test the get_string() method.""" + class TestNode(MyNode): + def __init__(self, name, sig): + MyNode.__init__(self, name) + self.sig = sig + + def for_signature(self): + return self.sig + + n = TestNode("foo", "bar") + assert n.get_string(0) == "foo", n.get_string(0) + assert n.get_string(1) == "bar", n.get_string(1) + + def test_literal(self): + """Test the is_literal() function.""" + n=SCons.Node.Node() + assert n.is_literal() + + def test_Annotate(self): + """Test using an interface-specific Annotate function.""" + def my_annotate(node, self=self): + node.annotation = self.node_string + + save_Annotate = SCons.Node.Annotate + SCons.Node.Annotate = my_annotate + + try: + self.node_string = '#1' + n = SCons.Node.Node() + assert n.annotation == '#1', n.annotation + + self.node_string = '#2' + n = SCons.Node.Node() + assert n.annotation == '#2', n.annotation + finally: + SCons.Node.Annotate = save_Annotate + + def test_clear(self): + """Test clearing all cached state information.""" + n = SCons.Node.Node() + + n.set_state(3) + n.binfo = 'xyz' + n.includes = 'testincludes' + n.found_include = {'testkey':'testvalue'} + n.implicit = 'testimplicit' + + x = MyExecutor() + n.set_executor(x) + + n.clear() + + assert n.includes is None, n.includes + assert x.cleaned_up + + def test_get_subst_proxy(self): + """Test the get_subst_proxy method.""" + n = MyNode("test") + + assert n.get_subst_proxy() == n, n.get_subst_proxy() + + def test_new_binfo(self): + """Test the new_binfo() method""" + n = SCons.Node.Node() + result = n.new_binfo() + assert isinstance(result, SCons.Node.BuildInfoBase), result + + def test_get_suffix(self): + """Test the base Node get_suffix() method""" + n = SCons.Node.Node() + s = n.get_suffix() + assert s == '', s + + def test_postprocess(self): + """Test calling the base Node postprocess() method""" + n = SCons.Node.Node() + n.waiting_parents = set( ['foo','bar'] ) + + n.postprocess() + assert n.waiting_parents == set(), n.waiting_parents + + def test_add_to_waiting_parents(self): + """Test the add_to_waiting_parents() method""" + n1 = SCons.Node.Node() + n2 = SCons.Node.Node() + assert n1.waiting_parents == set(), n1.waiting_parents + r = n1.add_to_waiting_parents(n2) + assert r == 1, r + assert n1.waiting_parents == set((n2,)), n1.waiting_parents + r = n1.add_to_waiting_parents(n2) + assert r == 0, r + + +class NodeListTestCase(unittest.TestCase): + def test___str__(self): + """Test""" + n1 = MyNode("n1") + n2 = MyNode("n2") + n3 = MyNode("n3") + nl = SCons.Node.NodeList([n3, n2, n1]) + + l = [1] + ul = UserList.UserList([2]) + try: + l.extend(ul) + except TypeError: + # An older version of Python (*cough* 1.5.2 *cough*) + # that doesn't allow UserList objects to extend lists. + pass + else: + s = str(nl) + assert s == "['n3', 'n2', 'n1']", s + + r = repr(nl) + r = re.sub('at (0[xX])?[0-9a-fA-F]+', 'at 0x', r) + # Don't care about ancestry: just leaf value of MyNode + r = re.sub('<.*?\.MyNode', '<MyNode', r) + # New-style classes report as "object"; classic classes report + # as "instance"... + r = re.sub("object", "instance", r) + l = string.join(["<MyNode instance at 0x>"]*3, ", ") + assert r == '[%s]' % l, r + + + +if __name__ == "__main__": + suite = unittest.TestSuite() + tclasses = [ BuildInfoBaseTestCase, + NodeInfoBaseTestCase, + NodeTestCase, + NodeListTestCase ] + for tclass in tclasses: + names = unittest.getTestCaseNames(tclass, 'test_') + suite.addTests(map(tclass, names)) + if not unittest.TextTestRunner().run(suite).wasSuccessful(): + sys.exit(1) + +# Local Variables: +# tab-width:4 +# indent-tabs-mode:nil +# End: +# vim: set expandtab tabstop=4 shiftwidth=4: diff --git a/src/engine/SCons/Node/Python.py b/src/engine/SCons/Node/Python.py new file mode 100644 index 0000000..941b4ed --- /dev/null +++ b/src/engine/SCons/Node/Python.py @@ -0,0 +1,128 @@ +"""scons.Node.Python + +Python nodes. + +""" + +# +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 The SCons Foundation +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY +# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# + +__revision__ = "src/engine/SCons/Node/Python.py 4577 2009/12/27 19:44:43 scons" + +import SCons.Node + +class ValueNodeInfo(SCons.Node.NodeInfoBase): + current_version_id = 1 + + field_list = ['csig'] + + def str_to_node(self, s): + return Value(s) + +class ValueBuildInfo(SCons.Node.BuildInfoBase): + current_version_id = 1 + +class Value(SCons.Node.Node): + """A class for Python variables, typically passed on the command line + or generated by a script, but not from a file or some other source. + """ + + NodeInfo = ValueNodeInfo + BuildInfo = ValueBuildInfo + + def __init__(self, value, built_value=None): + SCons.Node.Node.__init__(self) + self.value = value + if built_value is not None: + self.built_value = built_value + + def str_for_display(self): + return repr(self.value) + + def __str__(self): + return str(self.value) + + def make_ready(self): + self.get_csig() + + def build(self, **kw): + if not hasattr(self, 'built_value'): + apply (SCons.Node.Node.build, (self,), kw) + + is_up_to_date = SCons.Node.Node.children_are_up_to_date + + def is_under(self, dir): + # Make Value nodes get built regardless of + # what directory scons was run from. Value nodes + # are outside the filesystem: + return 1 + + def write(self, built_value): + """Set the value of the node.""" + self.built_value = built_value + + def read(self): + """Return the value. If necessary, the value is built.""" + self.build() + if not hasattr(self, 'built_value'): + self.built_value = self.value + return self.built_value + + def get_text_contents(self): + """By the assumption that the node.built_value is a + deterministic product of the sources, the contents of a Value + are the concatenation of all the contents of its sources. As + the value need not be built when get_contents() is called, we + cannot use the actual node.built_value.""" + ###TODO: something reasonable about universal newlines + contents = str(self.value) + for kid in self.children(None): + contents = contents + kid.get_contents() + return contents + + get_contents = get_text_contents ###TODO should return 'bytes' value + + def changed_since_last_build(self, target, prev_ni): + cur_csig = self.get_csig() + try: + return cur_csig != prev_ni.csig + except AttributeError: + return 1 + + def get_csig(self, calc=None): + """Because we're a Python value node and don't have a real + timestamp, we get to ignore the calculator and just use the + value contents.""" + try: + return self.ninfo.csig + except AttributeError: + pass + contents = self.get_contents() + self.get_ninfo().csig = contents + return contents + +# Local Variables: +# tab-width:4 +# indent-tabs-mode:nil +# End: +# vim: set expandtab tabstop=4 shiftwidth=4: diff --git a/src/engine/SCons/Node/PythonTests.py b/src/engine/SCons/Node/PythonTests.py new file mode 100644 index 0000000..01844ac --- /dev/null +++ b/src/engine/SCons/Node/PythonTests.py @@ -0,0 +1,130 @@ +# +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 The SCons Foundation +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY +# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# + +__revision__ = "src/engine/SCons/Node/PythonTests.py 4577 2009/12/27 19:44:43 scons" + +import sys +import unittest + +import SCons.Errors +import SCons.Node.Python + +class ValueTestCase(unittest.TestCase): + + def test_Value(self): + """Test creating a Value() object + """ + v1 = SCons.Node.Python.Value('a') + assert v1.value == 'a', v1.value + + value2 = 'a' + v2 = SCons.Node.Python.Value(value2) + assert v2.value == value2, v2.value + assert v2.value is value2, v2.value + + assert not v1 is v2 + assert v1.value == v2.value + + v3 = SCons.Node.Python.Value('c', 'cb') + assert v3.built_value == 'cb' + + def test_build(self): + """Test "building" a Value Node + """ + class fake_executor: + def __call__(self, node): + node.write('faked') + + v1 = SCons.Node.Python.Value('b', 'built') + v1.executor = fake_executor() + v1.build() + assert v1.built_value == 'built', v1.built_value + + v2 = SCons.Node.Python.Value('b') + v2.executor = fake_executor() + v2.build() + assert v2.built_value == 'faked', v2.built_value + + def test_read(self): + """Test the Value.read() method + """ + v1 = SCons.Node.Python.Value('a') + x = v1.read() + assert x == 'a', x + + def test_write(self): + """Test the Value.write() method + """ + v1 = SCons.Node.Python.Value('a') + assert v1.value == 'a', v1.value + assert not hasattr(v1, 'built_value') + + v1.write('new') + assert v1.value == 'a', v1.value + assert v1.built_value == 'new', v1.built_value + + def test_get_csig(self): + """Test calculating the content signature of a Value() object + """ + v1 = SCons.Node.Python.Value('aaa') + csig = v1.get_csig(None) + assert csig == 'aaa', csig + + v2 = SCons.Node.Python.Value(7) + csig = v2.get_csig(None) + assert csig == '7', csig + + v3 = SCons.Node.Python.Value(None) + csig = v3.get_csig(None) + assert csig == 'None', csig + +class ValueNodeInfoTestCase(unittest.TestCase): + def test___init__(self): + """Test ValueNodeInfo initialization""" + vvv = SCons.Node.Python.Value('vvv') + ni = SCons.Node.Python.ValueNodeInfo(vvv) + +class ValueBuildInfoTestCase(unittest.TestCase): + def test___init__(self): + """Test ValueBuildInfo initialization""" + vvv = SCons.Node.Python.Value('vvv') + bi = SCons.Node.Python.ValueBuildInfo(vvv) + +if __name__ == "__main__": + suite = unittest.TestSuite() + tclasses = [ + ValueTestCase, + ValueBuildInfoTestCase, + ValueNodeInfoTestCase, + ] + for tclass in tclasses: + names = unittest.getTestCaseNames(tclass, 'test_') + suite.addTests(map(tclass, names)) + if not unittest.TextTestRunner().run(suite).wasSuccessful(): + sys.exit(1) + +# Local Variables: +# tab-width:4 +# indent-tabs-mode:nil +# End: +# vim: set expandtab tabstop=4 shiftwidth=4: diff --git a/src/engine/SCons/Node/__init__.py b/src/engine/SCons/Node/__init__.py new file mode 100644 index 0000000..94fd4df --- /dev/null +++ b/src/engine/SCons/Node/__init__.py @@ -0,0 +1,1341 @@ +"""SCons.Node + +The Node package for the SCons software construction utility. + +This is, in many ways, the heart of SCons. + +A Node is where we encapsulate all of the dependency information about +any thing that SCons can build, or about any thing which SCons can use +to build some other thing. The canonical "thing," of course, is a file, +but a Node can also represent something remote (like a web page) or +something completely abstract (like an Alias). + +Each specific type of "thing" is specifically represented by a subclass +of the Node base class: Node.FS.File for files, Node.Alias for aliases, +etc. Dependency information is kept here in the base class, and +information specific to files/aliases/etc. is in the subclass. The +goal, if we've done this correctly, is that any type of "thing" should +be able to depend on any other type of "thing." + +""" + +# +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 The SCons Foundation +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY +# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# + +__revision__ = "src/engine/SCons/Node/__init__.py 4577 2009/12/27 19:44:43 scons" + +import copy +from itertools import chain, izip +import string +import UserList + +from SCons.Debug import logInstanceCreation +import SCons.Executor +import SCons.Memoize +import SCons.Util + +from SCons.Debug import Trace + +def classname(obj): + return string.split(str(obj.__class__), '.')[-1] + +# Node states +# +# These are in "priority" order, so that the maximum value for any +# child/dependency of a node represents the state of that node if +# it has no builder of its own. The canonical example is a file +# system directory, which is only up to date if all of its children +# were up to date. +no_state = 0 +pending = 1 +executing = 2 +up_to_date = 3 +executed = 4 +failed = 5 + +StateString = { + 0 : "no_state", + 1 : "pending", + 2 : "executing", + 3 : "up_to_date", + 4 : "executed", + 5 : "failed", +} + +# controls whether implicit dependencies are cached: +implicit_cache = 0 + +# controls whether implicit dep changes are ignored: +implicit_deps_unchanged = 0 + +# controls whether the cached implicit deps are ignored: +implicit_deps_changed = 0 + +# A variable that can be set to an interface-specific function be called +# to annotate a Node with information about its creation. +def do_nothing(node): pass + +Annotate = do_nothing + +# Classes for signature info for Nodes. + +class NodeInfoBase: + """ + The generic base class for signature information for a Node. + + Node subclasses should subclass NodeInfoBase to provide their own + logic for dealing with their own Node-specific signature information. + """ + current_version_id = 1 + def __init__(self, node): + # Create an object attribute from the class attribute so it ends up + # in the pickled data in the .sconsign file. + self._version_id = self.current_version_id + def update(self, node): + try: + field_list = self.field_list + except AttributeError: + return + for f in field_list: + try: + delattr(self, f) + except AttributeError: + pass + try: + func = getattr(node, 'get_' + f) + except AttributeError: + pass + else: + setattr(self, f, func()) + def convert(self, node, val): + pass + def merge(self, other): + self.__dict__.update(other.__dict__) + def format(self, field_list=None, names=0): + if field_list is None: + try: + field_list = self.field_list + except AttributeError: + field_list = self.__dict__.keys() + field_list.sort() + fields = [] + for field in field_list: + try: + f = getattr(self, field) + except AttributeError: + f = None + f = str(f) + if names: + f = field + ': ' + f + fields.append(f) + return fields + +class BuildInfoBase: + """ + The generic base class for build information for a Node. + + This is what gets stored in a .sconsign file for each target file. + It contains a NodeInfo instance for this node (signature information + that's specific to the type of Node) and direct attributes for the + generic build stuff we have to track: sources, explicit dependencies, + implicit dependencies, and action information. + """ + current_version_id = 1 + def __init__(self, node): + # Create an object attribute from the class attribute so it ends up + # in the pickled data in the .sconsign file. + self._version_id = self.current_version_id + self.bsourcesigs = [] + self.bdependsigs = [] + self.bimplicitsigs = [] + self.bactsig = None + def merge(self, other): + self.__dict__.update(other.__dict__) + +class Node: + """The base Node class, for entities that we know how to + build, or use to build other Nodes. + """ + + if SCons.Memoize.use_memoizer: + __metaclass__ = SCons.Memoize.Memoized_Metaclass + + memoizer_counters = [] + + class Attrs: + pass + + def __init__(self): + if __debug__: logInstanceCreation(self, 'Node.Node') + # Note that we no longer explicitly initialize a self.builder + # attribute to None here. That's because the self.builder + # attribute may be created on-the-fly later by a subclass (the + # canonical example being a builder to fetch a file from a + # source code system like CVS or Subversion). + + # Each list of children that we maintain is accompanied by a + # dictionary used to look up quickly whether a node is already + # present in the list. Empirical tests showed that it was + # fastest to maintain them as side-by-side Node attributes in + # this way, instead of wrapping up each list+dictionary pair in + # a class. (Of course, we could always still do that in the + # future if we had a good reason to...). + self.sources = [] # source files used to build node + self.sources_set = set() + self._specific_sources = False + self.depends = [] # explicit dependencies (from Depends) + self.depends_set = set() + self.ignore = [] # dependencies to ignore + self.ignore_set = set() + self.prerequisites = SCons.Util.UniqueList() + self.implicit = None # implicit (scanned) dependencies (None means not scanned yet) + self.waiting_parents = set() + self.waiting_s_e = set() + self.ref_count = 0 + self.wkids = None # Kids yet to walk, when it's an array + + self.env = None + self.state = no_state + self.precious = None + self.noclean = 0 + self.nocache = 0 + self.always_build = None + self.includes = None + self.attributes = self.Attrs() # Generic place to stick information about the Node. + self.side_effect = 0 # true iff this node is a side effect + self.side_effects = [] # the side effects of building this target + self.linked = 0 # is this node linked to the variant directory? + + self.clear_memoized_values() + + # Let the interface in which the build engine is embedded + # annotate this Node with its own info (like a description of + # what line in what file created the node, for example). + Annotate(self) + + def disambiguate(self, must_exist=None): + return self + + def get_suffix(self): + return '' + + memoizer_counters.append(SCons.Memoize.CountValue('get_build_env')) + + def get_build_env(self): + """Fetch the appropriate Environment to build this node. + """ + try: + return self._memo['get_build_env'] + except KeyError: + pass + result = self.get_executor().get_build_env() + self._memo['get_build_env'] = result + return result + + def get_build_scanner_path(self, scanner): + """Fetch the appropriate scanner path for this node.""" + return self.get_executor().get_build_scanner_path(scanner) + + def set_executor(self, executor): + """Set the action executor for this node.""" + self.executor = executor + + def get_executor(self, create=1): + """Fetch the action executor for this node. Create one if + there isn't already one, and requested to do so.""" + try: + executor = self.executor + except AttributeError: + if not create: + raise + try: + act = self.builder.action + except AttributeError: + executor = SCons.Executor.Null(targets=[self]) + else: + executor = SCons.Executor.Executor(act, + self.env or self.builder.env, + [self.builder.overrides], + [self], + self.sources) + self.executor = executor + return executor + + def executor_cleanup(self): + """Let the executor clean up any cached information.""" + try: + executor = self.get_executor(create=None) + except AttributeError: + pass + else: + executor.cleanup() + + def reset_executor(self): + "Remove cached executor; forces recompute when needed." + try: + delattr(self, 'executor') + except AttributeError: + pass + + def push_to_cache(self): + """Try to push a node into a cache + """ + pass + + def retrieve_from_cache(self): + """Try to retrieve the node's content from a cache + + This method is called from multiple threads in a parallel build, + so only do thread safe stuff here. Do thread unsafe stuff in + built(). + + Returns true iff the node was successfully retrieved. + """ + return 0 + + # + # Taskmaster interface subsystem + # + + def make_ready(self): + """Get a Node ready for evaluation. + + This is called before the Taskmaster decides if the Node is + up-to-date or not. Overriding this method allows for a Node + subclass to be disambiguated if necessary, or for an implicit + source builder to be attached. + """ + pass + + def prepare(self): + """Prepare for this Node to be built. + + This is called after the Taskmaster has decided that the Node + is out-of-date and must be rebuilt, but before actually calling + the method to build the Node. + + This default implementation checks that explicit or implicit + dependencies either exist or are derived, and initializes the + BuildInfo structure that will hold the information about how + this node is, uh, built. + + (The existence of source files is checked separately by the + Executor, which aggregates checks for all of the targets built + by a specific action.) + + Overriding this method allows for for a Node subclass to remove + the underlying file from the file system. Note that subclass + methods should call this base class method to get the child + check and the BuildInfo structure. + """ + for d in self.depends: + if d.missing(): + msg = "Explicit dependency `%s' not found, needed by target `%s'." + raise SCons.Errors.StopError, msg % (d, self) + if self.implicit is not None: + for i in self.implicit: + if i.missing(): + msg = "Implicit dependency `%s' not found, needed by target `%s'." + raise SCons.Errors.StopError, msg % (i, self) + self.binfo = self.get_binfo() + + def build(self, **kw): + """Actually build the node. + + This is called by the Taskmaster after it's decided that the + Node is out-of-date and must be rebuilt, and after the prepare() + method has gotten everything, uh, prepared. + + This method is called from multiple threads in a parallel build, + so only do thread safe stuff here. Do thread unsafe stuff + in built(). + + """ + try: + apply(self.get_executor(), (self,), kw) + except SCons.Errors.BuildError, e: + e.node = self + raise + + def built(self): + """Called just after this node is successfully built.""" + + # Clear the implicit dependency caches of any Nodes + # waiting for this Node to be built. + for parent in self.waiting_parents: + parent.implicit = None + + self.clear() + + self.ninfo.update(self) + + def visited(self): + """Called just after this node has been visited (with or + without a build).""" + try: + binfo = self.binfo + except AttributeError: + # Apparently this node doesn't need build info, so + # don't bother calculating or storing it. + pass + else: + self.ninfo.update(self) + self.store_info() + + # + # + # + + def add_to_waiting_s_e(self, node): + self.waiting_s_e.add(node) + + def add_to_waiting_parents(self, node): + """ + Returns the number of nodes added to our waiting parents list: + 1 if we add a unique waiting parent, 0 if not. (Note that the + returned values are intended to be used to increment a reference + count, so don't think you can "clean up" this function by using + True and False instead...) + """ + wp = self.waiting_parents + if node in wp: + return 0 + wp.add(node) + return 1 + + def postprocess(self): + """Clean up anything we don't need to hang onto after we've + been built.""" + self.executor_cleanup() + self.waiting_parents = set() + + def clear(self): + """Completely clear a Node of all its cached state (so that it + can be re-evaluated by interfaces that do continuous integration + builds). + """ + # The del_binfo() call here isn't necessary for normal execution, + # but is for interactive mode, where we might rebuild the same + # target and need to start from scratch. + self.del_binfo() + self.clear_memoized_values() + self.ninfo = self.new_ninfo() + self.executor_cleanup() + try: + delattr(self, '_calculated_sig') + except AttributeError: + pass + self.includes = None + + def clear_memoized_values(self): + self._memo = {} + + def builder_set(self, builder): + self.builder = builder + try: + del self.executor + except AttributeError: + pass + + def has_builder(self): + """Return whether this Node has a builder or not. + + In Boolean tests, this turns out to be a *lot* more efficient + than simply examining the builder attribute directly ("if + node.builder: ..."). When the builder attribute is examined + directly, it ends up calling __getattr__ for both the __len__ + and __nonzero__ attributes on instances of our Builder Proxy + class(es), generating a bazillion extra calls and slowing + things down immensely. + """ + try: + b = self.builder + except AttributeError: + # There was no explicit builder for this Node, so initialize + # the self.builder attribute to None now. + b = self.builder = None + return b is not None + + def set_explicit(self, is_explicit): + self.is_explicit = is_explicit + + def has_explicit_builder(self): + """Return whether this Node has an explicit builder + + This allows an internal Builder created by SCons to be marked + non-explicit, so that it can be overridden by an explicit + builder that the user supplies (the canonical example being + directories).""" + try: + return self.is_explicit + except AttributeError: + self.is_explicit = None + return self.is_explicit + + def get_builder(self, default_builder=None): + """Return the set builder, or a specified default value""" + try: + return self.builder + except AttributeError: + return default_builder + + multiple_side_effect_has_builder = has_builder + + def is_derived(self): + """ + Returns true iff this node is derived (i.e. built). + + This should return true only for nodes whose path should be in + the variant directory when duplicate=0 and should contribute their build + signatures when they are used as source files to other derived files. For + example: source with source builders are not derived in this sense, + and hence should not return true. + """ + return self.has_builder() or self.side_effect + + def alter_targets(self): + """Return a list of alternate targets for this Node. + """ + return [], None + + def get_found_includes(self, env, scanner, path): + """Return the scanned include lines (implicit dependencies) + found in this node. + + The default is no implicit dependencies. We expect this method + to be overridden by any subclass that can be scanned for + implicit dependencies. + """ + return [] + + def get_implicit_deps(self, env, scanner, path): + """Return a list of implicit dependencies for this node. + + This method exists to handle recursive invocation of the scanner + on the implicit dependencies returned by the scanner, if the + scanner's recursive flag says that we should. + """ + if not scanner: + return [] + + # Give the scanner a chance to select a more specific scanner + # for this Node. + #scanner = scanner.select(self) + + nodes = [self] + seen = {} + seen[self] = 1 + deps = [] + while nodes: + n = nodes.pop(0) + d = filter(lambda x, seen=seen: not seen.has_key(x), + n.get_found_includes(env, scanner, path)) + if d: + deps.extend(d) + for n in d: + seen[n] = 1 + nodes.extend(scanner.recurse_nodes(d)) + + return deps + + def get_env_scanner(self, env, kw={}): + return env.get_scanner(self.scanner_key()) + + def get_target_scanner(self): + return self.builder.target_scanner + + def get_source_scanner(self, node): + """Fetch the source scanner for the specified node + + NOTE: "self" is the target being built, "node" is + the source file for which we want to fetch the scanner. + + Implies self.has_builder() is true; again, expect to only be + called from locations where this is already verified. + + This function may be called very often; it attempts to cache + the scanner found to improve performance. + """ + scanner = None + try: + scanner = self.builder.source_scanner + except AttributeError: + pass + if not scanner: + # The builder didn't have an explicit scanner, so go look up + # a scanner from env['SCANNERS'] based on the node's scanner + # key (usually the file extension). + scanner = self.get_env_scanner(self.get_build_env()) + if scanner: + scanner = scanner.select(node) + return scanner + + def add_to_implicit(self, deps): + if not hasattr(self, 'implicit') or self.implicit is None: + self.implicit = [] + self.implicit_set = set() + self._children_reset() + self._add_child(self.implicit, self.implicit_set, deps) + + def scan(self): + """Scan this node's dependents for implicit dependencies.""" + # Don't bother scanning non-derived files, because we don't + # care what their dependencies are. + # Don't scan again, if we already have scanned. + if self.implicit is not None: + return + self.implicit = [] + self.implicit_set = set() + self._children_reset() + if not self.has_builder(): + return + + build_env = self.get_build_env() + executor = self.get_executor() + + # Here's where we implement --implicit-cache. + if implicit_cache and not implicit_deps_changed: + implicit = self.get_stored_implicit() + if implicit is not None: + # We now add the implicit dependencies returned from the + # stored .sconsign entry to have already been converted + # to Nodes for us. (We used to run them through a + # source_factory function here.) + + # Update all of the targets with them. This + # essentially short-circuits an N*M scan of the + # sources for each individual target, which is a hell + # of a lot more efficient. + for tgt in executor.get_all_targets(): + tgt.add_to_implicit(implicit) + + if implicit_deps_unchanged or self.is_up_to_date(): + return + # one of this node's sources has changed, + # so we must recalculate the implicit deps: + self.implicit = [] + self.implicit_set = set() + + # Have the executor scan the sources. + executor.scan_sources(self.builder.source_scanner) + + # If there's a target scanner, have the executor scan the target + # node itself and associated targets that might be built. + scanner = self.get_target_scanner() + if scanner: + executor.scan_targets(scanner) + + def scanner_key(self): + return None + + def select_scanner(self, scanner): + """Selects a scanner for this Node. + + This is a separate method so it can be overridden by Node + subclasses (specifically, Node.FS.Dir) that *must* use their + own Scanner and don't select one the Scanner.Selector that's + configured for the target. + """ + return scanner.select(self) + + def env_set(self, env, safe=0): + if safe and self.env: + return + self.env = env + + # + # SIGNATURE SUBSYSTEM + # + + NodeInfo = NodeInfoBase + BuildInfo = BuildInfoBase + + def new_ninfo(self): + ninfo = self.NodeInfo(self) + return ninfo + + def get_ninfo(self): + try: + return self.ninfo + except AttributeError: + self.ninfo = self.new_ninfo() + return self.ninfo + + def new_binfo(self): + binfo = self.BuildInfo(self) + return binfo + + def get_binfo(self): + """ + Fetch a node's build information. + + node - the node whose sources will be collected + cache - alternate node to use for the signature cache + returns - the build signature + + This no longer handles the recursive descent of the + node's children's signatures. We expect that they're + already built and updated by someone else, if that's + what's wanted. + """ + try: + return self.binfo + except AttributeError: + pass + + binfo = self.new_binfo() + self.binfo = binfo + + executor = self.get_executor() + ignore_set = self.ignore_set + + if self.has_builder(): + binfo.bact = str(executor) + binfo.bactsig = SCons.Util.MD5signature(executor.get_contents()) + + if self._specific_sources: + sources = [] + for s in self.sources: + if s not in ignore_set: + sources.append(s) + else: + sources = executor.get_unignored_sources(self, self.ignore) + seen = set() + bsources = [] + bsourcesigs = [] + for s in sources: + if not s in seen: + seen.add(s) + bsources.append(s) + bsourcesigs.append(s.get_ninfo()) + binfo.bsources = bsources + binfo.bsourcesigs = bsourcesigs + + depends = self.depends + dependsigs = [] + for d in depends: + if d not in ignore_set: + dependsigs.append(d.get_ninfo()) + binfo.bdepends = depends + binfo.bdependsigs = dependsigs + + implicit = self.implicit or [] + implicitsigs = [] + for i in implicit: + if i not in ignore_set: + implicitsigs.append(i.get_ninfo()) + binfo.bimplicit = implicit + binfo.bimplicitsigs = implicitsigs + + return binfo + + def del_binfo(self): + """Delete the build info from this node.""" + try: + delattr(self, 'binfo') + except AttributeError: + pass + + def get_csig(self): + try: + return self.ninfo.csig + except AttributeError: + ninfo = self.get_ninfo() + ninfo.csig = SCons.Util.MD5signature(self.get_contents()) + return self.ninfo.csig + + def get_cachedir_csig(self): + return self.get_csig() + + def store_info(self): + """Make the build signature permanent (that is, store it in the + .sconsign file or equivalent).""" + pass + + def do_not_store_info(self): + pass + + def get_stored_info(self): + return None + + def get_stored_implicit(self): + """Fetch the stored implicit dependencies""" + return None + + # + # + # + + def set_precious(self, precious = 1): + """Set the Node's precious value.""" + self.precious = precious + + def set_noclean(self, noclean = 1): + """Set the Node's noclean value.""" + # Make sure noclean is an integer so the --debug=stree + # output in Util.py can use it as an index. + self.noclean = noclean and 1 or 0 + + def set_nocache(self, nocache = 1): + """Set the Node's nocache value.""" + # Make sure nocache is an integer so the --debug=stree + # output in Util.py can use it as an index. + self.nocache = nocache and 1 or 0 + + def set_always_build(self, always_build = 1): + """Set the Node's always_build value.""" + self.always_build = always_build + + def exists(self): + """Does this node exists?""" + # All node exist by default: + return 1 + + def rexists(self): + """Does this node exist locally or in a repositiory?""" + # There are no repositories by default: + return self.exists() + + def missing(self): + return not self.is_derived() and \ + not self.linked and \ + not self.rexists() + + def remove(self): + """Remove this Node: no-op by default.""" + return None + + def add_dependency(self, depend): + """Adds dependencies.""" + try: + self._add_child(self.depends, self.depends_set, depend) + except TypeError, e: + e = e.args[0] + if SCons.Util.is_List(e): + s = map(str, e) + else: + s = str(e) + raise SCons.Errors.UserError("attempted to add a non-Node dependency to %s:\n\t%s is a %s, not a Node" % (str(self), s, type(e))) + + def add_prerequisite(self, prerequisite): + """Adds prerequisites""" + self.prerequisites.extend(prerequisite) + self._children_reset() + + def add_ignore(self, depend): + """Adds dependencies to ignore.""" + try: + self._add_child(self.ignore, self.ignore_set, depend) + except TypeError, e: + e = e.args[0] + if SCons.Util.is_List(e): + s = map(str, e) + else: + s = str(e) + raise SCons.Errors.UserError("attempted to ignore a non-Node dependency of %s:\n\t%s is a %s, not a Node" % (str(self), s, type(e))) + + def add_source(self, source): + """Adds sources.""" + if self._specific_sources: + return + try: + self._add_child(self.sources, self.sources_set, source) + except TypeError, e: + e = e.args[0] + if SCons.Util.is_List(e): + s = map(str, e) + else: + s = str(e) + raise SCons.Errors.UserError("attempted to add a non-Node as source of %s:\n\t%s is a %s, not a Node" % (str(self), s, type(e))) + + def _add_child(self, collection, set, child): + """Adds 'child' to 'collection', first checking 'set' to see if it's + already present.""" + #if type(child) is not type([]): + # child = [child] + #for c in child: + # if not isinstance(c, Node): + # raise TypeError, c + added = None + for c in child: + if c not in set: + set.add(c) + collection.append(c) + added = 1 + if added: + self._children_reset() + + def set_specific_source(self, source): + self.add_source(source) + self._specific_sources = True + + def add_wkid(self, wkid): + """Add a node to the list of kids waiting to be evaluated""" + if self.wkids is not None: + self.wkids.append(wkid) + + def _children_reset(self): + self.clear_memoized_values() + # We need to let the Executor clear out any calculated + # build info that it's cached so we can re-calculate it. + self.executor_cleanup() + + memoizer_counters.append(SCons.Memoize.CountValue('_children_get')) + + def _children_get(self): + try: + return self._memo['children_get'] + except KeyError: + pass + + # The return list may contain duplicate Nodes, especially in + # source trees where there are a lot of repeated #includes + # of a tangle of .h files. Profiling shows, however, that + # eliminating the duplicates with a brute-force approach that + # preserves the order (that is, something like: + # + # u = [] + # for n in list: + # if n not in u: + # u.append(n)" + # + # takes more cycles than just letting the underlying methods + # hand back cached values if a Node's information is requested + # multiple times. (Other methods of removing duplicates, like + # using dictionary keys, lose the order, and the only ordered + # dictionary patterns I found all ended up using "not in" + # internally anyway...) + if self.ignore_set: + if self.implicit is None: + iter = chain(self.sources,self.depends) + else: + iter = chain(self.sources, self.depends, self.implicit) + + children = [] + for i in iter: + if i not in self.ignore_set: + children.append(i) + else: + if self.implicit is None: + children = self.sources + self.depends + else: + children = self.sources + self.depends + self.implicit + + self._memo['children_get'] = children + return children + + def all_children(self, scan=1): + """Return a list of all the node's direct children.""" + if scan: + self.scan() + + # The return list may contain duplicate Nodes, especially in + # source trees where there are a lot of repeated #includes + # of a tangle of .h files. Profiling shows, however, that + # eliminating the duplicates with a brute-force approach that + # preserves the order (that is, something like: + # + # u = [] + # for n in list: + # if n not in u: + # u.append(n)" + # + # takes more cycles than just letting the underlying methods + # hand back cached values if a Node's information is requested + # multiple times. (Other methods of removing duplicates, like + # using dictionary keys, lose the order, and the only ordered + # dictionary patterns I found all ended up using "not in" + # internally anyway...) + if self.implicit is None: + return self.sources + self.depends + else: + return self.sources + self.depends + self.implicit + + def children(self, scan=1): + """Return a list of the node's direct children, minus those + that are ignored by this node.""" + if scan: + self.scan() + return self._children_get() + + def set_state(self, state): + self.state = state + + def get_state(self): + return self.state + + def state_has_changed(self, target, prev_ni): + return (self.state != SCons.Node.up_to_date) + + def get_env(self): + env = self.env + if not env: + import SCons.Defaults + env = SCons.Defaults.DefaultEnvironment() + return env + + def changed_since_last_build(self, target, prev_ni): + """ + + Must be overridden in a specific subclass to return True if this + Node (a dependency) has changed since the last time it was used + to build the specified target. prev_ni is this Node's state (for + example, its file timestamp, length, maybe content signature) + as of the last time the target was built. + + Note that this method is called through the dependency, not the + target, because a dependency Node must be able to use its own + logic to decide if it changed. For example, File Nodes need to + obey if we're configured to use timestamps, but Python Value Nodes + never use timestamps and always use the content. If this method + were called through the target, then each Node's implementation + of this method would have to have more complicated logic to + handle all the different Node types on which it might depend. + """ + raise NotImplementedError + + def Decider(self, function): + SCons.Util.AddMethod(self, function, 'changed_since_last_build') + + def changed(self, node=None): + """ + Returns if the node is up-to-date with respect to the BuildInfo + stored last time it was built. The default behavior is to compare + it against our own previously stored BuildInfo, but the stored + BuildInfo from another Node (typically one in a Repository) + can be used instead. + + Note that we now *always* check every dependency. We used to + short-circuit the check by returning as soon as we detected + any difference, but we now rely on checking every dependency + to make sure that any necessary Node information (for example, + the content signature of an #included .h file) is updated. + """ + t = 0 + if t: Trace('changed(%s [%s], %s)' % (self, classname(self), node)) + if node is None: + node = self + + result = False + + bi = node.get_stored_info().binfo + then = bi.bsourcesigs + bi.bdependsigs + bi.bimplicitsigs + children = self.children() + + diff = len(children) - len(then) + if diff: + # The old and new dependency lists are different lengths. + # This always indicates that the Node must be rebuilt. + # We also extend the old dependency list with enough None + # entries to equal the new dependency list, for the benefit + # of the loop below that updates node information. + then.extend([None] * diff) + if t: Trace(': old %s new %s' % (len(then), len(children))) + result = True + + for child, prev_ni in izip(children, then): + if child.changed_since_last_build(self, prev_ni): + if t: Trace(': %s changed' % child) + result = True + + contents = self.get_executor().get_contents() + if self.has_builder(): + import SCons.Util + newsig = SCons.Util.MD5signature(contents) + if bi.bactsig != newsig: + if t: Trace(': bactsig %s != newsig %s' % (bi.bactsig, newsig)) + result = True + + if not result: + if t: Trace(': up to date') + + if t: Trace('\n') + + return result + + def is_up_to_date(self): + """Default check for whether the Node is current: unknown Node + subtypes are always out of date, so they will always get built.""" + return None + + def children_are_up_to_date(self): + """Alternate check for whether the Node is current: If all of + our children were up-to-date, then this Node was up-to-date, too. + + The SCons.Node.Alias and SCons.Node.Python.Value subclasses + rebind their current() method to this method.""" + # Allow the children to calculate their signatures. + self.binfo = self.get_binfo() + if self.always_build: + return None + state = 0 + for kid in self.children(None): + s = kid.get_state() + if s and (not state or s > state): + state = s + return (state == 0 or state == SCons.Node.up_to_date) + + def is_literal(self): + """Always pass the string representation of a Node to + the command interpreter literally.""" + return 1 + + def render_include_tree(self): + """ + Return a text representation, suitable for displaying to the + user, of the include tree for the sources of this node. + """ + if self.is_derived() and self.env: + env = self.get_build_env() + for s in self.sources: + scanner = self.get_source_scanner(s) + if scanner: + path = self.get_build_scanner_path(scanner) + else: + path = None + def f(node, env=env, scanner=scanner, path=path): + return node.get_found_includes(env, scanner, path) + return SCons.Util.render_tree(s, f, 1) + else: + return None + + def get_abspath(self): + """ + Return an absolute path to the Node. This will return simply + str(Node) by default, but for Node types that have a concept of + relative path, this might return something different. + """ + return str(self) + + def for_signature(self): + """ + Return a string representation of the Node that will always + be the same for this particular Node, no matter what. This + is by contrast to the __str__() method, which might, for + instance, return a relative path for a file Node. The purpose + of this method is to generate a value to be used in signature + calculation for the command line used to build a target, and + we use this method instead of str() to avoid unnecessary + rebuilds. This method does not need to return something that + would actually work in a command line; it can return any kind of + nonsense, so long as it does not change. + """ + return str(self) + + def get_string(self, for_signature): + """This is a convenience function designed primarily to be + used in command generators (i.e., CommandGeneratorActions or + Environment variables that are callable), which are called + with a for_signature argument that is nonzero if the command + generator is being called to generate a signature for the + command line, which determines if we should rebuild or not. + + Such command generators should use this method in preference + to str(Node) when converting a Node to a string, passing + in the for_signature parameter, such that we will call + Node.for_signature() or str(Node) properly, depending on whether + we are calculating a signature or actually constructing a + command line.""" + if for_signature: + return self.for_signature() + return str(self) + + def get_subst_proxy(self): + """ + This method is expected to return an object that will function + exactly like this Node, except that it implements any additional + special features that we would like to be in effect for + Environment variable substitution. The principle use is that + some Nodes would like to implement a __getattr__() method, + but putting that in the Node type itself has a tendency to kill + performance. We instead put it in a proxy and return it from + this method. It is legal for this method to return self + if no new functionality is needed for Environment substitution. + """ + return self + + def explain(self): + if not self.exists(): + return "building `%s' because it doesn't exist\n" % self + + if self.always_build: + return "rebuilding `%s' because AlwaysBuild() is specified\n" % self + + old = self.get_stored_info() + if old is None: + return None + + old = old.binfo + old.prepare_dependencies() + + try: + old_bkids = old.bsources + old.bdepends + old.bimplicit + old_bkidsigs = old.bsourcesigs + old.bdependsigs + old.bimplicitsigs + except AttributeError: + return "Cannot explain why `%s' is being rebuilt: No previous build information found\n" % self + + new = self.get_binfo() + + new_bkids = new.bsources + new.bdepends + new.bimplicit + new_bkidsigs = new.bsourcesigs + new.bdependsigs + new.bimplicitsigs + + osig = dict(izip(old_bkids, old_bkidsigs)) + nsig = dict(izip(new_bkids, new_bkidsigs)) + + # The sources and dependencies we'll want to report are all stored + # as relative paths to this target's directory, but we want to + # report them relative to the top-level SConstruct directory, + # so we only print them after running them through this lambda + # to turn them into the right relative Node and then return + # its string. + def stringify( s, E=self.dir.Entry ) : + if hasattr( s, 'dir' ) : + return str(E(s)) + return str(s) + + lines = [] + + removed = filter(lambda x, nk=new_bkids: not x in nk, old_bkids) + if removed: + removed = map(stringify, removed) + fmt = "`%s' is no longer a dependency\n" + lines.extend(map(lambda s, fmt=fmt: fmt % s, removed)) + + for k in new_bkids: + if not k in old_bkids: + lines.append("`%s' is a new dependency\n" % stringify(k)) + elif k.changed_since_last_build(self, osig[k]): + lines.append("`%s' changed\n" % stringify(k)) + + if len(lines) == 0 and old_bkids != new_bkids: + lines.append("the dependency order changed:\n" + + "%sold: %s\n" % (' '*15, map(stringify, old_bkids)) + + "%snew: %s\n" % (' '*15, map(stringify, new_bkids))) + + if len(lines) == 0: + def fmt_with_title(title, strlines): + lines = string.split(strlines, '\n') + sep = '\n' + ' '*(15 + len(title)) + return ' '*15 + title + string.join(lines, sep) + '\n' + if old.bactsig != new.bactsig: + if old.bact == new.bact: + lines.append("the contents of the build action changed\n" + + fmt_with_title('action: ', new.bact)) + else: + lines.append("the build action changed:\n" + + fmt_with_title('old: ', old.bact) + + fmt_with_title('new: ', new.bact)) + + if len(lines) == 0: + return "rebuilding `%s' for unknown reasons\n" % self + + preamble = "rebuilding `%s' because" % self + if len(lines) == 1: + return "%s %s" % (preamble, lines[0]) + else: + lines = ["%s:\n" % preamble] + lines + return string.join(lines, ' '*11) + +try: + [].extend(UserList.UserList([])) +except TypeError: + # Python 1.5.2 doesn't allow a list to be extended by list-like + # objects (such as UserList instances), so just punt and use + # real lists. + def NodeList(l): + return l +else: + class NodeList(UserList.UserList): + def __str__(self): + return str(map(str, self.data)) + +def get_children(node, parent): return node.children() +def ignore_cycle(node, stack): pass +def do_nothing(node, parent): pass + +class Walker: + """An iterator for walking a Node tree. + + This is depth-first, children are visited before the parent. + The Walker object can be initialized with any node, and + returns the next node on the descent with each next() call. + 'kids_func' is an optional function that will be called to + get the children of a node instead of calling 'children'. + 'cycle_func' is an optional function that will be called + when a cycle is detected. + + This class does not get caught in node cycles caused, for example, + by C header file include loops. + """ + def __init__(self, node, kids_func=get_children, + cycle_func=ignore_cycle, + eval_func=do_nothing): + self.kids_func = kids_func + self.cycle_func = cycle_func + self.eval_func = eval_func + node.wkids = copy.copy(kids_func(node, None)) + self.stack = [node] + self.history = {} # used to efficiently detect and avoid cycles + self.history[node] = None + + def next(self): + """Return the next node for this walk of the tree. + + This function is intentionally iterative, not recursive, + to sidestep any issues of stack size limitations. + """ + + while self.stack: + if self.stack[-1].wkids: + node = self.stack[-1].wkids.pop(0) + if not self.stack[-1].wkids: + self.stack[-1].wkids = None + if self.history.has_key(node): + self.cycle_func(node, self.stack) + else: + node.wkids = copy.copy(self.kids_func(node, self.stack[-1])) + self.stack.append(node) + self.history[node] = None + else: + node = self.stack.pop() + del self.history[node] + if node: + if self.stack: + parent = self.stack[-1] + else: + parent = None + self.eval_func(node, parent) + return node + return None + + def is_done(self): + return not self.stack + + +arg2nodes_lookups = [] + +# Local Variables: +# tab-width:4 +# indent-tabs-mode:nil +# End: +# vim: set expandtab tabstop=4 shiftwidth=4: |