是否可以通过 spark 直接将 Writestream 用于 API

Posted

技术标签:

【中文标题】是否可以通过 spark 直接将 Writestream 用于 API【英文标题】:Is it possible to use Writestream directly to an API via spark 【发布时间】:2020-02-07 10:54:20 【问题描述】:

我在 Databricks 上构建代码以实时读取增量表(读取流),然后我需要将此流数据发布到 API。 在我阅读的所有论文中,writestream 仅用于创建文件(.csv、.avro、.parquet 等)或发送到事件中心。是否可以使用 writestream 发布到 API!?

我的代码:

from pyspark.sql.functions import unix_timestamp, round, col
import json
import pandas as pd
from pyspark.sql.functions import lit
import requests

#tried with foreach_batch but it doens't work
def foreach_batch_function(df,epochId):
    r2 = requests.post('https://demo.api.com/index.php/api/v5/smsrequest/', data=str(df), verify=False)
    r2.json()
    pass

rs = spark.readStream.format("delta").option('path','/mnt/gen2/raw/mytable').load()
df = rs.select(round('id_cliente_fat').alias('id_cliente_fat'),'fone_fat','nome_fat',unix_timestamp('dt_nasc_fat','YYYY-MM-DD').cast('timestamp').cast('date').alias('birth_date'),'email_fat')

df2 = df.selectExpr('id_cliente_fat as identifier_code','fone_fat as phone_number','nome_fat as name','birth_date','email_fat as email')

data = 'authentication':'username':'user','password':'pass'
r = requests.post('https://demo.api.com/index.php/api/v5/login/', data=json.dumps(data), verify=False).json()

df3 = df2.withColumn("steps", lit("[1,2,4,7]")).withColumn("place_id", lit(164)).withColumn("token", lit(r["authentication"]["token"]))

df4 = df3.select(to_json(struct(struct("token").alias("authentication"), struct("identifier_code", "phone_number", "name", "birth_date", "email","steps","place_id").alias("smsrequest").alias("smsrequest"))).alias(""))

df4.writeStream.foreachBatch(foreach_batch_function).start() 

【问题讨论】:

【参考方案1】:

你需要用.collect()方法把数据拿给驱动(数据量大不建议使用)。

试试这样的:

def foreach_batch_function(df,epochId):

    # Create a Json with kews the name of the columns and values the values of the df
    json_data = map(lambda row: row.asDict(), df.collect())

    r2 = requests.post('https://demo.api.com/index.php/api/v5/smsrequest/', data=json_data, verify=False)
    r2.json()
    pass

【讨论】:

以上是关于是否可以通过 spark 直接将 Writestream 用于 API的主要内容,如果未能解决你的问题,请参考以下文章

是否可以将 Option 与 spark UDF 一起使用

使用 spark-sql cli 将 csv 数据直接加载到 parquet 表中

是否可以直接在外部数据库表上执行删除和更新操作,比如在 ORACLE 中,使用 Spark

将数据从我的 spark 代码发送到 redshift

是否可以将 Spark 中的 data.table 与 Spark Dataframes 一起使用?

是否可以使用 spark 的 jdbc 驱动程序将 apache spark 与 jasper 集成?