使用 Pyspark 从 REST API 获取数据到 Spark Dataframe
Posted
技术标签:
【中文标题】使用 Pyspark 从 REST API 获取数据到 Spark Dataframe【英文标题】:Fetching data from REST API to Spark Dataframe using Pyspark 【发布时间】:2020-06-24 13:00:07 【问题描述】:我正在构建一个数据管道,它以 json 格式使用来自 RESTApi 的数据并推送到 Spark Dataframe。火花版本:2.4.4
但出现错误
df = SQLContext.jsonRDD(rdd)
AttributeError: type object 'SQLContext' has no attribute 'jsonRDD'
代码:
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from urllib import urlopen
from pyspark import SQLContext
import json
spark = SparkSession \
.builder \
.appName("DataCleansing") \
.getOrCreate()
def convert_single_object_per_line(json_list):
json_string = ""
for line in json_list:
json_string += json.dumps(line) + "\n"
return json_string
def parse_dataframe(json_data):
r = convert_single_object_per_line(json_data)
mylist = []
for line in r.splitlines():
mylist.append(line)
rdd = spark.sparkContext.parallelize(mylist)
df = SQLContext.jsonRDD(rdd)
return df
url = "https://mylink"
response = urlopen(url)
data = str(response.read())
json_data = json.loads(data)
df = parse_dataframe(json_data)
如果有任何其他更好的方法来查询 RestApi 并使用 Pyspark 将数据带到 Spark Dataframe。
我不确定我是否遗漏了什么。
【问题讨论】:
spark 有 rest api 数据源,检查一下 - github.com/sourav-mazumder/Data-Science-Extensions/tree/master/… 您是否使用“REST API 数据源”来实现并行请求? 【参考方案1】:检查 Spark Rest API 数据源。这个库的一个优点是它将使用多个执行器来获取数据 rest api 并为您创建数据框。
在您的代码中,您将所有数据提取到驱动程序并创建 DataFrame,如果您有非常大的数据,它可能会因堆空间而失败。
uri = "https://mylink"
options = 'url' : url, 'method' : 'GET', 'readTimeout' : '10000', 'connectionTimeout' : '2000', 'partitions' : '10'
// Now we create the Dataframe which contains the result from the call to the API
df = spark.read.format("org.apache.dsext.spark.datasource.rest.RestDataSource").options(**options).load()
【讨论】:
评论不用于扩展讨论;这个对话是moved to chat。 太棒了。现在谈话在哪里?以上是关于使用 Pyspark 从 REST API 获取数据到 Spark Dataframe的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 httppost/rest-api 从 keycloak 获取用户列表
使用 Jquery 从 REST API nodeJS 获取数据 [重复]