如何将 pyspark、石墨烯和烧瓶结合在一起?
Posted
技术标签:
【中文标题】如何将 pyspark、石墨烯和烧瓶结合在一起?【英文标题】:How can i combine pyspark, graphene and flask together? 【发布时间】:2019-09-03 20:50:36 【问题描述】:在我的代码中,我使用pyspark
进行数据操作,python graphene
用于围绕我的数据构建graphql
服务器,flask
用于服务graphql api。
但我面临一些问题,我不知道它发生的原因。
基本上,代码不会计算出正确的结果,我假设总体上可能存在一些并发问题。
这是我的代码的简化:
import graphene
from flask import Flask
from flask_graphql import GraphQLView
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
app = Flask(__name__)
spark = SparkSession.builder \
.master("local") \
.appName("Sencivil") \
.config("spark.driver.allowMultipleContexts", "true") \
.getOrCreate()
df = spark.read.format("csv")\
.option("sep", ";")\
.option("header", "true")\
.load("./people.csv")\
.cache()
class Birthdate(graphene.ObjectType):
boys = graphene.Int()
girls = graphene.Int()
def __init__(self, bd):
self.bd = bd
def resolve_boys(self, info):
boys = df.where(df.gender == 1).groupby("birthdate").count()
return boys.count()
def resolve_girls(self, info):
girls = df.where(df.gender == 2).groupby("birthdate").count()
return girls.count()
class Person(graphene.ObjectType):
born = graphene.List(Birthdate)
def resolve_born(self, info):
bds = [Birthdate(row.asDict()["birthdate"])
for row in df.select("birthdate").collect()]
return bds
class Query(graphene.ObjectType):
people = graphene.Field(Person)
def resolve_people(self, info):
return Person()
schema = graphene.Schema(query=Query)
app.add_url_rule(
"/graphql", view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=True))
if __name__ == "__main__":
app.run(debug=True)
people.csv
birthdate;gender
13-01-1987;1
02-09-1986;2
13-01-1987;1
02-09-1986;2
12-04-1998;1
问题在于Birthdate
类的resolve_boys
和resolve_girls
方法,我希望男孩得到2 个,女孩得到1 个,正如我们可以直接从pyspark shell 看到的那样:
>>> boys = df.where(df.gender == 1).groupby("birthdate").count()
>>> boys.show()
+----------+-----+
| birthdate|count|
+----------+-----+
|13-01-1987| 2|
|12-04-1998| 1|
+----------+-----+
>>> girls = df.where(df.gender == 2).groupby("birthdate").count()
>>> girls.show()
+----------+-----+
| birthdate|count|
+----------+-----+
|02-09-1986| 2|
+----------+-----+
但我只是从 api 获得 0:
那么如何解决这个问题呢?
如果我将读取 csv 文件的代码更改为像这样在本地创建数据框
# df = spark.read.format("csv")\
# .option("sep", ";")\
# .option("header", "true")\
# .load("./people.csv")\
# .cache()
df = spark.createDataFrame(
['birthdate': "13-01-1987", "gender": 1,
'birthdate': "02-09-1986", "gender": 2,
'birthdate': "13-01-1987", "gender": 1,
'birthdate': "02-09-1986", "gender": 2,
'birthdate': "12-04-1998", "gender": 1]
)
我得到了正确答案:
所以在读取 csv 文件时会出现问题。但是为什么会这样呢?
【问题讨论】:
【参考方案1】:您的问题解决了吗?正如您指出的那样,数据框似乎没有正确加载,但我相信这是由.load("./people.csv")\
这行引起的,它没有指向您的 csv 文件。
以防万一,您可以将代码包装在 try-except 块中以打印出数据帧或错误响应吗?
干杯!
【讨论】:
以上是关于如何将 pyspark、石墨烯和烧瓶结合在一起?的主要内容,如果未能解决你的问题,请参考以下文章