请教一个关于使用spark 读取kafka只能读取一个分区数据的问题
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了请教一个关于使用spark 读取kafka只能读取一个分区数据的问题相关的知识,希望对你有一定的参考价值。
参考技术A 我先写了一个kafka的生产者程序,然后写了一个kafka的消费者程序,一切正常。生产者程序生成5条数据,消费者能够读取到5条数据。然后我将kafka的消费者程序替换成使用spark的读取kafka的程序,重复多次发现每次都是读取1号分区的数据,而其余的0号和2号2个分区的数据都没有读到。请哪位大侠出手帮助一下。
我使用了三台虚拟机slave122,slave123,slave124作为kafka集群和zk集群;然后生产者和消费者程序以及spark消费者程序都是在myeclipse上完成。
软件版本为:kafka_2.11-0.10.1.0,spark-streaming-kafka-0-10_2.11-2.1.0,zookeeper-3.4.9
spark消费者程序主要代码如下:
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "slave124:9092,slave122:9092,slave123:9092");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("group.id", "ssgroup");
kafkaParams.put("auto.offset.reset", "earliest"); //update mykafka,"earliest" from the beginning,"latest" from the rear of topic
kafkaParams.put("enable.auto.commit", "true"); //messages successfully polled by the consumer may not yet have resulted in a Spark output operation, resulting in undefined semantics
kafkaParams.put("auto.commit.interval.ms", "5000");
// Create a local StreamingContext with two working thread and batch interval of 2 second
SparkConf conf = new SparkConf();
//conf被set后,返回新的SparkConf实例,所以多个set必须连续,不能拆开。
conf.setMaster("local[1]").setAppName("streaming word count").setJars(new String[]"D:\\Workspaces\\MyEclipse 2015\\MyFirstHadoop\\bin\\MyFirstHadoop.jar");;
try
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
Collection<String> topics = new HashSet<>(Arrays.asList("order"));
JavaInputDStream<ConsumerRecord<String, String>> oJInputStream = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
JavaPairDStream<String, String> pairs = oJInputStream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>()
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record)
try
BufferedWriter oBWriter = new BufferedWriter(new FileWriter("D:\\Workspaces\\MyEclipse 2015\\MyFirstHadoop\\bin\\mysparkstream\\MyFirstHadoop.out",true));
String strLog = "^^^^^^^^^^^ " + System.currentTimeMillis() / 1000 + " mapToPair:topic:" + record.topic() + ",key:" + record.key() + ",value:" + record.value() + ",partition id:" + record.partition() + ",offset:" + record.offset() + ".\n";
System.out.println(strLog);
oBWriter.write(strLog);
oBWriter.close();
catch (IOException e)
// TODO Auto-generated catch block
e.printStackTrace();
return new Tuple2<>(record.key(), record.value());
);
pairs.print();
jssc.start(); //start here in fact
jssc.awaitTermination();
jssc.close();
catch(Exception e)
// TODO Auto-generated catch block
System.out.println("Exception:throw one exception");
e.printStackTrace();
Spark Streaming - 从Kafka读取json并将json写入其他Kafka主题
我正在尝试准备Spark流媒体应用程序(Spark 2.1,Kafka 0.10)
我需要从Kafka主题“输入”中读取数据,找到正确的数据并将结果写入主题“输出”
我可以从Kafka基于KafkaUtils.createDirectStream方法读取数据。
我将RDD转换为json并准备过滤器:
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val elementDstream = messages.map(v => v.value).foreachRDD { rdd =>
val PeopleDf=spark.read.schema(schema1).json(rdd)
import spark.implicits._
PeopleDf.show()
val PeopleDfFilter = PeopleDf.filter(($"value1".rlike("1"))||($"value2" === 2))
PeopleDfFilter.show()
}
我可以从Kafka加载数据并“按原样”写入Kafka使用KafkaProducer:
messages.foreachRDD( rdd => {
rdd.foreachPartition( partition => {
val kafkaTopic = "output"
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
partition.foreach{ record: ConsumerRecord[String, String] => {
System.out.print("########################" + record.value())
val messageResult = new ProducerRecord[String, String](kafkaTopic, record.value())
producer.send(messageResult)
}}
producer.close()
})
})
但我无法整合这两个动作>在json中找到适当的值并将结果写入Kafka:用JSON格式编写PeopleDfFilter来“输出”Kafka主题。
我在Kafka中有很多输入消息,这就是我想使用foreachPartition创建Kafka生成器的原因。
非常感谢您的任何建议。
答案
这个过程非常简单,为什么不一直使用结构化流媒体呢?
import org.apache.spark.sql.functions.from_json
spark
// Read the data
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", inservers)
.option("subscribe", intopic)
.load()
// Transform / filter
.select(from_json($"value".cast("string"), schema).alias("value"))
.filter(...) // Add the condition
.select(to_json($"value").alias("value")
// Write back
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", outservers)
.option("subscribe", outtopic)
.start()
另一答案
尝试使用Structured Streaming。即使您使用Spark 2.1,您也可以实现自己的Kafka ForeachWriter,如下所示:
import java.util.Properties
import kafkashaded.org.apache.kafka.clients.producer._
import org.apache.spark.sql.ForeachWriter
class KafkaSink(topic:String, servers:String) extends ForeachWriter[(String, String)] {
val kafkaProperties = new Properties()
kafkaProperties.put("bootstrap.servers", servers)
kafkaProperties.put("key.serializer",
classOf[org.apache.kafka.common.serialization.StringSerializer].toString)
kafkaProperties.put("value.serializer",
classOf[org.apache.kafka.common.serialization.StringSerializer].toString)
val results = new scala.collection.mutable.HashMap[String, String]
var producer: KafkaProducer[String, String] = _
def open(partitionId: Long,version: Long): Boolean = {
producer = new KafkaProducer(kafkaProperties)
true
}
def process(value: (String, String)): Unit = {
producer.send(new ProducerRecord(topic, value._1 + ":" + value._2))
}
def close(errorOrNull: Throwable): Unit = {
producer.close()
}
}
用法:
val topic = "<topic2>"
val brokers = "<server:ip>"
val writer = new KafkaSink(topic, brokers)
val query =
streamingSelectDF
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(ProcessingTime("25 seconds"))
.start()
以上是关于请教一个关于使用spark 读取kafka只能读取一个分区数据的问题的主要内容,如果未能解决你的问题,请参考以下文章
从 Kafka 主题读取数据处理数据并使用 scala 和 spark 写回 Kafka 主题
从Apache Kafka到Apache Spark安全地读取数据