Refactored the test suite; updated include and library paths in setup.cfg.

This commit is contained in:
Kirill Simonov 2008-12-28 20:16:50 +00:00
parent a667b61247
commit aff84ff195
34 changed files with 1639 additions and 1590 deletions

View File

@ -1,4 +1,4 @@
include README LICENSE setup.py
recursive-include examples *.py *.cfg *.yaml
#recursive-include tests *.py
#recursive-include tests/data *
recursive-include tests *.py
recursive-include tests/data *

View File

@ -29,6 +29,9 @@ test: build
testext: buildext
${PYTHON} tests/test_build_ext.py ${TEST}
testall:
${PYTHON} setup.py test
dist:
${PYTHON} setup.py --with-libyaml sdist --formats=zip,gztar

View File

@ -17,18 +17,19 @@
# The following options are used to build PyYAML Windows installer
# for Python 2.3 on my PC:
#include_dirs=../../libyaml/branches/stable/include
#library_dirs=../../libyaml/branches/stable/win32/vc6/output/release/lib
#include_dirs=../../../libyaml/tags/0.1.2/include
#library_dirs=../../../libyaml/tags/0.1.2/win32/vc6/output/release/lib
#define=YAML_DECLARE_STATIC
# The following options are used to build PyYAML Windows installer
# for Python 2.4 and Python 2.5 on my PC:
#include_dirs=../../libyaml/branches/stable/include
#library_dirs=../../libyaml/branches/stable/win32/vs2003/output/release/lib
#include_dirs=../../../libyaml/tags/0.1.2/include
#library_dirs=../../../libyaml/tags/0.1.2/win32/vs2003/output/release/lib
#define=YAML_DECLARE_STATIC
# The following options are used to build PyYAML Windows installer
# for Python 2.6 on my PC:
#include_dirs=../../libyaml/branches/stable/include
#library_dirs=../../libyaml/branches/stable/win32/vs2008/output/release/lib
#include_dirs=../../../libyaml/tags/0.1.2/include
#library_dirs=../../../libyaml/tags/0.1.2/win32/vs2008/output/release/lib
#define=YAML_DECLARE_STATIC

View File

@ -269,6 +269,25 @@ class bdist_rpm(_bdist_rpm):
return spec_file
class test(Command):
user_options = []
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
build_cmd = self.get_finalized_command('build')
build_cmd.run()
sys.path.insert(0, build_cmd.build_lib)
sys.path.insert(0, 'tests')
import test_all
test_all.main([])
if __name__ == '__main__':
setup(
@ -296,6 +315,7 @@ if __name__ == '__main__':
cmdclass={
'build_ext': build_ext,
'bdist_rpm': bdist_rpm,
'test': test,
},
)

357
tests/canonical.py Normal file
View File

