Remove hardcoded buffer size from ORPort sockets

When reading ORPort data that exceeded a hardcode (and arbitrary) buffer size
we cropped the content. This was caught by starlight when attempting to use
one of our demo scripts...

  https://trac.torproject.org/projects/tor/ticket/28961
  https://stem.torproject.org/tutorials/examples/download_descriptor.html

  Original traceback:
    File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 589, in _download_descriptors
      self.content, self.reply_headers = _download_from_orport(endpoint, self.compression, self.resource)
    File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 998, in _download_from_orport
      response = b''.join([cell.data for cell in circ.send(RelayCommand.DATA, request, stream_id = 1)])
    File "/home/atagar/Desktop/stem/stem/client/__init__.py", line 268, in send
      decrypted_cell, backward_key, backward_digest = stem.client.cell.RelayCell.decrypt(self.relay.link_protocol, encrypted_cell, self.backward_key, self.backward_digest)
    File "/home/atagar/Desktop/stem/stem/client/cell.py", line 412, in decrypt
      raise stem.ProtocolError('RELAY cells should be %i bytes, but received %i' % (link_protocol.fixed_cell_length, len(content)))
  ProtocolError: RELAY cells should be 512 bytes, but received 464

I'm unhappy with this approach, but after three days of chewing on this it's
the least bad approach I've come up with and seems to work. Patches welcome if
there's a smarter way of going about this.
This commit is contained in:
Damian Johnson 2019-01-02 12:32:23 -08:00
parent 0724fa5273
commit e2d8575ce4
2 changed files with 56 additions and 13 deletions

View File

@ -130,6 +130,38 @@ class Relay(object):
return Relay(conn, link_protocol)
def _msg(self, cell):
"""
Sends a cell on the ORPort and provides the response we receive in reply.
Unfortunately unlike control sockets, ORPorts don't have generalized rules
for predictable message IO. With control sockets...
* Each message we send receives a single reply.
* We may also receive asynchronous events marked with a 650 status.
ORPorts by contrast receive variable length cells with differing rules on
their arrival. As such making a best effort attempt at a send-and-receive
method in which we do the following...
* Discard any existing unread data from the socket.
* Send our request.
* Await up to a second for a reply.
It's quite possible this is a stupid approach. If so, patches welcome.
:param stem.client.cell.Cell cell: cell to be sent
:returns: **generator** with the cells received in reply
"""
self._orport.recv(timeout = 0) # discard unread data
self._orport.send(cell.pack(self.link_protocol))
response = self._orport.recv(timeout = 1)
for received_cell in stem.client.cell.Cell.pop(response, self.link_protocol):
yield received_cell
def is_alive(self):
"""
Checks if our socket is currently connected. This is a pass-through for our
@ -170,15 +202,16 @@ class Relay(object):
circ_id = max(self._circuits) + 1 if self._circuits else self.link_protocol.first_circ_id
create_fast_cell = stem.client.cell.CreateFastCell(circ_id)
self._orport.send(create_fast_cell.pack(self.link_protocol))
created_fast_cell = None
response = stem.client.cell.Cell.unpack(self._orport.recv(), self.link_protocol)
created_fast_cells = filter(lambda cell: isinstance(cell, stem.client.cell.CreatedFastCell), response)
for cell in self._msg(create_fast_cell):
if isinstance(cell, stem.client.cell.CreatedFastCell):
created_fast_cell = cell
break
if not created_fast_cells:
if not created_fast_cell:
raise ValueError('We should get a CREATED_FAST response from a CREATE_FAST request')
created_fast_cell = list(created_fast_cells)[0]
kdf = KDF.from_value(create_fast_cell.key_material + created_fast_cell.key_material)
if created_fast_cell.derivative_key != kdf.key_hash:

View File

@ -91,10 +91,6 @@ ERROR_MSG = 'Error while receiving a control message (%s): %s'
TRUNCATE_LOGS = 10
# maximum number of bytes to read at a time from a relay socket
MAX_READ_BUFFER_LEN = 10 * 1024 * 1024
class BaseSocket(object):
"""
@ -389,10 +385,13 @@ class RelaySocket(BaseSocket):
self._send(message, lambda s, sf, msg: _write_to_socket(sf, msg))
def recv(self):
def recv(self, timeout = None):
"""
Receives a message from the relay.
:param float timeout: maxiumum number of seconds to await a response, this
blocks indefinitely if **None**
:returns: bytes for the message received
:raises:
@ -400,10 +399,21 @@ class RelaySocket(BaseSocket):
* :class:`stem.SocketClosed` if the socket closes before we receive a complete message
"""
# TODO: Is MAX_READ_BUFFER_LEN defined in the spec? Not sure where it came
# from.
def wrapped_recv(s, sf):
if timeout is None:
return s.recv()
else:
s.setblocking(0)
s.settimeout(timeout)
return self._recv(lambda s, sf: s.recv(MAX_READ_BUFFER_LEN))
try:
return s.recv()
except ssl.SSLWantReadError:
return None
finally:
s.setblocking(1)
return self._recv(wrapped_recv)
def is_localhost(self):
return self.address == '127.0.0.1'