Bug 1703921 - Ensure h2 server terminates threads when connections are closed r=twisniewski

Move the location where we send the "close thread" message so it will
always happen when we're waiting on the thread to join. Also stop
blocking forever on queues, but spin the loop every second to check if
the connection closed flag got set.

Differential Revision: https://phabricator.services.mozilla.com/D111323
This commit is contained in:
James Graham 2021-04-10 20:04:49 +00:00
parent a1c854d385
commit 5d7d6a92d3

View File

@ -10,7 +10,7 @@ import time
import traceback
import uuid
from collections import OrderedDict
from queue import Queue
from queue import Empty, Queue
from h2.config import H2Configuration
from h2.connection import H2Connection
@ -412,12 +412,11 @@ class Http2WebTestRequestHandler(BaseWebTestRequestHandler):
self.logger.error('(%s) Closing Connection - \n%s' % (self.uid, str(e)))
if not self.close_connection:
self.close_connection = True
for stream_id, (thread, queue) in stream_queues.items():
queue.put(None)
except Exception as e:
self.logger.error('(%s) Unexpected Error - \n%s' % (self.uid, str(e)))
finally:
for stream_id, (thread, queue) in stream_queues.items():
queue.put(None)
thread.join()
def _is_extended_connect_frame(self, frame):
@ -459,6 +458,9 @@ class Http2WebTestRequestHandler(BaseWebTestRequestHandler):
def _stream_ws_thread(self, stream_id, queue):
frame = queue.get(True, None)
if frame is None:
return
rfile, wfile = os.pipe()
rfile, wfile = os.fdopen(rfile, 'rb'), os.fdopen(wfile, 'wb', 0) # needs to be unbuffer for websockets
stream_handler = H2HandlerCopy(self, frame, rfile)
@ -511,7 +513,10 @@ class Http2WebTestRequestHandler(BaseWebTestRequestHandler):
t.start()
while not self.close_connection:
frame = queue.get(True, None)
try:
frame = queue.get(True, 1)
except Empty:
continue
if isinstance(frame, DataReceived):
wfile.write(frame.data)
@ -558,8 +563,11 @@ class Http2WebTestRequestHandler(BaseWebTestRequestHandler):
response = None
req_handler = None
while not self.close_connection:
# Wait for next frame, blocking
frame = queue.get(True, None)
try:
frame = queue.get(True, 1)
except Empty:
# Restart to check for close_connection
continue
self.logger.debug('(%s - %s) %s' % (self.uid, stream_id, str(frame)))
@ -598,7 +606,12 @@ class Http2WebTestRequestHandler(BaseWebTestRequestHandler):
request.frames.append(frame)
if hasattr(frame, "stream_ended") and frame.stream_ended:
self.finish_handling(request, response, req_handler)
try:
self.finish_handling(request, response, req_handler)
except StreamClosedError:
self.logger.debug('(%s - %s) Unable to write response; stream closed' %
(self.uid, stream_id))
break
def frame_handler(self, request, response, handler):
try: