如何在多线程应用程序中使用 aiopg 池?
Posted
技术标签:
【中文标题】如何在多线程应用程序中使用 aiopg 池?【英文标题】:How to use aiopg pool in multi-threaded application? 【发布时间】:2015-08-17 12:51:52 【问题描述】:我有一个 python 3.4.3、postgreSQL 9.4、aiopg-0.7.0。多线程应用程序的示例取自此站点。如何使用游泳池?执行select时线程挂起。
import time
import asyncio
import aiopg
import functools
from threading import Thread, current_thread, Event
from concurrent.futures import Future
class B(Thread):
def __init__(self, start_event):
Thread.__init__(self)
self.loop = None
self.tid = None
self.event = start_event
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.tid = current_thread()
self.loop.call_soon(self.event.set)
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
def _async_add(func, fut):
try:
ret = func()
fut.set_result(ret)
except Exception as e:
fut.set_exception(e)
f = functools.partial(asyncio.async, coro, loop=self.loop)
if current_thread() == self.tid:
return f() # We can call directly if we're not going between threads.
else:
# We're in a non-event loop thread so we use a Future
# to get the task from the event loop thread once
# it's ready.
fut = Future()
self.loop.call_soon_threadsafe(_async_add, f, fut)
return fut.result()
def cancel_task(self, task):
self.loop.call_soon_threadsafe(task.cancel)
@asyncio.coroutine
def test(pool, name_task):
while True:
print(name_task, 'running')
with (yield from pool.cursor()) as cur:
print(name_task, " select. ")
yield from cur.execute("SELECT count(*) FROM test")
count = yield from cur.fetchone()
print(name_task, ' Result: ', count)
yield from asyncio.sleep(3)
@asyncio.coroutine
def connect_db():
dsn = 'dbname=%s user=%s password=%s host=%s' % ('testdb', 'user', 'passw', '127.0.0.1')
pool = yield from aiopg.create_pool(dsn)
print('create pool type =', type(pool))
# future.set_result(pool)
return (pool)
event = Event()
b = B(event)
b.start()
event.wait() # Let the loop's thread signal us, rather than sleeping
loop_db = asyncio.get_event_loop()
pool = loop_db.run_until_complete(connect_db())
time.sleep(2)
t = b.add_task(test(pool, 'Task1')) # This is a real task
t = b.add_task(test(pool, 'Task2'))
while True:
time.sleep(10)
b.stop()
'yield from cur.execute("SELECT count(*) FROM test")'中不返回结果
【问题讨论】:
可能有帮助:pylover.dobisel.com/posts/aiopg-aiopg_sa-and-aiopg8000 【参考方案1】:长话短说:您不能从不同的事件循环共享 aiopg 池对象。
每个aiopg.Pool
都与事件循环耦合。如果您没有明确指定loop
参数,则它取自asyncio.get_event_loop()
调用。
因此,在您的示例中,您有一个从主线程耦合到事件循环的池。
当您从单独的线程执行数据库查询时,您试图通过执行线程的循环而不是主循环来完成它。它不起作用。
【讨论】:
以上是关于如何在多线程应用程序中使用 aiopg 池?的主要内容,如果未能解决你的问题,请参考以下文章