Long story short
The suggested testing pattern requires boilerplate duplication of app code and doesn’t show testing the app directly.
https://docs.aiohttp.org/en/stable/testing.html#pytest
Expected behaviour
I should be able to pass my app module directly to test_client to test my app.
Actual behaviour
The docs show building the app from handlers on the fly before each test. Passing the real app directly to test_client works fine the first time, then fails with «RuntimeError: web.Application instance initialized with different loop».
Steps to reproduce
# handler.py async def foo(): return 'foo'
# app.py from aiohttp import web from .handler import foo app = web.Application() app.add_routes([web.get('/foo/', foo)])
# test.py from .app import app async def test_foo_status(test_client): response = await client.get('/foo/') assert response.status == 200 async def test_foo_body(test_client): response = await client.get('/foo/') body = await response.text() assert body == 'foo'
Your environment
- OS: Ubuntu 18.04
- Python: 3.6.5
- aiohttp: 3.3.2
Work around
I hacked around the issue for now by peeking at the source code and determining the cause of the error.
if self._loop is not None and self._loop is not loop: raise RuntimeError( "web.Application instance initialized with different loop")
https://github.com/aio-libs/aiohttp/blob/e561eaa/aiohttp/web_app.py#L134
It is not immediately clear to my why this code is necessary. I could not find any context in the git history. Setting app._loop = None in my test cleanup method seems to avoid the problem without any negative side effects. Is this code actually necessary? Could this fix be integrated into test_client?
Hi I’m using AsyncIOMotorClient for asynchronous db calls to mongoDb.
Below is my code.
xyz.py
async def insertMany(self,collection_name,documents_to_insert):
try:
collection=self.database[collection_name]
document_inserted = await collection.insert_many(documents_to_insert)
return document_inserted
except Exception:
raise
def insertManyFn(self,collection_name,documents_to_insert):
try:
loop=asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop1=asyncio.get_event_loop()
inserted_documents_count = loop1.run_until_complete(self.insertMany(collection_name, documents_to_insert))
if inserted_documents_count==len(documents_to_insert):
document_to_insert={Config.DB_JOB_COLUMN:Job.job_id,Config.DB_JOB_RESULT_COLUMN:Config.DB_JOB_RESULT_SUCCESS}
loop1.run_until_complete(self.insertOne(Config.DB_JOB_COLLECTION, document_to_insert))
except Exception:
raise
xyz1.py
t=Timer(10,xyz.insertManyFn,
(collection_name,documents_to_insert))
t.start()
While running this I’m getting an exception
RuntimeError: Task <Task pending coro=<xyz.insertMany() running at <my workspace location>/xyz.py:144> cb=[_run_until_complete_cb() at /usr/lib64/python3.5/asyncio/base_events.py:164]> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/lib64/python3.5/asyncio/futures.py:431]> attached to a different loop
In the above program insertManyFn will be called after 10sec and do the insert operation. But when it make the first call to insertMany I’m getting an exception.
Короче
Предлагаемый шаблон тестирования требует стандартного дублирования кода приложения и не показывает тестирование приложения напрямую.
https://docs.aiohttp.org/en/stable/testing.html#pytest
Ожидаемое поведение
Я должен иметь возможность передать свой модуль приложения напрямую test_client для тестирования моего приложения.
Фактическое поведение
В документации показано создание приложения из обработчиков «на лету» перед каждым тестом. Передача реального app непосредственно в test_client работает нормально в первый раз, затем завершается ошибкой «RuntimeError: экземпляр web.Application инициализирован другим циклом».
Действия по воспроизведению
# handler.py
async def foo():
return 'foo'
# app.py
from aiohttp import web
from .handler import foo
app = web.Application()
app.add_routes([web.get('/foo/', foo)])
# test.py
from .app import app
async def test_foo_status(test_client):
response = await client.get('/foo/')
assert response.status == 200
async def test_foo_body(test_client):
response = await client.get('/foo/')
body = await response.text()
assert body == 'foo'
Ваше окружение
- ОС: Ubuntu 18.04
- Python: 3.6.5
- aiohttp: 3.3.2
Работать вокруг
На данный момент я решил проблему, заглянув в исходный код и определив причину ошибки.
if self._loop is not None and self._loop is not loop:
raise RuntimeError(
"web.Application instance initialized with different loop")
https://github.com/aio-libs/aiohttp/blob/e561eaa/aiohttp/web_app.py#L134
Мне не сразу понятно, зачем нужен этот код. Я не смог найти никакого контекста в истории git. Установка app._loop = None в моем методе тестовой очистки, похоже, позволяет избежать проблемы без каких-либо негативных побочных эффектов. Действительно ли этот код необходим? Можно ли интегрировать это исправление в test_client ?
Application — объект с отслеживанием состояния, его следует воссоздавать для каждого теста из набора модульных тестов.
Срок службы приложения должен быть короче, чем цикл обработки событий.
Подсказка: замените app.app на async def app.make_app() coroutine, которая возвращает новое приложение при каждом вызове.
Добавить def make_app() в app.py ? В этом есть смысл.
FWIW tornado продвигает шаблон make_app() в своем самом первом примере. Я не думаю, что издевательство над приложением в тестовом файле — хороший образец для продвижения документации.
http://www.tornadoweb.org/en/stable/#hello -world
Честно говоря, я не думаю, что издевательство над любым объектом aiohttp — хороший шаблон
Этот поток был автоматически заблокирован, поскольку после его закрытия в последнее время не было никаких действий. Пожалуйста, откройте [новый выпуск], чтобы узнать об ошибках.
Если вы чувствуете, что в ходе этого обсуждения были затронуты важные моменты, включите эти выдержки в этот [новый выпуск].
Мой код:
import asyncio
async def coro(arg):
return arg
async def main():
loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop) - пробовал, не помогает
task = asyncio.ensure_future(coro(1), loop=loop)
res = await asyncio.gather(*asyncio.all_tasks(loop=loop))
if __name__ == '__main__':
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(main())
Ошибка:
Traceback (most recent call last):
File "/Users/lifr0m/Documents/Proxies/test.py", line 18, in <module>
event_loop.run_until_complete(main())
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/Users/lifr0m/Documents/Proxies/test.py", line 13, in main
res = await asyncio.gather(*asyncio.all_tasks(loop=loop))
RuntimeError: Task <Task pending name='Task-1' coro=<main() running at /Users/lifr0m/Documents/Proxies/test.py:13> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/base_events.py:184]> got Future <_GatheringFuture pending> attached to a different loop
Собственно, что я делаю не так? Я разве не могу запустить таск от одного лупа и в нём создать второй луп? Я в аргументах указал второй луп.
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
import asyncio
import warnings
from collections import MutableMapping
from functools import partial
from . import hdrs
from .abc import AbstractAccessLogger, AbstractMatchInfo, AbstractRouter
from .frozenlist import FrozenList
from .helpers import DEBUG, AccessLogger
from .log import web_logger
from .signals import Signal
from .web_middlewares import _fix_request_current_app
from .web_request import Request
from .web_response import StreamResponse
from .web_server import Server
from .web_urldispatcher import PrefixedSubAppResource, UrlDispatcher
__all__ = ('Application', 'CleanupError')
class Application(MutableMapping):
ATTRS = frozenset([
'logger', '_debug', '_router', '_loop', '_handler_args',
'_middlewares', '_middlewares_handlers', '_run_middlewares',
'_state', '_frozen', '_subapps',
'_on_response_prepare', '_on_startup', '_on_shutdown',
'_on_cleanup', '_client_max_size', '_cleanup_ctx'])
def __init__(self, *,
logger=web_logger,
router=None,
middlewares=(),
handler_args=None,
client_max_size=1024**2,
loop=None,
debug=...):
if router is None:
router = UrlDispatcher()
else:
warnings.warn("router argument is deprecated", DeprecationWarning,
stacklevel=2)
assert isinstance(router, AbstractRouter), router
if loop is not None:
warnings.warn("loop argument is deprecated", DeprecationWarning,
stacklevel=2)
self._debug = debug
self._router = router
self._loop = loop
self._handler_args = handler_args
self.logger = logger
self._middlewares = FrozenList(middlewares)
self._middlewares_handlers = None # initialized on freezing
self._run_middlewares = None # initialized on freezing
self._state = {}
self._frozen = False
self._subapps = []
self._on_response_prepare = Signal(self)
self._on_startup = Signal(self)
self._on_shutdown = Signal(self)
self._on_cleanup = Signal(self)
self._cleanup_ctx = CleanupContext()
self._on_startup.append(self._cleanup_ctx._on_startup)
self._on_cleanup.append(self._cleanup_ctx._on_cleanup)
self._client_max_size = client_max_size
def __init_subclass__(cls):
warnings.warn("Inheritance class {} from web.Application "
"is discouraged".format(cls.__name__),
DeprecationWarning,
stacklevel=2)
if DEBUG:
def __setattr__(self, name, val):
if name not in self.ATTRS:
warnings.warn("Setting custom web.Application.{} attribute "
"is discouraged".format(name),
DeprecationWarning,
stacklevel=2)
super().__setattr__(name, val)
# MutableMapping API
def __eq__(self, other):
return self is other
def __getitem__(self, key):
return self._state[key]
def _check_frozen(self):
if self._frozen:
warnings.warn("Changing state of started or joined "
"application is deprecated",
DeprecationWarning,
stacklevel=3)
def __setitem__(self, key, value):
self._check_frozen()
self._state[key] = value
def __delitem__(self, key):
self._check_frozen()
del self._state[key]
def __len__(self):
return len(self._state)
def __iter__(self):
return iter(self._state)
########
@property
def loop(self):
return self._loop
def _set_loop(self, loop):
if loop is None:
loop = asyncio.get_event_loop()
if self._loop is not None and self._loop is not loop:
raise RuntimeError(
"web.Application instance initialized with different loop")
self._loop = loop
# set loop debug
if self._debug is ...:
self._debug = loop.get_debug()
# set loop to sub applications
for subapp in self._subapps:
subapp._set_loop(loop)
@property
def frozen(self):
return self._frozen
def freeze(self):
if self._frozen:
return
self._frozen = True
self._middlewares.freeze()
self._router.freeze()
self._on_response_prepare.freeze()
self._cleanup_ctx.freeze()
self._on_startup.freeze()
self._on_shutdown.freeze()
self._on_cleanup.freeze()
self._middlewares_handlers = tuple(self._prepare_middleware())
# If current app and any subapp do not have middlewares avoid run all
# of the code footprint that it implies, which have a middleware
# hardcoded per app that sets up the current_app attribute. If no
# middlewares are configured the handler will receive the proper
# current_app without needing all of this code.
self._run_middlewares = True if self.middlewares else False
for subapp in self._subapps:
subapp.freeze()
self._run_middlewares =
self._run_middlewares or subapp._run_middlewares
@property
def debug(self):
return self._debug
def _reg_subapp_signals(self, subapp):
def reg_handler(signame):
subsig = getattr(subapp, signame)
async def handler(app):
await subsig.send(subapp)
appsig = getattr(self, signame)
appsig.append(handler)
reg_handler('on_startup')
reg_handler('on_shutdown')
reg_handler('on_cleanup')
def add_subapp(self, prefix, subapp):
if self.frozen:
raise RuntimeError(
"Cannot add sub application to frozen application")
if subapp.frozen:
raise RuntimeError("Cannot add frozen application")
if prefix.endswith('/'):
prefix = prefix[:-1]
if prefix in ('', '/'):
raise ValueError("Prefix cannot be empty")
resource = PrefixedSubAppResource(prefix, subapp)
self.router.register_resource(resource)
self._reg_subapp_signals(subapp)
self._subapps.append(subapp)
subapp.freeze()
if self._loop is not None:
subapp._set_loop(self._loop)
return resource
def add_routes(self, routes):
self.router.add_routes(routes)
@property
def on_response_prepare(self):
return self._on_response_prepare
@property
def on_startup(self):
return self._on_startup
@property
def on_shutdown(self):
return self._on_shutdown
@property
def on_cleanup(self):
return self._on_cleanup
@property
def cleanup_ctx(self):
return self._cleanup_ctx
@property
def router(self):
return self._router
@property
def middlewares(self):
return self._middlewares
def _make_handler(self, *,
loop=None,
access_log_class=AccessLogger,
**kwargs):
if not issubclass(access_log_class, AbstractAccessLogger):
raise TypeError(
'access_log_class must be subclass of '
'aiohttp.abc.AbstractAccessLogger, got {}'.format(
access_log_class))
self._set_loop(loop)
self.freeze()
kwargs['debug'] = self.debug
if self._handler_args:
for k, v in self._handler_args.items():
kwargs[k] = v
return Server(self._handle, request_factory=self._make_request,
access_log_class=access_log_class,
loop=self.loop, **kwargs)
def make_handler(self, *,
loop=None,
access_log_class=AccessLogger,
**kwargs):
warnings.warn("Application.make_handler(...) is deprecated, "
"use AppRunner API instead",
DeprecationWarning,
stacklevel=2)
return self._make_handler(loop=loop,
access_log_class=access_log_class,
**kwargs)
async def startup(self):
"""Causes on_startup signal
Should be called in the event loop along with the request handler.
"""
await self.on_startup.send(self)
async def shutdown(self):
"""Causes on_shutdown signal
Should be called before cleanup()
"""
await self.on_shutdown.send(self)
async def cleanup(self):
"""Causes on_cleanup signal
Should be called after shutdown()
"""
await self.on_cleanup.send(self)
def _make_request(self, message, payload, protocol, writer, task,
_cls=Request):
return _cls(
message, payload, protocol, writer, task,
self._loop,
client_max_size=self._client_max_size)
def _prepare_middleware(self):
for m in reversed(self._middlewares):
if getattr(m, '__middleware_version__', None) == 1:
yield m, True
else:
warnings.warn('old-style middleware "{!r}" deprecated, '
'see #2252'.format(m),
DeprecationWarning, stacklevel=2)
yield m, False
yield _fix_request_current_app(self), True
async def _handle(self, request):
match_info = await self._router.resolve(request)
if DEBUG: # pragma: no cover
if not isinstance(match_info, AbstractMatchInfo):
raise TypeError("match_info should be AbstractMAtchInfo "
"instance, not {!r}".format(match_info))
match_info.add_app(self)
match_info.freeze()
resp = None
request._match_info = match_info
expect = request.headers.get(hdrs.EXPECT)
if expect:
resp = await match_info.expect_handler(request)
await request.writer.drain()
if resp is None:
handler = match_info.handler
if self._run_middlewares:
for app in match_info.apps[::-1]:
for m, new_style in app._middlewares_handlers:
if new_style:
handler = partial(m, handler=handler)
else:
handler = await m(app, handler)
resp = await handler(request)
if DEBUG:
if not isinstance(resp, StreamResponse):
msg = ("Handler {!r} should return response instance, "
Loading …
The autoreload doesn’t work, it doesn’t detect the changes of the backend side. I use the following adev command:
$ DEBUG=DEBUG adev runserver --no-debug-toolbar datalibrary/api.py
[12:19:17] pre-check enabled, checking app factory
2017-12-19 12:19:17,078 [6652] INFO datalibrary.api: boot datalibrary-api server
2017-12-19 12:19:17,079 [6652] DEBUG datalibrary.api: setup asyncpg, using postgres://datalibrary:datalibrary@localhost:5432/datalibrary
[12:19:17] Starting aux server at http://localhost:8001 ◆
2017-12-19 12:19:17,093 [6652] INFO adev.server.dft: Starting aux server at http://localhost:8001 ◆
[12:19:17] Starting dev server at http://localhost:8000 ●
2017-12-19 12:19:17,094 [6652] INFO adev.server.dft: Starting dev server at http://localhost:8000 ●
2017-12-19 12:19:17,102 [6652] DEBUG watchgod.main: time=1ms files=40 changes=0
2017-12-19 12:19:17,502 [6652] DEBUG watchgod.main: time=1ms files=40 changes=0
2017-12-19 12:19:17,768 [6678] INFO datalibrary.api: boot datalibrary-api server
2017-12-19 12:19:17,770 [6678] DEBUG datalibrary.api: setup asyncpg, using postgres://datalibrary:datalibrary@localhost:5432/datalibrary
2017-12-19 12:19:17,904 [6652] DEBUG watchgod.main: time=2ms files=40 changes=0
2017-12-19 12:19:18,305 [6652] DEBUG watchgod.main: time=2ms files=40 changes=0
2017-12-19 12:19:18,706 [6652] DEBUG watchgod.main: time=3ms files=40 changes=0
2017-12-19 12:19:19,107 [6652] DEBUG watchgod.main: time=3ms files=40 changes=0
2017-12-19 12:19:19,507 [6652] DEBUG watchgod.main: time=2ms files=40 changes=0
2017-12-19 12:19:19,909 [6652] DEBUG watchgod.main: time=2ms files=40 changes=0
2017-12-19 12:19:20,309 [6652] DEBUG watchgod.main: time=2ms files=40 changes=0
2017-12-19 12:19:20,711 [6652] DEBUG watchgod.main: time=3ms files=40 changes=0
2017-12-19 12:19:21,111 [6652] DEBUG watchgod.main: time=2ms files=40 changes=0
2017-12-19 12:19:21,512 [6652] DEBUG watchgod.main: time=3ms files=40 changes=0
2017-12-19 12:19:21,913 [6652] DEBUG watchgod.main: time=2ms files=40 changes=0
2017-12-19 12:19:22,314 [6652] DEBUG watchgod.main: time=3ms files=40 changes=0
2017-12-19 12:19:22,715 [6652] DEBUG watchgod.main: time=2ms files=40 changes=0
The watchgod is kicking and then when a file changes it stops…
2017-12-19 12:19:22,715 [6652] DEBUG watchgod.main: time=2ms files=40 changes=0
^C[12:20:23] shutting down server...
2017-12-19 12:20:23,377 [6652] INFO adev.server.dft: shutting down server...
2017-12-19 12:20:23,464 [6652] CRITICAL root: Traceback (most recent call last):
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/bin/adev", line 11, in <module>
sys.exit(cli())
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/click/core.py", line 722, in __call__
return self.main(*args, **kwargs)
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/click/core.py", line 697, in main
rv = self.invoke(ctx)
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/click/core.py", line 895, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/click/core.py", line 535, in invoke
return callback(*args, **kwargs)
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/aiohttp_devtools/cli.py", line 88, in runserver
run_app(*_runserver(**active_config))
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/aiohttp_devtools/runserver/main.py", line 26, in run_app
loop.run_until_complete(app.shutdown())
File "/usr/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
return future.result()
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/aiohttp/web.py", line 272, in shutdown
yield from self.on_shutdown.send(self)
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/aiohttp/signals.py", line 51, in send
yield from self._send(*args, **kwargs)
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/aiohttp/signals.py", line 17, in _send
yield from res
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/aiohttp_devtools/runserver/watch.py", line 108, in close
await super().close()
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/aiohttp_devtools/runserver/watch.py", line 32, in close
self._task.result()
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/aiohttp_devtools/runserver/watch.py", line 48, in _run
async for changes in self._awatch:
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/watchgod/main.py", line 89, in __anext__
changes = await self._loop.run_in_executor(None, self._w.check)
File "/usr/lib/python3.6/concurrent/futures/thread.py", line 56, in run
result = self.fn(*self.args, **self.kwargs)
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/watchgod/watcher.py", line 43, in check
self._walk(str(self.root_path), changes, new_files)
File "/home/amirouche/.local/share/virtualenvs/west2-bifHViav/lib/python3.6/site-packages/watchgod/watcher.py", line 32, in _walk
mtime = entry.stat().st_mtime
FileNotFoundError: [Errno 2] No such file or directory: '/home/amirouche/src/namr/datalibrary/west2/datalibrary/.#api.py'
2017-12-19 12:20:23,497 [6652] ERROR asyncio: Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7ff6416b6a58>
(west2-bifHViav)
Я пытаюсь настроить коннектор REST для шлюза Thingsboard IoT Gateway, но при инициализации возникают некоторые ошибки. Моя конфигурация rest.json:
{
"host": "127.0.0.1",
"port": "5000",
"SSL": false,
"mapping": [
{
"endpoint": "/sizerdata",
"HTTPMethods": [
"POST"
],
"security": {
"type": "anonymous"
},
"converter": {
"type": "json",
"deviceNameExpression": "Sizer ${name}",
"deviceTypeExpression": "default",
"attributes": [
{
"type": "string",
"key": "serialNumber",
"value": "${serialNumber}"
}
],
"timeseries": [
{
"type": "integer",
"key": "cupfill",
"value": "${cupfill}"
},
{
"type": "integer",
"key": "packsPerHour",
"value": "${packsPerHour}"
},
{
"type": "integer",
"key": "totalFruitPerMinute",
"value": "${totalFruitPerMinute}"
}
]
}
}
]
}
Когда я запускаю службу thingsboard-gateway, я получаю следующие ошибки в журналах:
""2021-12-29 13:09:07" - |ERROR| - [rest_connector.py] - rest_connector - run - 152 - web.Application instance initialized with different loop"
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/thingsboard_gateway/connectors/rest/rest_connector.py", line 150, in run
self.__run_server()
File "/usr/lib/python3/dist-packages/thingsboard_gateway/connectors/rest/rest_connector.py", line 143, in __run_server
web.run_app(self._app, host=self.__config['host'], port=self.__config['port'], handle_signals=False,
File "/var/lib/thingsboard_gateway/.local/lib/python3.8/site-packages/aiohttp/web.py", line 514, in run_app
loop.run_until_complete(main_task)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/var/lib/thingsboard_gateway/.local/lib/python3.8/site-packages/aiohttp/web.py", line 321, in _run_app
await runner.setup()
File "/var/lib/thingsboard_gateway/.local/lib/python3.8/site-packages/aiohttp/web_runner.py", line 279, in setup
self._server = await self._make_server()
File "/var/lib/thingsboard_gateway/.local/lib/python3.8/site-packages/aiohttp/web_runner.py", line 373, in _make_server
self._app._set_loop(loop)
File "/var/lib/thingsboard_gateway/.local/lib/python3.8/site-packages/aiohttp/web_app.py", line 223, in _set_loop
raise RuntimeError(
RuntimeError: web.Application instance initialized with different loop
Я пробовал это на установке Ubuntu и контейнере Docker и получил тот же результат. Есть идеи, почему это не работает?
dbrattli / aioreactive
Goto Github
PK
View Code? Open in Web Editor
NEW
20.0
18.0
641 KB
Async/await reactive tools for Python 3.9+
License: MIT License
async
asyncio
streams
asynciterator
expression
functional
functional-programming
aioreactive’s Introduction
aioreactive — ReactiveX for asyncio using async and await
NEWS: Project rebooted Nov. 2020. Rebuilt using Expression.
Aioreactive is RxPY for asyncio.
It’s an asynchronous and reactive Python library for asyncio using async
and await. Aioreactive is built on the
Expression functional library
and, integrates naturally with the Python language.
aioreactive is the unification of RxPY and reactive programming with
asyncio using async and await.
The design goals for aioreactive:
- Python 3.9+ only. We have a hard dependency PEP
585, Type Hinting Generics
In Standard Collections, data classes and type variables. - All operators and tools are implemented as plain old functions.
- Everything is
async. Sending values is async, subscribing to
observables is async. Disposing subscriptions is async. - One scheduler to rule them all. Everything runs on the asyncio base
event-loop. - No multi-threading. Only async and await with concurrency using
asyncio. Threads are hard, and in many cases it doesn’t make sense to
use multi-threading in Python applications. If you need to use threads
you may wrap them with
concurrent.futures
and compose them into the chain withflat_map()or similar. See
parallel.py
for an example. - Simple, clean and use few abstractions. Try to align with the
itertools package, and reuse as much from the Python standard library
as possible. - Support type hints and static type checking using Pylance.
- Implicit synchronous back-pressure ™. Producers of events will
simply be awaited until the event can be processed by the down-stream
consumers.
AsyncObservable and AsyncObserver
With aioreactive you subscribe observers to observables, and the key
abstractions of aioreactive can be seen in this single line of code:
subscription = await observable.subscribe_async(observer)
The difference from RxPY can be seen with the await expression.
Aioreactive is built around the asynchronous duals, or opposites of the
AsyncIterable and AsyncIterator abstract base classes. These async
classes are called AsyncObservable and AsyncObserver.
AsyncObservable is a producer of events. It may be seen as the dual or
opposite of AsyncIterable and provides a single setter method called
subscribe_async() that is the dual of the __aiter__() getter method:
from abc import ABC, abstractmethod class AsyncObservable(ABC): @abstractmethod async def subscribe_async(self, observer): return NotImplemented
AsyncObserver is a consumer of events and is modeled after the
so-called consumer interface, the
enhanced generator interface in
PEP-342 and async
generators in PEP-525. It
is the dual of the AsyncIterator __anext__() method, and expands to
three async methods asend(), that is the opposite of __anext__(),
athrow() that is the opposite of an raise Exception() and aclose()
that is the opposite of raise StopAsyncIteration:
from abc import ABC, abstractmethod class AsyncObserver(ABC): @abstractmethod async def asend(self, value): return NotImplemented @abstractmethod async def athrow(self, error): return NotImplemented @abstractmethod async def aclose(self): return NotImplemented
Subscribing to observables
An observable becomes hot and starts streaming items by using the
subscribe_async() method. The subscribe_async() method takes an
observable and returns a disposable subscription. So the
subscribe_async() method is used to attach a observer to the
observable.
async def asend(value): print(value) disposable = await subscribe_async(source, AsyncAnonymousObserver(asend))
AsyncAnonymousObserver is an anonymous observer that constructs an
AsyncObserver out of plain async functions, so you don’t have to
implement a new named observer every time you need one.
The subscription returned by subscribe_async() is disposable, so to
unsubscribe you need to await the dispose_async() method on the
subscription.
await subscription.dispose_async()
Asynchronous iteration
Even more interesting, with to_async_iterable you can flip around from
AsyncObservable to an AsyncIterable and use async-for to consume
the stream of events.
obv = AsyncIteratorObserver() subscription = subscribe_async(source, obv) async for x in obv: print(x)
They effectively transform us from an async push model to an async pull
model, and lets us use the awesome new language features such as async for and async-with. We do this without any queueing, as a push by the
AsyncObservable will await the pull by the `AsyncIterator. This
effectively applies so-called «back-pressure» up the subscription as the
producer will await the iterator to pick up the item send.
The for-loop may be wrapped with async-with to control the lifetime of
the subscription:
import aioreactive as rx xs = rx.from_iterable([1, 2, 3]) result = [] obv = rx.AsyncIteratorObserver() async with await xs.subscribe_async(obv) as subscription: async for x in obv: result.append(x) assert result == [1, 2, 3]
Async streams
An async stream is both an async observer and an async observable.
Aioreactive lets you create streams explicitly.
import aioreactive as rx stream = AsyncSubject() # Alias for AsyncMultiStream sink = rx.AsyncAnonymousObserver() await stream.subscribe_async(sink) await stream.asend(42)
You can create streams directly from AsyncMultiStream or
AsyncSingleStream. AsyncMultiStream supports multiple observers, and
is hot in the sense that it will drop any event that is sent if there
are currently no observers attached. AsyncSingleStream on the other
hand supports a single observer, and is cold in the sense that it will
await any producer until there is an observer attached.
Operators
The Rx operators in aioreactive are all plain old functions. You can
apply them to an observable and compose it into a transformed, filtered,
aggregated or combined observable. This transformed observable can be
streamed into an observer.
Observable -> Operator -> Operator -> Operator -> Observer
Aioreactive contains many of the same operators as you know from RxPY.
Our goal is not to implement them all, but to provide the most essential
ones.
- concat — Concatenates two or more observables.
- choose — Filters and/or transforms the observable.
- choose_asnc — Asynchronously filters and/or transforms the observable.
- debounce — Throttles an observable.
- delay — delays the items within an observable.
- distinct_until_changed — an observable with continuously distinct values.
- filter — filters an observable.
- filteri — filters an observable with index.
- flat_map — transforms an observable into a stream of observables and flattens the resulting observable.
- flat_map_latest — transforms an observable into a stream of
observables and flattens the resulting observable by producing values
from the latest observable. - from_iterable — Create an observable from an (async) iterable.
- subscribe — Subscribes an observer to an observable. Returns a subscription.
- map — transforms an observable.
- mapi — transforms an observable with index.
- map_async — transforms an observable asynchronously.
- mapi_async — transforms an observable asynchronously with index.
- merge_inner — Merges an observable of observables.
- merge — Merge one observable with another observable.
- merge_seq — Merge a sequence of observables.
- run — Awaits the future returned by subscribe. Returns when the subscription closes.
- slice — Slices an observable.
- skip — Skip items from the start of the observable stream.
- skip_last — Skip items from the end of the observable stream.
- starfilter — Filters an observable with a predicate and spreads the arguments.
- starmap — Transforms and async observable and spreads the arguments to the mapper.
- switch_latest — Merges the latest stream in an observable of streams.
- take — Take a number of items from the start of the observable stream.
- take_last — Take a number of items from the end of the observable stream.
- unit — Converts a value or future to an observable.
- with_latest_from — Combines two observables into one.
Functional or object-oriented, reactive or interactive
With aioreactive you can choose to program functionally with plain old
functions, or object-oriented with classes and methods. Aioreactive
supports both method chaining or forward pipe programming styles.
Pipe forward programming style
AsyncObservable may compose operators using forward pipelining with
the pipe operator provided by the amazing
Expression library. This works
by having the operators partially applied with their arguments before
being given the source stream as the last curried argument.
ys = pipe(xs, filter(predicate), map(mapper), flat_map(request))
Longer pipelines may break lines as for binary operators:
import aioreactve as rx async def main(): stream = rx.AsyncSubject() obv = rx.AsyncIteratorObserver() xs = pipe( stream, rx.map(lambda x: x["term"]), rx.filter(lambda text: len(text) > 2), rx.debounce(0.75), rx.distinct_until_changed(), rx.map(search_wikipedia), rx.switch_latest(), ) async with xs.subscribe_async(obv) as ys async for value in obv: print(value)
AsyncObservable also supports slicing using the Python slice notation.
@pytest.mark.asyncio async def test_slice_special(): xs = rx.from_iterable([1, 2, 3, 4, 5]) values = [] async def asend(value): values.append(value) ys = xs[1:-1] result = await run(ys, AsyncAnonymousObserver(asend)) assert result == 4 assert values == [2, 3, 4]
Fluent and chained programming style
An alternative to pipelining is to use the classic and fluent method
chaining as we know from ReactiveX.
An AsyncObservable created from class methods such as
AsyncRx.from_iterable() returns a AsyncChainedObservable.
where we may use methods such as .filter() and .map().
from aioreactive import AsyncRx @pytest.mark.asyncio async def test_observable_simple_pipe(): xs = AsyncRx.from_iterable([1, 2, 3]) result = [] async def mapper(value): await asyncio.sleep(0.1) return value * 10 async def predicate(value): await asyncio.sleep(0.1) return value > 1 ys = xs.filter(predicate).map(mapper) async def on_next(value): result.append(value) subscription = await ys.subscribe_async(AsyncAnonymousObserver(on_next)) await subsubscription assert result == [20, 30]
Virtual time testing
Aioreactive also provides a virtual time event loop
(VirtualTimeEventLoop) that enables you to write asyncio unit-tests
that run in virtual time. Virtual time means that time is emulated, so
tests run as quickly as possible even if they sleep or awaits long-lived
operations. A test using virtual time still gives the same result as it
would have done if it had been run in real-time.
For example the following test still gives the correct result even if it
takes 0 seconds to run:
@pytest.fixture() def event_loop(): loop = VirtualTimeEventLoop() yield loop loop.close() @pytest.mark.asyncio async def test_call_later(): result = [] def action(value): result.append(value) loop = asyncio.get_event_loop() loop.call_later(10, partial(action, 1)) loop.call_later(1, partial(action, 2)) loop.call_later(5, partial(action, 3)) await asyncio.sleep(10) assert result == [2, 3, 1]
The aioreactive testing module provides a test AsyncSubject that may
delay sending values, and a test AsyncTestObserver that records all
events. These two classes helps you with testing in virtual time.
@pytest.fixture() def event_loop(): loop = VirtualTimeEventLoop() yield loop loop.close() @pytest.mark.asyncio async def test_delay_done(): xs = AsyncSubject() # Test stream async def mapper(value): return value * 10 ys = delay(0.5, xs) lis = AsyncTestObserver() # Test AsyncAnonymousObserver sub = await subscribe_async(ys, lis) await xs.asend_later(0, 10) await xs.asend_later(1, 20) await xs.aclose_later(1) await sub assert lis.values == [ (0.5, OnNext(10)), (1.5, OnNext(20)), (2.5, OnCompleted) ]
Why not use AsyncIterable for everything?
AsyncIterable and AsyncObservable are closely related (in fact they
are duals). AsyncIterable is an async iterable (pull) world, while
AsyncObservable is an async reactive (push) based world. There are
many operations such as map() and filter() that may be simpler to
implement using AsyncIterable, but once we start to include time, then
AsyncObservable really starts to shine. Operators such as delay()
makes much more sense for AsyncObservable than for AsyncIterable.
However, aioreactive makes it easy for you to flip-around to async
iterable just before you need to consume the stream, thus giving you the
best of both worlds.
Will aioreactive replace RxPY?
Aioreactive will not replace RxPY.
RxPY is an implementation of Observable. Aioreactive is an
implementation of AsyncObservable.
Rx and RxPY has hundreds of different query operators, and we currently
have no plans to implementing all of them for aioreactive.
Many ideas from aioreactive have already been ported back into «classic» RxPY.
References
Aioreactive was inspired by:
- AsyncRx — Aioreactive is a direct port of AsyncRx from F#.
- Expression — Functional programming for Python.
- Is it really Pythonic to continue using LINQ operators instead of plain old functions?
- Reactive Extensions (Rx) and RxPY.
- Dart Streams
- Underscore.js.
- itertools and functools.
- dbrattli/OSlash
- kriskowal/q.
License
The MIT License (MIT)
Copyright (c) 2016 Børge Lanes, Dag Brattli.
aioreactive’s People
aioreactive’s Issues
Is there an operator to connect an Observable to an Observer, and if not, which one would you prefer?
Hi Dag,
I think the pipe syntax for chaining operators is really cool, and am missing something similar to complete the chain.
That is, I find myself writing code like xs = source | op.map( f1) | op.map( f2) | ... and then having to complete it with a clunky
async def f():
subscription = await subscribe(xs, AnonymousAsyncObserver(sink_fun))
when I’d much rather just say something like
f = xs > sink_fun
where the > operator means we’re coupling the source with an Observer that can consume it, returning a regular coroutine.
Is there something like that in aioreactive, and if not, do you have any immediate plans to write it?
If the answer to both of the above is ‘no’ , I’ll have a go at writing one this week and would be grateful for any hints 
Thanks a lot!
What is the difference between AsyncObservable and AsyncStream in aioreactive?
Just trying to wrap my head about these concepts.
rx.delay frequently destroys tasks
Hi, i’m unsure if this is the intended behaviour, but rx.delay causes asyncio to frequently throw the following error.
if i’m not mistaken, rx.delay(x: seconds) is supposed to be an operator that delays the parent for x seconds.
this is how i think rx.delay should behave:
def setup(): return pipe( rx.interval(0, 2), rx.flat_map(many), rx.subscribe_async(observer), ) async def observer(x): print(x)
async def delayed(x, y): await asyncio.sleep(y * 0.1) return (x, y) def many(x: int): return rx.merge_seq( [ pipe( delayed(x, 1), rx.from_async, ), pipe( delayed(x, 2), rx.from_async, ), ] )
however, when we use rx.delay instead of calling asyncio.sleep
async def delayed(x, y): return (x, y) def many(x: int): return rx.merge_seq( [ pipe( delayed(x, 1), rx.from_async, rx.delay(0.1), ), pipe( delayed(x, 2), rx.from_async, rx.delay(0.2), ), ] )
the exceptions are thrown
(0, 1)
(0, 2)
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-17' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-18' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
(1, 1)
(1, 2)
(2, 1)
(2, 2)
(3, 1)
(3, 2)
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-43' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-44' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-56' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-57' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
(4, 1)
(4, 2)
Concat with async iterables
I’m trying to adapt the concat operator to implement the catch_exception op, but just noticed that the concat fails with an async iterable. I’m using the following observable:
async def asynciter():
for i in range(5):
await asyncio.sleep(1)
yield i
xs = from_async_iterable(asynciter())
I have a fork with a working catch_exception and retry here, as well as the tests mentioned: [email protected]6219f9a
Observable from an async_generator?
How can I create an AsyncObservable from an async_generator using aioreactive?
async def records():
for i in range(0, 3):
yield i
source = rx.from_(records())
source.subscribe(
on_next=lambda x: logger.info(f"on_next: {x=}"),
on_error=lambda x: logger.error(f"error: {x=}"),
on_completed=lambda: logger.info("completed")
)
results in an Exception: TypeError("'async_generator' object is not iterable")
AttributeError: ‘NoneType’ object has no attribute ‘subscribe_async’ in aioreactive/combine.py», line 99, in update
The complete code is extended , but this is a summary.
This exception does not represent a real problem because my application finishes executing all the functions correctly, and it only happens with the first «asend» of the subject incoming_data.
type_data = Dict[str, Any] incoming_data: rx.AsyncObservable['type_data'] = rx.AsyncSubject() async def webhook(request): payload = await request.json() await incoming_data.asend(payload) return web.Response(status=200) async def on_startup(app): rs: RequestSender = RequestSender() mb: MessageBuilder = MessageBuilder() obs_request_sender = pipe(incoming_data, rx.map(lambda x_d: mb.adapt_message(ticket=x_d[1]["ticket"], model_message=x_d[1]["model_message"], destiny='guazuapp', data=x_d[1])), rx.flat_map_async(lambda x_d: rs.producer(messages_to_send=x_d[1]['messages_to_send']))) await obs_request_sender.subscribe_async(rx.AsyncAnonymousObserver(on_next_obs_request_sender, a_throw )) async def on_next_obs_request_sender(payload): print(payload) async def a_throw(ex: Exception) -> None: print(ex) async def aio_app(): app = web.Application() app.on_startup.append(on_startup) app.add_routes([web.post('/middleware_receptor', webhook)]) return app def main(): port = os.environ.get("PORT", 5055) web.run_app(aio_app(), host="localhost", port=int(port)) if __name__ == "__main__": main()
class RequestSender: n_workers:int = 3 queue = asyncio.Queue() @log_function_runtime async def producer(self, messages_to_send: List[Dict[str, str]]): for dict_message in messages_to_send: self.queue.put_nowait(dict_message) # Create (n_workers:int) worker tasks to process the queue concurrently. tasks = [] session = aiohttp.ClientSession() for i in range(self.n_workers): task = asyncio.create_task(self.request_worker(session)) tasks.append(task) # Wait until the queue is fully processed. await self.queue.join() # Cancel our worker tasks. for task in tasks: task.cancel() # Wait until all worker tasks are cancelled. await asyncio.gather(*tasks, return_exceptions=True) await session.close() @log_function_runtime async def request_worker(self, session): while True: dict_message:Dict[str, str] = await self.queue.get() destiny:str = dict_message.get("destiny") body_message = dict_message.pop(destiny) function_name= destiny await self.request_post(function_name, body_message, session, dict_message) self.queue.task_done() async def request_post(self, function_name: str, body_message: str, session, dict_message:Dict[str, Any]): async with session.post(url=url, headers=headers, data=body_message) as response_post: response = await response_post.text() print(f"response: {response}") return rx.single(response)
This is all the trace I have
Task exception was never retrieved
future: <Task finished name=’Task-4′ coro=<start_immediate..runner() done, defined at /usr/local/lib/python3.10/site-packages/expression/core/aiotools.py:86> exception=AttributeError(«‘NoneType’ object has no attribute ‘subscribe_async'»)>
Traceback (most recent call last):
File «/usr/local/lib/python3.10/site-packages/expression/core/aiotools.py», line 87, in runner
return await computation
File «/usr/local/lib/python3.10/site-packages/aioreactive/combine.py», line 146, in worker
await message_loop(initial_model)
File «/usr/local/lib/python3.10/site-packages/aioreactive/combine.py», line 141, in message_loop
model = await update(msg, model)
File «/usr/local/lib/python3.10/site-packages/aioreactive/combine.py», line 99, in update
inner = await xs.subscribe_async(obv(model.key))
AttributeError: ‘NoneType’ object has no attribute ‘subscribe_async’
Package Version
aiohttp 3.8.1
aiohttp-jinja2 1.5
aioreactive 0.16.0
aiosignal 1.2.0
async-timeout 4.0.2
attrs 22.1.0
autopep8 1.6.0
certifi 2022.6.15
charset-normalizer 2.1.0
expression 2.0.1
frozenlist 1.3.0
greenlet 1.1.2
idna 3.3
Jinja2 3.1.2
MarkupSafe 2.1.1
multidict 6.0.2
mysql-connector-python 8.0.29
pip 22.0.4
protobuf 4.21.4
pycodestyle 2.9.1
requests 2.28.1
ring 0.9.1
Rx 3.2.0
setuptools 58.1.0
six 1.16.0
SQLAlchemy 1.4.39
toml 0.10.2
typing_extensions 4.1.1
urllib3 1.26.11
watchdog 2.1.9
wheel 0.37.1
wirerope 0.4.5
yarl 1.7.2
Latest release?
pypi shows 0.16.0 as the latest release (Mar 22, 2022)
github shows 0.17.0 as the latest tag (Mar 23, 2032)
last master branch commit was Jul 14, 2022
which version should I be using?
buffer operator?
Hi, I’ve been implementing a state management library (http://github.com/nardi/pobx) using RxPY, and am now trying to build an async version using this library. However, in my RxPY implementation I make use of the buffer operator to propagate a bunch of values all at once. Are there any plans to implement this operator here too, or should I try to get my own version running? FWIW, in RxPY buffer is in turn implemented using the window operator, which is also not available here, so it might be a bit of work.
from_iterable broken, or is it my mistake?
I’ve written a merge_sorted function using aioreactive idiom (it returns an async iterable), and it works fine with async for (see attached notebook).
However, when I try to convert it into a source using from_iterable, that doesn’t work at all (also in attached notebook). Is it a bug in from_iterable or am I doing it wrong?
Thanks a lot!
aioreactive merge_sorted.zip
aiohttp streaming POST
So I was playing around with doing http requests, but cannot seem to get this to work:
import logging as log import aiohttp import json import asyncio from collections import deque import re from aioreactive.core import FuncSink, start from aioreactive.producer import Producer, AsyncStream from aioreactive.producer import op from malefico.core import recordio as extract def decode(msg): return json.loads(msg.decode("ascii")) async def subscribe(test): headers = {'content-type': 'application/json'} url = 'http://localhost:5050/api/v1/scheduler' d = { "type": "SUBSCRIBE", "subscribe": { "framework_info": { "user": "username", "name": "Example HTTP Framework" } } } async with aiohttp.ClientSession(headers=headers) as session: async with session.post(url, data=json.dumps( d), timeout=100000) as resp: return Producer.from_iterable(resp.content) async def main(): url = 'http://localhost:5050/api/v1/scheduler' d = { "type": "SUBSCRIBE", "subscribe": { "framework_info": { "user": "username", "name": "Example HTTP Framework" } } } stream = AsyncStream() headers = {'content-type': 'application/json'} xs = (stream | op.flat_map(subscribe) | extract | op.map(decode) ) async with start(xs) as ys: await stream.asend(1) async for value in ys: print(value) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.set_debug(True) asyncio.ensure_future(main(), loop=loop) loop.run_forever() loop.close()
Weird thing is if i move the POST call out of the flat_map say to the actual subscribe call and move Producer.from_iterable(req.content) to another flat_map it starts working.
Any ideas?
Also really great library fun to play with and please let me know if I am using it in a completely wrong way.
Error example autocomplete , web.Application instance initialized with different loop
summarized Traceback.
…
File «/home/denis/Documents/env_aioreactive/lib/python3.9/site-packages/aiohttp/web.py», line 321, in _run_app
await runner.setup()
File «/home/denis/Documents/env_aioreactive/lib/python3.9/site-packages/aiohttp/web_runner.py», line 279, in setup
self._server = await self._make_server()
File «/home/denis/Documents/env_aioreactive/lib/python3.9/site-packages/aiohttp/web_runner.py», line 373, in _make_server
self._app._set_loop(loop)
File «/home/denis/Documents/env_aioreactive/lib/python3.9/site-packages/aiohttp/web_app.py», line 223, in _set_loop
raise RuntimeError(
RuntimeError: web.Application instance initialized with different loop
Process finished with exit code 1
Packages Versions
aiohttp 3.8.1
aiohttp-jinja2 1.5
aioreactive 0.16.0
aiosignal 1.2.0
async-timeout 4.0.2
attrs 22.1.0
charset-normalizer 2.1.0
expression 2.2.0
frozenlist 1.3.0
idna 3.3
Jinja2 3.1.2
MarkupSafe 2.1.1
multidict 6.0.2
pip 21.3.1
setuptools 60.2.0
typing_extensions 4.3.0
wheel 0.37.1
yarl 1.7.2
Errors on examples/autocomplete
Hi.
I tried to run Autocomplete’s sample.
However, it did not work well with the following error.
(I fixed the import part before running.)
WebSocket opened
<CoroWrapper WebSocketResponse.send_str() running at /.../python-3.6.4/lib/python3.6/site-packages/aiohttp/web_ws.py:247, created at /lib/python3.6/asyncio/coroutines.py:85> was never yielded from
Coroutine object created at (most recent call last, truncated to 10 last lines):
File "/.../python-3.6.4/lib/python3.6/site-packages/aioreactive/core/bases.py", line 40, in asend
await self.asend_core(value)
File "/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/.../python-3.6.4/lib/python3.6/site-packages/aioreactive/core/streams.py", line 41, in asend_core
await self._observer.asend(value)
File "/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/.../python-3.6.4/lib/python3.6/site-packages/aioreactive/core/bases.py", line 40, in asend
await self.asend_core(value)
File "/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "/.../python-3.6.4/lib/python3.6/site-packages/aioreactive/core/observers.py", line 98, in asend_core
await self._send(value)
File "/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
return self.gen.send(None)
File "autocomplete.py", line 58, in asend
ws.send_str(value)
File "/lib/python3.6/asyncio/coroutines.py", line 85, in debug_wrapper
return CoroWrapper(gen, None)
I want a sample to pass coroutine to map, so I want to know how to do it normally.
from_iterable crashes with streams of tuples
It seems that debug code is not able to format tuples and namedtuples properly. A snippet that reproduces the problem:
import asyncio from aioreactive.core import subscribe, AsyncAnonymousObserver from aioreactive.core import AsyncObservable from collections import namedtuple async def main(): Foo = namedtuple("Foo", ["name", "value"]) foos = [Foo("one", 1), Foo("two", 2)] xs = AsyncObservable.from_iterable(foos) async def mysink(value): print(value) await subscribe(xs, AsyncAnonymousObserver(mysink)) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
AsyncAnonymousObserver exception was never retrieved
future: <AsyncAnonymousObserver finished
exception=TypeError('not all arguments converted during string formatting',)>
Traceback (most recent call last):
File ".../lib/python3.6/site-packages/aioreactive/operators/from_iterable.py", line 32, in worker
log.debug("sync_worker. asending: %s" % value)
TypeError: not all arguments converted during string formatting
Iterable AsyncMultiStream?
Hi! I’m wondering why AsyncMultiStream doesn’t inherit from AsyncStreamIterable as AsyncSingleStream does? To me it looks pretty reasonable to iterate hot observables because each observer already actually gets individual AsyncSingleStream during subscription.
How can I install this?
Hi there!
This lib looks neat! I’m really excited to give it a try!
Is there any info on how to install this?
I think it would be cool if there was an Install or Getting Started section in README.md. 
Stream Fork
I was wondering if there is a nice way to «fork» a stream, meaning to use the filter operator to get a number of streams from one stream, based on a certain condition for example.
Also please let me know if issues are the wrong place to ask these questions.
Is this project still supported, or has functionality been migrated to RXPY 3.0?
from_iterable does not handle exceptions in iterables
import asyncio from aioreactive.core import AsyncObservable, run async def generator(): # Also fails with sync generators for i in range(10): if i > 2: print("Let's raise") raise ValueError(i) # stream hangs here await asyncio.sleep(.1) print('Yield', i) yield i async def main(): iterable = generator() observer = AsyncObservable.from_iterable(iterable) await run(observer, timeout=None) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
So the flow is simply hangs after the exception is thrown in the generator
Retry operator
Hi,
I was wondering I you could give me a tip on how to implement a retry operator.I tried doing something similar to RXpy, but could not figure it out
Thanks
My first AsyncObservable
First off, thanks you putting this library together. I’ve been playing around with it and it’s been real slick so far.
In light of that, I think I’m not understanding something. I’m trying to create an Observable and I’m wondering if I’m missing something. Here’s what I’m doing right now:
class MyFirstObservable(AsyncObservable):
def __init__(self, delay):
self.observers = []
async def __asubscribe__(self, observer):
self.observers += [observer]
Now, when Observers subscribe, I will add them to my list. But it feels like maintaining a list of who is subscribed should be built in? When I have events to pass out, I will iterate through my observers and give them events to check out, but I’m just wondering what the intent was in not aggregating observers by default?
README unclear
See screenshot: Readme suggests to use subscribe_async but actually drops the _async part in the example. Is this intended?
Python 3.11 support?
pip install using python 3.11 returns the following error:
ERROR: Ignored the following versions that require a different python version: 0.16.0 Requires-Python >=3.9,<3.11
ERROR: Could not find a version that satisfies the requirement aioreactive==0.17.0 (from versions: 0.2.0, 0.2.1, 0.3.0, 0.5.0, 0.6.0, 0.7.0, 0.8.0, 0.9.0, 0.10.0, 0.11.0, 0.12.0, 0.13.0, 0.14.0, 0.15.0)
ERROR: No matching distribution found for aioreactive==0.17.0
Implementing reduce
Hello,
Is there a reduce operator or not yet?
If not, let me know and I will try to implement it.
documentation for creating observable streams
I’ve got a functions:
def get_producer(): tool = Tool() async def start(listener): def on_event(event): listener(event) tool.on_scan_result = on_event tool.start_scan() await tool.transport.drain() async def stop(): tool.stop_scanning() await tool.transport.drain() return { "start": start, "stop": stop } # my old api. return AsyncObservable( # what goes here? )
And I’d like to be able to turn it into an observable stream, that automatically starts/stops scanning when the obserables are added/removed but I’m not sure how.
Cannot run filter example
"""Example to show how to split a stream into two substreams.""" import asyncio from aioreactive.core import subscribe, AsyncAnonymousObserver from aioreactive.core import AsyncObservable from aioreactive.operators import pipe as op async def main(): xs = AsyncObservable.from_iterable(range(10)) # Split into odds and evens odds = xs | op.filter(lambda x: x % 2 == 1) evens = xs | op.filter(lambda x: x % 2 == 0) async def mysink(value): print(value) await subscribe(odds, AsyncAnonymousObserver(mysink)) await subscribe(evens, AsyncAnonymousObserver(mysink)) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
upon running:
AttributeError: module ‘aioreactive.operators.pipe’ has no attribute ‘filter’
split.py example fails with InvalidStateError
- Create a new Python 3.9 environment.
- pip install aioreactive
- Download and run split.py from examples/streams
Expected result: see values being printed out.
Actual result:
Exception in callback MailboxProcessor.__process_events()
handle: <Handle MailboxProcessor.__process_events()>
Traceback (most recent call last):
File "C:UsersIan.condaenvsaioreactivelibasyncioevents.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "C:UsersIan.condaenvsaioreactivelibsite-packagesexpressioncoremailbox.py", line 152, in __process_events
cont(msg)
File "C:UsersIan.condaenvsaioreactivelibsite-packagesexpressioncoreaiotools.py", line 45, in done
future.set_result(value)
asyncio.exceptions.InvalidStateError: invalid state
Exception in callback MailboxProcessor.__process_events()
handle: <Handle MailboxProcessor.__process_events()>
Traceback (most recent call last):
File "C:UsersIan.condaenvsaioreactivelibasyncioevents.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "C:UsersIan.condaenvsaioreactivelibsite-packagesexpressioncoremailbox.py", line 152, in __process_events
cont(msg)
File "C:UsersIan.condaenvsaioreactivelibsite-packagesexpressioncoreaiotools.py", line 45, in done
future.set_result(value)
asyncio.exceptions.InvalidStateError: invalid state
Is this project abandoned?
I would rather use this library than RxPY, so if you’re not planning to develop it anymore I would be interested in continuing it 
Errors with adispose()
<CoroWrapper Merge.Sink.cancel() running at /usr/local/lib/python3.6/site-packages/aioreactive/operators/merge.py:36, created at /usr/local/lib/python3.6/asyncio/coroutines.py:84> was never yielded from
Coroutine object created at (most recent call last):
...
File "/usr/local/lib/python3.6/site-packages/aioreactive/core/disposables.py", line 24, in adispose
await disposable.adispose()
File "/usr/local/lib/python3.6/asyncio/coroutines.py", line 109, in __next__
return self.gen.send(None)
File "/usr/local/lib/python3.6/site-packages/aioreactive/core/disposables.py", line 14, in adispose
await self._dispose()
File "/usr/local/lib/python3.6/asyncio/coroutines.py", line 109, in __next__
return self.gen.send(None)
File "/usr/local/lib/python3.6/site-packages/aioreactive/core/streams.py", line 66, in adispose
self.cancel()
File "/usr/local/lib/python3.6/asyncio/coroutines.py", line 84, in debug_wrapper
return CoroWrapper(gen, None)
Are there any more radical interface changes coming such as those of 27 days ago?
Dag,
the wave of commits 27 days ago aligned the API more with RxPY and introduced quite a number of breaking API changes (subscribe instead of start, etc).
Could you please tell whether you plan any more major changes like that in the near future, and when do you plan to push this last wave of changes onto PyPI?
Thanks a lot!


