如何在多线程应用程序中使用 aiopg 池?

Posted

技术标签:

【中文标题】如何在多线程应用程序中使用 aiopg 池?【英文标题】:How to use aiopg pool in multi-threaded application? 【发布时间】:2015-08-17 12:51:52 【问题描述】:

我有一个 python 3.4.3、postgreSQL 9.4、aiopg-0.7.0。多线程应用程序的示例取自此站点。如何使用游泳池?执行select时线程挂起。

import time
import asyncio
import aiopg
import functools
from threading import Thread, current_thread, Event
from concurrent.futures import Future

class B(Thread):
   def __init__(self, start_event):
       Thread.__init__(self)
       self.loop = None
       self.tid = None
       self.event = start_event

   def run(self):
       self.loop = asyncio.new_event_loop()
       asyncio.set_event_loop(self.loop)
       self.tid = current_thread()
       self.loop.call_soon(self.event.set)
       self.loop.run_forever()

   def stop(self):
       self.loop.call_soon_threadsafe(self.loop.stop)

   def add_task(self, coro):
       """this method should return a task object, that I
         can cancel, not a handle"""
      def _async_add(func, fut):
          try:
              ret = func()
              fut.set_result(ret)
          except Exception as e:
              fut.set_exception(e)

       f = functools.partial(asyncio.async, coro, loop=self.loop)
       if current_thread() == self.tid:
           return f() # We can call directly if we're not going between threads.
       else:
           # We're in a non-event loop thread so we use a Future
           # to get the task from the event loop thread once
           # it's ready.
           fut = Future()
           self.loop.call_soon_threadsafe(_async_add, f, fut)
           return fut.result()

   def cancel_task(self, task):
       self.loop.call_soon_threadsafe(task.cancel)


@asyncio.coroutine
def test(pool, name_task):
    while True:
        print(name_task, 'running')
        with (yield from pool.cursor()) as cur:
            print(name_task, " select. ")
            yield from cur.execute("SELECT count(*) FROM test")
            count = yield from cur.fetchone()
            print(name_task, ' Result: ', count)
        yield from asyncio.sleep(3)

@asyncio.coroutine
def connect_db():
    dsn = 'dbname=%s user=%s password=%s host=%s' % ('testdb', 'user', 'passw', '127.0.0.1')
    pool = yield from aiopg.create_pool(dsn)
    print('create pool type =', type(pool))
    # future.set_result(pool)
    return (pool)

event = Event()
b = B(event)
b.start()
event.wait() # Let the loop's thread signal us, rather than sleeping
loop_db = asyncio.get_event_loop()
pool = loop_db.run_until_complete(connect_db())
time.sleep(2)
t = b.add_task(test(pool, 'Task1'))  # This is a real task
t = b.add_task(test(pool, 'Task2'))

while True:
    time.sleep(10)

b.stop()

'yield from cur.execute("SELECT count(*) FROM test")'中不返回结果

【问题讨论】:

可能有帮助:pylover.dobisel.com/posts/aiopg-aiopg_sa-and-aiopg8000 【参考方案1】:

长话短说:您不能从不同的事件循环共享 aiopg 池对象。

每个aiopg.Pool 都与事件循环耦合。如果您没有明确指定loop 参数,则它取自asyncio.get_event_loop() 调用。

因此,在您的示例中,您有一个从主线程耦合到事件循环的池。

当您从单独的线程执行数据库查询时,您试图通过执行线程的循环而不是主循环来完成它。它不起作用。

【讨论】:

以上是关于如何在多线程应用程序中使用 aiopg 池?的主要内容,如果未能解决你的问题,请参考以下文章

线程池

多线程开发,先学会线程池吧

Java线程池的使用及工作原理

python进程线程协程以及几种自定义线程池

如何优雅的使用线程池

如何在多线程或基于 wcf 服务的应用程序中正确使用事件?