@ -0,0 +1,357 @@
import yaml, yaml.composer, yaml.constructor, yaml.resolver
class CanonicalError(yaml.YAMLError):
pass
class CanonicalScanner:
def __init__(self, data):
try:
self.data = unicode(data, 'utf-8')+u'\0'
except UnicodeDecodeError:
raise CanonicalError("utf-8 stream is expected")
self.index = 0
self.tokens = []
self.scanned = False
def check_token(self, *choices):
if not self.scanned:
self.scan()
if self.tokens:
if not choices:
return True
for choice in choices:
if isinstance(self.tokens[0], choice):
return True
return False
def peek_token(self):
if not self.scanned:
self.scan()
if self.tokens:
return self.tokens[0]
def get_token(self, choice=None):
if not self.scanned:
self.scan()
token = self.tokens.pop(0)
if choice and not isinstance(token, choice):
raise CanonicalError("unexpected token "+repr(token))
return token
def get_token_value(self):
token = self.get_token()
return token.value
def scan(self):
self.tokens.append(yaml.StreamStartToken(None, None))
while True:
self.find_token()
ch = self.data[self.index]
if ch == u'\0':
self.tokens.append(yaml.StreamEndToken(None, None))
break
elif ch == u'%':
self.tokens.append(self.scan_directive())
elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
self.index += 3
self.tokens.append(yaml.DocumentStartToken(None, None))
elif ch == u'[':
self.index += 1
self.tokens.append(yaml.FlowSequenceStartToken(None, None))
elif ch == u'{':
self.index += 1
self.tokens.append(yaml.FlowMappingStartToken(None, None))
elif ch == u']':
self.index += 1
self.tokens.append(yaml.FlowSequenceEndToken(None, None))
elif ch == u'}':
self.index += 1
self.tokens.append(yaml.FlowMappingEndToken(None, None))
elif ch == u'?':
self.index += 1
self.tokens.append(yaml.KeyToken(None, None))
elif ch == u':':
self.index += 1
self.tokens.append(yaml.ValueToken(None, None))
elif ch == u',':
self.index += 1
self.tokens.append(yaml.FlowEntryToken(None, None))
elif ch == u'*' or ch == u'&':
self.tokens.append(self.scan_alias())
elif ch == u'!':
self.tokens.append(self.scan_tag())
elif ch == u'"':
self.tokens.append(self.scan_scalar())
else:
raise CanonicalError("invalid token")
self.scanned = True
DIRECTIVE = u'%YAML 1.1'
def scan_directive(self):
if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
self.index += len(self.DIRECTIVE)
return yaml.DirectiveToken('YAML', (1, 1), None, None)
else:
raise CanonicalError("invalid directive")
def scan_alias(self):
if self.data[self.index] == u'*':
TokenClass = yaml.AliasToken
else:
TokenClass = yaml.AnchorToken
self.index += 1
start = self.index
while self.data[self.index] not in u', \n\0':
self.index += 1
value = self.data[start:self.index]
return TokenClass(value, None, None)
def scan_tag(self):
self.index += 1
start = self.index
while self.data[self.index] not in u' \n\0':
self.index += 1
value = self.data[start:self.index]
if not value:
value = u'!'
elif value[0] == u'!':
value = 'tag:yaml.org,2002:'+value[1:]
elif value[0] == u'<' and value[-1] == u'>':
value = value[1:-1]
else:
value = u'!'+value
return yaml.TagToken(value, None, None)
QUOTE_CODES = {
'x': 2,
'u': 4,
'U': 8,
}
QUOTE_REPLACES = {
u'\\': u'\\',
u'\"': u'\"',
u' ': u' ',
u'a': u'\x07',
u'b': u'\x08',
u'e': u'\x1B',
u'f': u'\x0C',
u'n': u'\x0A',
u'r': u'\x0D',
u't': u'\x09',
u'v': u'\x0B',
u'N': u'\u0085',
u'L': u'\u2028',
u'P': u'\u2029',
u'_': u'_',
u'0': u'\x00',
}
def scan_scalar(self):
self.index += 1
chunks = []
start = self.index
ignore_spaces = False
while self.data[self.index] != u'"':
if self.data[self.index] == u'\\':
ignore_spaces = False
chunks.append(self.data[start:self.index])
self.index += 1
ch = self.data[self.index]
self.index += 1
if ch == u'\n':
ignore_spaces = True
elif ch in self.QUOTE_CODES:
length = self.QUOTE_CODES[ch]
code = int(self.data[self.index:self.index+length], 16)
chunks.append(unichr(code))
self.index += length
else:
if ch not in self.QUOTE_REPLACES:
raise CanonicalError("invalid escape code")
chunks.append(self.QUOTE_REPLACES[ch])
start = self.index
elif self.data[self.index] == u'\n':
chunks.append(self.data[start:self.index])
chunks.append(u' ')
self.index += 1
start = self.index
ignore_spaces = True
elif ignore_spaces and self.data[self.index] == u' ':
self.index += 1
start = self.index
else:
ignore_spaces = False
self.index += 1
chunks.append(self.data[start:self.index])
self.index += 1
return yaml.ScalarToken(u''.join(chunks), False, None, None)
def find_token(self):
found = False
while not found:
while self.data[self.index] in u' \t':
self.index += 1
if self.data[self.index] == u'#':
while self.data[self.index] != u'\n':
self.index += 1
if self.data[self.index] == u'\n':
self.index += 1
else:
found = True
class CanonicalParser:
def __init__(self):
self.events = []
self.parsed = False
# stream: STREAM-START document* STREAM-END
def parse_stream(self):
self.get_token(yaml.StreamStartToken)
self.events.append(yaml.StreamStartEvent(None, None))
while not self.check_token(yaml.StreamEndToken):
if self.check_token(yaml.DirectiveToken, yaml.DocumentStartToken):
self.parse_document()
else:
raise CanonicalError("document is expected, got "+repr(self.tokens[0]))
self.get_token(yaml.StreamEndToken)
self.events.append(yaml.StreamEndEvent(None, None))
# document: DIRECTIVE? DOCUMENT-START node
def parse_document(self):
node = None
if self.check_token(yaml.DirectiveToken):
self.get_token(yaml.DirectiveToken)
self.get_token(yaml.DocumentStartToken)
self.events.append(yaml.DocumentStartEvent(None, None))
self.parse_node()
self.events.append(yaml.DocumentEndEvent(None, None))
# node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
def parse_node(self):
if self.check_token(yaml.AliasToken):
self.events.append(yaml.AliasEvent(self.get_token_value(), None, None))
else:
anchor = None
if self.check_token(yaml.AnchorToken):
anchor = self.get_token_value()
tag = None
if self.check_token(yaml.TagToken):
tag = self.get_token_value()
if self.check_token(yaml.ScalarToken):
self.events.append(yaml.ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
elif self.check_token(yaml.FlowSequenceStartToken):
self.events.append(yaml.SequenceStartEvent(anchor, tag, None, None))
self.parse_sequence()
elif self.check_token(yaml.FlowMappingStartToken):
self.events.append(yaml.MappingStartEvent(anchor, tag, None, None))
self.parse_mapping()
else:
raise CanonicalError("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[0]))
# sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
def parse_sequence(self):
self.get_token(yaml.FlowSequenceStartToken)
if not self.check_token(yaml.FlowSequenceEndToken):
self.parse_node()
while not self.check_token(yaml.FlowSequenceEndToken):
self.get_token(yaml.FlowEntryToken)
if not self.check_token(yaml.FlowSequenceEndToken):
self.parse_node()
self.get_token(yaml.FlowSequenceEndToken)
self.events.append(yaml.SequenceEndEvent(None, None))
# mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
def parse_mapping(self):
self.get_token(yaml.FlowMappingStartToken)
if not self.check_token(yaml.FlowMappingEndToken):
self.parse_map_entry()
while not self.check_token(yaml.FlowMappingEndToken):
self.get_token(yaml.FlowEntryToken)
if not self.check_token(yaml.FlowMappingEndToken):
self.parse_map_entry()
self.get_token(yaml.FlowMappingEndToken)
self.events.append(yaml.MappingEndEvent(None, None))
# map_entry: KEY node VALUE node
def parse_map_entry(self):
self.get_token(yaml.KeyToken)
self.parse_node()
self.get_token(yaml.ValueToken)
self.parse_node()
def parse(self):
self.parse_stream()
self.parsed = True
def get_event(self):
if not self.parsed:
self.parse()
return self.events.pop(0)
def check_event(self, *choices):
if not self.parsed:
self.parse()
if self.events:
if not choices:
return True
for choice in choices:
if isinstance(self.events[0], choice):
return True
return False
def peek_event(self):
if not self.parsed:
self.parse()
return self.events[0]
class CanonicalLoader(CanonicalScanner, CanonicalParser,
yaml.composer.Composer, yaml.constructor.Constructor, yaml.resolver.Resolver):
def __init__(self, stream):
if hasattr(stream, 'read'):
stream = stream.read()
CanonicalScanner.__init__(self, stream)
CanonicalParser.__init__(self)
yaml.composer.Composer.__init__(self)
yaml.constructor.Constructor.__init__(self)
yaml.resolver.Resolver.__init__(self)
yaml.CanonicalLoader = CanonicalLoader
def canonical_scan(stream):
return yaml.scan(stream, Loader=CanonicalLoader)
yaml.canonical_scan = canonical_scan
def canonical_parse(stream):
return yaml.parse(stream, Loader=CanonicalLoader)
yaml.canonical_parse = canonical_parse
def canonical_compose(stream):
return yaml.compose(stream, Loader=CanonicalLoader)
yaml.canonical_compose = canonical_compose
def canonical_compose_all(stream):
return yaml.compose_all(stream, Loader=CanonicalLoader)
yaml.canonical_compose_all = canonical_compose_all
def canonical_load(stream):
return yaml.load(stream, Loader=CanonicalLoader)
yaml.canonical_load = canonical_load
def canonical_load_all(stream):
return yaml.load_all(stream, Loader=CanonicalLoader)
yaml.canonical_load_all = canonical_load_all

View File

@ -1 +1 @@
[file, Loader, dump, abs, yaml.tokens]
[file, yaml.Loader, yaml.dump, abs, yaml.tokens]

View File

View File

@ -1,18 +1,18 @@
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------
***************************************************************
***************************************************************
***************************************************************
***************************************************************
***************************************************************
***************************************************************
***************************************************************
***************************************************************
***************************************************************
***************************************************************
***************************************************************
***************************************************************
***************************************************************
***************************************************************
***************************************************************
***************************************************************
Invalid byte ('\xFF'): ÿ <--
-------------------------------------------------------------------------------------------------------------------------------
***************************************************************

Binary file not shown.

View File

@ -1,3 +1,3 @@
dumper = Dumper(StringIO.StringIO())
dumper = yaml.Dumper(StringIO.StringIO())
dumper.open()
dumper.open()

View File

@ -1,4 +1,4 @@
dumper = Dumper(StringIO.StringIO())
dumper = yaml.Dumper(StringIO.StringIO())
dumper.open()
dumper.close()
dumper.open()

View File

@ -1,4 +1,4 @@
dumper = Dumper(StringIO.StringIO())
dumper = yaml.Dumper(StringIO.StringIO())
dumper.open()
dumper.close()
dumper.serialize(ScalarNode(tag='!foo', value='bar'))
dumper.serialize(yaml.ScalarNode(tag='!foo', value='bar'))

View File

@ -1,2 +1,2 @@
dumper = Dumper(StringIO.StringIO())
dumper = yaml.Dumper(StringIO.StringIO())
dumper.close()

View File

@ -1,2 +1,2 @@
dumper = Dumper(StringIO.StringIO())
dumper.serialize(ScalarNode(tag='!foo', value='bar'))
dumper = yaml.Dumper(StringIO.StringIO())
dumper.serialize(yaml.ScalarNode(tag='!foo', value='bar'))

View File

@ -1 +1 @@
safe_dump(object)
yaml.safe_dump(object)

View File

@ -1,14 +1,14 @@
import unittest
import sys, yaml, test_appliance
def main():
import yaml
names = ['test_yaml']
if yaml.__libyaml__:
names.append('test_yaml_ext')
suite = unittest.defaultTestLoader.loadTestsFromNames(names)
runner = unittest.TextTestRunner()
runner.run(suite)
def main(args=None):
collections = []
import test_yaml
collections.append(test_yaml)
if yaml.__with_libyaml__:
import test_yaml_ext
collections.append(test_yaml_ext)
test_appliance.run(collections, args)
if __name__ == '__main__':
main()

View File

@ -1,358 +1,145 @@
import unittest, os
import sys, os, os.path, types, traceback, pprint
from yaml import *
from yaml.composer import *
from yaml.constructor import *
from yaml.resolver import *
DATA = 'tests/data'
class TestAppliance(unittest.TestCase):
def find_test_functions(collections):
if not isinstance(collections, list):
collections = [collections]
functions = []
for collection in collections:
if not isinstance(collection, dict):
collection = vars(collection)
keys = collection.keys()
keys.sort()
for key in keys:
value = collection[key]
if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'):
functions.append(value)
return functions
DATA = 'tests/data'
SKIP_EXT = '.skip'
def find_test_filenames(directory):
filenames = {}
for filename in os.listdir(directory):
if os.path.isfile(os.path.join(directory, filename)):
base, ext = os.path.splitext(filename)
filenames.setdefault(base, []).append(ext)
filenames = filenames.items()
filenames.sort()
return filenames
all_tests = {}
for filename in os.listdir(DATA):
if os.path.isfile(os.path.join(DATA, filename)):
root, ext = os.path.splitext(filename)
all_tests.setdefault(root, []).append(ext)
def parse_arguments(args):
if args is None:
args = sys.argv[1:]
verbose = False
if '-v' in args:
verbose = True
args.remove('-v')
if '--verbose' in args:
verbose = True
if 'YAML_TEST_VERBOSE' in os.environ:
verbose = True
include_functions = []
if args:
include_functions.append(args.pop(0))
if 'YAML_TEST_FUNCTIONS' in os.environ:
include_functions.extend(os.environ['YAML_TEST_FUNCTIONS'].split())
include_filenames = []
include_filenames.extend(args)
if 'YAML_TEST_FILENAMES' in os.environ:
include_filenames.extend(os.environ['YAML_TEST_FILENAMES'].split())
return include_functions, include_filenames, verbose
def add_tests(cls, method_name, *extensions):
for test in cls.all_tests:
available_extensions = cls.all_tests[test]
if cls.SKIP_EXT in available_extensions:
continue
for ext in extensions:
if ext not in available_extensions:
break
else:
filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
def test_method(self, test=test, filenames=filenames):
getattr(self, '_'+method_name)(test, *filenames)
test = test.replace('-', '_').replace('.', '_')
try:
test_method.__name__ = '%s_%s' % (method_name, test)
except TypeError:
import new
test_method = new.function(test_method.func_code, test_method.func_globals,
'%s_%s' % (method_name, test), test_method.func_defaults,
test_method.func_closure)
setattr(cls, test_method.__name__, test_method)
add_tests = classmethod(add_tests)
class Error(Exception):
pass
class CanonicalScanner:
def __init__(self, data):
self.data = unicode(data, 'utf-8')+u'\0'
self.index = 0
self.scan()
def check_token(self, *choices):
if self.tokens:
if not choices:
return True
for choice in choices:
if isinstance(self.tokens[0], choice):
return True
return False
def peek_token(self):
if self.tokens:
return self.tokens[0]
def get_token(self, choice=None):
token = self.tokens.pop(0)
if choice and not isinstance(token, choice):
raise Error("unexpected token "+repr(token))
return token
def get_token_value(self):
token = self.get_token()
return token.value
def scan(self):
self.tokens = []
self.tokens.append(StreamStartToken(None, None))
while True:
self.find_token()
ch = self.data[self.index]
if ch == u'\0':
self.tokens.append(StreamEndToken(None, None))
break
elif ch == u'%':
self.tokens.append(self.scan_directive())
elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
self.index += 3
self.tokens.append(DocumentStartToken(None, None))
elif ch == u'[':
self.index += 1
self.tokens.append(FlowSequenceStartToken(None, None))
elif ch == u'{':
self.index += 1
self.tokens.append(FlowMappingStartToken(None, None))
elif ch == u']':
self.index += 1
self.tokens.append(FlowSequenceEndToken(None, None))
elif ch == u'}':
self.index += 1
self.tokens.append(FlowMappingEndToken(None, None))
elif ch == u'?':
self.index += 1
self.tokens.append(KeyToken(None, None))
elif ch == u':':
self.index += 1
self.tokens.append(ValueToken(None, None))
elif ch == u',':
self.index += 1
self.tokens.append(FlowEntryToken(None, None))
elif ch == u'*' or ch == u'&':
self.tokens.append(self.scan_alias())
elif ch == u'!':
self.tokens.append(self.scan_tag())
elif ch == u'"':
self.tokens.append(self.scan_scalar())
else:
raise Error("invalid token")
DIRECTIVE = u'%YAML 1.1'
def scan_directive(self):
if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
self.index += len(self.DIRECTIVE)
return DirectiveToken('YAML', (1, 1), None, None)
def scan_alias(self):
if self.data[self.index] == u'*':
TokenClass = AliasToken
def execute(function, filenames, verbose):
if verbose:
sys.stdout.write('='*75+'\n')
sys.stdout.write('%s(%s)...\n' % (function.func_name, ', '.join(filenames)))
try:
function(verbose=verbose, *filenames)
except Exception, exc:
info = sys.exc_info()
if isinstance(exc, AssertionError):
kind = 'FAILURE'
else:
TokenClass = AnchorToken
self.index += 1
start = self.index
while self.data[self.index] not in u', \n\0':
self.index += 1
value = self.data[start:self.index]
return TokenClass(value, None, None)
def scan_tag(self):
self.index += 1
start = self.index
while self.data[self.index] not in u' \n\0':
self.index += 1
value = self.data[start:self.index]
if value[0] == u'!':
value = 'tag:yaml.org,2002:'+value[1:]
elif value[0] == u'<' and value[-1] == u'>':
value = value[1:-1]
kind = 'ERROR'
if verbose:
traceback.print_exc(limit=1, file=sys.stdout)
else:
value = u'!'+value
return TagToken(value, None, None)
sys.stdout.write(kind[0])
sys.stdout.flush()
else:
kind = 'SUCCESS'
info = None
if not verbose:
sys.stdout.write('.')
sys.stdout.flush()
return (function, filenames, kind, info)
QUOTE_CODES = {
'x': 2,
'u': 4,
'U': 8,
}
def display(results, verbose):
if results and not verbose:
sys.stdout.write('\n')
total = len(results)
failures = 0
errors = 0
for function, filenames, kind, info in results:
if kind == 'SUCCESS':
continue
if kind == 'FAILURE':
failures += 1
if kind == 'ERROR':
errors += 1
sys.stdout.write('='*75+'\n')
sys.stdout.write('%s(%s): %s\n' % (function.func_name, ', '.join(filenames), kind))
if kind == 'ERROR':
traceback.print_exception(file=sys.stdout, *info)
else:
sys.stdout.write('Traceback (most recent call last):\n')
traceback.print_tb(info[2], file=sys.stdout)
sys.stdout.write('%s: see below\n' % info[0].__name__)
sys.stdout.write('~'*75+'\n')
for arg in info[1].args:
pprint.pprint(arg, stream=sys.stdout, indent=2)
for filename in filenames:
sys.stdout.write('-'*75+'\n')
sys.stdout.write('%s:\n' % filename)
data = open(filename, 'rb').read()
sys.stdout.write(data)
if data and data[-1] != '\n':
sys.stdout.write('\n')
sys.stdout.write('='*75+'\n')
sys.stdout.write('TESTS: %s\n' % total)
if failures:
sys.stdout.write('FAILURES: %s\n' % failures)
if errors:
sys.stdout.write('ERRORS: %s\n' % errors)
QUOTE_REPLACES = {
u'\\': u'\\',
u'\"': u'\"',
u' ': u' ',
u'a': u'\x07',
u'b': u'\x08',
u'e': u'\x1B',
u'f': u'\x0C',
u'n': u'\x0A',
u'r': u'\x0D',
u't': u'\x09',
u'v': u'\x0B',
u'N': u'\u0085',
u'L': u'\u2028',
u'P': u'\u2029',
u'_': u'_',
u'0': u'\x00',
}
def scan_scalar(self):
self.index += 1
chunks = []
start = self.index
ignore_spaces = False
while self.data[self.index] != u'"':
if self.data[self.index] == u'\\':
ignore_spaces = False
chunks.append(self.data[start:self.index])
self.index += 1
ch = self.data[self.index]
self.index += 1
if ch == u'\n':
ignore_spaces = True
elif ch in self.QUOTE_CODES:
length = self.QUOTE_CODES[ch]
code = int(self.data[self.index:self.index+length], 16)
chunks.append(unichr(code))
self.index += length
def run(collections, args=None):
test_functions = find_test_functions(collections)
test_filenames = find_test_filenames(DATA)
include_functions, include_filenames, verbose = parse_arguments(args)
results = []
for function in test_functions:
if include_functions and function.func_name not in include_functions:
continue
if function.unittest:
for base, exts in test_filenames:
if include_filenames and base not in include_filenames:
continue
filenames = []
for ext in function.unittest:
if ext not in exts:
break
filenames.append(os.path.join(DATA, base+ext))
else:
chunks.append(self.QUOTE_REPLACES[ch])
start = self.index
elif self.data[self.index] == u'\n':
chunks.append(self.data[start:self.index])
chunks.append(u' ')
self.index += 1
start = self.index
ignore_spaces = True
elif ignore_spaces and self.data[self.index] == u' ':
self.index += 1
start = self.index
else:
ignore_spaces = False
self.index += 1
chunks.append(self.data[start:self.index])
self.index += 1
return ScalarToken(u''.join(chunks), False, None, None)
def find_token(self):
found = False
while not found:
while self.data[self.index] in u' \t':
self.index += 1
if self.data[self.index] == u'#':
while self.data[self.index] != u'\n':
self.index += 1
if self.data[self.index] == u'\n':
self.index += 1
else:
found = True
class CanonicalParser:
def __init__(self):
self.events = []
self.parse()
# stream: STREAM-START document* STREAM-END
def parse_stream(self):
self.get_token(StreamStartToken)
self.events.append(StreamStartEvent(None, None))
while not self.check_token(StreamEndToken):
if self.check_token(DirectiveToken, DocumentStartToken):
self.parse_document()
else:
raise Error("document is expected, got "+repr(self.tokens[self.index]))
self.get_token(StreamEndToken)
self.events.append(StreamEndEvent(None, None))
# document: DIRECTIVE? DOCUMENT-START node
def parse_document(self):
node = None
if self.check_token(DirectiveToken):
self.get_token(DirectiveToken)
self.get_token(DocumentStartToken)
self.events.append(DocumentStartEvent(None, None))
self.parse_node()
self.events.append(DocumentEndEvent(None, None))
# node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
def parse_node(self):
if self.check_token(AliasToken):
self.events.append(AliasEvent(self.get_token_value(), None, None))
skip_exts = getattr(function, 'skip', [])
for skip_ext in skip_exts:
if skip_ext in exts:
break
else:
result = execute(function, filenames, verbose)
results.append(result)
else:
anchor = None
if self.check_token(AnchorToken):
anchor = self.get_token_value()
tag = None
if self.check_token(TagToken):
tag = self.get_token_value()
if self.check_token(ScalarToken):
self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
elif self.check_token(FlowSequenceStartToken):
self.events.append(SequenceStartEvent(anchor, tag, None, None))
self.parse_sequence()
elif self.check_token(FlowMappingStartToken):
self.events.append(MappingStartEvent(anchor, tag, None, None))
self.parse_mapping()
else:
raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
# sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
def parse_sequence(self):
self.get_token(FlowSequenceStartToken)
if not self.check_token(FlowSequenceEndToken):
self.parse_node()
while not self.check_token(FlowSequenceEndToken):
self.get_token(FlowEntryToken)
if not self.check_token(FlowSequenceEndToken):
self.parse_node()
self.get_token(FlowSequenceEndToken)
self.events.append(SequenceEndEvent(None, None))
# mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
def parse_mapping(self):
self.get_token(FlowMappingStartToken)
if not self.check_token(FlowMappingEndToken):
self.parse_map_entry()
while not self.check_token(FlowMappingEndToken):
self.get_token(FlowEntryToken)
if not self.check_token(FlowMappingEndToken):
self.parse_map_entry()
self.get_token(FlowMappingEndToken)
self.events.append(MappingEndEvent(None, None))
# map_entry: KEY node VALUE node
def parse_map_entry(self):
self.get_token(KeyToken)
self.parse_node()
self.get_token(ValueToken)
self.parse_node()
def parse(self):
self.parse_stream()
def get_event(self):
return self.events.pop(0)
def check_event(self, *choices):
if self.events:
if not choices:
return True
for choice in choices:
if isinstance(self.events[0], choice):
return True
return False
def peek_event(self):
return self.events[0]
class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):
def __init__(self, stream):
if hasattr(stream, 'read'):
stream = stream.read()
CanonicalScanner.__init__(self, stream)
CanonicalParser.__init__(self)
Composer.__init__(self)
Constructor.__init__(self)
Resolver.__init__(self)
def canonical_scan(stream):
return scan(stream, Loader=CanonicalLoader)
def canonical_parse(stream):
return parse(stream, Loader=CanonicalLoader)
def canonical_compose(stream):
return compose(stream, Loader=CanonicalLoader)
def canonical_compose_all(stream):
return compose_all(stream, Loader=CanonicalLoader)
def canonical_load(stream):
return load(stream, Loader=CanonicalLoader)
def canonical_load_all(stream):
return load_all(stream, Loader=CanonicalLoader)
result = execute(function, [], verbose)
results.append(result)
display(results, verbose=verbose)

View File

@ -1,13 +1,10 @@
def main():
if __name__ == '__main__':
import sys, os, distutils.util
build_lib = 'build/lib'
build_lib_ext = os.path.join('build', 'lib.%s-%s' % (distutils.util.get_platform(), sys.version[0:3]))
sys.path.insert(0, build_lib)
sys.path.insert(0, build_lib_ext)
import test_yaml
test_yaml.main('test_yaml')
if __name__ == '__main__':
main()
import test_yaml, test_appliance
test_appliance.run(test_yaml)

View File

@ -1,14 +1,11 @@
def main():
if __name__ == '__main__':
import sys, os, distutils.util
build_lib = 'build/lib'
build_lib_ext = os.path.join('build', 'lib.%s-%s' % (distutils.util.get_platform(), sys.version[0:3]))
sys.path.insert(0, build_lib)
sys.path.insert(0, build_lib_ext)
import test_yaml_ext
test_yaml_ext.main('test_yaml_ext')
if __name__ == '__main__':
main()
import test_yaml_ext, test_appliance
test_appliance.run(test_yaml_ext)

View File

@ -1,20 +1,40 @@
import test_appliance
import yaml, canonical
class TestCanonicalAppliance(test_appliance.TestAppliance):
def test_canonical_scanner(canonical_filename, verbose=False):
data = open(canonical_filename, 'rb').read()
tokens = list(yaml.canonical_scan(data))
assert tokens, tokens
if verbose:
for token in tokens:
print token
def _testCanonicalScanner(self, test_name, canonical_filename):
data = file(canonical_filename, 'rb').read()
tokens = list(test_appliance.canonical_scan(data))
#for token in tokens:
# print token
test_canonical_scanner.unittest = ['.canonical']
def _testCanonicalParser(self, test_name, canonical_filename):
data = file(canonical_filename, 'rb').read()
event = list(test_appliance.canonical_parse(data))
#for event in events:
# print event
def test_canonical_parser(canonical_filename, verbose=False):
data = open(canonical_filename, 'rb').read()
events = list(yaml.canonical_parse(data))
assert events, events
if verbose:
for event in events:
print event
TestCanonicalAppliance.add_tests('testCanonicalScanner', '.canonical')
TestCanonicalAppliance.add_tests('testCanonicalParser', '.canonical')
test_canonical_parser.unittest = ['.canonical']
def test_canonical_error(data_filename, canonical_filename, verbose=False):
data = open(data_filename, 'rb').read()
try:
output = list(yaml.canonical_load_all(data))
except yaml.YAMLError, exc:
if verbose:
print exc
else:
raise AssertionError("expected an exception")
test_canonical_error.unittest = ['.data', '.canonical']
test_canonical_error.skip = ['.empty']
if __name__ == '__main__':
import test_appliance
test_appliance.run(globals())

View File

@ -1,313 +1,271 @@
import test_appliance
import yaml
import pprint
import datetime
try:
set
except NameError:
from sets import Set as set
from yaml import *
import yaml.tokens
class MyLoader(Loader):
pass
class MyDumper(Dumper):
pass
class MyTestClass1:
def __init__(self, x, y=0, z=0):
self.x = x
self.y = y
self.z = z
def __eq__(self, other):
if isinstance(other, MyTestClass1):
return self.__class__, self.__dict__ == other.__class__, other.__dict__
else:
return False
def construct1(constructor, node):
mapping = constructor.construct_mapping(node)
return MyTestClass1(**mapping)
def represent1(representer, native):
return representer.represent_mapping("!tag1", native.__dict__)
add_constructor("!tag1", construct1, Loader=MyLoader)
add_representer(MyTestClass1, represent1, Dumper=MyDumper)
class MyTestClass2(MyTestClass1, YAMLObject):
yaml_loader = MyLoader
yaml_dumper = MyDumper
yaml_tag = "!tag2"
def from_yaml(cls, constructor, node):
x = constructor.construct_yaml_int(node)
return cls(x=x)
from_yaml = classmethod(from_yaml)
def to_yaml(cls, representer, native):
return representer.represent_scalar(cls.yaml_tag, str(native.x))
to_yaml = classmethod(to_yaml)
class MyTestClass3(MyTestClass2):
yaml_tag = "!tag3"
def from_yaml(cls, constructor, node):
mapping = constructor.construct_mapping(node)
if '=' in mapping:
x = mapping['=']
del mapping['=']
mapping['x'] = x
return cls(**mapping)
from_yaml = classmethod(from_yaml)
def to_yaml(cls, representer, native):
return representer.represent_mapping(cls.yaml_tag, native.__dict__)
to_yaml = classmethod(to_yaml)
class YAMLObject1(YAMLObject):
yaml_loader = MyLoader
yaml_dumper = MyDumper
yaml_tag = '!foo'
def __init__(self, my_parameter=None, my_another_parameter=None):
self.my_parameter = my_parameter
self.my_another_parameter = my_another_parameter
def __eq__(self, other):
if isinstance(other, YAMLObject1):
return self.__class__, self.__dict__ == other.__class__, other.__dict__
else:
return False
class YAMLObject2(YAMLObject):
yaml_loader = MyLoader
yaml_dumper = MyDumper
yaml_tag = '!bar'
def __init__(self, foo=1, bar=2, baz=3):
self.foo = foo
self.bar = bar
self.baz = baz
def __getstate__(self):
return {1: self.foo, 2: self.bar, 3: self.baz}
def __setstate__(self, state):
self.foo = state[1]
self.bar = state[2]
self.baz = state[3]
def __eq__(self, other):
if isinstance(other, YAMLObject2):
return self.__class__, self.__dict__ == other.__class__, other.__dict__
else:
return False
class AnObject(object):
def __new__(cls, foo=None, bar=None, baz=None):
self = object.__new__(cls)
self.foo = foo
self.bar = bar
self.baz = baz
return self
def __cmp__(self, other):
return cmp((type(self), self.foo, self.bar, self.baz),
(type(other), other.foo, other.bar, other.baz))
def __eq__(self, other):
return type(self) is type(other) and \
(self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz)
class AnInstance:
def __init__(self, foo=None, bar=None, baz=None):
self.foo = foo
self.bar = bar
self.baz = baz
def __cmp__(self, other):
return cmp((type(self), self.foo, self.bar, self.baz),
(type(other), other.foo, other.bar, other.baz))
def __eq__(self, other):
return type(self) is type(other) and \
(self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz)
class AState(AnInstance):
def __getstate__(self):
return {
'_foo': self.foo,
'_bar': self.bar,
'_baz': self.baz,
}
def __setstate__(self, state):
self.foo = state['_foo']
self.bar = state['_bar']
self.baz = state['_baz']
class ACustomState(AnInstance):
def __getstate__(self):
return (self.foo, self.bar, self.baz)
def __setstate__(self, state):
self.foo, self.bar, self.baz = state
class InitArgs(AnInstance):
def __getinitargs__(self):
return (self.foo, self.bar, self.baz)
def __getstate__(self):
return {}
class InitArgsWithState(AnInstance):
def __getinitargs__(self):
return (self.foo, self.bar)
def __getstate__(self):
return self.baz
def __setstate__(self, state):
self.baz = state
class NewArgs(AnObject):
def __getnewargs__(self):
return (self.foo, self.bar, self.baz)
def __getstate__(self):
return {}
class NewArgsWithState(AnObject):
def __getnewargs__(self):
return (self.foo, self.bar)
def __getstate__(self):
return self.baz
def __setstate__(self, state):
self.baz = state
class Reduce(AnObject):
def __reduce__(self):
return self.__class__, (self.foo, self.bar, self.baz)
class ReduceWithState(AnObject):
def __reduce__(self):
return self.__class__, (self.foo, self.bar), self.baz
def __setstate__(self, state):
self.baz = state
class MyInt(int):
def __eq__(self, other):
return type(self) is type(other) and int(self) == int(other)
class MyList(list):
def __init__(self, n=1):
self.extend([None]*n)
def __eq__(self, other):
return type(self) is type(other) and list(self) == list(other)
class MyDict(dict):
def __init__(self, n=1):
for k in range(n):
self[k] = None
def __eq__(self, other):
return type(self) is type(other) and dict(self) == dict(other)
class FixedOffset(datetime.tzinfo):
def __init__(self, offset, name):
self.__offset = datetime.timedelta(minutes=offset)
self.__name = name
def utcoffset(self, dt):
return self.__offset
def tzname(self, dt):
return self.__name
def dst(self, dt):
return datetime.timedelta(0)
def execute(code):
exec code
return value
class TestConstructorTypes(test_appliance.TestAppliance):
def _make_objects():
global MyLoader, MyDumper, MyTestClass1, MyTestClass2, MyTestClass3, YAMLObject1, YAMLObject2, \
AnObject, AnInstance, AState, ACustomState, InitArgs, InitArgsWithState, \
NewArgs, NewArgsWithState, Reduce, ReduceWithState, MyInt, MyList, MyDict, \
FixedOffset, execute
def _testTypes(self, test_name, data_filename, code_filename):
data1 = None
data2 = None
class MyLoader(yaml.Loader):
pass
class MyDumper(yaml.Dumper):
pass
class MyTestClass1:
def __init__(self, x, y=0, z=0):
self.x = x
self.y = y
self.z = z
def __eq__(self, other):
if isinstance(other, MyTestClass1):
return self.__class__, self.__dict__ == other.__class__, other.__dict__
else:
return False
def construct1(constructor, node):
mapping = constructor.construct_mapping(node)
return MyTestClass1(**mapping)
def represent1(representer, native):
return representer.represent_mapping("!tag1", native.__dict__)
yaml.add_constructor("!tag1", construct1, Loader=MyLoader)
yaml.add_representer(MyTestClass1, represent1, Dumper=MyDumper)
class MyTestClass2(MyTestClass1, yaml.YAMLObject):
yaml_loader = MyLoader
yaml_dumper = MyDumper
yaml_tag = "!tag2"
def from_yaml(cls, constructor, node):
x = constructor.construct_yaml_int(node)
return cls(x=x)
from_yaml = classmethod(from_yaml)
def to_yaml(cls, representer, native):
return representer.represent_scalar(cls.yaml_tag, str(native.x))
to_yaml = classmethod(to_yaml)
class MyTestClass3(MyTestClass2):
yaml_tag = "!tag3"
def from_yaml(cls, constructor, node):
mapping = constructor.construct_mapping(node)
if '=' in mapping:
x = mapping['=']
del mapping['=']
mapping['x'] = x
return cls(**mapping)
from_yaml = classmethod(from_yaml)
def to_yaml(cls, representer, native):
return representer.represent_mapping(cls.yaml_tag, native.__dict__)
to_yaml = classmethod(to_yaml)
class YAMLObject1(yaml.YAMLObject):
yaml_loader = MyLoader
yaml_dumper = MyDumper
yaml_tag = '!foo'
def __init__(self, my_parameter=None, my_another_parameter=None):
self.my_parameter = my_parameter
self.my_another_parameter = my_another_parameter
def __eq__(self, other):
if isinstance(other, YAMLObject1):
return self.__class__, self.__dict__ == other.__class__, other.__dict__
else:
return False
class YAMLObject2(yaml.YAMLObject):
yaml_loader = MyLoader
yaml_dumper = MyDumper
yaml_tag = '!bar'
def __init__(self, foo=1, bar=2, baz=3):
self.foo = foo
self.bar = bar
self.baz = baz
def __getstate__(self):
return {1: self.foo, 2: self.bar, 3: self.baz}
def __setstate__(self, state):
self.foo = state[1]
self.bar = state[2]
self.baz = state[3]
def __eq__(self, other):
if isinstance(other, YAMLObject2):
return self.__class__, self.__dict__ == other.__class__, other.__dict__
else:
return False
class AnObject(object):
def __new__(cls, foo=None, bar=None, baz=None):
self = object.__new__(cls)
self.foo = foo
self.bar = bar
self.baz = baz
return self
def __cmp__(self, other):
return cmp((type(self), self.foo, self.bar, self.baz),
(type(other), other.foo, other.bar, other.baz))
def __eq__(self, other):
return type(self) is type(other) and \
(self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz)
class AnInstance:
def __init__(self, foo=None, bar=None, baz=None):
self.foo = foo
self.bar = bar
self.baz = baz
def __cmp__(self, other):
return cmp((type(self), self.foo, self.bar, self.baz),
(type(other), other.foo, other.bar, other.baz))
def __eq__(self, other):
return type(self) is type(other) and \
(self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz)
class AState(AnInstance):
def __getstate__(self):
return {
'_foo': self.foo,
'_bar': self.bar,
'_baz': self.baz,
}
def __setstate__(self, state):
self.foo = state['_foo']
self.bar = state['_bar']
self.baz = state['_baz']
class ACustomState(AnInstance):
def __getstate__(self):
return (self.foo, self.bar, self.baz)
def __setstate__(self, state):
self.foo, self.bar, self.baz = state
class InitArgs(AnInstance):
def __getinitargs__(self):
return (self.foo, self.bar, self.baz)
def __getstate__(self):
return {}
class InitArgsWithState(AnInstance):
def __getinitargs__(self):
return (self.foo, self.bar)
def __getstate__(self):
return self.baz
def __setstate__(self, state):
self.baz = state
class NewArgs(AnObject):
def __getnewargs__(self):
return (self.foo, self.bar, self.baz)
def __getstate__(self):
return {}
class NewArgsWithState(AnObject):
def __getnewargs__(self):
return (self.foo, self.bar)
def __getstate__(self):
return self.baz
def __setstate__(self, state):
self.baz = state
class Reduce(AnObject):
def __reduce__(self):
return self.__class__, (self.foo, self.bar, self.baz)
class ReduceWithState(AnObject):
def __reduce__(self):
return self.__class__, (self.foo, self.bar), self.baz
def __setstate__(self, state):
self.baz = state
class MyInt(int):
def __eq__(self, other):
return type(self) is type(other) and int(self) == int(other)
class MyList(list):
def __init__(self, n=1):
self.extend([None]*n)
def __eq__(self, other):
return type(self) is type(other) and list(self) == list(other)
class MyDict(dict):
def __init__(self, n=1):
for k in range(n):
self[k] = None
def __eq__(self, other):
return type(self) is type(other) and dict(self) == dict(other)
class FixedOffset(datetime.tzinfo):
def __init__(self, offset, name):
self.__offset = datetime.timedelta(minutes=offset)
self.__name = name
def utcoffset(self, dt):
return self.__offset
def tzname(self, dt):
return self.__name
def dst(self, dt):
return datetime.timedelta(0)
def _load_code(expression):
return eval(expression)
def _serialize_value(data):
if isinstance(data, list):
return '[%s]' % ', '.join(map(_serialize_value, data))
elif isinstance(data, dict):
items = []
for key, value in data.items():
key = _serialize_value(key)
value = _serialize_value(value)
items.append("%s: %s" % (key, value))
items.sort()
return '{%s}' % ', '.join(items)
elif isinstance(data, datetime.datetime):
return repr(data.utctimetuple())
elif isinstance(data, unicode):
return data.encode('utf-8')
else:
return str(data)
def test_constructor_types(data_filename, code_filename, verbose=False):
_make_objects()
native1 = None
native2 = None
try:
native1 = list(yaml.load_all(open(data_filename, 'rb'), Loader=MyLoader))
if len(native1) == 1:
native1 = native1[0]
native2 = _load_code(open(code_filename, 'rb').read())
try:
data1 = list(load_all(file(data_filename, 'rb'), Loader=MyLoader))
if len(data1) == 1:
data1 = data1[0]
data2 = eval(file(code_filename, 'rb').read())
self.failUnlessEqual(type(data1), type(data2))
try:
self.failUnlessEqual(data1, data2)
except (AssertionError, TypeError):
if isinstance(data1, dict):
data1 = [(repr(key), value) for key, value in data1.items()]
data1.sort()
data1 = repr(data1)
data2 = [(repr(key), value) for key, value in data2.items()]
data2.sort()
data2 = repr(data2)
if data1 != data2:
raise
elif isinstance(data1, list):
self.failUnlessEqual(type(data1), type(data2))
self.failUnlessEqual(len(data1), len(data2))
for item1, item2 in zip(data1, data2):
if (item1 != item1 or (item1 == 0.0 and item1 == 1.0)) and \
(item2 != item2 or (item2 == 0.0 and item2 == 1.0)):
continue
if isinstance(item1, datetime.datetime) \
and isinstance(item2, datetime.datetime):
self.failUnlessEqual(item1.microsecond,
item2.microsecond)
if isinstance(item1, datetime.datetime):
item1 = item1.utctimetuple()
if isinstance(item2, datetime.datetime):
item2 = item2.utctimetuple()
self.failUnlessEqual(item1, item2)
else:
raise
except:
print
print "DATA:"
print file(data_filename, 'rb').read()
print "CODE:"
print file(code_filename, 'rb').read()
print "NATIVES1:", data1
print "NATIVES2:", data2
raise
if native1 == native2:
return
except TypeError:
pass
if verbose:
print "SERIALIZED NATIVE1:"
print _serialize_value(native1)
print "SERIALIZED NATIVE2:"
print _serialize_value(native2)
assert _serialize_value(native1) == _serialize_value(native2), (native1, native2)
finally:
if verbose:
print "NATIVE1:"
pprint.pprint(native1)
print "NATIVE2:"
pprint.pprint(native2)
TestConstructorTypes.add_tests('testTypes', '.data', '.code')
test_constructor_types.unittest = ['.data', '.code']
if __name__ == '__main__':
import sys, test_constructor
sys.modules['test_constructor'] = sys.modules['__main__']
import test_appliance
test_appliance.run(globals())

View File

@ -1,91 +1,72 @@
import test_appliance, sys, StringIO
from yaml import *
import yaml
class TestEmitter(test_appliance.TestAppliance):
def _compare_events(events1, events2):
assert len(events1) == len(events2), (events1, events2)
for event1, event2 in zip(events1, events2):
assert event1.__class__ == event2.__class__, (event1, event2)
if isinstance(event1, yaml.NodeEvent):
assert event1.anchor == event2.anchor, (event1, event2)
if isinstance(event1, yaml.CollectionStartEvent):
assert event1.tag == event2.tag, (event1, event2)
if isinstance(event1, yaml.ScalarEvent):
if True not in event1.implicit+event2.implicit:
assert event1.tag == event2.tag, (event1, event2)
assert event1.value == event2.value, (event1, event2)
def _testEmitterOnData(self, test_name, canonical_filename, data_filename):
self._testEmitter(test_name, data_filename)
def test_emitter_on_data(data_filename, canonical_filename, verbose=False):
events = list(yaml.parse(open(data_filename, 'rb')))
output = yaml.emit(events)
if verbose:
print "OUTPUT:"
print output
new_events = list(yaml.parse(output))
_compare_events(events, new_events)
def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename):
self._testEmitter(test_name, canonical_filename, False)
test_emitter_on_data.unittest = ['.data', '.canonical']
def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename):
self._testEmitter(test_name, canonical_filename, True)
def test_emitter_on_canonical(canonical_filename, verbose=False):
events = list(yaml.parse(open(canonical_filename, 'rb')))
for canonical in [False, True]:
output = yaml.emit(events, canonical=canonical)
if verbose:
print "OUTPUT (canonical=%s):" % canonical
print output
new_events = list(yaml.parse(output))
_compare_events(events, new_events)
def _testEmitter(self, test_name, filename, canonical=None):
events = list(parse(file(filename, 'rb')))
#self._dump(filename, events, canonical)
stream = StringIO.StringIO()
emit(events, stream, canonical=canonical)
data = stream.getvalue()
new_events = list(parse(data))
for event, new_event in zip(events, new_events):
self.failUnlessEqual(event.__class__, new_event.__class__)
if isinstance(event, NodeEvent):
self.failUnlessEqual(event.anchor, new_event.anchor)
if isinstance(event, CollectionStartEvent):
self.failUnlessEqual(event.tag, new_event.tag)
if isinstance(event, ScalarEvent):
#self.failUnlessEqual(event.implicit, new_event.implicit)
if True not in event.implicit+new_event.implicit:
self.failUnlessEqual(event.tag, new_event.tag)
self.failUnlessEqual(event.value, new_event.value)
test_emitter_on_canonical.unittest = ['.canonical']
def _testEmitterStyles(self, test_name, canonical_filename, data_filename):
for filename in [canonical_filename, data_filename]:
events = list(parse(file(filename, 'rb')))
for flow_style in [False, True]:
for style in ['|', '>', '"', '\'', '']:
styled_events = []
for event in events:
if isinstance(event, ScalarEvent):
event = ScalarEvent(event.anchor, event.tag,
event.implicit, event.value, style=style)
elif isinstance(event, SequenceStartEvent):
event = SequenceStartEvent(event.anchor, event.tag,
event.implicit, flow_style=flow_style)
elif isinstance(event, MappingStartEvent):
event = MappingStartEvent(event.anchor, event.tag,
event.implicit, flow_style=flow_style)
styled_events.append(event)
stream = StringIO.StringIO()
emit(styled_events, stream)
data = stream.getvalue()
#print data
new_events = list(parse(data))
for event, new_event in zip(events, new_events):
self.failUnlessEqual(event.__class__, new_event.__class__)
if isinstance(event, NodeEvent):
self.failUnlessEqual(event.anchor, new_event.anchor)
if isinstance(event, CollectionStartEvent):
self.failUnlessEqual(event.tag, new_event.tag)
if isinstance(event, ScalarEvent):
#self.failUnlessEqual(event.implicit, new_event.implicit)
if True not in event.implicit+new_event.implicit:
self.failUnlessEqual(event.tag, new_event.tag)
self.failUnlessEqual(event.value, new_event.value)
def test_emitter_styles(data_filename, canonical_filename, verbose=False):
for filename in [data_filename, canonical_filename]:
events = list(yaml.parse(open(filename, 'rb')))
for flow_style in [False, True]:
for style in ['|', '>', '"', '\'', '']:
styled_events = []
for event in events:
if isinstance(event, yaml.ScalarEvent):
event = yaml.ScalarEvent(event.anchor, event.tag,
event.implicit, event.value, style=style)
elif isinstance(event, yaml.SequenceStartEvent):
event = yaml.SequenceStartEvent(event.anchor, event.tag,
event.implicit, flow_style=flow_style)
elif isinstance(event, yaml.MappingStartEvent):
event = yaml.MappingStartEvent(event.anchor, event.tag,
event.implicit, flow_style=flow_style)
styled_events.append(event)
output = yaml.emit(styled_events)
if verbose:
print "OUTPUT (filename=%r, flow_style=%r, style=%r)" % (filename, flow_style, style)
print output
new_events = list(yaml.parse(output))
_compare_events(events, new_events)
test_emitter_styles.unittest = ['.data', '.canonical']
def _dump(self, filename, events, canonical):
print "="*30
print "ORIGINAL DOCUMENT:"
print file(filename, 'rb').read()
print '-'*30
print "EMITTED DOCUMENT:"
emit(events, sys.stdout, canonical=canonical)
TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data')
TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical')
TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical')
TestEmitter.add_tests('testEmitterStyles', '.canonical', '.data')
class EventsLoader(Loader):
class EventsLoader(yaml.Loader):
def construct_event(self, node):
if isinstance(node, ScalarNode):
if isinstance(node, yaml.ScalarNode):
mapping = {}
else:
mapping = self.construct_mapping(node)
@ -104,34 +85,16 @@ class EventsLoader(Loader):
EventsLoader.add_constructor(None, EventsLoader.construct_event)
class TestEmitterEvents(test_appliance.TestAppliance):
def _testEmitterEvents(self, test_name, events_filename):
events = list(load(file(events_filename, 'rb'), Loader=EventsLoader))
#self._dump(events_filename, events)
stream = StringIO.StringIO()
emit(events, stream)
data = stream.getvalue()
new_events = list(parse(data))
self.failUnlessEqual(len(events), len(new_events))
for event, new_event in zip(events, new_events):
self.failUnlessEqual(event.__class__, new_event.__class__)
if isinstance(event, NodeEvent):
self.failUnlessEqual(event.anchor, new_event.anchor)
if isinstance(event, CollectionStartEvent):
self.failUnlessEqual(event.tag, new_event.tag)
if isinstance(event, ScalarEvent):
self.failUnless(event.implicit == new_event.implicit
or event.tag == new_event.tag)
self.failUnlessEqual(event.value, new_event.value)
def _dump(self, events_filename, events):
print "="*30
print "EVENTS:"
print file(events_filename, 'rb').read()
print '-'*30
def test_emitter_events(events_filename, verbose=False):
events = list(yaml.load(open(events_filename, 'rb'), Loader=EventsLoader))
output = yaml.emit(events)
if verbose:
print "OUTPUT:"
emit(events, sys.stdout)
TestEmitterEvents.add_tests('testEmitterEvents', '.events')
print output
new_events = list(yaml.parse(output))
_compare_events(events, new_events)
if __name__ == '__main__':
import test_appliance
test_appliance.run(globals())

