关于kafka数据实时落地至hdfs
Posted ZH519080
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于kafka数据实时落地至hdfs相关的知识,希望对你有一定的参考价值。
关于kafka数据实时落地至hdfs
好久没有写博客了!
关于如何使用spark streaming +kafka工具实现实时数据落地至hdfs目录
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import com.alibaba.fastjson.JSON, JSONArray
import com.ipinyou.cdp.common.SparkBase
import com.ipinyou.cdp.util.KafkaUtil
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.sql.SaveMode, SparkSession
import org.apache.spark.streaming.Seconds, StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.CanCommitOffsets, HasOffsetRanges, OffsetRange
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.slf4j.LoggerFactory
/** 实时信息落地到hdfs中,测试人员测试 */
object RTChangeNotice2Hdfs extends SparkBase
val logger = LoggerFactory.getLogger(this.getClass)
/** 拉取kafka的topic和group */
private val conf = ConfigFactory.load("rm_cdm.conf")
/** 这个brokers:表示从此brokers中获取数据 */
val brokers = conf.getString("kafka_brokers_iframe")
// /**从kakfa中获取数据并实时落地到hdfs中*/
// def rt_change_notice(spark: SparkSession, ssc: StreamingContext) =
// logger.info("brokers: " + brokers + " group_id: " + conf.getString("idmapping_notice_iframe.group") + " topic: " + conf.getString("idmapping_notice_iframe.topic"))
// val recordDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtil.createStream(ssc,
// brokers, conf.getString("idmapping_notice_iframe.group"),
// conf.getString("idmapping_notice_iframe.topic").split(","))
// import spark.implicits._
// recordDStream.foreachRDD(rdd =>
// val offRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// rdd.map(_.value()).map(line =>
// /** 得到消息所带的时间戳 */
// val message_time = JSON.parseObject(line).getString("message_timestamp")
// /** 得到消息所带的消息唯一id */
// val msgid = JSON.parseObject(line).getString("message_id")
// val data: JSONArray = JSON.parseObject(line).getJSONArray("data")
// val rest_json = for (i <- 0 until data.size()) yield
// val opType = data.getJSONObject(i).getString("op_type")
// val cdmid = data.getJSONObject(i).getJSONObject("data").getString("cdmid")
// val result_json: String = get_result(message_time, msgid, opType, cdmid)
// result_json
//
//
// rest_json
//
// ).filter(ele =>ele.size > 0).toDF("result").coalesce(1).write.format("parquet")
// .mode(SaveMode.Append).insertInto("cdp_temp.rt_change_notice")
//
// recordDStream.asInstanceOf[CanCommitOffsets].commitAsync(offRanges)
// )
//
// ssc.start()
// ssc.awaitTermination()
//
/**从kakfa中获取数据并实时落地到hdfs中*/
def rt_change_notice(spark: SparkSession, ssc: StreamingContext) =
logger.info("brokers: " + brokers + " group_id: " + conf.getString("idmapping_notice_iframe.group") + " topic: " + conf.getString("idmapping_notice_iframe.topic"))
val recordDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtil.createStream(ssc,
brokers, conf.getString("idmapping_notice_iframe.group"),
conf.getString("idmapping_notice_iframe.topic").split(","))
import spark.implicits._
recordDStream.foreachRDD(rdd =>
val offRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.map(_.value()).map(line =>
/** 得到消息所带的时间戳 */
val message_time = JSON.parseObject(line).getString("message_timestamp")
/** 得到消息所带的消息唯一id */
val msgid = JSON.parseObject(line).getString("message_id")
val data: JSONArray = JSON.parseObject(line).getJSONArray("data")
val rest_json = for (i <- 0 until data.size()) yield
val opType = data.getJSONObject(i).getString("op_type")
val cdmid = data.getJSONObject(i).getJSONObject("data").getString("cdmid")
val result_json: String = get_result(message_time, msgid, opType, cdmid)
result_json
rest_json
).toDF("result").coalesce(1).write.format("parquet")
.mode(SaveMode.Append).save("/tmp/logs/rt_change_notice")
recordDStream.asInstanceOf[CanCommitOffsets].commitAsync(offRanges)
)
ssc.start()
ssc.awaitTermination()
/** 把数据组装成json */
def get_result(messge_time: String, message_id: String, opType: String, cdmid: String) =
// val timestamp = System.currentTimeMillis()
val now = ZonedDateTime.now
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
val timestamp = formatter.format(now)
val profileType = "CUSTOMER"
implicit val formats = Serialization.formats(NoTypeHints)
val data_map = Map(
/** 消息id */
"msgid" -> message_id,
/** 从上游的kakfa拿到的消息时间戳 */
"msgtimestamp" -> messge_time,
/** 往下游kafka推数据的时间戳 */
"timestamp" -> timestamp,
/** CUSTOMER */
"profileType" -> profileType,
/**cdmid的值*/
"cdmid" -> cdmid,
/** idmapping数据的操作类型 */
"op_type" -> opType
)
val result: String = Serialization.write(data_map)
result
def main(args: Array[String]): Unit =
val spark = SparkSession.builder().appName("kafka_rt_2hdfs")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.parquet.writeLegacyFormat", "true")
.config("spark.sql.sources.partitionColumnTypeInference.enabled", "true")
.config("mergeSchema", "true")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.streaming.backpressure.enabled", "true")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.getOrCreate()
val ssc: StreamingContext = new StreamingContext(spark.sparkContext, Seconds(10))
rt_change_notice(spark, ssc)
spark.stop()
其中,代码中注释的方法rt_change_notice,空文件不落地,没有注释的rt_change_notice方法,在落地的时候会产生空文件(我的这个程序是为了测试人员使用,是保证所有的目录,包括空文件,所以我在此处执行)
是通过spark streaming、kafka与DataFrame相结合,借助SaveMode.Append的方式把数据实时存入hdfs中
以上是关于关于kafka数据实时落地至hdfs的主要内容,如果未能解决你的问题,请参考以下文章