Emitter is done!!!

This commit is contained in:
Kirill Simonov 2006-04-09 19:51:02 +00:00
parent 01b5e1925b
commit 2d457ba718
15 changed files with 768 additions and 81 deletions

View File

@ -6,6 +6,7 @@ from parser import *
from composer import *
from resolver import *
from constructor import *
from emitter import *
from tokens import *
from events import *

View File

@ -81,7 +81,7 @@ class Composer:
def compose_scalar_node(self):
event = self.parser.get()
return ScalarNode(event.tag, event.value,
return ScalarNode(event.tag, event.value, event.implicit,
event.start_mark, event.end_mark)
def compose_sequence_node(self):

View File

@ -14,6 +14,19 @@ from events import *
class EmitterError(YAMLError):
pass
class ScalarAnalysis:
def __init__(self, scalar, empty, multiline,
allow_flow_plain, allow_block_plain,
allow_single_quoted, allow_double_quoted, allow_block):
self.scalar = scalar
self.empty = empty
self.multiline = multiline
self.allow_flow_plain = allow_flow_plain
self.allow_block_plain = allow_block_plain
self.allow_single_quoted = allow_single_quoted
self.allow_double_quoted = allow_double_quoted
self.allow_block = allow_block
class Emitter:
DEFAULT_TAG_PREFIXES = {
@ -53,7 +66,6 @@ class Emitter:
# Characteristics of the last emitted character:
# - current position.
# - is it a line break?
# - is it a whitespace?
# - is it an indention character
# (indentation space, '-', '?', or ':')?
@ -69,44 +81,46 @@ class Emitter:
self.best_width = 80
self.tag_prefixes = None
# Scalar analysis.
self.analysis = None
# Analyses cache.
self.anchor_text = None
self.tag_text = None
self.scalar_analysis = None
self.scalar_style = None
def emit(self, event):
if self.events:
self.events.append(event)
event = self.events.pop(0)
self.event = event
if self.need_more_events():
self.event.insert(0, event)
return
self.state()
self.event = None
self.events.append(event)
while not self.need_more_events():
self.event = self.events.pop(0)
self.state()
self.event = None
# In some cases, we wait for a few next events before emitting.
def need_more_events(self):
if isinstance(self.event, DocumentStartEvent):
if not self.events:
return True
event = self.events[0]
if isinstance(event, DocumentStartEvent):
return self.need_events(1)
elif isinstance(self.event, SequenceStartEvent):
elif isinstance(event, SequenceStartEvent):
return self.need_events(2)
elif isinstance(self.event, MappingStartEvent):
elif isinstance(event, MappingStartEvent):
return self.need_events(3)
else:
return False
def need_events(self, count):
level = 0
for event in self.events:
if isinstance(event, (DocumentStart, CollectionStart)):
for event in self.events[1:]:
if isinstance(event, (DocumentStartEvent, CollectionStartEvent)):
level += 1
elif isinstance(event, (DocumentEnd, CollectionEnd)):
elif isinstance(event, (DocumentEndEvent, CollectionEndEvent)):
level -= 1
elif isinstance(event, StreamEnd):
elif isinstance(event, StreamEndEvent):
level = -1
if level < 0:
return False
return (len(self.events) < count)
return (len(self.events) < count+1)
def increase_indent(self, flow=False, indentless=False):
self.indents.append(self.indent)
@ -124,8 +138,8 @@ class Emitter:
def expect_stream_start(self):
if isinstance(self.event, StreamStartEvent):
self.encoding = event.encoding
self.canonical = event.canonical
self.encoding = self.event.encoding
self.canonical = self.event.canonical
if self.event.indent and self.event.indent > 1:
self.best_indent = self.event.indent
if self.event.width and self.event.width > self.best_indent:
@ -149,16 +163,21 @@ class Emitter:
def expect_document_start(self, first=False):
if isinstance(self.event, DocumentStartEvent):
if self.event.version:
self.write_version_directive(self.event.version)
version_text = self.analyze_version(self.event.version)
self.write_version_directive(version_text)
self.tag_prefixes = self.DEFAULT_TAG_PREFIXES.copy()
if self.event.tags:
for handle in self.event.tags:
handles = self.event.tags.keys()
handles.sort()
for handle in handles:
prefix = self.event.tags[handle]
self.tag_prefixes[prefix] = handle
self.write_tag_directive(handle, prefix)
implicit = (first and self.event.implicit and not self.canonical
handle_text = self.analyze_tag_handle(handle)
prefix_text = self.analyze_tag_prefix(prefix)
self.write_tag_directive(handle_text, prefix_text)
implicit = (first and not self.event.explicit and not self.canonical
and not self.event.version and not self.event.tags
and not self.check_next_empty_scalar())
and not self.check_empty_document())
if not implicit:
self.write_indent()
self.write_indicator(u'---', True)
@ -175,7 +194,7 @@ class Emitter:
def expect_document_end(self):
if isinstance(self.event, DocumentEndEvent):
self.write_indent()
if not event.implicit:
if self.event.explicit:
self.write_indicator(u'...', True)
self.write_indent()
self.state = self.expect_document_start
@ -184,6 +203,7 @@ class Emitter:
% self.event)
def expect_document_root(self):
self.states.append(self.expect_document_end)
self.expect_node(root=True)
# Node handlers.
@ -196,18 +216,18 @@ class Emitter:
self.simple_key_context = simple_key
if isinstance(self.event, AliasEvent):
self.expect_alias()
elif isinstance(event, (ScalarEvent, CollectionEvent)):
self.process_anchor()
elif isinstance(self.event, (ScalarEvent, CollectionStartEvent)):
self.process_anchor(u'&')
self.process_tag()
if isinstance(self.event, ScalarEvent):
self.expect_scalar()
elif isinstance(self.event, SequenceEvent):
elif isinstance(self.event, SequenceStartEvent):
if self.flow_level or self.canonical or self.event.flow_style \
or self.check_empty_sequence():
self.expect_flow_sequence()
else:
self.expect_block_sequence()
elif isinstance(self.event, MappingEvent):
elif isinstance(self.event, MappingStartEvent):
if self.flow_level or self.canonical or self.event.flow_style \
or self.check_empty_mapping():
self.expect_flow_mapping()
@ -217,7 +237,9 @@ class Emitter:
raise EmitterError("expected NodeEvent, but got %s" % self.event)
def expect_alias(self):
self.write_anchor(u'*', self.event.anchor)
if self.event.anchor is None:
raise EmitterError("anchor is not specified for alias")
self.process_anchor(u'*')
self.state = self.states.pop()
def expect_scalar(self):
@ -350,7 +372,7 @@ class Emitter:
return self.expect_block_mapping_key(first=True)
def expect_block_mapping_key(self, first=False):
if not first and isinstance(self.event, SequenceEndEvent):
if not first and isinstance(self.event, MappingEndEvent):
self.indent = self.indents.pop()
self.state = self.states.pop()
else:
@ -374,6 +396,320 @@ class Emitter:
self.states.append(self.expect_block_mapping_key)
self.expect_node(mapping=True)
# Checkers.
def check_empty_sequence(self):
return (isinstance(self.event, SequenceStartEvent) and self.events
and isinstance(self.events[0], SequenceEndEvent))
def check_empty_mapping(self):
return (isinstance(self.event, MappingStartEvent) and self.events
and isinstance(self.events[0], MappingEndEvent))
def check_empty_document(self):
if not isinstance(self.event, DocumentStartEvent) or not self.events:
return False
event = self.events[0]
return (isinstance(event, ScalarEvent) and event.anchor is None
and event.tag is None and event.implicit and event.value == u'')
def check_simple_key(self):
length = 0
if isinstance(self.event, NodeEvent) and self.event.anchor is not None:
if self.anchor_text is None:
self.anchor_text = self.analyze_anchor(self.event.anchor)
length += len(self.anchor_text)
if isinstance(self.event, (ScalarEvent, CollectionStartEvent)) \
and self.event.tag is not None:
if self.tag_text is None:
self.tag_text = self.analyze_tag(self.event.tag)
length += len(self.tag_text)
if isinstance(self.event, ScalarEvent):
if self.scalar_analysis is None:
self.scalar_analysis = self.analyze_scalar(self.event.value)
length += len(self.scalar_analysis.scalar)
return (length < 128 and (isinstance(self.event, AliasEvent)
or (isinstance(self.event, ScalarEvent) and not self.scalar_analysis.multiline)
or self.check_empty_sequence() or self.check_empty_mapping()))
# Anchor, Tag, and Scalar processors.
def process_anchor(self, indicator):
if self.event.anchor is None:
return
if self.anchor_text is None:
self.anchor_text = self.analyze_anchor(self.event.anchor)
if self.anchor_text:
self.write_indicator(indicator+self.anchor_text, True)
self.anchor_text = None
def process_tag(self):
if self.event.tag is None:
return
if isinstance(self.event, ScalarEvent) and self.best_scalar_style() == '':
return
if self.tag_text is None:
self.tag_text = self.analyze_tag(self.event.tag)
if self.tag_text:
self.write_indicator(self.tag_text, True)
self.tag_text = None
def best_scalar_style(self):
if self.scalar_analysis is None:
self.scalar_analysis = self.analyze_scalar(self.event.value)
if self.canonical:
return '"'
if (self.event.implicit and not self.event.style
and ((self.flow_level and self.scalar_analysis.allow_flow_plain)
or (not self.flow_level and self.scalar_analysis.allow_block_plain))
and (len(self.scalar_analysis.scalar) > 0
or (not self.flow_level and not self.simple_key_context))):
return ''
elif self.event.style == '\'' and self.scalar_analysis.allow_single_quoted:
return '\''
elif self.event.style in ['|', '>'] and not self.flow_level and self.scalar_analysis.allow_block:
return self.event.style
else:
return '"'
return style
def process_scalar(self):
if self.scalar_analysis is None:
self.scalar_analysis = self.analyze_scalar(self.event.value)
style = self.best_scalar_style()
if self.scalar_analysis.multiline and not self.simple_key_context \
and style not in ['|', '>']:
self.write_indent()
if style == '"':
self.write_double_quoted(self.scalar_analysis.scalar,
split=(not self.simple_key_context))
elif style == '\'':
self.write_single_quoted(self.scalar_analysis.scalar,
split=(not self.simple_key_context))
elif style == '>':
self.write_folded(self.scalar_analysis.scalar)
elif style == '|':
self.write_literal(self.scalar_analysis.scalar)
else:
self.write_plain(self.scalar_analysis.scalar,
split=(not self.simple_key_context))
self.scalar_analysis = None
# Analyzers.
def analyze_version(self, version):
major, minor = version
if major != 1:
raise EmitterError("unsupported YAML version: %d.%d" % (major, minor))
return u'%d.%d' % (major, minor)
def analyze_tag_handle(self, handle):
if not handle:
raise EmitterError("tag handle must not be empty")
if handle[0] != u'!' or handle[-1] != u'!':
raise EmitterError("tag handle must start and end with '!': %r"
% (handle.encode('utf-8')))
for ch in handle[1:-1]:
if not (u'0' <= ch <= u'9' or u'A' <= ch <= 'Z' or u'a' <= ch <= 'z' \
or ch in u'-_'):
raise EmitterError("invalid character %r in the tag handle: %r"
% (ch.encode('utf-8'), handle.encode('utf-8')))
return handle
def analyze_tag_prefix(self, prefix):
if not prefix:
raise EmitterError("tag prefix must not be empty")
chunks = []
start = end = 0
if prefix[0] == u'!':
end = 1
while end < len(prefix):
ch = prefix[end]
if u'0' <= ch <= u'9' or u'A' <= ch <= 'Z' or u'a' <= ch <= 'z' \
or ch in u'-;/?!:@&=+$,_.~*\'()[]':
end += 1
else:
if start < end:
chunks.append(prefix[start:end])
start = end = end+1
data = ch.encode('utf-8')
for ch in data:
chunks.append(u'%%%02X' % ord(ch))
if start < end:
chunks.append(prefix[start:end])
return u''.join(chunks)
def analyze_tag(self, tag):
if not tag:
raise EmitterError("tag must not be empty")
handle = None
suffix = tag
for prefix in self.tag_prefixes:
if tag.startswith(prefix) \
and (prefix == u'!' or len(prefix) < len(tag)):
handle = self.tag_prefixes[prefix]
suffix = tag[len(prefix):]
chunks = []
start = end = 0
while end < len(suffix):
ch = suffix[end]
if u'0' <= ch <= u'9' or u'A' <= ch <= 'Z' or u'a' <= ch <= 'z' \
or ch in u'-;/?:@&=+$,_.~*\'()[]' \
or (ch == u'!' and handle != u'!'):
end += 1
else:
if start < end:
chunks.append(suffix[start:end])
start = end = end+1
data = ch.encode('utf-8')
for ch in data:
chunks.append(u'%%%02X' % ord(ch))
if start < end:
chunks.append(suffix[start:end])
suffix_text = u''.join(chunks)
if handle:
return u'%s%s' % (handle, suffix_text)
else:
return u'!<%s>' % suffix_text
def analyze_anchor(self, anchor):
if not anchor:
raise EmitterError("anchor must not be empty")
for ch in anchor:
if not (u'0' <= ch <= u'9' or u'A' <= ch <= 'Z' or u'a' <= ch <= 'z' \
or ch in u'-_'):
raise EmitterError("invalid character %r in the anchor: %r"
% (ch.encode('utf-8'), text.encode('utf-8')))
return anchor
def analyze_scalar(self, scalar): # It begs for refactoring.
if not scalar:
return ScalarAnalysis(scalar=scalar, empty=True, multiline=False,
allow_flow_plain=False, allow_block_plain=True,
allow_single_quoted=True, allow_double_quoted=True,
allow_block=False)
contains_block_indicator = False
contains_flow_indicator = False
contains_line_breaks = False
contains_unicode_characters = False
contains_special_characters = False
contains_inline_spaces = False # non-space space+ non-space
contains_inline_breaks = False # non-space break+ non-space
contains_leading_spaces = False # ^ space+ (non-space | $)
contains_leading_breaks = False # ^ break+ (non-space | $)
contains_trailing_spaces = False # non-space space+ $
contains_trailing_breaks = False # non-space break+ $
contains_inline_breaks_spaces = False # non-space break+ space+ non-space
contains_mixed_breaks_spaces = False # anything else
if scalar.startswith(u'---') or scalar.startswith(u'...'):
contains_block_indicator = True
contains_flow_indicator = True
first = True
last = (len(scalar) == 1)
preceeded_by_space = False
followed_by_space = (len(scalar) > 1 and
scalar[1] in u'\0 \t\r\n\x85\u2028\u2029')
spaces = breaks = mixed = leading = False
index = 0
while index < len(scalar):
ch = scalar[index]
if first:
if ch in u'#,[]{}#&*!|>\'\"%@`':
contains_flow_indicator = True
contains_block_indicator = True
if ch in u'?:':
contains_flow_indicator = True
if followed_by_space or last:
contains_block_indicator = True
if ch == u'-' and followed_by_space or last:
contains_flow_indicator = True
contains_block_indicator = True
else:
if ch in u',?[]{}':
contains_flow_indicator = True
if ch == u':':
contains_flow_indicator = True
if followed_by_space or last:
contains_block_indicator = True
if ch == u'#' and preceeded_by_space:
contains_flow_indicator = True
contains_block_indicator = True
if ch in u'\n\x85\u2028\u2029':
contains_line_breaks = True
if not (ch == u'\n' or u'\x20' <= ch <= u'\x7E'):
if ch < u'\x80':
contains_special_characters = True
else:
contains_special_characters = True
# TODO: We need an option to allow unescaped unicode
# characters.
contains_unicode_characters = True
if ch == u' ':
if not spaces and not breaks:
leading = first
spaces = True
elif ch in u'\n\x85\u2028\u2029':
if not spaces and not breaks:
leading = first
breaks = True
if spaces:
mixed = True
if ch not in u' \n\x85\u2028\u2029':
if leading:
if spaces and breaks:
contains_mixed_breaks_spaces = True
elif spaces:
contains_leading_spaces = True
elif breaks:
contains_leading_breaks = True
else:
if mixed:
contains_mixed_break_spaces = True
elif spaces and breaks:
contains_inline_breaks_spaces = True
elif spaces:
contains_inline_spaces = True
elif breaks:
contains_inline_breaks = True
spaces = breaks = mixed = leading = False
elif last:
if spaces and breaks:
contains_mixed_break_spaces = True
elif spaces:
if leading:
contains_leading_spaces = True
else:
contains_trailing_spaces = True
elif breaks:
if leading:
contains_leading_breaks = True
else:
contains_trailing_breaks = True
index += 1
first = False
last = (index+1 == len(scalar))
preceeded_by_space = (ch in u'\0 \t\r\n\x85\u2028\u2029')
followed_by_space = (index+1 < len(scalar) and
scalar[index+1] in u'\0 \t\r\n\x85\u2028\u2029')
allow_flow_plain = not (contains_flow_indicator or contains_special_characters
or contains_leading_spaces or contains_leading_breaks
or contains_trailing_spaces or contains_trailing_breaks
or contains_inline_breaks_spaces or contains_mixed_breaks_spaces)
allow_block_plain = not (contains_block_indicator or contains_special_characters
or contains_leading_spaces or contains_leading_breaks
or contains_trailing_spaces or contains_trailing_breaks
or contains_inline_breaks_spaces or contains_mixed_breaks_spaces)
allow_single_quoted = not (contains_special_characters
or contains_inline_breaks_spaces or contains_mixed_breaks_spaces)
allow_double_quoted = True
allow_block = not (contains_special_characters
or contains_leading_spaces or contains_leading_breaks
or contains_trailing_spaces or contains_mixed_breaks_spaces)
return ScalarAnalysis(scalar=scalar, empty=False, multiline=contains_line_breaks,
allow_flow_plain=allow_flow_plain, allow_block_plain=allow_block_plain,
allow_single_quoted=allow_single_quoted, allow_double_quoted=allow_double_quoted,
allow_block=allow_block)
# Writers.
def write_stream_start(self):
@ -387,11 +723,11 @@ class Emitter:
def write_indicator(self, indicator, need_whitespace,
whitespace=False, indention=False):
if self.whitespace:
if self.whitespace or not need_whitespace:
data = indicator
else:
data = u' '+indicator
self.writespace = whitespace
self.whitespace = whitespace
self.indention = self.indention and indention
self.column += len(data)
if self.encoding:
@ -400,17 +736,20 @@ class Emitter:
def write_indent(self):
indent = self.indent or 0
if not self.indention or self.column > indent:
if not self.indention or self.column > indent \
or (self.column == indent and not self.whitespace):
self.write_line_break()
if self.column < indent:
self.whitespace = True
data = u' '*(indent-self.column)
self.column = indent
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
def write_line_break(self):
data = self.best_line_break
def write_line_break(self, data=None):
if data is None:
data = self.best_line_break
self.whitespace = True
self.indention = True
self.line += 1
@ -419,3 +758,294 @@ class Emitter:
data = data.encode(self.encoding)
self.writer.write(data)
def write_version_directive(self, version_text):
data = u'%%YAML %s' % version_text
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
self.write_line_break()
def write_tag_directive(self, handle_text, prefix_text):
data = u'%%TAG %s %s' % (handle_text, prefix_text)
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
self.write_line_break()
# Scalar writers.
def write_single_quoted(self, text, split=True):
self.write_indicator(u'\'', True)
spaces = False
breaks = False
start = end = 0
while end <= len(text):
ch = None
if end < len(text):
ch = text[end]
if spaces:
if ch is None or ch != u' ':
if start+1 == end and self.column > self.best_width and split \
and start != 0 and end != len(text):
self.write_indent()
else:
data = text[start:end]
self.column += len(data)
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
start = end
elif breaks:
if ch is None or ch not in u'\n\x85\u2028\u2029':
if text[start] == u'\n':
self.write_line_break()
for br in text[start:end]:
if br == u'\n':
self.write_line_break()
else:
self.write_line_break(br)
self.write_indent()
start = end
else:
if ch is None or ch in u' \n\x85\u2028\u2029' or ch == u'\'':
if start < end:
data = text[start:end]
self.column += len(data)
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
start = end
if ch == u'\'':
data = u'\'\''
self.column += 2
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
start = end + 1
if ch is not None:
spaces = (ch == u' ')
breaks = (ch in u'\n\x85\u2028\u2029')
end += 1
self.write_indicator(u'\'', False)
ESCAPE_REPLACEMENTS = {
u'\0': u'0',
u'\x07': u'a',
u'\x08': u'b',
u'\x09': u't',
u'\x0A': u'n',
u'\x0B': u'v',
u'\x0C': u'f',
u'\x0D': u'r',
u'\x1B': u'e',
u'\"': u'\"',
u'\\': u'\\',
u'\x85': u'N',
u'\xA0': u'_',
u'\u2028': u'L',
u'\u2029': u'P',
}
def write_double_quoted(self, text, split=True):
self.write_indicator(u'"', True)
start = end = 0
while end <= len(text):
ch = None
if end < len(text):
ch = text[end]
if ch is None or not (u'\x20' <= ch <= u'\x7E') or ch in u'"\\':
if start < end:
data = text[start:end]
self.column += len(data)
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
start = end
if ch is not None:
if ch in self.ESCAPE_REPLACEMENTS:
data = u'\\'+self.ESCAPE_REPLACEMENTS[ch]
elif ch <= u'\xFF':
data = u'\\x%02X' % ord(ch)
elif ch <= u'\uFFFF':
data = u'\\u%04X' % ord(ch)
else:
data = u'\\U%08X' % ord(ch)
self.column += len(data)
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
start = end+1
if 0 < end < len(text)-1 and (ch == u' ' or start >= end) \
and self.column+(end-start) > self.best_width and split:
data = text[start:end]+u'\\'
if start < end:
start = end
self.column += len(data)
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
self.write_indent()
self.whitespace = False
self.indention = False
if ch == u' ':
data = u'\\'
self.column += len(data)
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
end += 1
self.write_indicator(u'"', False)
def determine_chomp(self, text):
tail = text[-2:]
while len(tail) < 2:
tail = u' '+tail
if tail[-1] in u'\n\x85\u2028\u2029':
if tail[-2] in u'\n\x85\u2028\u2029':
return u'+'
else:
return u''
else:
return u'-'
def write_folded(self, text):
chomp = self.determine_chomp(text)
self.write_indicator(u'>'+chomp, True)
self.write_indent()
leading_space = False
spaces = False
breaks = False
start = end = 0
while end <= len(text):
ch = None
if end < len(text):
ch = text[end]
if breaks:
if ch is None or ch not in u'\n\x85\u2028\u2029':
if not leading_space and ch is not None and ch != u' ' \
and text[start] == u'\n':
self.write_line_break()
leading_space = (ch == u' ')
for br in text[start:end]:
if br == u'\n':
self.write_line_break()
else:
self.write_line_break(br)
if ch is not None:
self.write_indent()
start = end
elif spaces:
if ch != u' ':
if start+1 == end and self.column > self.best_width:
self.write_indent()
else:
data = text[start:end]
self.column += len(data)
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
start = end
else:
if ch is None or ch in u' \n\x85\u2028\u2029':
data = text[start:end]
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
if ch is None:
self.write_line_break()
start = end
if ch is not None:
breaks = (ch in u'\n\x85\u2028\u2029')
spaces = (ch == u' ')
end += 1
def write_literal(self, text):
chomp = self.determine_chomp(text)
self.write_indicator(u'|'+chomp, True)
self.write_indent()
breaks = False
start = end = 0
while end <= len(text):
ch = None
if end < len(text):
ch = text[end]
if breaks:
if ch is None or ch not in u'\n\x85\u2028\u2029':
for br in text[start:end]:
if br == u'\n':
self.write_line_break()
else:
self.write_line_break(br)
if ch is not None:
self.write_indent()
start = end
else:
if ch is None or ch in u'\n\x85\u2028\u2029':
data = text[start:end]
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
if ch is None:
self.write_line_break()
start = end
if ch is not None:
breaks = (ch in u'\n\x85\u2028\u2029')
end += 1
def write_plain(self, text, split=True):
if not text:
return
if not self.whitespace:
data = u' '
self.column += len(data)
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
self.writespace = False
self.indention = False
spaces = False
breaks = False
start = end = 0
while end <= len(text):
ch = None
if end < len(text):
ch = text[end]
if spaces:
if ch != u' ':
if start+1 == end and self.column > self.best_width and split:
self.write_indent()
self.writespace = False
self.indention = False
else:
data = text[start:end]
self.column += len(data)
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
start = end
elif breaks:
if ch not in u'\n\x85\u2028\u2029':
if text[start] == u'\n':
self.write_line_break()
for br in text[start:end]:
if br == u'\n':
self.write_line_break()
else:
self.write_line_break(br)
self.write_indent()
self.whitespace = False
self.indention = False
start = end
else:
if ch is None or ch in u' \n\x85\u2028\u2029':
data = text[start:end]
self.column += len(data)
if self.encoding:
data = data.encode(self.encoding)
self.writer.write(data)
start = end
if ch is not None:
spaces = (ch == u' ')
breaks = (ch in u'\n\x85\u2028\u2029')
end += 1

View File

@ -49,19 +49,19 @@ class StreamEndEvent(Event):
class DocumentStartEvent(Event):
def __init__(self, start_mark=None, end_mark=None,
implicit=None, version=None, tags=None):
explicit=None, version=None, tags=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.implicit = implicit
self.explicit = explicit
self.version = version
self.tags = tags
class DocumentEndEvent(Event):
def __init__(self, start_mark=None, end_mark=None,
implicit=None):
explicit=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.implicit = implicit
self.explicit = explicit
class AliasEvent(NodeEvent):
pass

View File

@ -23,6 +23,12 @@ class Node:
class ScalarNode(Node):
id = 'scalar'
def __init__(self, tag, value, implicit, start_mark, end_mark):
self.tag = tag
self.value = value
self.implicit = implicit
self.start_mark = start_mark
self.end_mark = end_mark
class CollectionNode(Node):
pass

View File

@ -140,18 +140,18 @@ class Parser:
token = self.scanner.peek()
start_mark = end_mark = token.start_mark
yield DocumentStartEvent(start_mark, end_mark,
implicit=True)
explicit=False)
for event in self.parse_block_node():
yield event
token = self.scanner.peek()
start_mark = end_mark = token.start_mark
implicit = True
explicit = False
while self.scanner.check(DocumentEndToken):
token = self.scanner.get()
end_mark = token.end_mark
implicit = True
explicit = True
yield DocumentEndEvent(start_mark, end_mark,
implicit=implicit)
explicit=explicit)
# Parse explicit documents.
while not self.scanner.check(StreamEndToken):
@ -166,7 +166,7 @@ class Parser:
token = self.scanner.get()
end_mark = token.end_mark
yield DocumentStartEvent(start_mark, end_mark,
implicit=False, version=version, tags=tags)
explicit=True, version=version, tags=tags)
if self.scanner.check(DirectiveToken,
DocumentStartToken, DocumentEndToken, StreamEndToken):
yield self.process_empty_scalar(token.end_mark)
@ -175,13 +175,13 @@ class Parser:
yield event
token = self.scanner.peek()
start_mark = end_mark = token.start_mark
implicit=True
explicit = False
while self.scanner.check(DocumentEndToken):
token = self.scanner.get()
end_mark = token.end_mark
implicit=False
explicit=True
yield DocumentEndEvent(start_mark, end_mark,
implicit=implicit)
explicit=explicit)
# Parse end of stream.
token = self.scanner.get()
@ -273,10 +273,10 @@ class Parser:
tag = self.tag_handles[handle]+suffix
else:
tag = suffix
if tag is None:
if not (self.scanner.check(ScalarToken) and
self.scanner.peek().implicit):
tag = u'!'
#if tag is None:
# if not (self.scanner.check(ScalarToken) and
# self.scanner.peek().implicit):
# tag = u'!'
if start_mark is None:
start_mark = end_mark = self.scanner.peek().start_mark
event = None
@ -289,9 +289,10 @@ class Parser:
if self.scanner.check(ScalarToken):
token = self.scanner.get()
end_mark = token.end_mark
implicit = (tag is None and token.implicit)
event = ScalarEvent(anchor, tag, token.value,
start_mark, end_mark,
implicit=token.implicit, style=token.style)
implicit=implicit, style=token.style)
elif self.scanner.check(FlowSequenceStartToken):
end_mark = self.scanner.peek().end_mark
event = SequenceStartEvent(anchor, tag, start_mark, end_mark,
@ -410,7 +411,7 @@ class Parser:
while not self.scanner.check(FlowSequenceEndToken):
if self.scanner.check(KeyToken):
token = self.scanner.get()
yield MappingStartEvent(None, u'!',
yield MappingStartEvent(None, None, # u'!',
token.start_mark, token.end_mark,
flow_style=True)
if not self.scanner.check(ValueToken,

View File

@ -56,7 +56,7 @@ class BaseResolver:
self.resolve_node(path+[node, key], node.value[key])
def resolve_scalar(self, path, node):
if node.tag is None:
if node.tag is None and node.implicit:
node.tag = self.detect_scalar(node.value)
if node.tag is None or node.tag == u'!':
node.tag = self.DEFAULT_SCALAR_TAG

View File

@ -753,7 +753,7 @@ class Scanner:
ch = self.reader.peek()
return ch not in u'\0 \t\r\n\x85\u2028\u2029-?:,[]{}#&*!|>\'\"%@`' \
or (self.reader.peek(1) not in u'\0 \t\r\n\x85\u2028\u2029'
and (ch == '-' or (not self.flow_level and ch in u'?:')))
and (ch == u'-' or (not self.flow_level and ch in u'?:')))
# Scanners.

View File

@ -1,11 +1,11 @@
- !StreamStart
- !DocumentStart
- !Scalar { implicit: true }
- !DocumentStart { explicit: false }
- !Scalar { implicit: true, value: 'data' }
- !DocumentEnd
- !DocumentStart
- !Scalar { implicit: true }
- !DocumentEnd
- !DocumentStart
- !DocumentStart { version: [1,1], tags: { '!': '!foo', '!yaml!': 'tag:yaml.org,2002:', '!ugly!': '!!!!!!!' } }
- !Scalar { implicit: true }
- !DocumentEnd
- !StreamEnd

View File

@ -24,7 +24,7 @@
- !MappingEnd
- !MappingEnd
- !Scalar { implicit: true, value: 'flow mapping' }
- !MappingStart { flow: true }
- !MappingStart { flow_style: true }
- !Scalar { implicit: true, value: 'key' }
- !Scalar { implicit: true, value: 'value' }
- !MappingStart

View File

@ -64,7 +64,7 @@
- !DocumentStart
- !SequenceStart
- !SequenceStart { flow: true }
- !SequenceStart { flow_style: true }
- !SequenceStart
- !SequenceEnd
- !Scalar

12
tests/data/tags.events Normal file
View File

@ -0,0 +1,12 @@
- !StreamStart
- !DocumentStart
- !SequenceStart
- !Scalar { value: 'data' }
- !Scalar { tag: '!', value: 'data' }
- !Scalar { tag: 'tag:yaml.org,2002:str', value: 'data' }
- !Scalar { tag: '!myfunnytag', value: 'data' }
- !Scalar { tag: '!my!ugly!tag', value: 'data' }
- !Scalar { tag: 'tag:my.domain.org,2002:data!? #', value: 'data' }
- !SequenceEnd
- !DocumentEnd
- !StreamEnd

View File

@ -237,7 +237,7 @@ class CanonicalParser:
anchor = None
if self.test_token(AnchorToken):
anchor = self.get_value()
tag = u'!'
tag = None
if self.test_token(TagToken):
tag = self.get_value()
if self.test_token(ScalarToken):

View File

@ -4,25 +4,54 @@ import test_appliance, sys, StringIO
from yaml import *
import yaml
class TestEmitterOnCanonical(test_appliance.TestAppliance):
class TestEmitter(test_appliance.TestAppliance):
def _testEmitterOnCanonical(self, test_name, canonical_filename):
events = list(iter(Parser(Scanner(Reader(file(canonical_filename, 'rb'))))))
#writer = sys.stdout
def _testEmitterOnData(self, test_name, canonical_filename, data_filename):
self._testEmitter(test_name, data_filename)
def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename):
self._testEmitter(test_name, canonical_filename, False)
def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename):
self._testEmitter(test_name, canonical_filename, True)
def _testEmitter(self, test_name, filename, canonical=None):
events = list(iter(Parser(Scanner(Reader(file(filename, 'rb'))))))
if canonical is not None:
events[0].canonical = canonical
#self._dump(filename, events)
writer = StringIO.StringIO()
emitter = Emitter(writer)
#print "-"*30
#print "ORIGINAL DATA:"
#print file(canonical_filename, 'rb').read()
for event in events:
emitter.emit(event)
data = writer.getvalue()
new_events = list(parse(data))
self.failUnlessEqual(len(events), len(new_events))
for event, new_event in zip(events, new_events):
self.failUnlessEqual(event.__class__, new_event.__class__)
if isinstance(event, NodeEvent):
self.failUnlessEqual(event.anchor, new_event.anchor)
if isinstance(event, CollectionStartEvent):
self.failUnlessEqual(event.tag, new_event.tag)
if isinstance(event, ScalarEvent):
#self.failUnlessEqual(event.implicit, new_event.implicit)
if not event.implicit and not new_event.implicit:
self.failUnlessEqual(event.tag, new_event.tag)
self.failUnlessEqual(event.value, new_event.value)
TestEmitterOnCanonical.add_tests('testEmitterOnCanonical', '.canonical')
def _dump(self, filename, events):
writer = sys.stdout
emitter = Emitter(writer)
print "="*30
print "ORIGINAL DOCUMENT:"
print file(filename, 'rb').read()
print '-'*30
print "EMITTED DOCUMENT:"
for event in events:
emitter.emit(event)
TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data')
#TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical')
#TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical')
class EventsConstructor(Constructor):
@ -43,11 +72,11 @@ class EventsConstructor(Constructor):
EventsConstructor.add_constructor(None, EventsConstructor.construct_event)
class TestEmitter(test_appliance.TestAppliance):
class TestEmitterEvents(test_appliance.TestAppliance):
def _testEmitter(self, test_name, events_filename):
events = load_document(file(events_filename, 'rb'), Constructor=EventsConstructor)
self._dump(events_filename, events)
def _testEmitterEvents(self, test_name, events_filename):
events = list(load_document(file(events_filename, 'rb'), Constructor=EventsConstructor))
#self._dump(events_filename, events)
writer = StringIO.StringIO()
emitter = Emitter(writer)
for event in events:
@ -57,6 +86,14 @@ class TestEmitter(test_appliance.TestAppliance):
self.failUnlessEqual(len(events), len(new_events))
for event, new_event in zip(events, new_events):
self.failUnlessEqual(event.__class__, new_event.__class__)
if isinstance(event, NodeEvent):
self.failUnlessEqual(event.anchor, new_event.anchor)
if isinstance(event, CollectionStartEvent):
self.failUnlessEqual(event.tag, new_event.tag)
if isinstance(event, ScalarEvent):
self.failUnless(event.implicit == new_event.implicit
or event.tag == new_event.tag)
self.failUnlessEqual(event.value, new_event.value)
def _dump(self, events_filename, events):
writer = sys.stdout
@ -69,5 +106,5 @@ class TestEmitter(test_appliance.TestAppliance):
for event in events:
emitter.emit(event)
TestEmitter.add_tests('testEmitter', '.events')
TestEmitterEvents.add_tests('testEmitterEvents', '.events')

View File

@ -9,7 +9,7 @@ from test_structure import *
from test_errors import *
from test_detector import *
from test_constructor import *
#from test_emitter import *
from test_emitter import *
from test_syck import *
def main(module='__main__'):