我是不是正确使用了python池?

Posted

技术标签:

【中文标题】我是不是正确使用了python池?【英文标题】:Am I using python pooling properly?我是否正确使用了python池? 【发布时间】:2021-10-03 07:21:18 【问题描述】:

我有一个非常简单的 Python 脚本,它从列表(6K+ 长)中读取一个股票代码,并获取一些数据来标记交易日期间的异常交易量。

如果我只是循环遍历代码文件中的每一行,则需要数小时才能运行。

基于谷歌搜索,我找到了这种多处理的粗略示例,并决定尝试实现它。

当我运行脚本时,它运行得更快,但也导致了一些我无法弄清楚的非常奇怪的问题。有时我会收到一个 redis 断路器错误,或者有时它会在代码文件的末尾附近停止并挂起。

有什么想法吗?

import yfinance as yf
import multiprocessing
import time
import logging

file = open("C:\\Users\\miner\\Desktop\\unusual.txt", 'w')


def main():
    read_ticker_file()


def read_ticker_file():
    file1 = open("C:\\Users\\miner\\Desktop\\tickers.txt", 'r')
    lines = file1.readlines()

    count = 0

    ticker_arr = []

    for line in lines:
        count += 1
        line = line.strip('\n')
        line = line.strip()
        ticker_arr.append(line)

    return ticker_arr


def get_historical_data(symbol):
    yahoo_ticker = yf.Ticker(symbol)
    historical = yf.download(symbol, period="max", interval="1d")
    average_volume_arr = historical['Volume']
    try:
        current_volume = yahoo_ticker.info['volume']
        sum_volume = 0
        for volume in average_volume_arr:
            sum_volume += volume
        average_volume = sum_volume / len(average_volume_arr)
        if current_volume > average_volume:
            volume_over_average = (current_volume - average_volume) / average_volume
            volume_over_average = ":.2%".format(volume_over_average)
            unusual_volume = (symbol + " - " + str(volume_over_average))
            print(unusual_volume)
            write_to_file(unusual_volume)
    except Exception as e:
        print(e)


def write_to_file(data):
    file.writelines(data + "\n")


if __name__ == '__main__':
    # start = time.time()
    inputs = read_ticker_file()

    pool = multiprocessing.Pool(processes=20)
    pool.map(get_historical_data, inputs)
    pool.close()
    pool.join()
    # end = time.time()
    # print(start - end)

【问题讨论】:

我假设你的程序的瓶颈是下载?您是否总是了解您提到的问题?是否有可能您发送了太多查询并被目标服务器阻止? @André 雅虎没有太多支持或文档可以肯定地说,但我不这么认为。我没有看到任何特定于服务器的错误。 我的另外两个问题呢?下载是瓶颈吗?运行代码时是否总是出现这些问题? @André 如果我让它运行足够长的时间最终我总是得到 Redis CircuitBreaker 错误。据我所知,这与雅虎 api 无关。下载可能是瓶颈,是的 每当您使用multiprocessing 标记问​​题时,您也应该使用平台标记问题,例如windowslinux。我是否正确推断您在 Windows 下运行?如果是这样,池中的每个进程(全部 20 个)都将执行自己的 file = open("C:\\Users\\miner\\Desktop\\unusual.txt", 'w') 实例。因此,您将有 20 个进程打开以进行输出并并行写入同一个文件。这不是一件好事。如果是Linux,那么池中的每个进程都会继承已经打开的文件描述符,情况也好不了多少。 【参考方案1】:

正如我在上面的评论中提到的,我不相信您正确地处理了对unusual.txt 的输出。以下至少应该通过让您的工作函数将记录或None 返回到主进程进行写入来纠正该问题。我正在使用方法imap 而不是map,这样我就可以在返回值时懒惰地处理它们。它们现在也将按照它们出现在输入文件中的符号顺序排列。如果输入文件有大量符号,我们不应该使用默认的chunksize参数,所以我提供了一个函数来计算一个合适的值。

import yfinance as yf
import multiprocessing
import time

def read_ticker_file():
    with open("C:\\Users\\miner\\Desktop\\tickers.txt", 'r') as f:
        return [line.strip() for line in f]

def get_historical_data(symbol):
    yahoo_ticker = yf.Ticker(symbol)
    historical = yf.download(symbol, period="max", interval="1d")
    average_volume_arr = historical['Volume']
    try:
        current_volume = yahoo_ticker.info['volume']
        sum_volume = 0
        for volume in average_volume_arr:
            sum_volume += volume
        average_volume = sum_volume / len(average_volume_arr)
        if current_volume > average_volume:
            volume_over_average = (current_volume - average_volume) / average_volume
            volume_over_average = ":.2%".format(volume_over_average)
            unusual_volume = (symbol + " - " + str(volume_over_average))
            print(unusual_volume)
            return unusual_volume
        else:
            return None
    except Exception as e:
        print(e)
        return None

