Bug 794984 - [mozprocess] Add ability to separate stderr from stdout, r=ahal

A new parameter called 'processStderrLine' is added. When specified, stdout gets processed by the
'processOutputLine' callbacks and stderr is processed by the 'processStderrLine' callbacks. When
not specified, stderr is redirected to stdout which is the same default behaviour.

A side effect of this is that mozprocess now uses three threads to process output. One thread each
for stdout and stderr that reads output lines and stores them in a Queue as fast as possible, this
makes sure there is no blocking in stdout.write(). A third thread executes the callbacks.

--HG--
extra : rebase_source : d4601a6ae21ca61bfdca308c68155ce2c2e09e43
This commit is contained in:
Julien Pagès 2014-12-18 11:53:14 -05:00
parent f428f84463
commit ae6214cb51
3 changed files with 245 additions and 155 deletions

View File

@ -11,7 +11,7 @@ import sys
import threading
import time
import traceback
from Queue import Queue
from Queue import Queue, Empty
from datetime import datetime
__all__ = ['ProcessHandlerMixin', 'ProcessHandler']
@ -40,7 +40,13 @@ class ProcessHandlerMixin(object):
:param env: is the environment to use for the process (defaults to os.environ).
:param ignore_children: causes system to ignore child processes when True, defaults to False (which tracks child processes).
:param kill_on_timeout: when True, the process will be killed when a timeout is reached. When False, the caller is responsible for killing the process. Failure to do so could cause a call to wait() to hang indefinitely. (Defaults to True.)
:param processOutputLine: function or list of functions to be called for each line of output produced by the process (defaults to None).
:param processOutputLine: function or list of functions to be called for
each line of output produced by the process (defaults to an empty
list).
:param processStderrLine: function or list of functions to be called
for each line of error output - stderr - produced by the process
(defaults to an empty list). If this is not specified, stderr lines
will be sent to the *processOutputLine* callbacks.
:param onTimeout: function or list of functions to be called when the process times out.
:param onFinish: function or list of functions to be called when the process terminates normally without timing out.
:param kwargs: additional keyword args to pass directly into Popen.
@ -594,6 +600,7 @@ falling back to not using job objects for managing child processes"""
ignore_children = False,
kill_on_timeout = True,
processOutputLine=(),
processStderrLine=(),
onTimeout=(),
onFinish=(),
**kwargs):
@ -602,9 +609,7 @@ falling back to not using job objects for managing child processes"""
self.cwd = cwd
self.didTimeout = False
self._ignore_children = ignore_children
self._kill_on_timeout = kill_on_timeout
self.keywordargs = kwargs
self.outThread = None
self.read_buffer = ''
if env is None:
@ -612,15 +617,29 @@ falling back to not using job objects for managing child processes"""
self.env = env
# handlers
if callable(processOutputLine):
processOutputLine = [processOutputLine]
self.processOutputLineHandlers = list(processOutputLine)
if callable(onTimeout):
onTimeout = [onTimeout]
self.onTimeoutHandlers = list(onTimeout)
if callable(onFinish):
onFinish = [onFinish]
self.onFinishHandlers = list(onFinish)
def to_callable_list(arg):
if callable(arg):
arg = [arg]
return CallableList(arg)
processOutputLine = to_callable_list(processOutputLine)
processStderrLine = to_callable_list(processStderrLine)
onTimeout = to_callable_list(onTimeout)
onFinish = to_callable_list(onFinish)
def on_timeout():
self.didTimeout = True
if kill_on_timeout:
self.kill()
onTimeout.insert(0, on_timeout)
self._stderr = subprocess.STDOUT
if processStderrLine:
self._stderr = subprocess.PIPE
self.reader = ProcessReader(stdout_callback=processOutputLine,
stderr_callback=processStderrLine,
finished_callback=onFinish,
timeout_callback=onTimeout)
# It is common for people to pass in the entire array with the cmd and
# the args together since this is how Popen uses it. Allow for that.
@ -654,11 +673,10 @@ falling back to not using job objects for managing child processes"""
being killed.
"""
self.didTimeout = False
self.startTime = datetime.now()
# default arguments
args = dict(stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stderr=self._stderr,
cwd=self.cwd,
env=self.env,
ignore_children=self._ignore_children)
@ -690,7 +708,7 @@ falling back to not using job objects for managing child processes"""
self.proc.kill(sig=sig)
# When we kill the the managed process we also have to wait for the
# outThread to be finished. Otherwise consumers would have to assume
# reader thread to be finished. Otherwise consumers would have to assume
# that it still has not completely shutdown.
return self.wait()
except AttributeError:
@ -700,36 +718,6 @@ falling back to not using job objects for managing child processes"""
else:
raise
def readWithTimeout(self, f, timeout):
"""
Try to read a line of output from the file object *f*.
*f* must be a pipe, like the *stdout* member of a subprocess.Popen
object created with stdout=PIPE. If no output
is received within *timeout* seconds, return a blank line.
Returns a tuple (line, did_timeout), where *did_timeout* is True
if the read timed out, and False otherwise.
"""
# Calls a private member because this is a different function based on
# the OS
return self._readWithTimeout(f, timeout)
def processOutputLine(self, line):
"""Called for each line of output that a process sends to stdout/stderr."""
for handler in self.processOutputLineHandlers:
handler(line)
def onTimeout(self):
"""Called when a process times out."""
for handler in self.onTimeoutHandlers:
handler()
def onFinish(self):
"""Called when a process finishes without a timeout."""
for handler in self.onFinishHandlers:
handler()
def poll(self):
"""Check if child process has terminated
@ -739,10 +727,10 @@ falling back to not using job objects for managing child processes"""
- '0' if the process ended without failures
"""
# Ensure that we first check for the outputThread status. Otherwise
# Ensure that we first check for the reader status. Otherwise
# we might mark the process as finished while output is still getting
# processed.
if self.outThread and self.outThread.isAlive():
if self.reader.is_alive():
return None
elif hasattr(self.proc, "returncode"):
return self.proc.returncode
@ -760,43 +748,15 @@ falling back to not using job objects for managing child processes"""
for that number of seconds without producing any output before
being killed.
"""
def _processOutput():
self.didTimeout = False
logsource = self.proc.stdout
lineReadTimeout = None
if timeout:
lineReadTimeout = timeout - (datetime.now() - self.startTime).seconds
elif outputTimeout:
lineReadTimeout = outputTimeout
(lines, self.didTimeout) = self.readWithTimeout(logsource, lineReadTimeout)
while lines != "":
for line in lines.splitlines():
self.processOutputLine(line.rstrip())
if self.didTimeout:
break
if timeout:
lineReadTimeout = timeout - (datetime.now() - self.startTime).seconds
(lines, self.didTimeout) = self.readWithTimeout(logsource, lineReadTimeout)
if self.didTimeout:
if self._kill_on_timeout:
self.proc.kill()
self.onTimeout()
else:
self.onFinish()
# this method is kept for backward compatibility
if not hasattr(self, 'proc'):
self.run()
if not self.outThread:
self.outThread = threading.Thread(target=_processOutput)
self.outThread.daemon = True
self.outThread.start()
self.run(timeout=timeout, outputTimeout=outputTimeout)
# self.run will call this again
return
if not self.reader.is_alive():
self.reader.timeout = timeout
self.reader.output_timeout = outputTimeout
self.reader.start(self.proc)
def wait(self, timeout=None):
"""
@ -813,12 +773,12 @@ falling back to not using job objects for managing child processes"""
- '0' if the process ended without failures
"""
if self.outThread and self.outThread is not threading.current_thread():
# Thread.join() blocks the main thread until outThread is finished
if self.reader.thread and self.reader.thread is not threading.current_thread():
# Thread.join() blocks the main thread until the reader thread is finished
# wake up once a second in case a keyboard interrupt is sent
count = 0
while self.outThread.isAlive():
self.outThread.join(timeout=1)
while self.reader.is_alive():
self.reader.thread.join(timeout=1)
count += 1
if timeout and count > timeout:
return None
@ -832,77 +792,111 @@ falling back to not using job objects for managing child processes"""
"use ProcessHandler.wait() instead"
return self.wait(timeout=timeout)
### Private methods from here on down. Thar be dragons.
if isWin:
# Windows Specific private functions are defined in this block
PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
GetLastError = ctypes.windll.kernel32.GetLastError
@staticmethod
def _normalize_newline(line):
# adb on windows returns \r\r\n at the end of each line, to get around
# this normalize all newlines to have a unix-style '\n'
# http://src.chromium.org/viewvc/chrome/trunk/src/build/android/pylib/android_commands.py#l1944
return re.sub(r'\r+\n?$', '\n', line)
def _readWithTimeout(self, f, timeout):
if timeout is None:
# shortcut to allow callers to pass in "None" for no timeout.
return (self._normalize_newline(f.readline()), False)
x = msvcrt.get_osfhandle(f.fileno())
l = ctypes.c_long()
done = time.time() + timeout
while time.time() < done:
if self.PeekNamedPipe(x, None, 0, None, ctypes.byref(l), None) == 0:
err = self.GetLastError()
if err == 38 or err == 109: # ERROR_HANDLE_EOF || ERROR_BROKEN_PIPE
return ('', False)
else:
raise OSError("readWithTimeout got error: %d", err)
if l.value > 0:
# we're assuming that the output is line-buffered,
# which is not unreasonable
return (self._normalize_newline(f.readline()), False)
time.sleep(0.01)
return ('', True)
else:
# Generic
def _readWithTimeout(self, f, timeout):
while True:
try:
(r, w, e) = select.select([f], [], [], timeout)
except:
# return a blank line
return ('', True)
if len(r) == 0:
return ('', True)
output = os.read(f.fileno(), 4096)
if not output:
output = self.read_buffer
self.read_buffer = ''
return (output, False)
self.read_buffer += output
if '\n' not in self.read_buffer:
time.sleep(0.01)
continue
tmp = self.read_buffer.split('\n')
lines, self.read_buffer = tmp[:-1], tmp[-1]
real_lines = [x for x in lines if x != '']
if not real_lines:
time.sleep(0.01)
continue
break
return ('\n'.join(lines), False)
@property
def pid(self):
return self.proc.pid
class CallableList(list):
def __call__(self, *args, **kwargs):
for e in self:
e(*args, **kwargs)
def __add__(self, lst):
return CallableList(list.__add__(self, lst))
class ProcessReader(object):
def __init__(self, stdout_callback=None, stderr_callback=None,
finished_callback=None, timeout_callback=None,
timeout=None, output_timeout=None):
self.stdout_callback = stdout_callback or (lambda line: True)
self.stderr_callback = stderr_callback or (lambda line: True)
self.finished_callback = finished_callback or (lambda: True)
self.timeout_callback = timeout_callback or (lambda: True)
self.timeout = timeout
self.output_timeout = output_timeout
self.thread = None
def _create_stream_reader(self, name, stream, queue, callback):
thread = threading.Thread(name=name,
target=self._read_stream,
args=(stream, queue, callback))
thread.daemon = True
thread.start()
return thread
def _read_stream(self, stream, queue, callback):
for line in iter(stream.readline, b''):
queue.put((line, callback))
stream.close()
def start(self, proc):
queue = Queue()
stdout_reader = None
if proc.stdout:
stdout_reader = self._create_stream_reader('ProcessReaderStdout',
proc.stdout,
queue,
self.stdout_callback)
stderr_reader = None
if proc.stderr and proc.stderr != proc.stdout:
stderr_reader = self._create_stream_reader('ProcessReaderStderr',
proc.stderr,
queue,
self.stderr_callback)
self.thread = threading.Thread(name='ProcessReader',
target=self._read,
args=(stdout_reader,
stderr_reader,
queue))
self.thread.daemon = True
self.thread.start()
def _read(self, stdout_reader, stderr_reader, queue):
start_time = time.time()
timed_out = False
timeout = self.timeout
if timeout is not None:
timeout += start_time
output_timeout = self.output_timeout
if output_timeout is not None:
output_timeout += start_time
while (stdout_reader and stdout_reader.is_alive()) \
or (stderr_reader and stderr_reader.is_alive()):
has_line = True
try:
line, callback = queue.get(True, 0.02)
except Empty:
has_line = False
now = time.time()
if not has_line:
if output_timeout is not None and now > output_timeout:
timed_out = True
break
else:
if output_timeout is not None:
output_timeout = now + self.output_timeout
callback(line.rstrip())
if timeout is not None and now > timeout:
timed_out = True
break
# process remaining lines to read
while not queue.empty():
line, callback = queue.get(False)
callback(line.rstrip())
if timed_out:
self.timeout_callback()
if stdout_reader:
stdout_reader.join()
if stderr_reader:
stderr_reader.join()
if not timed_out:
self.finished_callback()
def is_alive(self):
if self.thread:
return self.thread.is_alive()
return False
### default output handlers
### these should be callables that take the output line

