Fix deadlock when stopping from an async context

Reentrant locks can only be acquired multiple from within the same thread. When
our _run_async_method() invoked start() or stop() we deadlocked.
This commit is contained in:
Damian Johnson 2020-07-12 17:01:18 -07:00
parent 8b539f2fac
commit a2e0da9866
2 changed files with 18 additions and 2 deletions

View File

@ -350,9 +350,11 @@ class Synchronous(object):
async def convert_generator(generator: AsyncIterator) -> Iterator:
return iter([d async for d in generator])
return asyncio.run_coroutine_threadsafe(convert_generator(func(self, *args, **kwargs)), self._loop).result()
future = asyncio.run_coroutine_threadsafe(convert_generator(func(self, *args, **kwargs)), self._loop)
else:
return asyncio.run_coroutine_threadsafe(func(self, *args, **kwargs), self._loop).result()
future = asyncio.run_coroutine_threadsafe(func(self, *args, **kwargs), self._loop)
return future.result()
def __iter__(self) -> Iterator:
return self._run_async_method('__aiter__')

View File

@ -118,6 +118,20 @@ class TestSynchronous(unittest.TestCase):
sync_test()
asyncio.run(async_test())
def test_stop_from_async(self):
"""
Ensure we can stop our instance from within an async method without
deadlock.
"""
class AsyncStop(Synchronous):
async def call_stop(self):
self.stop()
instance = AsyncStop()
instance.call_stop()
self.assertRaises(RuntimeError, instance.call_stop)
def test_resuming(self):
"""
Resume a previously stopped instance.