Pyspark:spark-submit 不像 CLI 那样工作

Posted

技术标签:

【中文标题】Pyspark:spark-submit 不像 CLI 那样工作【英文标题】:Pyspark: spark-submit not working like CLI 【发布时间】:2018-05-22 14:59:30 【问题描述】:

我有一个 pyspark 可以从 TSV 文件加载数据并将其保存为 parquet 文件以及将其保存为持久 SQL 表。

当我通过 pyspark CLI 逐行运行它时,它的工作方式与预期完全一样。当我使用 spark-submit 作为应用程序运行它时,它运行时没有任何错误,但我得到奇怪的结果: 1. 数据被覆盖而不是附加。 2. 当我对它运行 SQL 查询时,即使 parquet 文件有几 GB 大小(我所期望的),我也没有返回任何数据。有什么建议吗?

代码:

from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import *

csv_file = '/srv/spark/data/input/ipfixminute2018-03-28.tsv'
parquet_dir = '/srv/spark/data/parquet/ipfixminute'

sc = SparkContext(appName='import-ipfixminute')
spark = SQLContext(sc)

fields = [StructField('time_stamp', TimestampType(), True),
                StructField('subscriberId', StringType(), True),
                StructField('sourceIPv4Address', StringType(), True),
                StructField('destinationIPv4Address', StringType(), True),
                StructField('service',StringType(), True),
                StructField('baseService',StringType(), True),
                StructField('serverHostname', StringType(), True),
                StructField('rat', StringType(), True),
                StructField('userAgent', StringType(), True),
                StructField('accessPoint', StringType(), True),
                StructField('station', StringType(), True),
                StructField('device', StringType(), True),
                StructField('contentCategories', StringType(), True),
                StructField('incomingOctets', LongType(), True),
                StructField('outgoingOctets', LongType(), True),
                StructField('incomingShapingDrops', IntegerType(), True),
                StructField('outgoingShapingDrops', IntegerType(), True),
                StructField('qoeIncomingInternal', DoubleType(), True),
                StructField('qoeIncomingExternal', DoubleType(), True),
                StructField('qoeOutgoingInternal', DoubleType(), True),
                StructField('qoeOutgoingExternal', DoubleType(), True),
                StructField('incomingShapingLatency', DoubleType(), True),
                StructField('outgoingShapingLatency', DoubleType(), True),
                StructField('internalRtt', DoubleType(), True),
                StructField('externalRtt', DoubleType(), True),
                StructField('HttpUrl',StringType(), True)]

schema = StructType(fields)
df = spark.read.load(csv_file, format='csv',sep='\t',header=True,schema=schema,timestampFormat='yyyy-MM-dd HH:mm:ss')
df = df.drop('all')
df = df.withColumn('date',to_date('time_stamp'))
df.write.saveAsTable('test2',mode='append',partitionBy='date',path=parquet_dir)

【问题讨论】:

当您运行 spark-submit 时,您将执行委托给您可能缺少依赖项的集群(Spark?YARN?)。另请参阅此讨论 请参阅***.com/questions/36910014/… 看起来像 Spark can access Hive table from pyspark but not from spark-submit 的副本 尝试在代码末尾插入 time.sleep(600),进入 Spark UI 并查看日志。 @user8371915 是的,这是同一个问题!虽然在我的情况下它没有抛出任何错误,所以很难理解根本原因是什么。谢谢! 【参考方案1】:

正如@user8371915 建议的那样:

Spark can access Hive table from pyspark but not from spark-submit

我需要更换

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

这解决了这个问题。

【讨论】:

以上是关于Pyspark:spark-submit 不像 CLI 那样工作的主要内容,如果未能解决你的问题,请参考以下文章

为啥此 python 代码在 pyspark 中有效,但在 spark-submit 中无效?

spark-submit 适用于 Python 程序,但 pyspark 不起作用

没有 spark-submit 的 Exec pyspark 独立脚本

在 pyspark shell 中工作的过滤器不是 spark-submit

使用 Spark-Submit 在 kubernetes 上安装 PySpark 软件包:找不到常春藤缓存文件错误

如何在 Pyspark 中运行 Python 脚本