如何在 Flask 中摄取实时数据并对其进行预测 [重复]

Posted

技术标签:

【中文标题】如何在 Flask 中摄取实时数据并对其进行预测 [重复]【英文标题】:How to ingest live data and make predictions on them in Flask [duplicate] 【发布时间】:2022-01-23 03:37:20 【问题描述】:

我有一个烧瓶应用程序,当用户触发端点时,它会 ping 外部 API,获取实时数据,转换该信息,并通过线性回归模型运行它并将预测输出给用户。

我希望有一个内部函数不断从实时流中检索最新数据,对其进行转换,进行预测,然后每当用户使用请求 ping 一个单独的端点时,它只返回最最近做出的预测。

解决此问题的最佳方法是什么?目前我的设置与此类似:

from flask import Flask, make_response, jsonify, request
import numpy as np
import pandas as pd

app = Flask(__name__)

def get_data(name):
    response = requests.get('https://pokeapi.co/api/v2/pokemon/'.format(name))
    data = response.json()
    return data

def transform(name,data):
    df = pd.DataFrame(columns=['skill_name','url','name'])
    for val in range(len(data['abilities'])):
        row = pd.Series(data['abilities'][val]['ability'])
        row.index = ['skill_name','url']
        row['name'] = name

        df = df.append(row,ignore_index=True)
    return df

def make_prediction(transformed_data):
    pass
    #some sklearn model

@app.route('/getinfo', methods=['POST'])
def get_info():
    name = int(request.args.get('name'))
    raw_data = get_data(name)
    transformed_df = transform(name,data)
    return make_prediction(transformed_df)

如您所见,预测频率目前完全取决于 /getinfo 端点被 ping 的频率。

但现在想象一下,在一个假设的世界中,每隔几秒钟,关于特定口袋妖怪的信息就会不断变化,我想不断加载最新的数据,而不管外部用户的请求频率如何,以及用户何时发送一个请求,我手头上已经准备好最新的预测。有点像内部生成的预测流,放入堆栈对象,当收到请求时,最新的预测从堆栈顶部返回给用户。我该怎么做呢?我面临的主要问题是如何在 Flask 应用程序中同时运行两个函数 - 一个在无限的固定间隔循环上运行以获取新数据并生成预测,另一个获取最新预测并将其返回给用户只有当它被调用时。

【问题讨论】:

【参考方案1】:

在启动应用程序时,我会使用线程模块启动另一个线程,该线程模块设置在一个循环上,每隔 x 秒请求和处理一次新数据,然后使用新结果设置一个全局变量。

您可能需要将新结果保存到互斥锁中,具体取决于 Flask 是否对每个请求使用多处理模块。

然后当有人向 /get-info 发送请求时,您可以返回全局变量的值。

【讨论】:

以上是关于如何在 Flask 中摄取实时数据并对其进行预测 [重复]的主要内容,如果未能解决你的问题,请参考以下文章

在 Flask 中使用 SQS 实时更新

如何在 sklearn 中编写自定义估算器并对其使用交叉验证?

Windows c++ waveInStart 如何访问记录的数据并对其进行管理?

如何在 PostgreSQL 中找出碎片索引并对其进行碎片整理?

Oracle 到 Hadoop 数据的实时摄取

如何按列值的计数进行分组并对其进行排序?