KafkaStreaming.scala文件
import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.{KafkaManagerAdd, KafkaUtils} import org.json4s.DefaultFormats import org.json4s.jackson.Json import com.mongodb.casbah.{MongoClient, MongoClientURI, MongoCollection} import scala.collection.mutable.ArrayBuffer /** * Created by zty on 2017/12/20. */ object KafkaStreaming { var journalArticleClass = new JournalArticleDataManagerAdd(MongoClient(MongoClientURI("mongodb://IP:27017"))("数据库名称")("数据集合名称")) def main(args: Array[String]): Unit = { run() } def run(): Unit = { //kafka topic名称 val topicsJournalArticle = "JouArt" //kafka中间人 val brokers = "IP1:9092,IP2:9092,IP3:9092" //spark配置 val sparkconf = new SparkConf().setAppName("kafkastreaming").set("spark.streaming.kafka.maxRatePerPartition", "5") val ssc = new StreamingContext(sparkconf, Seconds(30)) val topicsJournalArticleSet = topicsJournalArticle.split(",").toSet val journalArticlekafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "group.id" -> "journalArticledataManager", "fetch.message.max.bytes" -> "20971520", "auto.offset.reset" -> "smallest") val journalArticleManager = new KafkaManagerAdd(journalArticlekafkaParams) val jsonsjournalArticleLines = journalArticleManager.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, journalArticlekafkaParams, topicsJournalArticleSet) // 更新offsets jsonsjournalArticleLines.foreachRDD(rdd => { journalArticleManager.updateZKOffsets(rdd) }) val jsonsjournalArticle = jsonsjournalArticleLines.map(_._2) val arrayjournalArticle = ArrayBuffer[String]() jsonsjournalArticle.foreachRDD(rdd => { val count = rdd.count().toInt rdd.take(count + 1).take(count).foreach(x => { arrayjournalArticle += x }) kafkaProducerSendJournalArticle(arrayjournalArticle) arrayjournalArticle.clear() }) ssc.start() ssc.awaitTermination() } def kafkaProducerSendJournalArticle(args: ArrayBuffer[String]) { if (args.nonEmpty && args.toString() != "[]" && args.toString() != "ArrayBuffer()") { args.foreach(line => { val json: Option[Any] = scala.util.parsing.json.JSON.parseFull(line.toString) val maps: List[Any] = json.get.asInstanceOf[List[Any]] maps.foreach(langMap => { val listJson: Map[String, Any] = langMap.asInstanceOf[Map[String, Any]] if (Json(DefaultFormats).write(listJson) != "" || Json(DefaultFormats).write(listJson) != null || !Json(DefaultFormats).write(listJson).isEmpty) { journalArticleClass.MongoDBJournalArticleAdd(Json(DefaultFormats).write(listJson).toString) journalArticleClass.Neo4jSolrJournalArticleAdd() } }) }) } } }
JournalArticleDataManagerAdd.scala文件
import java.text.SimpleDateFormat import java.util.Date import org.joda.time.DateTime import com.mongodb.DBObject import com.mongodb.casbah.{MongoClient, MongoClientURI, MongoCollection} import com.mongodb.util.JSON import org.neo4j.driver.v1.{AuthTokens, GraphDatabase, StatementResult} /** * Created by zty on 2017/02/01. */ class JournalArticleDataManagerAdd (collectionString : MongoCollection) { //链接mongodb def createDatabase(url: String, dbName: String, coll: String): MongoCollection = { MongoClient(MongoClientURI(url))(dbName)(coll) } //全局变量 由MongoDBJournalArticleAdd函数接受,与Neo4jSolrJournalArticleAdd共用 var jsonString = "" def MongoDBJournalArticleAdd(JsonString: String): Unit = { jsonString = JsonString try{ val bson: DBObject = JSON.parse(jsonString).asInstanceOf[DBObject] collectionString.insert(bson) }catch { case e: ArithmeticException => println(e) case ex: Throwable =>println(ex) } } //solr添加数据 def Neo4jSolrJournalArticleAdd(): Unit = { val driver = GraphDatabase.driver("bolt://IP/7687", AuthTokens.basic("neo4j", "******")) val session = driver.session try { // 字符串转json对象 var json = com.alibaba.fastjson.JSON.parseObject(jsonString) // ID var GUID = json.getJSONObject("_id").get("$oid") // 标题 var titleObject = json.getJSONObject("title") var titlevalue = titleObject.get("title") // 文献语种 var language = titleObject.get("language") var languagevalue=if(language!=null) language.toString else "" // 其他语种标题 var title_alternative = json.getJSONArray("title_alternative") var title_alternativevalue="" for( a <-0 to title_alternative.toArray.length-1){ var item=title_alternative.toArray.apply(a) title_alternativevalue += com.alibaba.fastjson.JSON.parseObject(item.toString).get("title_alternative") + "," } if(title_alternativevalue!="") title_alternativevalue=title_alternativevalue.substring(0,title_alternativevalue.length-1) else title_alternativevalue="" // 第一作者id var first_id = json.get("first_contributer_id") if(first_id==null) first_id="" else first_id=first_id.toString // 责任者 var contributer_meta = json.getJSONArray("contributer_meta") var contributer_metavalue = "" var contributer_idvalue = "" var contributer_orgvalue = "" var contributer_org_idvalue = "" var conid = "" var conorgid = "" for (a <- 0 to contributer_meta.toArray.length - 1) { var item = contributer_meta.toArray.apply(a) var itemJson = com.alibaba.fastjson.JSON.parseObject(item.toString) contributer_metavalue += itemJson.get("contributer_name") + "," var contributer_id = itemJson.getJSONArray("contributer_URI").toArray if(contributer_id.length != 0){ conid = com.alibaba.fastjson.JSON.parseObject(contributer_id.apply(0).toString).get("contributer_URI").toString if (conid.length!=0) contributer_idvalue += conid.substring(7,conid.length) + "‘,‘" else contributer_idvalue += "‘,‘"} var organization_list = itemJson.getJSONArray("organization_list") for (b <- 0 to organization_list.toArray.length - 1) { var list = organization_list.toArray.apply(b) contributer_orgvalue += com.alibaba.fastjson.JSON.parseObject(list.toString).get("name") + "," var contributer_org_id = com.alibaba.fastjson.JSON.parseObject(list.toString).getJSONArray("organization_URI").toArray if(contributer_org_id.length != 0){ conorgid = contributer_org_id.apply(0).toString if (conorgid.length!=0) contributer_org_idvalue += conorgid.substring(13,conorgid.length) + "‘,‘" else contributer_org_idvalue += "‘,‘"} } } if(contributer_metavalue!="") contributer_metavalue = contributer_metavalue.substring(0, contributer_metavalue.length - 1) else contributer_metavalue="" if(contributer_idvalue!="") contributer_idvalue = "[‘"+contributer_idvalue.substring(0, contributer_idvalue.length - 2)+"]" else contributer_idvalue="[]" if(contributer_orgvalue!="") contributer_orgvalue = contributer_orgvalue.substring(0, contributer_orgvalue.length - 1) else contributer_orgvalue="" if(contributer_org_idvalue!="") contributer_org_idvalue = "[‘"+contributer_org_idvalue.substring(0, contributer_org_idvalue.length - 2)+"]" else contributer_org_idvalue="[]" // 简介 var abstractvalue = json.getJSONObject("abstractvalue").get("abstractvalue") var abstractvaluevalue=if(abstractvalue==null) "" else abstractvalue.toString // 其他语种简介 var abstract_alternative = json.getJSONArray("abstract_alternative") var abstract_alternativevalue="" for( a <-0 to abstract_alternative.toArray.length-1){ var item=abstract_alternative.toArray.apply(a) abstract_alternativevalue += com.alibaba.fastjson.JSON.parseObject(item.toString).get("abstract_alternative") + "," } if(abstract_alternativevalue!="") abstract_alternativevalue=abstract_alternativevalue.substring(0,abstract_alternativevalue.length-1) else abstract_alternativevalue="" // 主题容器-主题词 val subject_list = json.getJSONObject("subject_meta").getJSONArray("subject_list") var CLCtitle="" var CLCcode="" var keywords="" for (a <- 0 to subject_list.toArray.length - 1) { var item = com.alibaba.fastjson.JSON.parseObject(subject_list.toArray.apply(a).toString) var source=item.get("source") var types=item.get("type") if(source=="CLC" || source=="clc"){ CLCtitle += item.get("subject_title") + "," CLCcode += item.get("subject_code") + "," } if(types=="Keyword" || types=="keyword") keywords+= item.get("subject_title") + "," } if(CLCtitle!="") CLCtitle=CLCtitle.substring(0,CLCtitle.length-1) else CLCtitle="" if(CLCcode!="") CLCcode=CLCcode.substring(0,CLCcode.length-1) else CLCcode="" if(keywords!="") keywords=keywords.substring(0,keywords.length-1) else keywords="" // 基金项目 var funding_list = json.getJSONObject("funding_list").getJSONArray("funding_meta") var funding_listvalue="" var funding_list_idvalue="" var funid = "" for( a <-0 to funding_list.toArray.length-1){ var item=com.alibaba.fastjson.JSON.parseObject(funding_list.toArray.apply(a).toString) funding_listvalue += item.get("title") + "," funding_list_idvalue += item.get("_id") +"‘,‘" } if(funding_listvalue!="") funding_listvalue=funding_listvalue.substring(0,funding_listvalue.length-1) else funding_listvalue="" if(funding_list_idvalue!="") funding_list_idvalue = "[‘"+funding_list_idvalue.substring(0, funding_list_idvalue.length - 2)+"]" else funding_list_idvalue="[]" // 收录类别 var holding_meta = json.getJSONArray("holding_meta").toArray var holding_metavalue="" for( a <-0 to holding_meta.length-1){ var item=holding_meta.apply(a) holding_metavalue += com.alibaba.fastjson.JSON.parseObject(item.toString).get("holding_code") + "," } if(holding_metavalue!="") holding_metavalue=holding_metavalue.substring(0,holding_metavalue.length-1) else holding_metavalue="" // 期刊id var journal_id=json.get("journal_URI") if(journal_id==null) journal_id="[]" else journal_id="[‘"+journal_id.toString.substring(8,journal_id.toString.length)+"‘]" // 期刊名称 var journal_title=json.get("journal_title") if(journal_title==null) journal_title="" else journal_title=journal_title.toString // 发表年份 var publication_year=json.get("publication_year") if(publication_year==null) publication_year="" else publication_year=publication_year.toString // volume var volume=json.get("volume") if(volume==null) volume="" else volume=volume.toString // issue var issue=json.get("issue") if(issue==null) issue="" else issue=issue.toString // 发表日期 var publication_datevalue=json.getJSONObject("publication_date") val dateTimep = new DateTime(publication_datevalue.get("$date").asInstanceOf[Number].longValue).toString("yyyy-MM-dd") var publication_date = "0001-01-01T00:00:00Z" if(dateTimep!=null) publication_date=dateTimep+ "T00:00:00Z" // solr Date 类型格式化 val now: Date = new Date() val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") val date = dateFormat.format(now) val timeFormat: SimpleDateFormat = new SimpleDateFormat("HH:mm:ssZ") val time = timeFormat.format(now) var createTime=date+"T"+time+"Z" // 拼数据字符串 solr存储数据 var text = "" var data = "{‘id‘:‘"+GUID.toString+ "‘,‘text‘:‘"+text+ "‘,‘title‘:‘"+titlevalue.toString+ "‘,‘title_alternative‘:‘"+title_alternativevalue+ "‘,‘first_contributer_id‘:‘"+first_id.toString+ "‘,‘contributer_id‘:"+contributer_idvalue+ ",‘contributer_name‘:‘"+contributer_metavalue+ "‘,‘contributer_org_id‘:"+contributer_org_idvalue+ ",‘contributer_org‘:‘"+contributer_orgvalue+ "‘,‘abstractvalue‘:‘"+abstractvaluevalue+ "‘,‘abstract_alternative‘:‘"+abstract_alternativevalue+ "‘,‘funding_list_id‘:"+funding_list_idvalue+ ",‘funding_list‘:‘"+funding_listvalue+ "‘,‘holding_code‘:‘"+holding_metavalue+ "‘,‘journal_id‘:"+journal_id.toString+ ",‘journal_title‘:‘"+journal_title.toString+ "‘,‘volume‘:‘"+volume+ "‘,‘issue‘:‘"+issue+ "‘,‘CLCcode‘:‘"+CLCcode+ "‘,‘CLCtitle‘:‘"+CLCtitle+ "‘,‘keywords‘:‘"+keywords+ "‘,‘language‘:‘"+languagevalue+ "‘,‘publication_year‘:‘"+publication_year+ "‘,‘publication_date‘:‘"+publication_date+ "‘,‘createTime‘:‘"+ createTime+ "‘,‘updateTime‘:‘"+createTime+"‘}" var zty = new SolrAdd() zty.postToSolr("JournalArticle", data) // neo4j存储数据 val script = s"CREATE (:journalArticle {guid:‘" + GUID + "‘,title:‘"+titlevalue.toString+ "‘,title_alternative:‘"+title_alternativevalue+ "‘,contributer_name:‘"+contributer_metavalue+ "‘,contributer_org:‘"+contributer_orgvalue+ "‘,abstractvalue:‘"+abstractvaluevalue+ "‘,abstract_alternative:‘"+abstract_alternativevalue+ "‘,funding_list:‘"+funding_listvalue+ "‘,holding_code:‘"+holding_metavalue+ "‘,journal_title:‘"+journal_title.toString+ "‘,volume:‘"+volume+ "‘,issue:‘"+issue+ "‘,CLCcode:‘"+CLCcode+ "‘,CLCtitle:‘"+CLCtitle+ "‘,keywords:‘"+keywords+ "‘,language:‘"+languagevalue+ "‘,publication_year:‘"+publication_year+ "‘,publication_date:‘"+publication_date+ "‘})" val result: StatementResult = session.run(script) session.close() driver.close() result.consume().counters().nodesCreated() }catch { case e: ArithmeticException => println(e) case ex: Throwable =>println(ex) } } }
KafkaManagerAdd.scala文件
package org.apache.spark.streaming.kafka import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.Decoder import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import scala.reflect.ClassTag /** * Created by zty on 15-8-5. */ class KafkaManagerAdd(val kafkaParams: Map[String, String]) extends Serializable { private val kc = new KafkaCluster(kafkaParams) /** * 创建数据流 * @param ssc * @param kafkaParams * @param topics * @tparam K * @tparam V * @tparam KD * @tparam VD * @return */ def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag]( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] = { val groupId = kafkaParams.get("group.id").get // 在zookeeper上读取offsets前先根据实际情况更新offsets setOrUpdateOffsets(topics, groupId) //从zookeeper上读取offset开始消费message val messages = { val partitionsE = kc.getPartitions(topics) if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}") val partitions = partitionsE.right.get val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions) if (consumerOffsetsE.isLeft) throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}") val consumerOffsets = consumerOffsetsE.right.get KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)) } messages } /** * 创建数据流前,根据实际消费情况更新消费offsets * @param topics * @param groupId */ private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = { topics.foreach(topic => { var hasConsumed = true val partitionsE = kc.getPartitions(Set(topic)) if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}") val partitions = partitionsE.right.get val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions) if (consumerOffsetsE.isLeft) hasConsumed = false if (hasConsumed) {// 消费过 /** * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException, * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。 * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小, * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时, * 这时把consumerOffsets更新为earliestLeaderOffsets */ val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions) if (earliestLeaderOffsetsE.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}") val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get val consumerOffsets = consumerOffsetsE.right.get // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets var offsets: Map[TopicAndPartition, Long] = Map() consumerOffsets.foreach({ case(tp, n) => val earliestLeaderOffset = earliestLeaderOffsets(tp).offset if (n < earliestLeaderOffset) { println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition + " offsets已经过时,更新为" + earliestLeaderOffset) offsets += (tp -> earliestLeaderOffset) } }) if (!offsets.isEmpty) { kc.setConsumerOffsets(groupId, offsets) } } else {// 没有消费过 val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null if (reset == Some("smallest")) { val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions) if (leaderOffsetsE.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}") leaderOffsets = leaderOffsetsE.right.get } else { val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions) if (leaderOffsetsE.isLeft) throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}") leaderOffsets = leaderOffsetsE.right.get } val offsets = leaderOffsets.map { case (tp, offset) => (tp, offset.offset) } kc.setConsumerOffsets(groupId, offsets) } }) } /** * 更新zookeeper上的消费offsets * @param rdd */ def updateZKOffsets(rdd: RDD[(String, String)]) : Unit = { val groupId = kafkaParams.get("group.id").get val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (offsets <- offsetsList) { val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition) val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset))) if (o.isLeft) { println(s"Error updating the offset to Kafka cluster: ${o.left.get}") } } } }
SolrAdd.scala文件
import scalaj.http.Http //post数据到solr //Author:zty class SolrAdd () { // dataType接收solr数据集名称字符串, jsonString接收数据json格式字符串 def postToSolr (dataType:String, jsonString: String): Unit = { var data = "{‘add‘:{ ‘doc‘:" + jsonString + ",‘boost‘:1.0,‘overwrite‘:true,‘commitWithin‘:1000}}" val result = Http("http://IP:8985/solr/"+dataType+"/update?wt=json") .postData(data) .header("Content-Type", "application/json").asString println(result) } }