如何将流式 DataFrame 转换为常规批处理 DataFrame? [关闭]
Posted
技术标签:
【中文标题】如何将流式 DataFrame 转换为常规批处理 DataFrame? [关闭]【英文标题】:How to convert streaming DataFrame into regular batch DataFrame? [closed] 【发布时间】:2017-10-31 10:37:08 【问题描述】:我想将流数据帧转换为普通数据帧以进行大量操作: count.distinct 用于实时分析的复杂查询。
如果您对将流数据帧转换为 spark 中的普通数据帧有任何想法,请提出建议。
【问题讨论】:
【参考方案1】:我认为最好的办法是编写自定义流Sink 并在addBatch
中的每批访问DataFrame
。
Sink
相当短,所以在这里引用 scaladoc 和代码。
/**
* An interface for systems that can collect the results of a streaming query. In order to preserve
* exactly once semantics a sink must be idempotent in the face of multiple attempts to add the same
* batch.
*/
trait Sink
/**
* Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
* this method is called more than once with the same batchId (which will happen in the case of
* failures), then `data` should only be added once.
*
* Note 1: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`).
* Otherwise, you may get a wrong result.
*
* Note 2: The method is supposed to be executed synchronously, i.e. the method should only return
* after data is consumed by sink successfully.
*/
def addBatch(batchId: Long, data: DataFrame): Unit
另请阅读StreamSinkProvider。
【讨论】:
【参考方案2】:将您的流式数据帧保存到本地或 kafka。并从本地或 kafka 以批处理模式读取。
【讨论】:
来自本地?那会是什么样子?愿意指出“来源”吗?以上是关于如何将流式 DataFrame 转换为常规批处理 DataFrame? [关闭]的主要内容,如果未能解决你的问题,请参考以下文章
Pandas:将 DataFrame 转换为每个单元格的均值和标准差
使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出
如何将 Dask.DataFrame 转换为 pd.DataFrame?