对 Apache 梁中的单个列执行转换
Posted
技术标签:
【中文标题】对 Apache 梁中的单个列执行转换【英文标题】:Perform a transformation on a single column in Apache beam 【发布时间】:2021-09-13 00:26:22 【问题描述】:我有一个已加载到 Google Cloud Storage 的 CSV,我正在创建一个 Dataflow 管道,它将读取和处理 CSV,然后按单列执行列表计数。
如何隔离单列。假设列是 id、city、sports_team。我想计算一个城市出现了多少次。
我的起始代码是这样的:
# Python's regular expression library
import re
# Beam and interactive Beam imports
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
class SplitRecords(beam.DoFn):
"""Spilt the element into records, return rideable_type record."""
def process(self, element):
records = element.split(",")
return [records[1]]
p = beam.Pipeline(InteractiveRunner())
lines = p | 'read in file' >> beam.io.ReadFromText("gs://ny-springml-data/AB_NYC_2019.csv", skip_header_lines=1)
records = lines | beam.ParDo(SplitRecords())
groups = (records | beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
groups | beam.io.WriteToText('TEST2.txt')
我收到一个 IndexError: list index out of range.... 我对这一切都非常陌生,因此感谢您的帮助。
【问题讨论】:
【参考方案1】:您的 CVS 文件中可能有一些意外的行,例如一个空白的。你可以做类似的事情
if len(records) < 2:
raise ValueError("Bad line: %r" % element)
else:
yield records[1]
以获得更好的错误消息。我还建议考虑使用 Beam Dataframes 来完成此类任务。
【讨论】:
以上是关于对 Apache 梁中的单个列执行转换的主要内容,如果未能解决你的问题,请参考以下文章
遍历 pyspark 数据框中的列,而不为单个列创建不同的数据框
在单个查询中将VARCHAR(max)列转换为大写,然后转换为XML