SparkStreaming 删选含有error的行
Posted 靖-Drei
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming 删选含有error的行相关的知识,希望对你有一定的参考价值。
筛选流数据中所有含error的行
package com.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
object PrintError
def main(args: Array[String])
val conf = new SparkConf()
// conf.setMaster("local")
conf.setAppName("print Error")
//从SparkConf创建StreamingContext并指定1秒钟的批处理大小
val ssc = new StreamingContext(conf, Seconds(10))
//链接到本机机器7777端口上后,使用收到的数据创建DStream
// val lines = ssc.socketTextStream("172.171.51.131", 7777)
val lines = ssc.socketTextStream("172.171.51.131", 7777)
//从DStream中筛选出包含字符串“error”的行
val errorLines = lines.filter(_.contains("error"))
//打印所有含“error”的行
errorLines.print()
//启动流计算环境StreamingContext并等待它“完成”
ssc.start()
//等待作业完成
ssc.awaitTermination()
输入:
输出:
以上是关于SparkStreaming 删选含有error的行的主要内容,如果未能解决你的问题,请参考以下文章