如何将流式 DataFrame 转换为常规批处理 DataFrame? [关闭]

Posted

技术标签:

【中文标题】如何将流式 DataFrame 转换为常规批处理 DataFrame? [关闭]【英文标题】:How to convert streaming DataFrame into regular batch DataFrame? [closed] 【发布时间】:2017-10-31 10:37:08 【问题描述】:

我想将流数据帧转换为普通数据帧以进行大量操作: count.distinct 用于实时分析的复杂查询。

如果您对将流数据帧转换为 spark 中的普通数据帧有任何想法,请提出建议。

【问题讨论】:

【参考方案1】:

我认为最好的办法是编写自定义流Sink 并在addBatch 中的每批访问DataFrame

Sink 相当短,所以在这里引用 scaladoc 和代码。

/**
 * An interface for systems that can collect the results of a streaming query. In order to preserve
 * exactly once semantics a sink must be idempotent in the face of multiple attempts to add the same
 * batch.
 */
trait Sink 

  /**
   * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
   * this method is called more than once with the same batchId (which will happen in the case of
   * failures), then `data` should only be added once.
   *
   * Note 1: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`).
   * Otherwise, you may get a wrong result.
   *
   * Note 2: The method is supposed to be executed synchronously, i.e. the method should only return
   * after data is consumed by sink successfully.
   */
  def addBatch(batchId: Long, data: DataFrame): Unit

另请阅读StreamSinkProvider。

【讨论】:

【参考方案2】:

将您的流式数据帧保存到本地或 kafka。并从本地或 kafka 以批处理模式读取。

【讨论】:

来自本地?那会是什么样子?愿意指出“来源”吗?

以上是关于如何将流式 DataFrame 转换为常规批处理 DataFrame? [关闭]的主要内容,如果未能解决你的问题,请参考以下文章

Pandas:将 DataFrame 转换为每个单元格的均值和标准差

使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出

如何将 Dask.DataFrame 转换为 pd.DataFrame?

将 Pandas DataFrame 的行转换为列标题,

将 SQL 查询转换为 Spark Dataframe 结构化数据处理

我们如何将 DataFrame 转换为 Bunch 数据类型? [关闭]