diff --git a/testing/mozbase/mozprocess/mozprocess/processhandler.py b/testing/mozbase/mozprocess/mozprocess/processhandler.py index 65f6e5177437..2aa64c4246de 100644 --- a/testing/mozbase/mozprocess/mozprocess/processhandler.py +++ b/testing/mozbase/mozprocess/mozprocess/processhandler.py @@ -11,7 +11,7 @@ import sys import threading import time import traceback -from Queue import Queue +from Queue import Queue, Empty from datetime import datetime __all__ = ['ProcessHandlerMixin', 'ProcessHandler'] @@ -40,7 +40,13 @@ class ProcessHandlerMixin(object): :param env: is the environment to use for the process (defaults to os.environ). :param ignore_children: causes system to ignore child processes when True, defaults to False (which tracks child processes). :param kill_on_timeout: when True, the process will be killed when a timeout is reached. When False, the caller is responsible for killing the process. Failure to do so could cause a call to wait() to hang indefinitely. (Defaults to True.) - :param processOutputLine: function or list of functions to be called for each line of output produced by the process (defaults to None). + :param processOutputLine: function or list of functions to be called for + each line of output produced by the process (defaults to an empty + list). + :param processStderrLine: function or list of functions to be called + for each line of error output - stderr - produced by the process + (defaults to an empty list). If this is not specified, stderr lines + will be sent to the *processOutputLine* callbacks. :param onTimeout: function or list of functions to be called when the process times out. :param onFinish: function or list of functions to be called when the process terminates normally without timing out. :param kwargs: additional keyword args to pass directly into Popen. @@ -594,6 +600,7 @@ falling back to not using job objects for managing child processes""" ignore_children = False, kill_on_timeout = True, processOutputLine=(), + processStderrLine=(), onTimeout=(), onFinish=(), **kwargs): @@ -602,9 +609,7 @@ falling back to not using job objects for managing child processes""" self.cwd = cwd self.didTimeout = False self._ignore_children = ignore_children - self._kill_on_timeout = kill_on_timeout self.keywordargs = kwargs - self.outThread = None self.read_buffer = '' if env is None: @@ -612,15 +617,29 @@ falling back to not using job objects for managing child processes""" self.env = env # handlers - if callable(processOutputLine): - processOutputLine = [processOutputLine] - self.processOutputLineHandlers = list(processOutputLine) - if callable(onTimeout): - onTimeout = [onTimeout] - self.onTimeoutHandlers = list(onTimeout) - if callable(onFinish): - onFinish = [onFinish] - self.onFinishHandlers = list(onFinish) + def to_callable_list(arg): + if callable(arg): + arg = [arg] + return CallableList(arg) + + processOutputLine = to_callable_list(processOutputLine) + processStderrLine = to_callable_list(processStderrLine) + onTimeout = to_callable_list(onTimeout) + onFinish = to_callable_list(onFinish) + + def on_timeout(): + self.didTimeout = True + if kill_on_timeout: + self.kill() + onTimeout.insert(0, on_timeout) + + self._stderr = subprocess.STDOUT + if processStderrLine: + self._stderr = subprocess.PIPE + self.reader = ProcessReader(stdout_callback=processOutputLine, + stderr_callback=processStderrLine, + finished_callback=onFinish, + timeout_callback=onTimeout) # It is common for people to pass in the entire array with the cmd and # the args together since this is how Popen uses it. Allow for that. @@ -654,11 +673,10 @@ falling back to not using job objects for managing child processes""" being killed. """ self.didTimeout = False - self.startTime = datetime.now() # default arguments args = dict(stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, + stderr=self._stderr, cwd=self.cwd, env=self.env, ignore_children=self._ignore_children) @@ -690,7 +708,7 @@ falling back to not using job objects for managing child processes""" self.proc.kill(sig=sig) # When we kill the the managed process we also have to wait for the - # outThread to be finished. Otherwise consumers would have to assume + # reader thread to be finished. Otherwise consumers would have to assume # that it still has not completely shutdown. return self.wait() except AttributeError: @@ -700,36 +718,6 @@ falling back to not using job objects for managing child processes""" else: raise - def readWithTimeout(self, f, timeout): - """ - Try to read a line of output from the file object *f*. - - *f* must be a pipe, like the *stdout* member of a subprocess.Popen - object created with stdout=PIPE. If no output - is received within *timeout* seconds, return a blank line. - - Returns a tuple (line, did_timeout), where *did_timeout* is True - if the read timed out, and False otherwise. - """ - # Calls a private member because this is a different function based on - # the OS - return self._readWithTimeout(f, timeout) - - def processOutputLine(self, line): - """Called for each line of output that a process sends to stdout/stderr.""" - for handler in self.processOutputLineHandlers: - handler(line) - - def onTimeout(self): - """Called when a process times out.""" - for handler in self.onTimeoutHandlers: - handler() - - def onFinish(self): - """Called when a process finishes without a timeout.""" - for handler in self.onFinishHandlers: - handler() - def poll(self): """Check if child process has terminated @@ -739,10 +727,10 @@ falling back to not using job objects for managing child processes""" - '0' if the process ended without failures """ - # Ensure that we first check for the outputThread status. Otherwise + # Ensure that we first check for the reader status. Otherwise # we might mark the process as finished while output is still getting # processed. - if self.outThread and self.outThread.isAlive(): + if self.reader.is_alive(): return None elif hasattr(self.proc, "returncode"): return self.proc.returncode @@ -760,43 +748,15 @@ falling back to not using job objects for managing child processes""" for that number of seconds without producing any output before being killed. """ - def _processOutput(): - self.didTimeout = False - logsource = self.proc.stdout - - lineReadTimeout = None - if timeout: - lineReadTimeout = timeout - (datetime.now() - self.startTime).seconds - elif outputTimeout: - lineReadTimeout = outputTimeout - - (lines, self.didTimeout) = self.readWithTimeout(logsource, lineReadTimeout) - while lines != "": - for line in lines.splitlines(): - self.processOutputLine(line.rstrip()) - - if self.didTimeout: - break - - if timeout: - lineReadTimeout = timeout - (datetime.now() - self.startTime).seconds - (lines, self.didTimeout) = self.readWithTimeout(logsource, lineReadTimeout) - - if self.didTimeout: - if self._kill_on_timeout: - self.proc.kill() - self.onTimeout() - else: - self.onFinish() - + # this method is kept for backward compatibility if not hasattr(self, 'proc'): - self.run() - - if not self.outThread: - self.outThread = threading.Thread(target=_processOutput) - self.outThread.daemon = True - self.outThread.start() - + self.run(timeout=timeout, outputTimeout=outputTimeout) + # self.run will call this again + return + if not self.reader.is_alive(): + self.reader.timeout = timeout + self.reader.output_timeout = outputTimeout + self.reader.start(self.proc) def wait(self, timeout=None): """ @@ -813,12 +773,12 @@ falling back to not using job objects for managing child processes""" - '0' if the process ended without failures """ - if self.outThread and self.outThread is not threading.current_thread(): - # Thread.join() blocks the main thread until outThread is finished + if self.reader.thread and self.reader.thread is not threading.current_thread(): + # Thread.join() blocks the main thread until the reader thread is finished # wake up once a second in case a keyboard interrupt is sent count = 0 - while self.outThread.isAlive(): - self.outThread.join(timeout=1) + while self.reader.is_alive(): + self.reader.thread.join(timeout=1) count += 1 if timeout and count > timeout: return None @@ -832,77 +792,111 @@ falling back to not using job objects for managing child processes""" "use ProcessHandler.wait() instead" return self.wait(timeout=timeout) - - ### Private methods from here on down. Thar be dragons. - - if isWin: - # Windows Specific private functions are defined in this block - PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe - GetLastError = ctypes.windll.kernel32.GetLastError - - @staticmethod - def _normalize_newline(line): - # adb on windows returns \r\r\n at the end of each line, to get around - # this normalize all newlines to have a unix-style '\n' - # http://src.chromium.org/viewvc/chrome/trunk/src/build/android/pylib/android_commands.py#l1944 - return re.sub(r'\r+\n?$', '\n', line) - - def _readWithTimeout(self, f, timeout): - if timeout is None: - # shortcut to allow callers to pass in "None" for no timeout. - return (self._normalize_newline(f.readline()), False) - x = msvcrt.get_osfhandle(f.fileno()) - l = ctypes.c_long() - done = time.time() + timeout - while time.time() < done: - if self.PeekNamedPipe(x, None, 0, None, ctypes.byref(l), None) == 0: - err = self.GetLastError() - if err == 38 or err == 109: # ERROR_HANDLE_EOF || ERROR_BROKEN_PIPE - return ('', False) - else: - raise OSError("readWithTimeout got error: %d", err) - if l.value > 0: - # we're assuming that the output is line-buffered, - # which is not unreasonable - return (self._normalize_newline(f.readline()), False) - time.sleep(0.01) - return ('', True) - - else: - # Generic - def _readWithTimeout(self, f, timeout): - while True: - try: - (r, w, e) = select.select([f], [], [], timeout) - except: - # return a blank line - return ('', True) - - if len(r) == 0: - return ('', True) - - output = os.read(f.fileno(), 4096) - if not output: - output = self.read_buffer - self.read_buffer = '' - return (output, False) - self.read_buffer += output - if '\n' not in self.read_buffer: - time.sleep(0.01) - continue - tmp = self.read_buffer.split('\n') - lines, self.read_buffer = tmp[:-1], tmp[-1] - real_lines = [x for x in lines if x != ''] - if not real_lines: - time.sleep(0.01) - continue - break - return ('\n'.join(lines), False) - @property def pid(self): return self.proc.pid +class CallableList(list): + def __call__(self, *args, **kwargs): + for e in self: + e(*args, **kwargs) + + def __add__(self, lst): + return CallableList(list.__add__(self, lst)) + +class ProcessReader(object): + def __init__(self, stdout_callback=None, stderr_callback=None, + finished_callback=None, timeout_callback=None, + timeout=None, output_timeout=None): + self.stdout_callback = stdout_callback or (lambda line: True) + self.stderr_callback = stderr_callback or (lambda line: True) + self.finished_callback = finished_callback or (lambda: True) + self.timeout_callback = timeout_callback or (lambda: True) + self.timeout = timeout + self.output_timeout = output_timeout + self.thread = None + + def _create_stream_reader(self, name, stream, queue, callback): + thread = threading.Thread(name=name, + target=self._read_stream, + args=(stream, queue, callback)) + thread.daemon = True + thread.start() + return thread + + def _read_stream(self, stream, queue, callback): + for line in iter(stream.readline, b''): + queue.put((line, callback)) + stream.close() + + def start(self, proc): + queue = Queue() + stdout_reader = None + if proc.stdout: + stdout_reader = self._create_stream_reader('ProcessReaderStdout', + proc.stdout, + queue, + self.stdout_callback) + stderr_reader = None + if proc.stderr and proc.stderr != proc.stdout: + stderr_reader = self._create_stream_reader('ProcessReaderStderr', + proc.stderr, + queue, + self.stderr_callback) + self.thread = threading.Thread(name='ProcessReader', + target=self._read, + args=(stdout_reader, + stderr_reader, + queue)) + self.thread.daemon = True + self.thread.start() + + def _read(self, stdout_reader, stderr_reader, queue): + start_time = time.time() + timed_out = False + timeout = self.timeout + if timeout is not None: + timeout += start_time + output_timeout = self.output_timeout + if output_timeout is not None: + output_timeout += start_time + + while (stdout_reader and stdout_reader.is_alive()) \ + or (stderr_reader and stderr_reader.is_alive()): + has_line = True + try: + line, callback = queue.get(True, 0.02) + except Empty: + has_line = False + now = time.time() + if not has_line: + if output_timeout is not None and now > output_timeout: + timed_out = True + break + else: + if output_timeout is not None: + output_timeout = now + self.output_timeout + callback(line.rstrip()) + if timeout is not None and now > timeout: + timed_out = True + break + # process remaining lines to read + while not queue.empty(): + line, callback = queue.get(False) + callback(line.rstrip()) + if timed_out: + self.timeout_callback() + if stdout_reader: + stdout_reader.join() + if stderr_reader: + stderr_reader.join() + if not timed_out: + self.finished_callback() + + def is_alive(self): + if self.thread: + return self.thread.is_alive() + return False ### default output handlers ### these should be callables that take the output line diff --git a/testing/mozbase/mozprocess/tests/manifest.ini b/testing/mozbase/mozprocess/tests/manifest.ini index a61bdfd2477a..d869952e3f36 100644 --- a/testing/mozbase/mozprocess/tests/manifest.ini +++ b/testing/mozbase/mozprocess/tests/manifest.ini @@ -15,3 +15,4 @@ disabled = bug 921632 [test_mozprocess_wait.py] [test_mozprocess_output.py] [test_mozprocess_params.py] +[test_process_reader.py] diff --git a/testing/mozbase/mozprocess/tests/test_process_reader.py b/testing/mozbase/mozprocess/tests/test_process_reader.py new file mode 100644 index 000000000000..5fd17b4be72e --- /dev/null +++ b/testing/mozbase/mozprocess/tests/test_process_reader.py @@ -0,0 +1,95 @@ +import unittest +import subprocess +import sys +from mozprocess.processhandler import ProcessReader, StoreOutput + +def run_python(str_code, stdout=subprocess.PIPE, stderr=subprocess.PIPE): + cmd = [sys.executable, '-c', str_code] + return subprocess.Popen(cmd, stdout=stdout, stderr=stderr) + +class TestProcessReader(unittest.TestCase): + def setUp(self): + self.out = StoreOutput() + self.err = StoreOutput() + self.finished = False + def on_finished(): + self.finished = True + self.timeout = False + def on_timeout(): + self.timeout = True + self.reader = ProcessReader(stdout_callback=self.out, + stderr_callback=self.err, + finished_callback=on_finished, + timeout_callback=on_timeout) + + def test_stdout_callback(self): + proc = run_python('print 1; print 2') + self.reader.start(proc) + self.reader.thread.join() + + self.assertEqual(self.out.output, ['1', '2']) + self.assertEqual(self.err.output, []) + + def test_stderr_callback(self): + proc = run_python('import sys; sys.stderr.write("hello world\\n")') + self.reader.start(proc) + self.reader.thread.join() + + self.assertEqual(self.out.output, []) + self.assertEqual(self.err.output, ['hello world']) + + def test_stdout_and_stderr_callbacks(self): + proc = run_python('import sys; sys.stderr.write("hello world\\n"); print 1; print 2') + self.reader.start(proc) + self.reader.thread.join() + + self.assertEqual(self.out.output, ['1', '2']) + self.assertEqual(self.err.output, ['hello world']) + + def test_finished_callback(self): + self.assertFalse(self.finished) + proc = run_python('') + self.reader.start(proc) + self.reader.thread.join() + self.assertTrue(self.finished) + + def test_timeout(self): + self.reader.timeout = 0.05 + self.assertFalse(self.timeout) + proc = run_python('import time; time.sleep(0.1)') + self.reader.start(proc) + self.reader.thread.join() + self.assertTrue(self.timeout) + self.assertFalse(self.finished) + + def test_output_timeout(self): + self.reader.output_timeout = 0.05 + self.assertFalse(self.timeout) + proc = run_python('import time; time.sleep(0.1)') + self.reader.start(proc) + self.reader.thread.join() + self.assertTrue(self.timeout) + self.assertFalse(self.finished) + + def test_read_without_eol(self): + proc = run_python('import sys; sys.stdout.write("1")') + self.reader.start(proc) + self.reader.thread.join() + self.assertEqual(self.out.output, ['1']) + + def test_read_with_strange_eol(self): + proc = run_python('import sys; sys.stdout.write("1\\r\\r\\r\\n")') + self.reader.start(proc) + self.reader.thread.join() + self.assertEqual(self.out.output, ['1']) + + def test_mixed_stdout_stderr(self): + proc = run_python('import sys; sys.stderr.write("hello world\\n"); print 1; print 2', stderr=subprocess.STDOUT) + self.reader.start(proc) + self.reader.thread.join() + + self.assertEqual(sorted(self.out.output), sorted(['1', '2', 'hello world'])) + self.assertEqual(self.err.output, []) + +if __name__ == '__main__': + unittest.main()