def compute_chunksize(iterable_size, pool_size):
    chunksize, remainder = divmod(iterable_size, 4 * pool_size)
    if remainder:
        chunksize += 1
    return chunksize

if __name__ == '__main__':
    # start = time.time()
    inputs = read_ticker_file()
    pool = multiprocessing.Pool(processes=20)
    chunksize = compute_chunksize(len(inputs), 20)
    results = pool.imap(get_historical_data, inputs, chunskize=chunksize)
    with open("C:\\Users\\miner\\Desktop\\unusual.txt", 'w') as f:
        for result in results:
            if result:
                print(result, file=f)
    # end = time.time()
    # print(start - end)

另一种方法

再说一次,除了你写给 unusual.txt 的问题之外,这不一定能解决你的问题,上面的代码也应该处理。但这是我编写解决方案并从那里开始工作的方式:

我在这里“一筹莫展”,不知道 tickers.txt 文件有多大,也不知道 yfinance 包有多大。但似乎相当明显的是,对 yf.download 的调用和写入 unusual.txt 的文件,我已经在我上面的评论中指出,我认为处理不正确,是我/ O 绑定的“进程”不能由多线程池处理。目前尚不清楚剩下的内容,即 current_volumeaverage_volume 的计算和比较是 CPU 密集型的,足以证明使用多处理来执行这些计算的开销是合理的。

下面将完成所有下载和计算的单个函数get_historical_data 拆分为两个函数load_historical_data_and_processprocess_data。同时创建了一个大型多线程池和多处理池。 tickers.txt 中的每个符号都使用带有函数imap 的多线程池调用工作函数load_historical_data_and_process,这是map 的“更懒惰”版本。即在文件较大的情况下,不必将所有符号读入内存,先建立map所需的列表;可以使用生成器函数。即使文件很小,使用imap 也没有真正的缺点。 load_historical_data_and_process 将完成所有必要的下载。为了进行计算,它将使用通过阻塞方法apply 传递给它的多线程池来调用工作函数process_data。通过直接调用函数process_data而不是使用多处理池来获得替代时间会很有趣。当然,在这种情况下,由于争用全局解释器锁,在执行process_data 时跨线程实现的并发性非常少。但是,根据process_data 的执行涉及多少实际 CPU(我无法知道),您不必跨进程边界传递参数和结果所节省的 CPU 可能会发生偏移。

import yfinance as yf
from multiprocessing.pool import ThreadPool, Pool
from functools import partial
import time

def get_symbols():
    with open("C:\\Users\\miner\\Desktop\\tickers.txt", 'r') as file1:
        for line in file1:
            yield line.strip()

def load_historical_data_and_process(multiprocessing_pool, symbol):
    """ What I believe is I/O-intensive and so this runs in a multithreading pool: """
    try:
        historical = yf.download(symbol, period="max", interval="1d")
        yahoo_ticker = yf.Ticker(symbol)
        current_volume = yahoo_ticker.info['volume']
        # To call directly:
        #return process_data(symbol, historical, current_volume)
        return multiprocessing_pool.apply(process_data, args=(symbol, historical, current_volume))
    except Exception as e:
        print(e)
        return None


def process_data(symbol, historical, current_volume):
    """ What I believe may warrant running in a multiprocessing pool: """
    average_volume_arr = historical['Volume']
    sum_volume = 0
    for volume in average_volume_arr:
        sum_volume += volume
    average_volume = sum_volume / len(average_volume_arr)
    if current_volume > average_volume:
        volume_over_average = (current_volume - average_volume) / average_volume
        volume_over_average = ":.2%".format(volume_over_average)
        unusual_volume_record = (symbol + " - " + str(volume_over_average))
        print(unusual_volume_record, flush=True)
        return unusual_volume_record
    else:
        return None

if __name__ == '__main__':
    # start = time.time()
    # or some suitable thread pool size:
    with Pool(processes=20) as multiprocessing_pool, ThreadPool(processes=100) as thread_pool:
        # pass multiprocessing pool to thread pool worker get_historical_data for CPU-intensive processing
        worker = partial(load_historical_data_and_process, multiprocessing_pool)
        results = thread_pool.imap(worker, get_symbols())
        with open("C:\\Users\\miner\\Desktop\\unusual.txt", 'w') as f:
            for result in results:
                if result:
                    print(result, file=f)
    # end = time.time()
    # print(start - end)

【讨论】:

以上是关于我是不是正确使用了python池?的主要内容,如果未能解决你的问题,请参考以下文章

并发编程系列之如何正确使用线程池?

并发编程系列之如何正确使用线程池?

Python从入门到精通(二十二)Python线程池的正确使用姿势

Java线程池异常处理的正确姿势

正确使用@Async,避免踩坑

使用线程池多线程爬取链接,检验链接正确性