View File

@ -15,3 +15,4 @@ disabled = bug 921632
[test_mozprocess_wait.py]
[test_mozprocess_output.py]
[test_mozprocess_params.py]
[test_process_reader.py]

View File

@ -0,0 +1,95 @@
import unittest
import subprocess
import sys
from mozprocess.processhandler import ProcessReader, StoreOutput
def run_python(str_code, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
cmd = [sys.executable, '-c', str_code]
return subprocess.Popen(cmd, stdout=stdout, stderr=stderr)
class TestProcessReader(unittest.TestCase):
def setUp(self):
self.out = StoreOutput()
self.err = StoreOutput()
self.finished = False
def on_finished():
self.finished = True
self.timeout = False
def on_timeout():
self.timeout = True
self.reader = ProcessReader(stdout_callback=self.out,
stderr_callback=self.err,
finished_callback=on_finished,
timeout_callback=on_timeout)
def test_stdout_callback(self):
proc = run_python('print 1; print 2')
self.reader.start(proc)
self.reader.thread.join()
self.assertEqual(self.out.output, ['1', '2'])
self.assertEqual(self.err.output, [])
def test_stderr_callback(self):
proc = run_python('import sys; sys.stderr.write("hello world\\n")')
self.reader.start(proc)
self.reader.thread.join()
self.assertEqual(self.out.output, [])
self.assertEqual(self.err.output, ['hello world'])
def test_stdout_and_stderr_callbacks(self):
proc = run_python('import sys; sys.stderr.write("hello world\\n"); print 1; print 2')
self.reader.start(proc)
self.reader.thread.join()
self.assertEqual(self.out.output, ['1', '2'])
self.assertEqual(self.err.output, ['hello world'])
def test_finished_callback(self):
self.assertFalse(self.finished)
proc = run_python('')
self.reader.start(proc)
self.reader.thread.join()
self.assertTrue(self.finished)
def test_timeout(self):
self.reader.timeout = 0.05
self.assertFalse(self.timeout)
proc = run_python('import time; time.sleep(0.1)')
self.reader.start(proc)
self.reader.thread.join()
self.assertTrue(self.timeout)
self.assertFalse(self.finished)
def test_output_timeout(self):
self.reader.output_timeout = 0.05
self.assertFalse(self.timeout)
proc = run_python('import time; time.sleep(0.1)')
self.reader.start(proc)
self.reader.thread.join()
self.assertTrue(self.timeout)
self.assertFalse(self.finished)
def test_read_without_eol(self):
proc = run_python('import sys; sys.stdout.write("1")')
self.reader.start(proc)
self.reader.thread.join()
self.assertEqual(self.out.output, ['1'])
def test_read_with_strange_eol(self):
proc = run_python('import sys; sys.stdout.write("1\\r\\r\\r\\n")')
self.reader.start(proc)
self.reader.thread.join()
self.assertEqual(self.out.output, ['1'])
def test_mixed_stdout_stderr(self):
proc = run_python('import sys; sys.stderr.write("hello world\\n"); print 1; print 2', stderr=subprocess.STDOUT)
self.reader.start(proc)
self.reader.thread.join()
self.assertEqual(sorted(self.out.output), sorted(['1', '2', 'hello world']))
self.assertEqual(self.err.output, [])
if __name__ == '__main__':
unittest.main()