SparkStreaming 删选含有error的行

Posted 靖-Drei

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming 删选含有error的行相关的知识,希望对你有一定的参考价值。

筛选流数据中所有含error的行

package com.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

object PrintError 
  def main(args: Array[String]) 
    val conf = new SparkConf()
 //   conf.setMaster("local")
    conf.setAppName("print Error")
    //从SparkConf创建StreamingContext并指定1秒钟的批处理大小
    val ssc = new StreamingContext(conf, Seconds(10))
    //链接到本机机器7777端口上后,使用收到的数据创建DStream
  //  val lines = ssc.socketTextStream("172.171.51.131", 7777)
    val lines = ssc.socketTextStream("172.171.51.131", 7777)
    //从DStream中筛选出包含字符串“error”的行
    val errorLines = lines.filter(_.contains("error"))
    //打印所有含“error”的行
    errorLines.print()
    //启动流计算环境StreamingContext并等待它“完成”
    ssc.start()
    //等待作业完成
    ssc.awaitTermination()

  

输入:

输出:


以上是关于SparkStreaming 删选含有error的行的主要内容,如果未能解决你的问题,请参考以下文章

python日志等级输出删选

删选器

删选两个数组相同元素或相同属性

关于下拉选择删选最基本一例(分享内容)

AFNetworking请求中含有中文时程序崩溃

PMD-Java 代码检查工具对 error 和 warning 的配置