如何将 PySpark / AWS Glue 中 RDD 的所有行加入/连接/合并成一条长线?

Posted

技术标签:

【中文标题】如何将 PySpark / AWS Glue 中 RDD 的所有行加入/连接/合并成一条长线?【英文标题】:How to join / concatenate / merge all rows of an RDD in PySpark / AWS Glue into one single long line? 【发布时间】:2021-09-28 17:54:52 【问题描述】:

我有一个协议需要接收许多(读取数百万)条记录。该协议要求所有数据都是单行馈送(InfluxDB/QuestDB)。使用 InfluxDB 客户端目前不是一个选项,所以我需要通过套接字来执行此操作。

我的 ETL 流程即将结束,我现在只需要获取我创建的最终 RDD 并获取所有这些行并将它们转换为一行,但不知道如何执行此操作(并且如何正确地做到这一点!)

我目前在 PySpark / AWS Glue 中:

def write_to_quest(df, measurement, table, timestamp_field, args):
    HOST = args['questdb_host']
    PORT = int(args['questdb_port'])
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    try:
        sock.connect((HOST, PORT))
        rows = df.rdd.map(lambda row: row.asDict(True))
        new_rdd = rows.map(lambda row: 
                                 _row_to_line_protocol(row, measurement, table, timestamp_field)).glom()

        #transform new_rdd to single_line_rdd here

        sock.sendall((single_line_rdd).encode())

    except socket.error as e:
        print("Got error: %s" % (e))

调用者:

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

allDaily = glueContext.create_dynamic_frame.from_catalog(database=args['db_name'],
                                                         table_name="daily",
                                                         transformation_ctx="allDaily",
                                                         push_down_predicate="(date_str='20040302' and meter_id='NEM1206106')"
                                                         # for faster testing
                                                         )

# TODO: Handle entire DF as a single payload
df = allDaily.toDF()
tdf = df.withColumn('reading_date_time', F.to_timestamp(df['reading_date_time'], '%Y-%m-%dT%H:%M:%S.%f'))
tdf = tdf.drop(*["ingestion_date", "period_start", "period_end", "quality_method",
                 "event", "import_reactive_total", "export_reactive_total"])

write_to_quest(df=tdf, measurement="meter_id", table="daily", timestamp_field="reading_date_time", args=args)

new_rdd 的形状是一组字符串列表:

RDD[
['string here,to,join','another string,to,join'...x70]
['string here,to,join','another string,to,join'...x70]
['string here,to,join','another string,to,join'...x70]
x200
]

我如何得到这个,所以我有一个单行,所有内容都由 '\n'(换行符)连接?

例如:

'string here,to,join\nanother string,to,join\n....'

到目前为止,我已经尝试了几种 foreach 的组合,例如:

foreach(lambda x: ("\n".join(x)))

但绝对无济于事,我还担心它的可扩展性 - 例如,我很确定在数百万行上使用 .collect() 会杀死事情

那么解决这最后一步的最佳方法是什么?

接受答案后编辑

我实施的 Werner 回答的具体解决方案是这样的(我删除 Glob 以每行获取一个列表项,然后删除空格(因为 Influx / Quest 对空格敏感)

def write_to_quest(df, measurement, table, timestamp_field, args):
    """
    Open a socket and write the row directly into Quest
    :param df_row:
    :param measurement:
    :param table:
    :param timestamp_field:
    :param args:
    :return:
    """
    HOST = args['questdb_host']
    PORT = int(args['questdb_port'])
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    try:
        sock.connect((HOST, PORT))
        rows = df.rdd.map(lambda row: row.asDict(True))
        new_rdd = rows.map(lambda row:
                            _row_to_line_protocol(row, measurement, table, timestamp_field))
        result = new_rdd.map(lambda r: "".join(r) + "\n") \
            .aggregate("", lambda a, b: a + b, lambda a, b: a + b)
        
        sock.sendall((result.encode()))


    except socket.error as e:
        print("Got error: %s" % (e))

    sock.close()

【问题讨论】:

【参考方案1】:

rdd 的每一行可以使用map 映射成每行一个字符串,然后map 调用的结果可以将aggregated 转换成一个大字符串:

result = rdd.map(lambda r: " ".join(r) + "\n")\
    .aggregate("", lambda a,b: a+b, lambda a,b: a+b)

如果目标是拥有一个大字符串,则至少在最后一步必须将所有数据移动到一个位置。在此处使用aggregate优于收集所有行并在驱动程序上连接字符串,因为aggregate 可以在大多数情况下分布式并行处理。然而,在单个节点上仍然需要为整个最终字符串提供足够的内存。

【讨论】:

这就像一个绝对的梦想 - 我已经基于此更新了我的问题,并使用适用于上述特定用例的解决方案 - 谢谢:)

以上是关于如何将 PySpark / AWS Glue 中 RDD 的所有行加入/连接/合并成一条长线?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用带有 PySpark 的 WHERE 子句在 AWS Glue 中查询 JDBC 数据库?

如何在 AWS Glue pyspark 脚本中合并两个节点

AWS Glue PySpark 替换 NULL

如何在 AWS Glue PySpark 中运行并行线程?

如何从aws glue pyspark作业中的嵌套数组中提取数据

Python/Pyspark 迭代代码(用于 AWS Glue ETL 作业)