如何从 SocketTCP 获取数据以保存到 Spark Scala 中的数据帧?

Posted

技术标签:

【中文标题】如何从 SocketTCP 获取数据以保存到 Spark Scala 中的数据帧?【英文标题】:How to get data from SocketTCP to save to dataframe in Spark Scala? 【发布时间】:2021-09-04 18:17:10 【问题描述】:

我尝试从 Socket TCP 获取数据以附加到我收到的数据帧 数据并将它们执行到 Seq() 但是当我使用 forEach 附加 他们到数据框有问题这是我的代码:

object CustomReceiver 
  def main(args: Array[String]): Unit = 
    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val spark: SparkSession = SparkSession.builder()
      .master("local[2]")
      .appName("CustomReceiver")
      .getOrCreate()

    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(5))
    import spark.implicits._

   /*formatdata line data from Socket:  number1, 20210621090303, RadiusMessage, Stop, 84602496347, v241.66.85.130 */

    val linesData1 = ssc.receiverStream(new CustomReceiver("localhost", 11000))
    linesData1.flatMap(_.split(" ").map(_.trim))
    linesData1.foreachRDD  rdd =>
      rdd.foreach line => 

        val  arrraLine = line.split(",").toList
       // oke arrayLine data : List(number1, 20210621090303, RadiusMessage, Stop, 84602496347, 241.66.85.130)
        val testRDD = Seq(arrraLine).map(x =>(x(0), x(1), x(2), x(3), x(4)))
        // oke TestRDD : testRDD :List((number1,20210621090303,RadiusMessage,Stop,84602496347))

          val testDF = testRDD.toDF("cot1","cot2","cot3","cot4","cot5")
         // has an Problem
          testDF.show()
      
      
    
       ssc.start()
        ssc.awaitTermination()
      
    

这是我运行时的问题

java.lang.NullPointerException 在 org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231) 在 Cl.CustomReceiver$.$anonfun$main$4(CustomeReceiver.scala:52) 在 Cl.CustomReceiver$.$anonfun$main$4$adapted(CustomeReceiver.scala:45) 在 scala.collection.Iterator.foreach(Ite​​rator.scala:943) 在 scala.collection.Iterator.foreach$(Iterator.scala:943) 在 org.apache.spark.util.CompletionIterator.foreach(CompletionIterator.scala:25) 在 org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012) 在 org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012) 在 org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 在 org.apache.spark.scheduler.Task.run(Task.scala:131) 在 org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)

【问题讨论】:

【参考方案1】:

您尝试做的事情不是很值得推荐。如果您想使用 Dataframes,请使用 Spark Structured Streaming。您正在尝试在 foreach RDD 操作中创建 DF。如果您使用旧的 Spark Streaming 版本,请使用 RDD。

您可以在“foreachRDD”中创建一个 DF,为每个 mini-batch 创建一个 RDD,但这不是一个好主意。如果您进行测试,您将看到许多用于创建新 DF 的 spark 阶段,并且对于每个 mini-batch...使用结构化流,您可以直接创建数据帧。

【讨论】:

以上是关于如何从 SocketTCP 获取数据以保存到 Spark Scala 中的数据帧?的主要内容,如果未能解决你的问题,请参考以下文章

从 php url 获取数据以在 react native 中保存到本地 sqlite

如何在 jmeter 中关闭 JDBC 连接

如何从 MySQL 中获取 sum(Net_weight) 以查询结果并使用 ID 保存到 MSSQL

从数据库获取数据值以保存到会话存储?

如何从 url 获取 json 数据并将其保存到 const 变量 [TypeScript]

如何从GPS获取数据并发送到服务器以及如何保存到数据库