如何在 Spark 2.1.0 中使用 SparkSQL 将“.txt”转换为“.parquet”?

Posted

技术标签:

【中文标题】如何在 Spark 2.1.0 中使用 SparkSQL 将“.txt”转换为“.parquet”?【英文标题】:How to use SparkSQL Convert ".txt" to ".parquet" in Spark 2.1.0? 【发布时间】:2017-07-05 03:43:49 【问题描述】:

看,我用“spark-shell”命令测试了它。(https://spark.apache.org/docs/latest/sql-programming-guide.html)


scala> case class IP(country: String) extends Serializable
17/07/05 11:20:09 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.50.3:42868 in memory (size: 33.1 KB, free: 93.3 MB)
17/07/05 11:20:09 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.50.3:40888 in memory (size: 33.1 KB, free: 93.3 MB)
17/07/05 11:20:09 INFO ContextCleaner: Cleaned accumulator 0
17/07/05 11:20:09 INFO ContextCleaner: Cleaned accumulator 1
defined class IP

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

scala> val df = spark.sparkContext.textFile("/test/guchao/ip.txt").map(x => x.split("\\|", -1)).map(x => IP(x(0))).toDF()
17/07/05 11:20:36 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 216.5 KB, free 92.9 MB)
17/07/05 11:20:36 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 20.8 KB, free 92.8 MB)
17/07/05 11:20:36 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.50.3:42868 (size: 20.8 KB, free: 93.3 MB)
17/07/05 11:20:36 INFO SparkContext: Created broadcast 2 from textFile at :33
df: org.apache.spark.sql.DataFrame = [country: string]

scala> df.write.mode(SaveMode.Overwrite).save("/test/guchao/ip.parquet")
17/07/05 11:20:44 INFO ParquetFileFormat: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
17/07/05 11:20:44 INFO SQLHadoopMapReduceCommitProtocol: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
17/07/05 11:20:44 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
17/07/05 11:20:44 INFO CodeGenerator: Code generated in 88.405717 ms
17/07/05 11:20:44 INFO FileInputFormat: Total input paths to process : 1
17/07/05 11:20:44 INFO SparkContext: Starting job: save at :36
17/07/05 11:20:44 INFO DAGScheduler: Got job 1 (save at :36) with 2 output partitions
17/07/05 11:20:44 INFO DAGScheduler: Final stage: ResultStage 1 (save at :36)
17/07/05 11:20:44 INFO DAGScheduler: Parents of final stage: List()
17/07/05 11:20:44 INFO DAGScheduler: Missing parents: List()
17/07/05 11:20:44 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[12] at save at :36), which has no missing parents
17/07/05 11:20:44 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 77.3 KB, free 92.8 MB)
17/07/05 11:20:44 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 29.3 KB, free 92.7 MB)
17/07/05 11:20:44 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.50.3:42868 (size: 29.3 KB, free: 93.2 MB)
17/07/05 11:20:44 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:996
17/07/05 11:20:44 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[12] at save at :36)
17/07/05 11:20:44 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
17/07/05 11:20:44 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 192.168.50.3, executor 0, partition 0, ANY, 6027 bytes)
17/07/05 11:20:44 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.50.3:40888 (size: 29.3 KB, free: 93.3 MB)
17/07/05 11:20:45 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.50.3:40888 (size: 20.8 KB, free: 93.2 MB)
17/07/05 11:20:45 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 192.168.50.3, executor 0, partition 1, ANY, 6027 bytes)
17/07/05 11:20:45 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 679 ms on 192.168.50.3 (executor 0) (1/2)
17/07/05 11:20:46 INFO DAGScheduler: ResultStage 1 (save at :36) finished in 1.476 s
17/07/05 11:20:46 INFO DAGScheduler: Job 1 finished: save at :36, took 1.597097 s
17/07/05 11:20:46 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 804 ms on 192.168.50.3 (executor 0) (2/2)
17/07/05 11:20:46 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
17/07/05 11:20:46 INFO FileFormatWriter: Job null committed.

但结果是: [root@master ~]# hdfs dfs -ls -h /test/guchao 17/07/05 11:20:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 2 items drwxr-xr-x - root supergroup 0 2017-07-05 11:20 /test/guchao/ip.parquet -rw-r--r-- 1 root supergroup 23.9 M 2017-07-05 10:05 /test/guchao/ip.txt

为什么这个“ip.parquet”的大小是0?我不明白也很困惑。

谢谢!

【问题讨论】:

您所做的不仅仅是将文本转换为镶木地板。您正在拆分和映射。也许 Dataframe 是空的? 【参考方案1】:

hdfs dfs -ls -h <path> 显示文件大小,目录显示为 0。

df.write.mode(SaveMode.Overwrite).save("/test/guchao/ip.parquet")

这会将目录创建为/test/guchao/ip.parquet,其中包含此目录中的部分文件,这就是它显示0大小的原因

hadoop fs -ls /test/guchao/ip.parquet 

这应该显示输出文件的实际大小

如果你想获得目录的大小,可以使用

hadoop fs -du -s /test/guchao/ip.parquet

希望这会有所帮助!

【讨论】:

它确实会发生一段时间,别担心 :)【参考方案2】:

/test/guchao/ip.parquet 是一个目录,进入该目录,您应该会找到类似 part-00000 的内容,这将是您要查找的文件。

hadoop fs -ls /test/guchao/ip.parquet

【讨论】:

以上是关于如何在 Spark 2.1.0 中使用 SparkSQL 将“.txt”转换为“.parquet”?的主要内容,如果未能解决你的问题,请参考以下文章

如何在Spark中转置数据框?

spark-redshift - 使用 Spark 2.1.0 保存时出错

spark(2.1.0) 操作hbase(1.0.2)

无法使用 Apache spark 2.1.0 连接到 hive 数据库

如何在 spark sql 2.1.0 中的 Dataset<Row> 上获取 groupby 之后的所有列

Spark RDD的默认分区数:(spark 2.1.0)