如何将 pyspark、石墨烯和烧瓶结合在一起?

Posted

技术标签:

【中文标题】如何将 pyspark、石墨烯和烧瓶结合在一起?【英文标题】:How can i combine pyspark, graphene and flask together? 【发布时间】:2019-09-03 20:50:36 【问题描述】:

在我的代码中,我使用pyspark 进行数据操作,python graphene 用于围绕我的数据构建graphql 服务器,flask 用于服务graphql api。 但我面临一些问题,我不知道它发生的原因。

基本上,代码不会计算出正确的结果,我假设总体上可能存在一些并发问题。

这是我的代码的简化:

import graphene
from flask import Flask
from flask_graphql import GraphQLView

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

app = Flask(__name__)

spark = SparkSession.builder \
    .master("local") \
    .appName("Sencivil") \
    .config("spark.driver.allowMultipleContexts", "true") \
    .getOrCreate()

df = spark.read.format("csv")\
    .option("sep", ";")\
    .option("header", "true")\
    .load("./people.csv")\
    .cache()


class Birthdate(graphene.ObjectType):
    boys = graphene.Int()
    girls = graphene.Int()

    def __init__(self, bd):
        self.bd = bd

    def resolve_boys(self, info):
        boys = df.where(df.gender == 1).groupby("birthdate").count()
        return boys.count()

    def resolve_girls(self, info):
        girls = df.where(df.gender == 2).groupby("birthdate").count()
        return girls.count()


class Person(graphene.ObjectType):
    born = graphene.List(Birthdate)

    def resolve_born(self, info):
        bds = [Birthdate(row.asDict()["birthdate"])
               for row in df.select("birthdate").collect()]
        return bds


class Query(graphene.ObjectType):
    people = graphene.Field(Person)

    def resolve_people(self, info):
        return Person()


schema = graphene.Schema(query=Query)

app.add_url_rule(
    "/graphql", view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=True))


if __name__ == "__main__":
    app.run(debug=True)

people.csv

birthdate;gender
13-01-1987;1
02-09-1986;2
13-01-1987;1
02-09-1986;2
12-04-1998;1

问题在于Birthdate 类的resolve_boysresolve_girls 方法,我希望男孩得到2 个,女孩得到1 个,正如我们可以直接从pyspark shell 看到的那样:

>>> boys = df.where(df.gender == 1).groupby("birthdate").count()
>>> boys.show()
+----------+-----+                                                              
| birthdate|count|
+----------+-----+
|13-01-1987|    2|
|12-04-1998|    1|
+----------+-----+

>>> girls = df.where(df.gender == 2).groupby("birthdate").count()
>>> girls.show()
+----------+-----+                                                              
| birthdate|count|
+----------+-----+
|02-09-1986|    2|
+----------+-----+

但我只是从 api 获得 0:

那么如何解决这个问题呢?

如果我将读取 csv 文件的代码更改为像这样在本地创建数据框

# df = spark.read.format("csv")\
#     .option("sep", ";")\
#     .option("header", "true")\
#     .load("./people.csv")\
#     .cache()

df = spark.createDataFrame(
    ['birthdate': "13-01-1987", "gender": 1,
     'birthdate': "02-09-1986", "gender": 2, 
     'birthdate': "13-01-1987", "gender": 1,
     'birthdate': "02-09-1986", "gender": 2, 
     'birthdate': "12-04-1998", "gender": 1]
)

我得到了正确答案:

所以在读取 csv 文件时会出现问题。但是为什么会这样呢?

【问题讨论】:

【参考方案1】:

您的问题解决了吗?正如您指出的那样,数据框似乎没有正确加载,但我相信这是由.load("./people.csv")\ 这行引起的,它没有指向您的 csv 文件。

以防万一,您可以将代码包装在 try-except 块中以打印出数据帧或错误响应吗?

干杯!

【讨论】:

以上是关于如何将 pyspark、石墨烯和烧瓶结合在一起?的主要内容,如果未能解决你的问题,请参考以下文章

将 graphql 集成到烧瓶蓝图中

使用石墨烯和 SQLalchemy 为 GraphQL 设置权限很热门

石墨烯和 SQLAlchemy:派生自基础对象类型

如何在石墨烯烧瓶中进行多个查询过滤器?

石墨或grafana可以用来监控pyspark指标吗?

将 json 模型字段与 django 石墨烯一起使用