我是不是正确使用了python池?
Posted
技术标签:
【中文标题】我是不是正确使用了python池?【英文标题】:Am I using python pooling properly?我是否正确使用了python池? 【发布时间】:2021-10-03 07:21:18 【问题描述】:我有一个非常简单的 Python 脚本,它从列表(6K+ 长)中读取一个股票代码,并获取一些数据来标记交易日期间的异常交易量。
如果我只是循环遍历代码文件中的每一行,则需要数小时才能运行。
基于谷歌搜索,我找到了这种多处理的粗略示例,并决定尝试实现它。
当我运行脚本时,它运行得更快,但也导致了一些我无法弄清楚的非常奇怪的问题。有时我会收到一个 redis 断路器错误,或者有时它会在代码文件的末尾附近停止并挂起。
有什么想法吗?
import yfinance as yf
import multiprocessing
import time
import logging
file = open("C:\\Users\\miner\\Desktop\\unusual.txt", 'w')
def main():
read_ticker_file()
def read_ticker_file():
file1 = open("C:\\Users\\miner\\Desktop\\tickers.txt", 'r')
lines = file1.readlines()
count = 0
ticker_arr = []
for line in lines:
count += 1
line = line.strip('\n')
line = line.strip()
ticker_arr.append(line)
return ticker_arr
def get_historical_data(symbol):
yahoo_ticker = yf.Ticker(symbol)
historical = yf.download(symbol, period="max", interval="1d")
average_volume_arr = historical['Volume']
try:
current_volume = yahoo_ticker.info['volume']
sum_volume = 0
for volume in average_volume_arr:
sum_volume += volume
average_volume = sum_volume / len(average_volume_arr)
if current_volume > average_volume:
volume_over_average = (current_volume - average_volume) / average_volume
volume_over_average = ":.2%".format(volume_over_average)
unusual_volume = (symbol + " - " + str(volume_over_average))
print(unusual_volume)
write_to_file(unusual_volume)
except Exception as e:
print(e)
def write_to_file(data):
file.writelines(data + "\n")
if __name__ == '__main__':
# start = time.time()
inputs = read_ticker_file()
pool = multiprocessing.Pool(processes=20)
pool.map(get_historical_data, inputs)
pool.close()
pool.join()
# end = time.time()
# print(start - end)
【问题讨论】:
我假设你的程序的瓶颈是下载?您是否总是了解您提到的问题?是否有可能您发送了太多查询并被目标服务器阻止? @André 雅虎没有太多支持或文档可以肯定地说,但我不这么认为。我没有看到任何特定于服务器的错误。 我的另外两个问题呢?下载是瓶颈吗?运行代码时是否总是出现这些问题? @André 如果我让它运行足够长的时间最终我总是得到 Redis CircuitBreaker 错误。据我所知,这与雅虎 api 无关。下载可能是瓶颈,是的 每当您使用multiprocessing
标记问题时,您也应该使用平台标记问题,例如windows
或linux
。我是否正确推断您在 Windows 下运行?如果是这样,池中的每个进程(全部 20 个)都将执行自己的 file = open("C:\\Users\\miner\\Desktop\\unusual.txt", 'w')
实例。因此,您将有 20 个进程打开以进行输出并并行写入同一个文件。这不是一件好事。如果是Linux,那么池中的每个进程都会继承已经打开的文件描述符,情况也好不了多少。
【参考方案1】:
正如我在上面的评论中提到的,我不相信您正确地处理了对unusual.txt
的输出。以下至少应该通过让您的工作函数将记录或None
返回到主进程进行写入来纠正该问题。我正在使用方法imap
而不是map
,这样我就可以在返回值时懒惰地处理它们。它们现在也将按照它们出现在输入文件中的符号顺序排列。如果输入文件有大量符号,我们不应该使用默认的chunksize参数,所以我提供了一个函数来计算一个合适的值。
import yfinance as yf
import multiprocessing
import time
def read_ticker_file():
with open("C:\\Users\\miner\\Desktop\\tickers.txt", 'r') as f:
return [line.strip() for line in f]
def get_historical_data(symbol):
yahoo_ticker = yf.Ticker(symbol)
historical = yf.download(symbol, period="max", interval="1d")
average_volume_arr = historical['Volume']
try:
current_volume = yahoo_ticker.info['volume']
sum_volume = 0
for volume in average_volume_arr:
sum_volume += volume
average_volume = sum_volume / len(average_volume_arr)
if current_volume > average_volume:
volume_over_average = (current_volume - average_volume) / average_volume
volume_over_average = ":.2%".format(volume_over_average)
unusual_volume = (symbol + " - " + str(volume_over_average))
print(unusual_volume)
return unusual_volume
else:
return None
except Exception as e:
print(e)
return None
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, 4 * pool_size)
if remainder:
chunksize += 1
return chunksize
if __name__ == '__main__':
# start = time.time()
inputs = read_ticker_file()
pool = multiprocessing.Pool(processes=20)
chunksize = compute_chunksize(len(inputs), 20)
results = pool.imap(get_historical_data, inputs, chunskize=chunksize)
with open("C:\\Users\\miner\\Desktop\\unusual.txt", 'w') as f:
for result in results:
if result:
print(result, file=f)
# end = time.time()
# print(start - end)
另一种方法
再说一次,除了你写给 unusual.txt 的问题之外,这不一定能解决你的问题,上面的代码也应该处理。但这是我编写解决方案并从那里开始工作的方式:
我在这里“一筹莫展”,不知道 tickers.txt 文件有多大,也不知道 yfinance
包有多大。但似乎相当明显的是,对 yf.download
的调用和写入 unusual.txt 的文件,我已经在我上面的评论中指出,我认为处理不正确,是我/ O 绑定的“进程”不能由多线程池处理。目前尚不清楚剩下的内容,即 current_volume
与 average_volume
的计算和比较是 CPU 密集型的,足以证明使用多处理来执行这些计算的开销是合理的。
下面将完成所有下载和计算的单个函数get_historical_data
拆分为两个函数load_historical_data_and_process
和process_data
。同时创建了一个大型多线程池和多处理池。 tickers.txt 中的每个符号都使用带有函数imap
的多线程池调用工作函数load_historical_data_and_process
,这是map
的“更懒惰”版本。即在文件较大的情况下,不必将所有符号读入内存,先建立map
所需的列表;可以使用生成器函数。即使文件很小,使用imap
也没有真正的缺点。 load_historical_data_and_process
将完成所有必要的下载。为了进行计算,它将使用通过阻塞方法apply
传递给它的多线程池来调用工作函数process_data
。通过直接调用函数process_data
而不是使用多处理池来获得替代时间会很有趣。当然,在这种情况下,由于争用全局解释器锁,在执行process_data
时跨线程实现的并发性非常少。但是,根据process_data
的执行涉及多少实际 CPU(我无法知道),您不必跨进程边界传递参数和结果所节省的 CPU 可能会发生偏移。
import yfinance as yf
from multiprocessing.pool import ThreadPool, Pool
from functools import partial
import time
def get_symbols():
with open("C:\\Users\\miner\\Desktop\\tickers.txt", 'r') as file1:
for line in file1:
yield line.strip()
def load_historical_data_and_process(multiprocessing_pool, symbol):
""" What I believe is I/O-intensive and so this runs in a multithreading pool: """
try:
historical = yf.download(symbol, period="max", interval="1d")
yahoo_ticker = yf.Ticker(symbol)
current_volume = yahoo_ticker.info['volume']
# To call directly:
#return process_data(symbol, historical, current_volume)
return multiprocessing_pool.apply(process_data, args=(symbol, historical, current_volume))
except Exception as e:
print(e)
return None
def process_data(symbol, historical, current_volume):
""" What I believe may warrant running in a multiprocessing pool: """
average_volume_arr = historical['Volume']
sum_volume = 0
for volume in average_volume_arr:
sum_volume += volume
average_volume = sum_volume / len(average_volume_arr)
if current_volume > average_volume:
volume_over_average = (current_volume - average_volume) / average_volume
volume_over_average = ":.2%".format(volume_over_average)
unusual_volume_record = (symbol + " - " + str(volume_over_average))
print(unusual_volume_record, flush=True)
return unusual_volume_record
else:
return None
if __name__ == '__main__':
# start = time.time()
# or some suitable thread pool size:
with Pool(processes=20) as multiprocessing_pool, ThreadPool(processes=100) as thread_pool:
# pass multiprocessing pool to thread pool worker get_historical_data for CPU-intensive processing
worker = partial(load_historical_data_and_process, multiprocessing_pool)
results = thread_pool.imap(worker, get_symbols())
with open("C:\\Users\\miner\\Desktop\\unusual.txt", 'w') as f:
for result in results:
if result:
print(result, file=f)
# end = time.time()
# print(start - end)
【讨论】:
以上是关于我是不是正确使用了python池?的主要内容,如果未能解决你的问题,请参考以下文章