2009-09-08 05:31:18 +00:00
|
|
|
import itertools
|
|
|
|
|
|
|
|
from ShCommands import Command, Pipeline
|
|
|
|
|
|
|
|
def tcl_preprocess(data):
|
|
|
|
# Tcl has a preprocessing step to replace escaped newlines.
|
|
|
|
i = data.find('\\\n')
|
|
|
|
if i == -1:
|
|
|
|
return data
|
|
|
|
|
|
|
|
# Replace '\\\n' and subsequent whitespace by a single space.
|
|
|
|
n = len(data)
|
|
|
|
str = data[:i]
|
|
|
|
i += 2
|
|
|
|
while i < n and data[i] in ' \t':
|
|
|
|
i += 1
|
|
|
|
return str + ' ' + data[i:]
|
|
|
|
|
|
|
|
class TclLexer:
|
|
|
|
"""TclLexer - Lex a string into "words", following the Tcl syntax."""
|
|
|
|
|
|
|
|
def __init__(self, data):
|
|
|
|
self.data = tcl_preprocess(data)
|
|
|
|
self.pos = 0
|
|
|
|
self.end = len(self.data)
|
|
|
|
|
|
|
|
def at_end(self):
|
|
|
|
return self.pos == self.end
|
|
|
|
|
|
|
|
def eat(self):
|
|
|
|
c = self.data[self.pos]
|
|
|
|
self.pos += 1
|
|
|
|
return c
|
|
|
|
|
|
|
|
def look(self):
|
|
|
|
return self.data[self.pos]
|
|
|
|
|
|
|
|
def maybe_eat(self, c):
|
|
|
|
"""
|
|
|
|
maybe_eat(c) - Consume the character c if it is the next character,
|
|
|
|
returning True if a character was consumed. """
|
|
|
|
if self.data[self.pos] == c:
|
|
|
|
self.pos += 1
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
def escape(self, c):
|
|
|
|
if c == 'a':
|
|
|
|
return '\x07'
|
|
|
|
elif c == 'b':
|
|
|
|
return '\x08'
|
|
|
|
elif c == 'f':
|
|
|
|
return '\x0c'
|
|
|
|
elif c == 'n':
|
|
|
|
return '\n'
|
|
|
|
elif c == 'r':
|
|
|
|
return '\r'
|
|
|
|
elif c == 't':
|
|
|
|
return '\t'
|
|
|
|
elif c == 'v':
|
|
|
|
return '\x0b'
|
|
|
|
elif c in 'uxo':
|
|
|
|
raise ValueError,'Invalid quoted character %r' % c
|
|
|
|
else:
|
|
|
|
return c
|
|
|
|
|
|
|
|
def lex_braced(self):
|
|
|
|
# Lex until whitespace or end of string, the opening brace has already
|
|
|
|
# been consumed.
|
|
|
|
|
|
|
|
str = ''
|
|
|
|
while 1:
|
|
|
|
if self.at_end():
|
|
|
|
raise ValueError,"Unterminated '{' quoted word"
|
|
|
|
|
|
|
|
c = self.eat()
|
|
|
|
if c == '}':
|
|
|
|
break
|
|
|
|
elif c == '{':
|
|
|
|
str += '{' + self.lex_braced() + '}'
|
|
|
|
elif c == '\\' and self.look() in '{}':
|
|
|
|
str += self.eat()
|
|
|
|
else:
|
|
|
|
str += c
|
|
|
|
|
|
|
|
return str
|
|
|
|
|
|
|
|
def lex_quoted(self):
|
|
|
|
str = ''
|
|
|
|
|
|
|
|
while 1:
|
|
|
|
if self.at_end():
|
|
|
|
raise ValueError,"Unterminated '\"' quoted word"
|
|
|
|
|
|
|
|
c = self.eat()
|
|
|
|
if c == '"':
|
|
|
|
break
|
|
|
|
elif c == '\\':
|
|
|
|
if self.at_end():
|
|
|
|
raise ValueError,'Missing quoted character'
|
|
|
|
|
|
|
|
str += self.escape(self.eat())
|
|
|
|
else:
|
|
|
|
str += c
|
|
|
|
|
|
|
|
return str
|
|
|
|
|
|
|
|
def lex_unquoted(self, process_all=False):
|
|
|
|
# Lex until whitespace or end of string.
|
|
|
|
str = ''
|
|
|
|
while not self.at_end():
|
|
|
|
if not process_all:
|
|
|
|
if self.look().isspace() or self.look() == ';':
|
|
|
|
break
|
|
|
|
|
|
|
|
c = self.eat()
|
|
|
|
if c == '\\':
|
|
|
|
if self.at_end():
|
|
|
|
raise ValueError,'Missing quoted character'
|
|
|
|
|
|
|
|
str += self.escape(self.eat())
|
|
|
|
elif c == '[':
|
|
|
|
raise NotImplementedError, ('Command substitution is '
|
|
|
|
'not supported')
|
|
|
|
elif c == '$' and not self.at_end() and (self.look().isalpha() or
|
|
|
|
self.look() == '{'):
|
|
|
|
raise NotImplementedError, ('Variable substitution is '
|
|
|
|
'not supported')
|
|
|
|
else:
|
|
|
|
str += c
|
|
|
|
|
|
|
|
return str
|
|
|
|
|
|
|
|
def lex_one_token(self):
|
|
|
|
if self.maybe_eat('"'):
|
|
|
|
return self.lex_quoted()
|
|
|
|
elif self.maybe_eat('{'):
|
|
|
|
# Check for argument substitution.
|
|
|
|
if not self.maybe_eat('*'):
|
|
|
|
return self.lex_braced()
|
|
|
|
|
|
|
|
if not self.maybe_eat('}'):
|
|
|
|
return '*' + self.lex_braced()
|
|
|
|
|
|
|
|
if self.at_end() or self.look().isspace():
|
|
|
|
return '*'
|
|
|
|
|
|
|
|
raise NotImplementedError, "Argument substitution is unsupported"
|
|
|
|
else:
|
|
|
|
return self.lex_unquoted()
|
|
|
|
|
|
|
|
def lex(self):
|
|
|
|
while not self.at_end():
|
|
|
|
c = self.look()
|
|
|
|
if c in ' \t':
|
|
|
|
self.eat()
|
|
|
|
elif c in ';\n':
|
|
|
|
self.eat()
|
|
|
|
yield (';',)
|
|
|
|
else:
|
|
|
|
yield self.lex_one_token()
|
|
|
|
|
|
|
|
class TclExecCommand:
|
|
|
|
kRedirectPrefixes1 = ('<', '>')
|
|
|
|
kRedirectPrefixes2 = ('<@', '<<', '2>', '>&', '>>', '>@')
|
|
|
|
kRedirectPrefixes3 = ('2>@', '2>>', '>>&', '>&@')
|
|
|
|
kRedirectPrefixes4 = ('2>@1',)
|
|
|
|
|
|
|
|
def __init__(self, args):
|
|
|
|
self.args = iter(args)
|
|
|
|
|
|
|
|
def lex(self):
|
|
|
|
try:
|
|
|
|
return self.args.next()
|
|
|
|
except StopIteration:
|
|
|
|
return None
|
|
|
|
|
|
|
|
def look(self):
|
|
|
|
next = self.lex()
|
|
|
|
if next is not None:
|
|
|
|
self.args = itertools.chain([next], self.args)
|
|
|
|
return next
|
|
|
|
|
|
|
|
def parse_redirect(self, tok, length):
|
|
|
|
if len(tok) == length:
|
|
|
|
arg = self.lex()
|
2009-09-08 05:46:28 +00:00
|
|
|
if arg is None:
|
2009-09-08 05:31:18 +00:00
|
|
|
raise ValueError,'Missing argument to %r redirection' % tok
|
|
|
|
else:
|
|
|
|
tok,arg = tok[:length],tok[length:]
|
|
|
|
|
|
|
|
if tok[0] == '2':
|
|
|
|
op = (tok[1:],2)
|
|
|
|
else:
|
|
|
|
op = (tok,)
|
|
|
|
return (op, arg)
|
|
|
|
|
|
|
|
def parse_pipeline(self):
|
|
|
|
if self.look() is None:
|
|
|
|
raise ValueError,"Expected at least one argument to exec"
|
|
|
|
|
|
|
|
commands = [Command([],[])]
|
|
|
|
while 1:
|
|
|
|
arg = self.lex()
|
|
|
|
if arg is None:
|
|
|
|
break
|
|
|
|
elif arg == '|':
|
|
|
|
commands.append(Command([],[]))
|
|
|
|
elif arg == '|&':
|
|
|
|
# Write this as a redirect of stderr; it must come first because
|
|
|
|
# stdout may have already been redirected.
|
|
|
|
commands[-1].redirects.insert(0, (('>&',2),'1'))
|
|
|
|
commands.append(Command([],[]))
|
|
|
|
elif arg[:4] in TclExecCommand.kRedirectPrefixes4:
|
|
|
|
commands[-1].redirects.append(self.parse_redirect(arg, 4))
|
|
|
|
elif arg[:3] in TclExecCommand.kRedirectPrefixes3:
|
|
|
|
commands[-1].redirects.append(self.parse_redirect(arg, 3))
|
|
|
|
elif arg[:2] in TclExecCommand.kRedirectPrefixes2:
|
|
|
|
commands[-1].redirects.append(self.parse_redirect(arg, 2))
|
|
|
|
elif arg[:1] in TclExecCommand.kRedirectPrefixes1:
|
|
|
|
commands[-1].redirects.append(self.parse_redirect(arg, 1))
|
|
|
|
else:
|
|
|
|
commands[-1].args.append(arg)
|
|
|
|
|
|
|
|
return Pipeline(commands, False, pipe_err=True)
|
|
|
|
|
|
|
|
def parse(self):
|
|
|
|
ignoreStderr = False
|
|
|
|
keepNewline = False
|
|
|
|
|
|
|
|
# Parse arguments.
|
|
|
|
while 1:
|
|
|
|
next = self.look()
|
|
|
|
if not isinstance(next, str) or next[0] != '-':
|
|
|
|
break
|
|
|
|
|
|
|
|
if next == '--':
|
|
|
|
self.lex()
|
|
|
|
break
|
|
|
|
elif next == '-ignorestderr':
|
|
|
|
ignoreStderr = True
|
|
|
|
elif next == '-keepnewline':
|
|
|
|
keepNewline = True
|
|
|
|
else:
|
|
|
|
raise ValueError,"Invalid exec argument %r" % next
|
|
|
|
|
|
|
|
return (ignoreStderr, keepNewline, self.parse_pipeline())
|
|
|
|
|
|
|
|
###
|
|
|
|
|
|
|
|
import unittest
|
|
|
|
|
|
|
|
class TestTclLexer(unittest.TestCase):
|
|
|
|
def lex(self, str, *args, **kwargs):
|
|
|
|
return list(TclLexer(str, *args, **kwargs).lex())
|
|
|
|
|
|
|
|
def test_preprocess(self):
|
|
|
|
self.assertEqual(tcl_preprocess('a b'), 'a b')
|
|
|
|
self.assertEqual(tcl_preprocess('a\\\nb c'), 'a b c')
|
|
|
|
|
|
|
|
def test_unquoted(self):
|
|
|
|
self.assertEqual(self.lex('a b c'),
|
|
|
|
['a', 'b', 'c'])
|
|
|
|
self.assertEqual(self.lex(r'a\nb\tc\ '),
|
|
|
|
['a\nb\tc '])
|
|
|
|
self.assertEqual(self.lex(r'a \\\$b c $\\'),
|
|
|
|
['a', r'\$b', 'c', '$\\'])
|
|
|
|
|
|
|
|
def test_braced(self):
|
|
|
|
self.assertEqual(self.lex('a {b c} {}'),
|
|
|
|
['a', 'b c', ''])
|
|
|
|
self.assertEqual(self.lex(r'a {b {c\n}}'),
|
|
|
|
['a', 'b {c\\n}'])
|
|
|
|
self.assertEqual(self.lex(r'a {b\{}'),
|
|
|
|
['a', 'b{'])
|
|
|
|
self.assertEqual(self.lex(r'{*}'), ['*'])
|
|
|
|
self.assertEqual(self.lex(r'{*} a'), ['*', 'a'])
|
|
|
|
self.assertEqual(self.lex(r'{*} a'), ['*', 'a'])
|
|
|
|
self.assertEqual(self.lex('{a\\\n b}'),
|
|
|
|
['a b'])
|
|
|
|
|
|
|
|
def test_quoted(self):
|
|
|
|
self.assertEqual(self.lex('a "b c"'),
|
|
|
|
['a', 'b c'])
|
|
|
|
|
|
|
|
def test_terminators(self):
|
|
|
|
self.assertEqual(self.lex('a\nb'),
|
|
|
|
['a', (';',), 'b'])
|
|
|
|
self.assertEqual(self.lex('a;b'),
|
|
|
|
['a', (';',), 'b'])
|
|
|
|
self.assertEqual(self.lex('a ; b'),
|
|
|
|
['a', (';',), 'b'])
|
|
|
|
|
|
|
|
class TestTclExecCommand(unittest.TestCase):
|
|
|
|
def parse(self, str):
|
|
|
|
return TclExecCommand(list(TclLexer(str).lex())).parse()
|
|
|
|
|
|
|
|
def test_basic(self):
|
|
|
|
self.assertEqual(self.parse('echo hello'),
|
|
|
|
(False, False,
|
|
|
|
Pipeline([Command(['echo', 'hello'], [])],
|
|
|
|
False, True)))
|
|
|
|
self.assertEqual(self.parse('echo hello | grep hello'),
|
|
|
|
(False, False,
|
|
|
|
Pipeline([Command(['echo', 'hello'], []),
|
|
|
|
Command(['grep', 'hello'], [])],
|
|
|
|
False, True)))
|
|
|
|
|
|
|
|
def test_redirect(self):
|
|
|
|
self.assertEqual(self.parse('echo hello > a >b >>c 2> d |& e'),
|
|
|
|
(False, False,
|
|
|
|
Pipeline([Command(['echo', 'hello'],
|
2009-09-08 05:46:28 +00:00
|
|
|
[(('>&',2),'1'),
|
|
|
|
(('>',),'a'),
|
2009-09-08 05:31:18 +00:00
|
|
|
(('>',),'b'),
|
|
|
|
(('>>',),'c'),
|
2009-09-08 05:46:28 +00:00
|
|
|
(('>',2),'d')]),
|
2009-09-08 05:31:18 +00:00
|
|
|
Command(['e'], [])],
|
|
|
|
False, True)))
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|