View File

@ -1,91 +1,66 @@
import test_appliance
import test_emitter
import yaml, test_emitter
import StringIO
def test_loader_error(error_filename, verbose=False):
try:
list(yaml.load_all(open(error_filename, 'rb')))
except yaml.YAMLError, exc:
if verbose:
print "%s:" % exc.__class__.__name__, exc
else:
raise AssertionError("expected an exception")
from yaml import *
test_loader_error.unittest = ['.loader-error']
class TestErrors(test_appliance.TestAppliance):
def test_loader_error_string(error_filename, verbose=False):
try:
list(yaml.load_all(open(error_filename, 'rb').read()))
except yaml.YAMLError, exc:
if verbose:
print "%s:" % exc.__class__.__name__, exc
else:
raise AssertionError("expected an exception")
def _testLoaderErrors(self, test_name, invalid_filename):
#self._load(invalid_filename)
self.failUnlessRaises(YAMLError, lambda: self._load(invalid_filename))
test_loader_error_string.unittest = ['.loader-error']
def _testLoaderStringErrors(self, test_name, invalid_filename):
#self._load_string(invalid_filename)
self.failUnlessRaises(YAMLError, lambda: self._load_string(invalid_filename))
def test_loader_error_single(error_filename, verbose=False):
try:
yaml.load(open(error_filename, 'rb').read())
except yaml.YAMLError, exc:
if verbose:
print "%s:" % exc.__class__.__name__, exc
else:
raise AssertionError("expected an exception")
def _testLoaderSingleErrors(self, test_name, invalid_filename):
#self._load_single(invalid_filename)
self.failUnlessRaises(YAMLError, lambda: self._load_single(invalid_filename))
test_loader_error_single.unittest = ['.single-loader-error']
def _testEmitterErrors(self, test_name, invalid_filename):
events = list(load(file(invalid_filename, 'rb').read(),
Loader=test_emitter.EventsLoader))
self.failUnlessRaises(YAMLError, lambda: self._emit(events))
def test_emitter_error(error_filename, verbose=False):
events = list(yaml.load(open(error_filename, 'rb'),
Loader=test_emitter.EventsLoader))
try:
yaml.emit(events)
except yaml.YAMLError, exc:
if verbose:
print "%s:" % exc.__class__.__name__, exc
else:
raise AssertionError("expected an exception")
def _testDumperErrors(self, test_name, invalid_filename):
code = file(invalid_filename, 'rb').read()
self.failUnlessRaises(YAMLError, lambda: self._dump(code))
test_emitter_error.unittest = ['.emitter-error']
def _dump(self, code):
try:
exec code
except YAMLError, exc:
#print '.'*70
#print "%s:" % exc.__class__.__name__, exc
raise
def test_dumper_error(error_filename, verbose=False):
code = open(error_filename, 'rb').read()
try:
import yaml, StringIO
exec code
except yaml.YAMLError, exc:
if verbose:
print "%s:" % exc.__class__.__name__, exc
else:
raise AssertionError("expected an exception")
def _emit(self, events):
try:
emit(events)
except YAMLError, exc:
#print '.'*70
#print "%s:" % exc.__class__.__name__, exc
raise
test_dumper_error.unittest = ['.dumper-error']
def _load(self, filename):
try:
return list(load_all(file(filename, 'rb')))
except YAMLError, exc:
#except ScannerError, exc:
#except ParserError, exc:
#except ComposerError, exc:
#except ConstructorError, exc:
#print '.'*70
#print "%s:" % exc.__class__.__name__, exc
raise
def _load_string(self, filename):
try:
return list(load_all(file(filename, 'rb').read()))
except YAMLError, exc:
#except ScannerError, exc:
#except ParserError, exc:
#except ComposerError, exc:
#except ConstructorError, exc:
#print '.'*70
#print "%s:" % filename
#print "%s:" % exc.__class__.__name__, exc
raise
def _load_single(self, filename):
try:
return load(file(filename, 'rb').read())
except YAMLError, exc:
#except ScannerError, exc:
#except ParserError, exc:
#except ComposerError, exc:
#except ConstructorError, exc:
#print '.'*70
#print "%s:" % filename
#print "%s:" % exc.__class__.__name__, exc
raise
TestErrors.add_tests('testLoaderErrors', '.loader-error')
TestErrors.add_tests('testLoaderStringErrors', '.loader-error')
TestErrors.add_tests('testLoaderSingleErrors', '.single-loader-error')
TestErrors.add_tests('testEmitterErrors', '.emitter-error')
TestErrors.add_tests('testDumperErrors', '.dumper-error')
if __name__ == '__main__':
import test_appliance
test_appliance.run(globals())

