使用多线程 + 多处理的 Python 日志记录
Posted
技术标签:
【中文标题】使用多线程 + 多处理的 Python 日志记录【英文标题】:Python logging with multithreading + multiprocessing 【发布时间】:2022-01-03 05:26:11 【问题描述】:请花时间阅读完整的问题以了解确切的问题。谢谢。
我有一个运行程序/驱动程序,它监听 Kafka 主题并在收到有关该主题的新消息时使用 ThreadPoolExecuter
调度任务(如下所示):
consumer = KafkaConsumer(CONSUMER_TOPIC, group_id='ME2',
bootstrap_servers=[f"KAFKA_SERVER_HOST:KAFKA_SERVER_PORT"],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
enable_auto_commit=False,
auto_offset_reset='latest',
max_poll_records=1,
max_poll_interval_ms=300000)
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for message in consumer:
futures.append(executor.submit(SOME_FUNCTION, ARG1, ARG2))
中间有一堆代码,但这段代码在这里并不重要,所以我跳过了。
现在,SOME_FUNCTION 来自另一个导入的 python 脚本(事实上,在后期阶段有一个导入层次结构)。重要的是,在这些脚本中的某个时刻,我调用了Multiprocessing
池,因为我需要对数据(SIMD - 单指令多数据)进行并行处理并使用 apply_async 函数来执行此操作。
for loop_message_chunk in loop_message_chunks:
res_list.append(self.pool.apply_async(self.one_matching.match, args=(hash_set, loop_message_chunk, fields)))
现在,我有 2 个版本的 runner/driver 程序:
基于Kafka(如上图)
此版本生成启动多处理的线程听 Kafka -> 启动线程 -> 启动多处理
基于 REST(使用烧瓶通过 REST 调用实现相同的任务)
此版本不启动任何线程并立即调用多处理监听 REST 端点 -> 启动多处理
您为什么要问 2 个跑步者/驱动程序脚本? - 这个微服务将被多个团队使用,有些团队想要基于同步 REST,而有些团队想要一个基于 KAFKA 的实时异步系统
当我从并行函数(上面示例中的self.one_matching.match
)进行日志记录时,它在通过 REST 版本调用时有效,但在使用 KAFKA 版本调用时无效(基本上当多处理由线程启动时 - 它不起作用)。
还要注意,只有并行函数的日志记录不起作用。从运行器到调用 apply_async 的脚本的层次结构中的其余脚本(包括从线程内调用的脚本)成功记录。
其他细节:
我使用 yaml 文件配置记录器 我在运行脚本本身中为 KAFKA 或 REST 版本配置记录器 我在运行器脚本之后调用的所有其他脚本中执行logging.getLogger
,以让特定记录器记录到不同的文件
记录器配置(值替换为泛型,因为我无法获取确切名称):
version: 1
formatters:
simple:
format: '%(asctime)s | %(name)s | %(filename)s : %(funcName)s : %(lineno)d | %(levelname)s :: %(message)s'
custom1:
format: '%(asctime)s | %(filename)s :: %(message)s'
time-message:
format: '%(asctime)s | %(message)s'
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple
stream: ext://sys.stdout
handler1:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 5
formatter: simple
level: DEBUG
filename: logs/logfile1.log
handler2:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: custom1
level: INFO
filename: logs/logfile2.log
handler3:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: time-message
level: DEBUG
filename: logs/logfile3.log
handler4:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: time-message
level: DEBUG
filename: logs/logfile4.log
handler5:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 5
formatter: simple
level: DEBUG
filename: logs/logfile5.log
loggers:
logger1:
level: DEBUG
handlers: [console, handler1]
propagate: no
logger2:
level: DEBUG
handlers: [console, handler5]
propagate: no
logger3:
level: INFO
handlers: [handler2]
propagate: no
logger4:
level: DEBUG
handlers: [console, handler3]
propagate: no
logger5:
level: DEBUG
handlers: [console, handler4]
propagate: no
kafka:
level: WARNING
handlers: [console]
propogate: no
root:
level: INFO
handlers: [console]
propogate: no
【问题讨论】:
我不知道我能回答为什么日志不能从一个线程启动的进程中工作,因为我希望它能够正常工作(大部分时间),并且然后有时会出现死锁(回复:6721)。我认为你可以摆脱线程但是使用aiokafka 在主(唯一)线程中创建一个 ProcessPoolExecutor,并根据需要从事件循环向它提交任务:docs.python.org/3/library/… 如果你想保持SOME_FUNCTION
不变(每次调用创建它自己的池而不是回调到全局ProcessPoolExecutor),它仍然应该以相同的方式工作。我只是在想,不继续创建和销毁单独的独立池可能会减少总开销。
似乎最简单的方法是使用带有logrotate的syslog,否则你需要在单独的进程中使用QueueListener和QueueHandler之类的东西,或者使用flask logger和你的kafka logger来登录不同的文件。
您难道不知道普通的日志记录不能很好地用于多处理吗?如果子进程是fork
ed,它可能会起作用,但如果它们是spawn
ed,则不会。 QueueHandler 可能还不够,你需要 SocketHandler 来确定。您可以阅读此线程以了解更多信息***.com/questions/64335940/…
【参考方案1】:
可能的答案:摆脱线程并改用 asyncio
示例伪代码结构(由theseexamples拼凑而成)
#pseudocode example structure: probably has bugs...
from aiokafka import AIOKafkaConsumer
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
async def SOME_FUNCTION_CO(executor, **kwargs):
res_list = []
for loop_message_chunk in loop_message_chunks:
res_list.append(executor.submit(self.one_matching.match, hash_set, loop_message_chunk, fields))
#call concurrent.futures.wait on res_list later, and cancel unneeded futures (regarding one of your prior questions)
return res_list
async def consume():
consumer = AIOKafkaConsumer(
'my_topic', 'my_other_topic',
bootstrap_servers='localhost:9092',
group_id="my-group")
# Get cluster layout and join group `my-group`
await consumer.start()
#Global executor:
#I would also suggest using a "spawn" context unless you really need the
#performance of "fork".
ctx = multiprocessing.get_context("spawn")
tasks = [] #similar to futures in your example (Task subclasses asyncio.Future which is similar to concurrent.futures.Future as well)
with ProcessPoolExecutor(mp_context=ctx) as executor:
try:
# Consume messages
async for msg in consumer:
tasks.append(asyncio.create_task(SOME_FUNCTION_CO(executor, **kwargs)))
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
if __name__ == "__main__":
asyncio.run(consume())
我一直在反复讨论我认为在这个例子中我应该如何表示SOME_FUNCTION
,但这里的关键点是在msg in consumer
的循环中,您正在安排任务最终完成 。如果这些任务中的任何一个需要很长时间,它可能会阻塞主循环(它也在运行async for msg in consumer
行)。反而;这些可能需要很长时间的任务中的任何一个都应该快速返回某种类型的未来,这样您就可以在结果准备好后轻松访问它。
【讨论】:
【参考方案2】:首先,我没有使用完全相同的堆栈。我正在使用 fastaapi 和 Redis pubsub,现在为烧瓶和 Kafka 复制它会很乏味。我认为原则上它应该以同样的方式工作。至少它可能会指出您的代码中存在一些错误配置。另外,我正在对记录器配置进行硬编码。
很抱歉粘贴了很多代码,但我想提供一个完整的工作示例,也许我在你的描述中遗漏了一些东西,你没有提供一个最小的工作示例。
我有四个文件:
app.py (fastapi application)
config.py (setup config variables and logger)
redis_ps (redis consumer/listener)
utils (processing function (some_function), redis publish function)
和redis容器
docker pull redis
运行
docker run --restart unless-stopped --publish 6379:6379 --name redis -d redis
python3 app.py (will run server and pubsub listener)
python3 utils.py (will publish message over pubsub)
curl -X 'POST' \
'http://0.0.0.0:5000/sync' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '[[2,4],[6, 8]]'
输出
[2021-12-08 17:54:32,688] DEBUG in utils: Run some_function, caller: pubsub
[2021-12-08 17:54:32,688] DEBUG in utils: Run some_function, caller: pubsub
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 1, result 1
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 3, result 9
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 5, result 25
[2021-12-08 17:54:32,698] DEBUG in utils: caller: pubsub, Processing 7, result 49
[2021-12-08 17:54:39,519] DEBUG in utils: Run some_function, caller: rest api
[2021-12-08 17:54:39,520] DEBUG in utils: Run some_function, caller: rest api
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 8, result 64
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 6, result 36
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 2, result 4
[2021-12-08 17:54:39,531] DEBUG in utils: caller: rest api, Processing 4, result 16
源代码
app.py
from concurrent import futures
from typing import List
import uvicorn
from fastapi import FastAPI, APIRouter
from redis_ps import PubSubWorkerThreadListen
from utils import some_function
router = APIRouter()
@router.post("/sync")
def sync_process(data: List[List[int]]):
with futures.ThreadPoolExecutor(max_workers=2) as executor:
future_all = [executor.submit(some_function, loop_message_chunks=d, caller="rest api") for d in data]
return [future.result() for future in future_all]
def create_app():
app = FastAPI(title="app", openapi_url="/openapi.json", docs_url="/")
app.include_router(router)
thread = PubSubWorkerThreadListen()
thread.start()
return app
if __name__ == "__main__":
_app = create_app()
uvicorn.run(_app, host="0.0.0.0", port=5000, debug=True, log_level="debug")
config.py
import sys
import logging
COMPONENT_NAME = "test_logger"
REDIS_URL = "redis://localhost:6379"
def setup_logger(logger_name: str, log_level=logging.DEBUG, fmt: logging.Formatter = None):
fmt = fmt or logging.Formatter("[%(asctime)s] %(levelname)s in %(module)s: %(message)s")
handler = logging.StreamHandler(sys.stdout)
handler.name = "h_console"
handler.setFormatter(fmt)
handler.setLevel(log_level)
logger_ = logging.getLogger(logger_name)
logger_.addHandler(handler)
logger_.setLevel(log_level)
return logger_
setup_logger(COMPONENT_NAME)
redis.ps
import json
import logging
import threading
import time
from concurrent import futures
from typing import Dict, List, Union
import redis
from config import COMPONENT_NAME, REDIS_URL
from utils import some_function
logger = logging.getLogger(COMPONENT_NAME)
class PubSubWorkerThreadListen(threading.Thread):
def __init__(self):
super().__init__()
self._running = threading.Event()
@staticmethod
def connect_pubsub() -> redis.client.PubSub:
while True:
try:
r = redis.Redis.from_url(REDIS_URL)
p = r.pubsub()
p.psubscribe(["*:*:*"])
logger.info("Connected to Redis")
return p
except Exception:
time.sleep(0.1)
def run(self):
if self._running.is_set():
return
self._running.set()
while self._running.is_set():
p = self.connect_pubsub()
try:
listen(p)
except Exception as e:
logger.error(f"Failed to process Redis message or failed to connect: e")
time.sleep(0.1)
def stop(self):
self._running.clear()
def get_data(msg) -> Union[Dict, List]:
data = msg.get("data")
if isinstance(data, int):
# the first message has 'data': 1
return []
try:
return json.loads(data)
except Exception as e:
logger.warning("Failed to parse data in the message (%s) with error %s", msg, e)
return []
def listen(p_):
logger.debug("Start listening")
while True:
for msg_ in p_.listen():
data = get_data(msg_)
if data:
with futures.ThreadPoolExecutor(max_workers=2) as executor:
future_all = [executor.submit(some_function, loop_message_chunks=d, caller="pubsub") for d in data]
[future.result() for future in future_all]
utils.py
import json
import logging
from multiprocessing import Pool
from typing import List
import redis
from config import COMPONENT_NAME, REDIS_URL
logger = logging.getLogger(COMPONENT_NAME)
def one_matching(v, caller: str = ""):
logger.debug(f"caller: caller, Processing v, result v*v")
return v * v
def some_function(loop_message_chunks: List[int], caller: str):
logger.debug(f"Run some_function, caller: caller")
with Pool(2) as pool:
v = [pool.apply_async(one_matching, args=(i, caller)) for i in loop_message_chunks]
res_list = [res.get(timeout=1) for res in v]
return res_list
def publish():
data = [[1, 3], [5, 7]]
r_ = redis.Redis.from_url(REDIS_URL)
logger.debug("Published message %s %s", "test", data)
r_.publish("test:test:test", json.dumps(data).encode())
if __name__ == "__main__":
publish()
【讨论】:
以上是关于使用多线程 + 多处理的 Python 日志记录的主要内容,如果未能解决你的问题,请参考以下文章
Python-logging详解(彩色日志扩展,多进程安全等)
Python-logging详解(彩色日志扩展,多进程安全等)
在 Windows 中使用多处理进行 Python 日志记录
python多处理日志记录:带有RotatingFileHandler的QueueHandler“文件被另一个进程使用”错误