All client usage failed with ValueErrors

When our Relay class sends a message it first drains its socket of unread
data...

  async def _msg(self, cell):
    await self._orport.recv(timeout = 0)
    await self._orport.send(cell.pack(self.link_protocol))
    response = await self._orport.recv(timeout = 1)
    yield stem.client.cell.Cell.pop(response, self.link_protocol)[0]

This in turn called asyncio.wait_for() with a timeout value of zero which
returns immediately, leaving our socket undrained.

Our following recv() is then polluted with unexpected data. For instance,
this is caused anything that uses create_circuit() (such as descriptor
downloads) to fail with confusing exceptions such as...

  ======================================================================
  ERROR: test_downloading_via_orport
  ----------------------------------------------------------------------
  Traceback (most recent call last):
    File "/home/atagar/Desktop/stem/test/require.py", line 60, in wrapped
      return func(self, *args, **kwargs)
    File "/home/atagar/Desktop/stem/test/require.py", line 75, in wrapped
      return func(self, *args, **kwargs)
    File "/home/atagar/Desktop/stem/test/require.py", line 75, in wrapped
      return func(self, *args, **kwargs)
    File "/home/atagar/Desktop/stem/test/integ/descriptor/remote.py", line 27, in test_downloading_via_orport
      fall_back_to_authority = False,
    File "/home/atagar/Desktop/stem/stem/util/__init__.py", line 363, in _run_async_method
      return future.result()
    File "/home/atagar/Python-3.7.0/Lib/concurrent/futures/_base.py", line 432, in result
      return self.__get_result()
    File "/home/atagar/Python-3.7.0/Lib/concurrent/futures/_base.py", line 384, in __get_result
      raise self._exception
    File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 469, in run
      return [desc async for desc in self._run(suppress)]
    File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 469, in <listcomp>
      return [desc async for desc in self._run(suppress)]
    File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 482, in _run
      raise self.error
    File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 549, in _download_descriptors
      response = await asyncio.wait_for(self._download_from(endpoint), time_remaining)
    File "/home/atagar/Python-3.7.0/Lib/asyncio/tasks.py", line 384, in wait_for
      return await fut
    File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 588, in _download_from
      async with await relay.create_circuit() as circ:
    File "/home/atagar/Desktop/stem/stem/client/__init__.py", line 270, in create_circuit
      async for cell in self._msg(create_fast_cell):
    File "/home/atagar/Desktop/stem/stem/client/__init__.py", line 226, in _msg
      yield stem.client.cell.Cell.pop(response, self.link_protocol)[0]
    File "/home/atagar/Desktop/stem/stem/client/cell.py", line 182, in pop
      cls = Cell.by_value(command)
    File "/home/atagar/Desktop/stem/stem/client/cell.py", line 139, in by_value
      raise ValueError("'%s' isn't a valid cell value" % value)
  ValueError: '65' isn't a valid cell value

This also reverts our Relay's '_orport_lock' back to a threaded RLock because
asyncio locks are not reentrant, causing methods such as directory() (which
call _send()) to deadlock upon themselves. We might drop this lock entirely in
the future (thread safety should be moot now that the stem.client module is
fully asynchronous).
This commit is contained in:
Damian Johnson 2020-07-22 18:11:41 -07:00
parent 1e1f3665ae
commit 5d29a26313
3 changed files with 13 additions and 14 deletions

View File

@ -25,8 +25,8 @@ a wrapper for :class:`~stem.socket.RelaySocket`, much the same way as
+- close - closes this circuit
"""
import asyncio
import hashlib
import threading
import stem
import stem.client.cell
@ -71,7 +71,7 @@ class Relay(object):
self.link_protocol = LinkProtocol(link_protocol)
self._orport = orport
self._orport_buffer = b'' # unread bytes
self._orport_lock = asyncio.Lock()
self._orport_lock = threading.RLock()
self._circuits = {} # type: Dict[int, stem.client.Circuit]
@staticmethod
@ -162,7 +162,7 @@ class Relay(object):
:returns: next :class:`~stem.client.cell.Cell`
"""
async with self._orport_lock:
with self._orport_lock:
# cells begin with [circ_id][cell_type][...]
circ_id_size = self.link_protocol.circ_id_size.size
@ -253,7 +253,7 @@ class Relay(object):
:func:`~stem.socket.BaseSocket.close` method.
"""
async with self._orport_lock:
with self._orport_lock:
return await self._orport.close()
async def create_circuit(self) -> 'stem.client.Circuit':
@ -261,7 +261,7 @@ class Relay(object):
Establishes a new circuit.
"""
async with self._orport_lock:
with self._orport_lock:
circ_id = max(self._circuits) + 1 if self._circuits else self.link_protocol.first_circ_id
create_fast_cell = stem.client.cell.CreateFastCell(circ_id)
@ -286,7 +286,7 @@ class Relay(object):
return circ
async def __aiter__(self) -> AsyncIterator['stem.client.Circuit']:
async with self._orport_lock:
with self._orport_lock:
for circ in self._circuits.values():
yield circ
@ -338,7 +338,7 @@ class Circuit(object):
:returns: **str** with the requested descriptor data
"""
async with self.relay._orport_lock:
with self.relay._orport_lock:
await self._send(RelayCommand.BEGIN_DIR, stream_id = stream_id)
await self._send(RelayCommand.DATA, request, stream_id = stream_id)
@ -372,7 +372,7 @@ class Circuit(object):
:param stream_id: specific stream this concerns
"""
async with self.relay._orport_lock:
with self.relay._orport_lock:
# Encrypt and send the cell. Our digest/key only updates if the cell is
# successfully sent.
@ -384,7 +384,7 @@ class Circuit(object):
self.forward_key = forward_key
async def close(self) -> None:
async with self.relay._orport_lock:
with self.relay._orport_lock:
await self.relay._orport.send(stem.client.cell.DestroyCell(self.id).pack(self.relay.link_protocol))
del self.relay._circuits[self.id]

View File

@ -362,13 +362,12 @@ class RelaySocket(BaseSocket):
"""
async def wrapped_recv(reader: asyncio.StreamReader) -> Optional[bytes]:
read_coroutine = reader.read(1024)
if timeout is None:
return await read_coroutine
return await reader.read(1024)
else:
try:
return await asyncio.wait_for(read_coroutine, timeout)
except (asyncio.TimeoutError, ssl.SSLError, ssl.SSLWantReadError):
return await asyncio.wait_for(reader.read(1024), max(timeout, 0.0001))
except asyncio.TimeoutError:
return None
return await self._recv(wrapped_recv)

View File

@ -98,7 +98,7 @@ class TestDescriptorDownloader(unittest.TestCase):
self.assertEqual('moria1', desc.nickname)
self.assertEqual('128.31.0.34', desc.address)
self.assertEqual('9695DFC35FFEB861329B9F1AB04C46397020CE31', desc.fingerprint)
self.assertEqual(TEST_DESCRIPTOR, desc.get_bytes())
self.assertEqual(TEST_DESCRIPTOR.rstrip(), desc.get_bytes())
reply.stop()