View File

@ -1,34 +1,32 @@
import test_appliance
import yaml
from yaml.reader import Mark
def test_marks(marks_filename, verbose=False):
inputs = open(marks_filename, 'rb').read().split('---\n')[1:]
for input in inputs:
index = 0
line = 0
column = 0
while input[index] != '*':
if input[index] == '\n':
line += 1
column = 0
else:
column += 1
index += 1
mark = yaml.Mark(marks_filename, index, line, column, unicode(input), index)
snippet = mark.get_snippet(indent=2, max_length=79)
if verbose:
print snippet
assert isinstance(snippet, str), type(snippet)
assert snippet.count('\n') == 1, snippet.count('\n')
data, pointer = snippet.split('\n')
assert len(data) < 82, len(data)
assert data[len(pointer)-1] == '*', data[len(pointer)-1]
class TestMark(test_appliance.TestAppliance):
test_marks.unittest = ['.marks']
def _testMarks(self, test_name, marks_filename):
inputs = file(marks_filename, 'rb').read().split('---\n')[1:]
for input in inputs:
index = 0
line = 0
column = 0
while input[index] != '*':
if input[index] == '\n':
line += 1
column = 0
else:
column += 1
index += 1
mark = Mark(test_name, index, line, column, unicode(input), index)
snippet = mark.get_snippet(indent=2, max_length=79)
#print "INPUT:"
#print input
#print "SNIPPET:"
#print snippet
self.failUnless(isinstance(snippet, str))
self.failUnlessEqual(snippet.count('\n'), 1)
data, pointer = snippet.split('\n')
self.failUnless(len(data) < 82)
self.failUnlessEqual(data[len(pointer)-1], '*')
TestMark.add_tests('testMarks', '.marks')
if __name__ == '__main__':
import test_appliance
test_appliance.run(globals())

View File

@ -1,44 +1,35 @@
import test_appliance
from yaml.reader import Reader, ReaderError
import yaml.reader
import codecs
class TestReaderErrors(test_appliance.TestAppliance):
def _testReaderUnicodeErrors(self, test_name, stream_filename):
for encoding in ['utf-8', 'utf-16-le', 'utf-16-be']:
try:
data = unicode(file(stream_filename, 'rb').read(), encoding)
break
except:
pass
else:
return
#self._load(data)
self.failUnlessRaises(ReaderError,
lambda: self._load(data))
#self._load(codecs.open(stream_filename, encoding=encoding))
self.failUnlessRaises(ReaderError,
lambda: self._load(codecs.open(stream_filename, encoding=encoding)))
def _testReaderStringErrors(self, test_name, stream_filename):
data = file(stream_filename, 'rb').read()
#self._load(data)
self.failUnlessRaises(ReaderError, lambda: self._load(data))
def _testReaderFileErrors(self, test_name, stream_filename):
data = file(stream_filename, 'rb')
#self._load(data)
self.failUnlessRaises(ReaderError, lambda: self._load(data))
def _load(self, data):
stream = Reader(data)
def _run_reader(data, verbose):
try:
stream = yaml.reader.Reader(data)
while stream.peek() != u'\0':
stream.forward()
except yaml.reader.ReaderError, exc:
if verbose:
print exc
else:
raise AssertionError("expected an exception")
TestReaderErrors.add_tests('testReaderUnicodeErrors', '.stream-error')
TestReaderErrors.add_tests('testReaderStringErrors', '.stream-error')
TestReaderErrors.add_tests('testReaderFileErrors', '.stream-error')
def test_stream_error(error_filename, verbose=False):
_run_reader(open(error_filename, 'rb'), verbose)
_run_reader(open(error_filename, 'rb').read(), verbose)
for encoding in ['utf-8', 'utf-16-le', 'utf-16-be']:
try:
data = unicode(open(error_filename, 'rb').read(), encoding)
break
except UnicodeDecodeError:
pass
else:
return
_run_reader(data, verbose)
_run_reader(codecs.open(error_filename, encoding=encoding), verbose)
test_stream_error.unittest = ['.stream-error']
if __name__ == '__main__':
import test_appliance
test_appliance.run(globals())

View File

@ -1,7 +1,5 @@
import test_appliance
from yaml import *
import yaml
class AnInstance:
@ -24,29 +22,29 @@ class AnInstanceWithState(AnInstance):
def __setstate__(self, state):
self.foo, self.bar = state['attributes']
class TestRecursive(test_appliance.TestAppliance):
def _testRecursive(self, test_name, recursive_filename):
exec file(recursive_filename, 'r').read()
value1 = value
output1 = None
value2 = None
output2 = None
try:
output1 = dump(value1)
#print "OUTPUT %s:" % test_name
#print output1
value2 = load(output1)
output2 = dump(value2)
self.failUnlessEqual(output1, output2)
except:
def test_recursive(recursive_filename, verbose=False):
exec open(recursive_filename, 'rb').read()
value1 = value
output1 = None
value2 = None
output2 = None
try:
output1 = yaml.dump(value1)
value2 = yaml.load(output1)
output2 = yaml.dump(value2)
assert output1 == output2, (output1, output2)
finally:
if verbose:
print "VALUE1:", value1
print "VALUE2:", value2
print "OUTPUT1:"
print output1
print "OUTPUT2:"
print output2
raise
TestRecursive.add_tests('testRecursive', '.recursive')
test_recursive.unittest = ['.recursive']
if __name__ == '__main__':
import test_appliance
test_appliance.run(globals())

View File

@ -1,60 +1,42 @@
import test_appliance
from test_constructor import *
import yaml
import test_constructor
import pprint
from yaml import *
class TestRepresenterTypes(test_appliance.TestAppliance):
def _testTypesUnicode(self, test_name, data_filename, code_filename):
return self._testTypes(test_name, data_filename, code_filename, allow_unicode=True)
def _testTypes(self, test_name, data_filename, code_filename, allow_unicode=False):
data1 = eval(file(code_filename, 'rb').read())
data2 = None
output = None
def test_representer_types(code_filename, verbose=False):
test_constructor._make_objects()
for allow_unicode in [False, True]:
native1 = test_constructor._load_code(open(code_filename, 'rb').read())
native2 = None
try:
output = dump(data1, Dumper=MyDumper, allow_unicode=allow_unicode)
data2 = load(output, Loader=MyLoader)
self.failUnlessEqual(type(data1), type(data2))
output = yaml.dump(native1, Dumper=test_constructor.MyDumper,
allow_unicode=allow_unicode)
native2 = yaml.load(output, Loader=test_constructor.MyLoader)
try:
self.failUnlessEqual(data1, data2)
except (AssertionError, TypeError):
if isinstance(data1, dict):
data1 = [(repr(key), value) for key, value in data1.items()]
data1.sort()
data1 = repr(data1)
data2 = [(repr(key), value) for key, value in data2.items()]
data2.sort()
data2 = repr(data2)
if data1 != data2:
raise
elif isinstance(data1, list):
self.failUnlessEqual(type(data1), type(data2))
self.failUnlessEqual(len(data1), len(data2))
for item1, item2 in zip(data1, data2):
if (item1 != item1 or (item1 == 0.0 and item1 == 1.0)) and \
(item2 != item2 or (item2 == 0.0 and item2 == 1.0)):
continue
if isinstance(item1, datetime.datetime) \
and isinstance(item2, datetime.datetime):
self.failUnlessEqual(item1.microsecond,
item2.microsecond)
if isinstance(item1, datetime.datetime):
item1 = item1.utctimetuple()
if isinstance(item2, datetime.datetime):
item2 = item2.utctimetuple()
self.failUnlessEqual(item1, item2)
else:
raise
except:
print
print "OUTPUT:"
print output
print "NATIVES1:", data1
print "NATIVES2:", data2
raise
if native1 == native2:
continue
except TypeError:
pass
value1 = test_constructor._serialize_value(native1)
value2 = test_constructor._serialize_value(native2)
if verbose:
print "SERIALIZED NATIVE1:"
print value1
print "SERIALIZED NATIVE2:"
print value2
assert value1 == value2, (native1, native2)
finally:
if verbose:
print "NATIVE1:"
pprint.pprint(native1)
print "NATIVE2:"
pprint.pprint(native2)
print "OUTPUT:"
print output
TestRepresenterTypes.add_tests('testTypes', '.data', '.code')
TestRepresenterTypes.add_tests('testTypesUnicode', '.data', '.code')
test_representer_types.unittest = ['.code']
if __name__ == '__main__':
import test_appliance
test_appliance.run(globals())

View File

