如何从 SocketTCP 获取数据以保存到 Spark Scala 中的数据帧?
Posted
技术标签:
【中文标题】如何从 SocketTCP 获取数据以保存到 Spark Scala 中的数据帧?【英文标题】:How to get data from SocketTCP to save to dataframe in Spark Scala? 【发布时间】:2021-09-04 18:17:10 【问题描述】:我尝试从 Socket TCP 获取数据以附加到我收到的数据帧 数据并将它们执行到 Seq() 但是当我使用 forEach 附加 他们到数据框有问题这是我的代码:
object CustomReceiver
def main(args: Array[String]): Unit =
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val spark: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("CustomReceiver")
.getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))
import spark.implicits._
/*formatdata line data from Socket: number1, 20210621090303, RadiusMessage, Stop, 84602496347, v241.66.85.130 */
val linesData1 = ssc.receiverStream(new CustomReceiver("localhost", 11000))
linesData1.flatMap(_.split(" ").map(_.trim))
linesData1.foreachRDD rdd =>
rdd.foreach line =>
val arrraLine = line.split(",").toList
// oke arrayLine data : List(number1, 20210621090303, RadiusMessage, Stop, 84602496347, 241.66.85.130)
val testRDD = Seq(arrraLine).map(x =>(x(0), x(1), x(2), x(3), x(4)))
// oke TestRDD : testRDD :List((number1,20210621090303,RadiusMessage,Stop,84602496347))
val testDF = testRDD.toDF("cot1","cot2","cot3","cot4","cot5")
// has an Problem
testDF.show()
ssc.start()
ssc.awaitTermination()
这是我运行时的问题
java.lang.NullPointerException 在 org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231) 在 Cl.CustomReceiver$.$anonfun$main$4(CustomeReceiver.scala:52) 在 Cl.CustomReceiver$.$anonfun$main$4$adapted(CustomeReceiver.scala:45) 在 scala.collection.Iterator.foreach(Iterator.scala:943) 在 scala.collection.Iterator.foreach$(Iterator.scala:943) 在 org.apache.spark.util.CompletionIterator.foreach(CompletionIterator.scala:25) 在 org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012) 在 org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012) 在 org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 在 org.apache.spark.scheduler.Task.run(Task.scala:131) 在 org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)
【问题讨论】:
【参考方案1】:您尝试做的事情不是很值得推荐。如果您想使用 Dataframes,请使用 Spark Structured Streaming。您正在尝试在 foreach RDD 操作中创建 DF。如果您使用旧的 Spark Streaming 版本,请使用 RDD。
您可以在“foreachRDD”中创建一个 DF,为每个 mini-batch 创建一个 RDD,但这不是一个好主意。如果您进行测试,您将看到许多用于创建新 DF 的 spark 阶段,并且对于每个 mini-batch...使用结构化流,您可以直接创建数据帧。
【讨论】:
以上是关于如何从 SocketTCP 获取数据以保存到 Spark Scala 中的数据帧?的主要内容,如果未能解决你的问题,请参考以下文章
从 php url 获取数据以在 react native 中保存到本地 sqlite
如何从 MySQL 中获取 sum(Net_weight) 以查询结果并使用 ID 保存到 MSSQL