对 Apache 梁中的单个列执行转换

Posted

技术标签:

【中文标题】对 Apache 梁中的单个列执行转换【英文标题】:Perform a transformation on a single column in Apache beam 【发布时间】:2021-09-13 00:26:22 【问题描述】:

我有一个已加载到 Google Cloud Storage 的 CSV,我正在创建一个 Dataflow 管道,它将读取和处理 CSV,然后按单列执行列表计数。

如何隔离单列。假设列是 id、city、sports_team。我想计算一个城市出现了多少次。

我的起始代码是这样的:

# Python's regular expression library
import re

# Beam and interactive Beam imports
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

class SplitRecords(beam.DoFn):
    """Spilt the element into records, return rideable_type record."""
    def process(self, element):
        records = element.split(",")
        return [records[1]]

p = beam.Pipeline(InteractiveRunner())
lines = p | 'read in file' >> beam.io.ReadFromText("gs://ny-springml-data/AB_NYC_2019.csv", skip_header_lines=1)
records = lines | beam.ParDo(SplitRecords())
groups = (records | beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
groups | beam.io.WriteToText('TEST2.txt')

我收到一个 IndexError: list index out of range.... 我对这一切都非常陌生,因此感谢您的帮助。

【问题讨论】:

【参考方案1】:

您的 CVS 文件中可能有一些意外的行,例如一个空白的。你可以做类似的事情

if len(records) < 2:
  raise ValueError("Bad line: %r" % element)
else:
  yield records[1]

以获得更好的错误消息。我还建议考虑使用 Beam Dataframes 来完成此类任务。

【讨论】:

以上是关于对 Apache 梁中的单个列执行转换的主要内容,如果未能解决你的问题,请参考以下文章

遍历 pyspark 数据框中的列,而不为单个列创建不同的数据框

在单个查询中将VARCHAR(max)列转换为大写,然后转换为XML

如何使用 Java 将 unix 纪元的列转换为 Apache spark DataFrame 中的日期?

apache spark中的sortbykey

SQL中PIVOT 行列转换

根据 Angular 中的标题对表格中单列中的数据进行排序