@ -1,83 +1,92 @@
import test_appliance
import yaml
import pprint
from yaml import *
def test_implicit_resolver(data_filename, detect_filename, verbose=False):
correct_tag = None
node = None
try:
correct_tag = open(detect_filename, 'rb').read().strip()
node = yaml.compose(open(data_filename, 'rb'))
assert isinstance(node, yaml.SequenceNode), node
for scalar in node.value:
assert isinstance(scalar, yaml.ScalarNode), scalar
assert scalar.tag == correct_tag, (scalar.tag, correct_tag)
finally:
if verbose:
print "CORRECT TAG:", correct_tag
if hasattr(node, 'value'):
print "CHILDREN:"
pprint.pprint(node.value)
class MyLoader(Loader):
pass
test_implicit_resolver.unittest = ['.data', '.detect']
class MyDumper(Dumper):
pass
def _make_path_loader_and_dumper():
global MyLoader, MyDumper
add_path_resolver(u'!root', [],
Loader=MyLoader, Dumper=MyDumper)
class MyLoader(yaml.Loader):
pass
class MyDumper(yaml.Dumper):
pass
add_path_resolver(u'!root/scalar', [], str,
Loader=MyLoader, Dumper=MyDumper)
yaml.add_path_resolver(u'!root', [],
Loader=MyLoader, Dumper=MyDumper)
yaml.add_path_resolver(u'!root/scalar', [], str,
Loader=MyLoader, Dumper=MyDumper)
yaml.add_path_resolver(u'!root/key11/key12/*', ['key11', 'key12'],
Loader=MyLoader, Dumper=MyDumper)
yaml.add_path_resolver(u'!root/key21/1/*', ['key21', 1],
Loader=MyLoader, Dumper=MyDumper)
yaml.add_path_resolver(u'!root/key31/*/*/key14/map', ['key31', None, None, 'key14'], dict,
Loader=MyLoader, Dumper=MyDumper)
add_path_resolver(u'!root/key11/key12/*', ['key11', 'key12'],
Loader=MyLoader, Dumper=MyDumper)
return MyLoader, MyDumper
add_path_resolver(u'!root/key21/1/*', ['key21', 1],
Loader=MyLoader, Dumper=MyDumper)
def _convert_node(node):
if isinstance(node, yaml.ScalarNode):
return (node.tag, node.value)
elif isinstance(node, yaml.SequenceNode):
value = []
for item in node.value:
value.append(_convert_node(item))
return (node.tag, value)
elif isinstance(node, yaml.MappingNode):
value = []
for key, item in node.value:
value.append((_convert_node(key), _convert_node(item)))
return (node.tag, value)
add_path_resolver(u'!root/key31/*/*/key14/map', ['key31', None, None, 'key14'], dict,
Loader=MyLoader, Dumper=MyDumper)
class TestResolver(test_appliance.TestAppliance):
def _testImplicitResolver(self, test_name, data_filename, detect_filename):
node = None
correct_tag = None
try:
correct_tag = file(detect_filename, 'rb').read().strip()
node = compose(file(data_filename, 'rb'))
self.failUnless(isinstance(node, SequenceNode))
for scalar in node.value:
self.failUnless(isinstance(scalar, ScalarNode))
self.failUnlessEqual(scalar.tag, correct_tag)
except:
print
print "DATA:"
print file(data_filename, 'rb').read()
print "CORRECT_TAG:"
print file(detect_filename, 'rb').read()
print "ROOT NODE:", node
print "SCALAR NODES:", node.value
raise
def _testPathResolverLoader(self, test_name, data_filename, path_filename):
#print serialize_all(compose_all(file(data_filename, 'rb').read(), Loader=MyLoader))
nodes1 = compose_all(file(data_filename, 'rb').read(), Loader=MyLoader)
nodes2 = compose_all(file(path_filename, 'rb').read())
def test_path_resolver_loader(data_filename, path_filename, verbose=False):
_make_path_loader_and_dumper()
nodes1 = list(yaml.compose_all(open(data_filename, 'rb').read(), Loader=MyLoader))
nodes2 = list(yaml.compose_all(open(path_filename, 'rb').read()))
try:
for node1, node2 in zip(nodes1, nodes2):
self.failUnlessEqual(self._convert(node1), self._convert(node2))
data1 = _convert_node(node1)
data2 = _convert_node(node2)
assert data1 == data2, (data1, data2)
finally:
if verbose:
print yaml.serialize_all(nodes1)
def _testPathResolverDumper(self, test_name, data_filename, path_filename):
for filename in [data_filename, path_filename]:
output = serialize_all(compose_all(file(filename, 'rb').read()), Dumper=MyDumper)
#print output
nodes1 = compose_all(output)
nodes2 = compose_all(file(data_filename, 'rb').read())
for node1, node2 in zip(nodes1, nodes2):
self.failUnlessEqual(self._convert(node1), self._convert(node2))
test_path_resolver_loader.unittest = ['.data', '.path']
def _convert(self, node):
if isinstance(node, ScalarNode):
return node.tag, node.value
elif isinstance(node, SequenceNode):
value = []
for item in node.value:
value.append(self._convert(item))
return node.tag, value
elif isinstance(node, MappingNode):
value = []
for key, item in node.value:
value.append((self._convert(key), self._convert(item)))
value.sort()
return node.tag, value
def test_path_resolver_dumper(data_filename, path_filename, verbose=False):
_make_path_loader_and_dumper()
for filename in [data_filename, path_filename]:
output = yaml.serialize_all(yaml.compose_all(open(filename, 'rb')), Dumper=MyDumper)
if verbose:
print output
nodes1 = yaml.compose_all(output)
nodes2 = yaml.compose_all(open(data_filename, 'rb'))
for node1, node2 in zip(nodes1, nodes2):
data1 = _convert_node(node1)
data2 = _convert_node(node2)
assert data1 == data2, (data1, data2)
TestResolver.add_tests('testImplicitResolver', '.data', '.detect')
TestResolver.add_tests('testPathResolverLoader', '.data', '.path')
TestResolver.add_tests('testPathResolverDumper', '.data', '.path')
test_path_resolver_dumper.unittest = ['.data', '.path']
if __name__ == '__main__':
import test_appliance
test_appliance.run(globals())

View File

@ -1,228 +1,187 @@
import test_appliance
import yaml, canonical
import pprint
from yaml import *
class TestStructure(test_appliance.TestAppliance):
def _testStructure(self, test_name, data_filename, structure_filename):
node1 = None
node2 = eval(file(structure_filename, 'rb').read())
try:
loader = Loader(file(data_filename, 'rb'))
node1 = []
while not loader.check_event(StreamEndEvent):
if not loader.check_event(StreamStartEvent, DocumentStartEvent, DocumentEndEvent):
node1.append(self._convert(loader))
else:
loader.get_event()
loader.get_event()
if len(node1) == 1:
node1 = node1[0]
self.failUnlessEqual(node1, node2)
except:
print
print "DATA:"
print file(data_filename, 'rb').read()
print "NODE1:", node1
print "NODE2:", node2
raise
def _convert(self, loader):
if loader.check_event(ScalarEvent):
event = loader.get_event()
if event.tag or event.anchor or event.value:
return True
else:
return None
elif loader.check_event(SequenceStartEvent):
loader.get_event()
sequence = []
while not loader.check_event(SequenceEndEvent):
sequence.append(self._convert(loader))
loader.get_event()
return sequence
elif loader.check_event(MappingStartEvent):
loader.get_event()
mapping = []
while not loader.check_event(MappingEndEvent):
key = self._convert(loader)
value = self._convert(loader)
mapping.append((key, value))
loader.get_event()
return mapping
elif loader.check_event(AliasEvent):
loader.get_event()
return '*'
def _convert_structure(loader):
if loader.check_event(yaml.ScalarEvent):
event = loader.get_event()
if event.tag or event.anchor or event.value:
return True
else:
loader.get_event()
return '?'
return None
elif loader.check_event(yaml.SequenceStartEvent):
loader.get_event()
sequence = []
while not loader.check_event(yaml.SequenceEndEvent):
sequence.append(_convert_structure(loader))
loader.get_event()
return sequence
elif loader.check_event(yaml.MappingStartEvent):
loader.get_event()
mapping = []
while not loader.check_event(yaml.MappingEndEvent):
key = _convert_structure(loader)
value = _convert_structure(loader)
mapping.append((key, value))
loader.get_event()
return mapping
elif loader.check_event(yaml.AliasEvent):
loader.get_event()
return '*'
else:
loader.get_event()
return '?'
TestStructure.add_tests('testStructure', '.data', '.structure')
def test_structure(data_filename, structure_filename, verbose=False):
nodes1 = []
nodes2 = eval(open(structure_filename, 'rb').read())
try:
loader = yaml.Loader(open(data_filename, 'rb'))
while loader.check_event():
if loader.check_event(yaml.StreamStartEvent, yaml.StreamEndEvent,
yaml.DocumentStartEvent, yaml.DocumentEndEvent):
loader.get_event()
continue
nodes1.append(_convert_structure(loader))
if len(nodes1) == 1:
nodes1 = nodes1[0]
assert nodes1 == nodes2, (nodes1, nodes2)
finally:
if verbose:
print "NODES1:"
pprint.pprint(nodes1)
print "NODES2:"
pprint.pprint(nodes2)
class TestParser(test_appliance.TestAppliance):
test_structure.unittest = ['.data', '.structure']
def _testParser(self, test_name, data_filename, canonical_filename):
events1 = None
events2 = None
try:
events1 = list(parse(file(data_filename, 'rb')))
events2 = list(test_appliance.canonical_parse(file(canonical_filename, 'rb')))
self._compare(events1, events2)
except:
print
print "DATA1:"
print file(data_filename, 'rb').read()
print "DATA2:"
print file(canonical_filename, 'rb').read()
print "EVENTS1:", events1
print "EVENTS2:", events2
raise
def _compare_events(events1, events2, full=False):
assert len(events1) == len(events2), (len(events1), len(events2))
for event1, event2 in zip(events1, events2):
assert event1.__class__ == event2.__class__, (event1, event2)
if isinstance(event1, yaml.AliasEvent) and full:
assert event1.anchor == event2.anchor, (event1, event2)
if isinstance(event1, (yaml.ScalarEvent, yaml.CollectionStartEvent)):
if (event1.tag not in [None, u'!'] and event2.tag not in [None, u'!']) or full:
assert event1.tag == event2.tag, (event1, event2)
if isinstance(event1, yaml.ScalarEvent):
assert event1.value == event2.value, (event1, event2)
def _compare(self, events1, events2):
self.failUnlessEqual(len(events1), len(events2))
for event1, event2 in zip(events1, events2):
self.failUnlessEqual(event1.__class__, event2.__class__)
if isinstance(event1, AliasEvent):
#self.failUnlessEqual(event1.name, event2.name)
pass
elif isinstance(event1, ScalarEvent):
#self.failUnlessEqual(event1.anchor, event2.anchor)
#self.failUnlessEqual(event1.tag, event2.tag)
self.failUnlessEqual(event1.value, event2.value)
if isinstance(event1, CollectionStartEvent):
#self.failUnlessEqual(event1.anchor, event2.anchor)
#self.failUnlessEqual(event1.tag, event2.tag)
pass
def test_parser(data_filename, canonical_filename, verbose=False):
events1 = None
events2 = None
try:
events1 = list(yaml.parse(open(data_filename, 'rb')))
events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb')))
_compare_events(events1, events2)
finally:
if verbose:
print "EVENTS1:"
pprint.pprint(events1)
print "EVENTS2:"
pprint.pprint(events2)
TestParser.add_tests('testParser', '.data', '.canonical')
test_parser.unittest = ['.data', '.canonical']
class TestResolver(test_appliance.TestAppliance):
def test_parser_on_canonical(canonical_filename, verbose=False):
events1 = None
events2 = None
try:
events1 = list(yaml.parse(open(canonical_filename, 'rb')))
events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb')))
_compare_events(events1, events2, full=True)
finally:
if verbose:
print "EVENTS1:"
pprint.pprint(events1)
print "EVENTS2:"
pprint.pprint(events2)
def _testResolver(self, test_name, data_filename, canonical_filename):
nodes1 = None
nodes2 = None
try:
nodes1 = list(compose_all(file(data_filename, 'rb')))
nodes2 = list(test_appliance.canonical_compose_all(file(canonical_filename, 'rb')))
self.failUnlessEqual(len(nodes1), len(nodes2))
for node1, node2 in zip(nodes1, nodes2):
self._compare(node1, node2)
except:
print
print "DATA1:"
print file(data_filename, 'rb').read()
print "DATA2:"
print file(canonical_filename, 'rb').read()
print "NODES1:", nodes1
print "NODES2:", nodes2
raise
test_parser_on_canonical.unittest = ['.canonical']
def _compare(self, node1, node2):
self.failUnlessEqual(node1.__class__, node2.__class__)
if isinstance(node1, ScalarNode):
#self.failUnlessEqual(node1.tag, node2.tag)
self.failUnlessEqual(node1.value, node2.value)
elif isinstance(node1, SequenceNode):
self.failUnlessEqual(len(node1.value), len(node2.value))
for item1, item2 in zip(node1.value, node2.value):
self._compare(item1, item2)
elif isinstance(node1, MappingNode):
self.failUnlessEqual(len(node1.value), len(node2.value))
items1 = node1.value.items()
items1.sort(lambda (k1,v1), (k2,v2): cmp((k1.tag,k1.value,v1.tag,v1.value),
(k2.tag,k2.value,v2.tag,v2.value)))
items2 = node2.value.items()
items2.sort(lambda (k1,v1), (k2,v2): cmp((k1.tag,k1.value,v1.tag,v1.value),
(k2.tag,k2.value,v2.tag,v2.value)))
for (key1, value1), (key2, value2) in zip(items1, items2):
self._compare(key1, key2)
self._compare(value1, value2)
def _compare_nodes(node1, node2):
assert node1.__class__ == node2.__class__, (node1, node2)
assert node1.tag == node2.tag, (node1, node2)
if isinstance(node1, yaml.ScalarNode):
assert node1.value == node2.value, (node1, node2)
else:
assert len(node1.value) == len(node2.value), (node1, node2)
for item1, item2 in zip(node1.value, node2.value):
if not isinstance(item1, tuple):
item1 = (item1,)
item2 = (item2,)
for subnode1, subnode2 in zip(item1, item2):
_compare_nodes(subnode1, subnode2)
TestResolver.add_tests('testResolver', '.data', '.canonical')
def test_composer(data_filename, canonical_filename, verbose=False):
nodes1 = None
nodes2 = None
try:
nodes1 = list(yaml.compose_all(open(data_filename, 'rb')))
nodes2 = list(yaml.canonical_compose_all(open(canonical_filename, 'rb')))
assert len(nodes1) == len(nodes2), (len(nodes1), len(nodes2))
for node1, node2 in zip(nodes1, nodes2):
_compare_nodes(node1, node2)
finally:
if verbose:
print "NODES1:"
pprint.pprint(nodes1)
print "NODES2:"
pprint.pprint(nodes2)
class MyLoader(Loader):
def construct_sequence(self, node):
return tuple(Loader.construct_sequence(self, node))
test_composer.unittest = ['.data', '.canonical']
def construct_mapping(self, node):
pairs = self.construct_pairs(node)
pairs.sort()
return pairs
def _make_loader():
global MyLoader
def construct_undefined(self, node):
return self.construct_scalar(node)
class MyLoader(yaml.Loader):
def construct_sequence(self, node):
return tuple(yaml.Loader.construct_sequence(self, node))
def construct_mapping(self, node):
pairs = self.construct_pairs(node)
pairs.sort()
return pairs
def construct_undefined(self, node):
return self.construct_scalar(node)
MyLoader.add_constructor(u'tag:yaml.org,2002:map', MyLoader.construct_mapping)
MyLoader.add_constructor(None, MyLoader.construct_undefined)
MyLoader.add_constructor(u'tag:yaml.org,2002:map', MyLoader.construct_mapping)
MyLoader.add_constructor(None, MyLoader.construct_undefined)
class MyCanonicalLoader(test_appliance.CanonicalLoader):
def _make_canonical_loader():
global MyCanonicalLoader
def construct_sequence(self, node):
return tuple(test_appliance.CanonicalLoader.construct_sequence(self, node))
class MyCanonicalLoader(yaml.CanonicalLoader):
def construct_sequence(self, node):
return tuple(yaml.CanonicalLoader.construct_sequence(self, node))
def construct_mapping(self, node):
pairs = self.construct_pairs(node)
pairs.sort()
return pairs
def construct_undefined(self, node):
return self.construct_scalar(node)
def construct_mapping(self, node):
pairs = self.construct_pairs(node)
pairs.sort()
return pairs
MyCanonicalLoader.add_constructor(u'tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping)
MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined)
def construct_undefined(self, node):
return self.construct_scalar(node)
def test_constructor(data_filename, canonical_filename, verbose=False):
_make_loader()
_make_canonical_loader()
native1 = None
native2 = None
try:
native1 = list(yaml.load_all(open(data_filename, 'rb'), Loader=MyLoader))
native2 = list(yaml.load_all(open(canonical_filename, 'rb'), Loader=MyCanonicalLoader))
assert native1 == native2, (native1, native2)
finally:
if verbose:
print "NATIVE1:"
pprint.pprint(native1)
print "NATIVE2:"
pprint.pprint(native2)
MyCanonicalLoader.add_constructor(u'tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping)
MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined)
test_constructor.unittest = ['.data', '.canonical']
class TestConstructor(test_appliance.TestAppliance):
def _testConstructor(self, test_name, data_filename, canonical_filename):
data1 = None
data2 = None
try:
data1 = list(load_all(file(data_filename, 'rb'), Loader=MyLoader))
data2 = list(load_all(file(canonical_filename, 'rb'), Loader=MyCanonicalLoader))
self.failUnlessEqual(data1, data2)
except:
print
print "DATA1:"
print file(data_filename, 'rb').read()
print "DATA2:"
print file(canonical_filename, 'rb').read()
print "NATIVES1:", data1
print "NATIVES2:", data2
raise
TestConstructor.add_tests('testConstructor', '.data', '.canonical')
class TestParserOnCanonical(test_appliance.TestAppliance):
def _testParserOnCanonical(self, test_name, canonical_filename):
events1 = None
events2 = None
try:
events1 = list(parse(file(canonical_filename, 'rb')))
events2 = list(test_appliance.canonical_parse(file(canonical_filename, 'rb')))
self._compare(events1, events2)
except:
print
print "DATA:"
print file(canonical_filename, 'rb').read()
print "EVENTS1:", events1
print "EVENTS2:", events2
raise
def _compare(self, events1, events2):
self.failUnlessEqual(len(events1), len(events2))
for event1, event2 in zip(events1, events2):
self.failUnlessEqual(event1.__class__, event2.__class__)
if isinstance(event1, AliasEvent):
self.failUnlessEqual(event1.anchor, event2.anchor)
elif isinstance(event1, ScalarEvent):
self.failUnlessEqual(event1.anchor, event2.anchor)
self.failUnlessEqual(event1.tag, event2.tag)
self.failUnlessEqual(event1.value, event2.value)
if isinstance(event1, CollectionStartEvent):
self.failUnlessEqual(event1.anchor, event2.anchor)
self.failUnlessEqual(event1.tag, event2.tag)
TestParserOnCanonical.add_tests('testParserOnCanonical', '.canonical')
if __name__ == '__main__':
import test_appliance
test_appliance.run(globals())

View File

@ -1,30 +0,0 @@
import test_appliance
class TestSyck(test_appliance.TestAppliance):
def _testSyckOnTokenTests(self, test_name, data_filename, tokens_filename):
try:
syck.parse(file(data_filename, 'rb'))
except:
print
print "DATA:"
print file(data_filename, 'rb').read()
raise
def _testSyckOnCanonicalTests(self, test_name, data_filename, canonical_filename):
try:
syck.parse(file(data_filename, 'rb'))
except:
print
print "DATA:"
print file(data_filename, 'rb').read()
raise
try:
import syck
#TestSyck.add_tests('testSyckOnTokenTests', '.data', '.tokens')
#TestSyck.add_tests('testSyckOnCanonicalTests', '.data', '.canonical')
except ImportError:
pass

View File

@ -1,86 +1,77 @@
import test_appliance
import yaml
import pprint
from yaml import *
# Tokens mnemonic:
# directive: %
# document_start: ---
# document_end: ...
# alias: *
# anchor: &
# tag: !
# scalar _
# block_sequence_start: [[
# block_mapping_start: {{
# block_end: ]}
# flow_sequence_start: [
# flow_sequence_end: ]
# flow_mapping_start: {
# flow_mapping_end: }
# entry: ,
# key: ?
# value: :
class TestTokens(test_appliance.TestAppliance):
_replaces = {
yaml.DirectiveToken: '%',
yaml.DocumentStartToken: '---',
yaml.DocumentEndToken: '...',
yaml.AliasToken: '*',
yaml.AnchorToken: '&',
yaml.TagToken: '!',
yaml.ScalarToken: '_',
yaml.BlockSequenceStartToken: '[[',
yaml.BlockMappingStartToken: '{{',
yaml.BlockEndToken: ']}',
yaml.FlowSequenceStartToken: '[',
yaml.FlowSequenceEndToken: ']',
yaml.FlowMappingStartToken: '{',
yaml.FlowMappingEndToken: '}',
yaml.BlockEntryToken: ',',
yaml.FlowEntryToken: ',',
yaml.KeyToken: '?',
yaml.ValueToken: ':',
}
# Tokens mnemonic:
# directive: %
# document_start: ---
# document_end: ...
# alias: *
# anchor: &
# tag: !
# scalar _
# block_sequence_start: [[
# block_mapping_start: {{
# block_end: ]}
# flow_sequence_start: [
# flow_sequence_end: ]
# flow_mapping_start: {
# flow_mapping_end: }
# entry: ,
# key: ?
# value: :
def test_tokens(data_filename, tokens_filename, verbose=False):
tokens1 = []
tokens2 = open(tokens_filename, 'rb').read().split()
try:
for token in yaml.scan(open(data_filename, 'rb')):
if not isinstance(token, (yaml.StreamStartToken, yaml.StreamEndToken)):
tokens1.append(_replaces[token.__class__])
finally:
if verbose:
print "TOKENS1:", ' '.join(tokens1)
print "TOKENS2:", ' '.join(tokens2)
assert len(tokens1) == len(tokens2), (tokens1, tokens2)
for token1, token2 in zip(tokens1, tokens2):
assert token1 == token2, (token1, token2)
replaces = {
DirectiveToken: '%',
DocumentStartToken: '---',
DocumentEndToken: '...',
AliasToken: '*',
AnchorToken: '&',
TagToken: '!',
ScalarToken: '_',
BlockSequenceStartToken: '[[',
BlockMappingStartToken: '{{',
BlockEndToken: ']}',
FlowSequenceStartToken: '[',
FlowSequenceEndToken: ']',
FlowMappingStartToken: '{',
FlowMappingEndToken: '}',
BlockEntryToken: ',',
FlowEntryToken: ',',
KeyToken: '?',
ValueToken: ':',
}
test_tokens.unittest = ['.data', '.tokens']
def _testTokens(self, test_name, data_filename, tokens_filename):
tokens1 = None
tokens2 = file(tokens_filename, 'rb').read().split()
def test_scanner(data_filename, canonical_filename, verbose=False):
for filename in [data_filename, canonical_filename]:
tokens = []
try:
tokens1 = []
for token in scan(file(data_filename, 'rb')):
if not isinstance(token, (StreamStartToken, StreamEndToken)):
tokens1.append(token)
tokens1 = [self.replaces[t.__class__] for t in tokens1]
self.failUnlessEqual(tokens1, tokens2)
except:
print
print "DATA:"
print file(data_filename, 'rb').read()
print "TOKENS1:", tokens1
print "TOKENS2:", tokens2
raise
for token in yaml.scan(open(filename, 'rb')):
tokens.append(token.__class__.__name__)
finally:
if verbose:
pprint.pprint(tokens)
TestTokens.add_tests('testTokens', '.data', '.tokens')
test_scanner.unittest = ['.data', '.canonical']
class TestScanner(test_appliance.TestAppliance):
def _testScanner(self, test_name, data_filename, canonical_filename):
for filename in [canonical_filename, data_filename]:
tokens = None
try:
tokens = []
for token in scan(file(filename, 'rb')):
if not isinstance(token, (StreamStartToken, StreamEndToken)):
tokens.append(token.__class__.__name__)
except:
print
print "DATA:"
print file(data_filename, 'rb').read()
print "TOKENS:", tokens
raise
TestScanner.add_tests('testScanner', '.data', '.canonical')
if __name__ == '__main__':
import test_appliance
test_appliance.run(globals())

View File

@ -1,6 +1,4 @@
import unittest
from test_mark import *
from test_reader import *
from test_canonical import *
@ -12,11 +10,8 @@ from test_constructor import *
from test_emitter import *
from test_representer import *
from test_recursive import *
from test_syck import *
def main(module='__main__'):
unittest.main(module)
if __name__ == '__main__':
main()
import test_appliance
test_appliance.run(globals())

View File

@ -1,195 +1,273 @@
import unittest, test_appliance
import _yaml, yaml
import types, pprint
test_appliance.TestAppliance.SKIP_EXT = '.skip-ext'
yaml.PyBaseLoader = yaml.BaseLoader
yaml.PySafeLoader = yaml.SafeLoader
yaml.PyLoader = yaml.Loader
yaml.PyBaseDumper = yaml.BaseDumper
yaml.PySafeDumper = yaml.SafeDumper
yaml.PyDumper = yaml.Dumper
class TestCVersion(unittest.TestCase):
def testCVersion(self):
self.failUnlessEqual("%s.%s.%s" % _yaml.get_version(), _yaml.get_version_string())
class TestCLoader(test_appliance.TestAppliance):
def _testCScannerFileInput(self, test_name, data_filename, canonical_filename):
self._testCScanner(test_name, data_filename, canonical_filename, True)
def _testCScanner(self, test_name, data_filename, canonical_filename, file_input=False, Loader=yaml.Loader):
if file_input:
data = file(data_filename, 'r')
else:
data = file(data_filename, 'r').read()
tokens = list(yaml.scan(data, Loader=Loader))
ext_tokens = []
try:
if file_input:
data = file(data_filename, 'r')
for token in yaml.scan(data, Loader=yaml.CLoader):
ext_tokens.append(token)
self.failUnlessEqual(len(tokens), len(ext_tokens))
for token, ext_token in zip(tokens, ext_tokens):
self.failUnlessEqual(token.__class__, ext_token.__class__)
if not isinstance(token, yaml.StreamEndToken):
self.failUnlessEqual((token.start_mark.index, token.start_mark.line, token.start_mark.column),
(ext_token.start_mark.index, ext_token.start_mark.line, ext_token.start_mark.column))
self.failUnlessEqual((token.end_mark.index, token.end_mark.line, token.end_mark.column),
(ext_token.end_mark.index, ext_token.end_mark.line, ext_token.end_mark.column))
if hasattr(token, 'value'):
self.failUnlessEqual(token.value, ext_token.value)
except:
print
print "DATA:"
print file(data_filename, 'rb').read()
print "TOKENS:", tokens
print "EXT_TOKENS:", ext_tokens
raise
def _testCParser(self, test_name, data_filename, canonical_filename, Loader=yaml.Loader):
data = file(data_filename, 'r').read()
events = list(yaml.parse(data, Loader=Loader))
ext_events = []
try:
for event in yaml.parse(data, Loader=yaml.CLoader):
ext_events.append(event)
#print "EVENT:", event
self.failUnlessEqual(len(events), len(ext_events))
for event, ext_event in zip(events, ext_events):
self.failUnlessEqual(event.__class__, ext_event.__class__)
if hasattr(event, 'anchor'):
self.failUnlessEqual(event.anchor, ext_event.anchor)
if hasattr(event, 'tag'):
self.failUnlessEqual(event.tag, ext_event.tag)
if hasattr(event, 'implicit'):
self.failUnlessEqual(event.implicit, ext_event.implicit)
if hasattr(event, 'value'):
self.failUnlessEqual(event.value, ext_event.value)
if hasattr(event, 'explicit'):
self.failUnlessEqual(event.explicit, ext_event.explicit)
if hasattr(event, 'version'):
self.failUnlessEqual(event.version, ext_event.version)
if hasattr(event, 'tags'):
self.failUnlessEqual(event.tags, ext_event.tags)
except:
print
print "DATA:"
print file(data_filename, 'rb').read()
print "EVENTS:", events
print "EXT_EVENTS:", ext_events
raise
TestCLoader.add_tests('testCScanner', '.data', '.canonical')
TestCLoader.add_tests('testCScannerFileInput', '.data', '.canonical')
TestCLoader.add_tests('testCParser', '.data', '.canonical')
class TestCEmitter(test_appliance.TestAppliance):
def _testCEmitter(self, test_name, data_filename, canonical_filename, Loader=yaml.Loader):
data1 = file(data_filename, 'r').read()
events = list(yaml.parse(data1, Loader=Loader))
data2 = yaml.emit(events, Dumper=yaml.CDumper)
ext_events = []
try:
for event in yaml.parse(data2):
ext_events.append(event)
self.failUnlessEqual(len(events), len(ext_events))
for event, ext_event in zip(events, ext_events):
self.failUnlessEqual(event.__class__, ext_event.__class__)
if hasattr(event, 'anchor'):
self.failUnlessEqual(event.anchor, ext_event.anchor)
if hasattr(event, 'tag'):
if not (event.tag in ['!', None] and ext_event.tag in ['!', None]):
self.failUnlessEqual(event.tag, ext_event.tag)
if hasattr(event, 'implicit'):
self.failUnlessEqual(event.implicit, ext_event.implicit)
if hasattr(event, 'value'):
self.failUnlessEqual(event.value, ext_event.value)
if hasattr(event, 'explicit') and event.explicit:
self.failUnlessEqual(event.explicit, ext_event.explicit)
if hasattr(event, 'version'):
self.failUnlessEqual(event.version, ext_event.version)
if hasattr(event, 'tags'):
self.failUnlessEqual(event.tags, ext_event.tags)
except:
print
print "DATA1:"
print data1
print "DATA2:"
print data2
print "EVENTS:", events
print "EXT_EVENTS:", ext_events
raise
TestCEmitter.add_tests('testCEmitter', '.data', '.canonical')
yaml.BaseLoader = yaml.CBaseLoader
yaml.SafeLoader = yaml.CSafeLoader
yaml.Loader = yaml.CLoader
yaml.BaseDumper = yaml.CBaseDumper
yaml.SafeDumper = yaml.CSafeDumper
yaml.Dumper = yaml.CDumper
old_scan = yaml.scan
def scan(stream, Loader=yaml.CLoader):
def new_scan(stream, Loader=yaml.CLoader):
return old_scan(stream, Loader)
yaml.scan = scan
old_parse = yaml.parse
def parse(stream, Loader=yaml.CLoader):
def new_parse(stream, Loader=yaml.CLoader):
return old_parse(stream, Loader)
yaml.parse = parse
old_compose = yaml.compose
def compose(stream, Loader=yaml.CLoader):
def new_compose(stream, Loader=yaml.CLoader):
return old_compose(stream, Loader)
yaml.compose = compose
old_compose_all = yaml.compose_all
def compose_all(stream, Loader=yaml.CLoader):
def new_compose_all(stream, Loader=yaml.CLoader):
return old_compose_all(stream, Loader)
yaml.compose_all = compose_all
old_load_all = yaml.load_all
def load_all(stream, Loader=yaml.CLoader):
return old_load_all(stream, Loader)
yaml.load_all = load_all
old_load = yaml.load
def load(stream, Loader=yaml.CLoader):
def new_load(stream, Loader=yaml.CLoader):
return old_load(stream, Loader)
yaml.load = load
def safe_load_all(stream):
return yaml.load_all(stream, yaml.CSafeLoader)
yaml.safe_load_all = safe_load_all
def safe_load(stream):
return yaml.load(stream, yaml.CSafeLoader)
yaml.safe_load = safe_load
old_load_all = yaml.load_all
def new_load_all(stream, Loader=yaml.CLoader):
return old_load_all(stream, Loader)
old_safe_load = yaml.safe_load
def new_safe_load(stream):
return old_load(stream, yaml.CSafeLoader)
old_safe_load_all = yaml.safe_load_all
def new_safe_load_all(stream):
return old_load_all(stream, yaml.CSafeLoader)
old_emit = yaml.emit
def emit(events, stream=None, Dumper=yaml.CDumper, **kwds):
def new_emit(events, stream=None, Dumper=yaml.CDumper, **kwds):
return old_emit(events, stream, Dumper, **kwds)
yaml.emit = emit
old_serialize_all = yaml.serialize_all
def serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds):
return old_serialize_all(nodes, stream, Dumper, **kwds)
yaml.serialize_all = serialize_all
old_serialize = yaml.serialize
def serialize(node, stream, Dumper=yaml.CDumper, **kwds):
def new_serialize(node, stream, Dumper=yaml.CDumper, **kwds):
return old_serialize(node, stream, Dumper, **kwds)
yaml.serialize = serialize
old_dump_all = yaml.dump_all
def dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds):
return old_dump_all(documents, stream, Dumper, **kwds)
yaml.dump_all = dump_all
old_serialize_all = yaml.serialize_all
def new_serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds):
return old_serialize_all(nodes, stream, Dumper, **kwds)
old_dump = yaml.dump
def dump(data, stream=None, Dumper=yaml.CDumper, **kwds):
def new_dump(data, stream=None, Dumper=yaml.CDumper, **kwds):
return old_dump(data, stream, Dumper, **kwds)
yaml.dump = dump
def safe_dump_all(documents, stream=None, **kwds):
return yaml.dump_all(documents, stream, yaml.CSafeDumper, **kwds)
yaml.safe_dump_all = safe_dump_all
def safe_dump(data, stream=None, **kwds):
return yaml.dump(data, stream, yaml.CSafeDumper, **kwds)
yaml.safe_dump = safe_dump
from test_yaml import *
old_dump_all = yaml.dump_all
def new_dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds):
return old_dump_all(documents, stream, Dumper, **kwds)
def main(module='__main__'):
unittest.main(module)
old_safe_dump = yaml.safe_dump
def new_safe_dump(data, stream=None, **kwds):
return old_dump(data, stream, yaml.CSafeDumper, **kwds)
old_safe_dump_all = yaml.safe_dump_all
def new_safe_dump_all(documents, stream=None, **kwds):
return old_dump_all(documents, stream, yaml.CSafeDumper, **kwds)
def _set_up():
yaml.BaseLoader = yaml.CBaseLoader
yaml.SafeLoader = yaml.CSafeLoader
yaml.Loader = yaml.CLoader
yaml.BaseDumper = yaml.CBaseDumper
yaml.SafeDumper = yaml.CSafeDumper
yaml.Dumper = yaml.CDumper
yaml.scan = new_scan
yaml.parse = new_parse
yaml.compose = new_compose
yaml.compose_all = new_compose_all
yaml.load = new_load
yaml.load_all = new_load_all
yaml.safe_load = new_safe_load
yaml.safe_load_all = new_safe_load_all
yaml.emit = new_emit
yaml.serialize = new_serialize
yaml.serialize_all = new_serialize_all
yaml.dump = new_dump
yaml.dump_all = new_dump_all
yaml.safe_dump = new_safe_dump
yaml.safe_dump_all = new_safe_dump_all
def _tear_down():
yaml.BaseLoader = yaml.PyBaseLoader
yaml.SafeLoader = yaml.PySafeLoader
yaml.Loader = yaml.PyLoader
yaml.BaseDumper = yaml.PyBaseDumper
yaml.SafeDumper = yaml.PySafeDumper
yaml.Dumper = yaml.PyDumper
yaml.scan = old_scan
yaml.parse = old_parse
yaml.compose = old_compose
yaml.compose_all = old_compose_all
yaml.load = old_load
yaml.load_all = old_load_all
yaml.safe_load = old_safe_load
yaml.safe_load_all = old_safe_load_all
yaml.emit = old_emit
yaml.serialize = old_serialize
yaml.serialize_all = old_serialize_all
yaml.dump = old_dump
yaml.dump_all = old_dump_all
yaml.safe_dump = old_safe_dump
yaml.safe_dump_all = old_safe_dump_all
def test_c_version(verbose=False):
if verbose:
print _yaml.get_version()
print _yaml.get_version_string()
assert ("%s.%s.%s" % _yaml.get_version()) == _yaml.get_version_string(), \
(_yaml.get_version(), _yaml.get_version_string())
def _compare_scanners(py_data, c_data, verbose):
py_tokens = list(yaml.scan(py_data, Loader=yaml.PyLoader))
c_tokens = []
try:
for token in yaml.scan(c_data, Loader=yaml.CLoader):
c_tokens.append(token)
assert len(py_tokens) == len(c_tokens), (len(py_tokens), len(c_tokens))
for py_token, c_token in zip(py_tokens, c_tokens):
assert py_token.__class__ == c_token.__class__, (py_token, c_token)
if hasattr(py_token, 'value'):
assert py_token.value == c_token.value, (py_token, c_token)
if isinstance(py_token, yaml.StreamEndToken):
continue
py_start = (py_token.start_mark.index, py_token.start_mark.line, py_token.start_mark.column)
py_end = (py_token.end_mark.index, py_token.end_mark.line, py_token.end_mark.column)
c_start = (c_token.start_mark.index, c_token.start_mark.line, c_token.start_mark.column)
c_end = (c_token.end_mark.index, c_token.end_mark.line, c_token.end_mark.column)
assert py_start == c_start, (py_start, c_start)
assert py_end == c_end, (py_end, c_end)
finally:
if verbose:
print "PY_TOKENS:"
pprint.pprint(py_tokens)
print "C_TOKENS:"
pprint.pprint(c_tokens)
def test_c_scanner(data_filename, canonical_filename, verbose=False):
_compare_scanners(open(data_filename, 'rb'),
open(data_filename, 'rb'), verbose)
_compare_scanners(open(data_filename, 'rb').read(),
open(data_filename, 'rb').read(), verbose)
_compare_scanners(open(canonical_filename, 'rb'),
open(canonical_filename, 'rb'), verbose)
_compare_scanners(open(canonical_filename, 'rb').read(),
open(canonical_filename, 'rb').read(), verbose)
test_c_scanner.unittest = ['.data', '.canonical']
test_c_scanner.skip = ['.skip-ext']
def _compare_parsers(py_data, c_data, verbose):
py_events = list(yaml.parse(py_data, Loader=yaml.PyLoader))
c_events = []
try:
for event in yaml.parse(c_data, Loader=yaml.CLoader):
c_events.append(event)
assert len(py_events) == len(c_events), (len(py_events), len(c_events))
for py_event, c_event in zip(py_events, c_events):
for attribute in ['__class__', 'anchor', 'tag', 'implicit',
'value', 'explicit', 'version', 'tags']:
py_value = getattr(py_event, attribute, None)
c_value = getattr(c_event, attribute, None)
assert py_value == c_value, (py_event, c_event, attribute)
finally:
if verbose:
print "PY_EVENTS:"
pprint.pprint(py_events)
print "C_EVENTS:"
pprint.pprint(c_events)
def test_c_parser(data_filename, canonical_filename, verbose=False):
_compare_parsers(open(data_filename, 'rb'),
open(data_filename, 'rb'), verbose)
_compare_parsers(open(data_filename, 'rb').read(),
open(data_filename, 'rb').read(), verbose)
_compare_parsers(open(canonical_filename, 'rb'),
open(canonical_filename, 'rb'), verbose)
_compare_parsers(open(canonical_filename, 'rb').read(),
open(canonical_filename, 'rb').read(), verbose)
test_c_parser.unittest = ['.data', '.canonical']
test_c_parser.skip = ['.skip-ext']
def _compare_emitters(data, verbose):
events = list(yaml.parse(data, Loader=yaml.PyLoader))
c_data = yaml.emit(events, Dumper=yaml.CDumper)
if verbose:
print c_data
py_events = list(yaml.parse(c_data, Loader=yaml.PyLoader))
c_events = list(yaml.parse(c_data, Loader=yaml.CLoader))
try:
assert len(events) == len(py_events), (len(events), len(py_events))
assert len(events) == len(c_events), (len(events), len(c_events))
for event, py_event, c_event in zip(events, py_events, c_events):
for attribute in ['__class__', 'anchor', 'tag', 'implicit',
'value', 'explicit', 'version', 'tags']:
value = getattr(event, attribute, None)
py_value = getattr(py_event, attribute, None)
c_value = getattr(c_event, attribute, None)
if attribute == 'tag' and value in [None, u'!'] \
and py_value in [None, u'!'] and c_value in [None, u'!']:
continue
if attribute == 'explicit' and (py_value or c_value):
continue
assert value == py_value, (event, py_event, attribute)
assert value == c_value, (event, c_event, attribute)
finally:
if verbose:
print "EVENTS:"
pprint.pprint(events)
print "PY_EVENTS:"
pprint.pprint(py_events)
print "C_EVENTS:"
pprint.pprint(c_events)
def test_c_emitter(data_filename, canonical_filename, verbose=False):
_compare_emitters(open(data_filename, 'rb').read(), verbose)
_compare_emitters(open(canonical_filename, 'rb').read(), verbose)
test_c_emitter.unittest = ['.data', '.canonical']
test_c_emitter.skip = ['.skip-ext']
def wrap_ext_function(function):
def wrapper(*args, **kwds):
_set_up()
try:
function(*args, **kwds)
finally:
_tear_down()
wrapper.func_name = '%s_ext' % function.func_name
wrapper.unittest = function.unittest
wrapper.skip = getattr(function, 'skip', [])+['.skip-ext']
return wrapper
def wrap_ext(collections):
functions = []
if not isinstance(collections, list):
collections = [collections]
for collection in collections:
if not isinstance(collection, dict):
collection = vars(collection)
keys = collection.keys()
keys.sort()
for key in keys:
value = collection[key]
if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'):
functions.append(wrap_ext_function(value))
for function in functions:
assert function.func_name not in globals()
globals()[function.func_name] = function
import test_tokens, test_structure, test_errors, test_resolver, test_constructor, \
test_emitter, test_representer, test_recursive
wrap_ext([test_tokens, test_structure, test_errors, test_resolver, test_constructor,
test_emitter, test_representer, test_recursive])
if __name__ == '__main__':
main()
import test_appliance
test_appliance.run(globals())