kafka写入hdfs
Posted tangsonghuai
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka写入hdfs相关的知识,希望对你有一定的参考价值。
碰到的问题
(1)线程操作问题,因为单机节点,代码加锁就好了,后续再写
(2) 消费者写hdfs的时候以流的形式写入,但是什么时候关闭流就是一个大问题了,这里引入了 fsDataOutputStream.hsync();
生产者
1 package com.xuliugen.kafka.demo; 2 3 import org.apache.kafka.clients.producer.KafkaProducer; 4 import org.apache.kafka.clients.producer.ProducerRecord; 5 6 import java.util.Properties; 7 8 public class ProducerDemo { 9 10 // Topic 11 private static final String topic = "tangsonghuai"; 12 13 public static void main(String[] args) throws Exception { 14 15 Properties props = new Properties(); 16 props.put("bootstrap.servers", "192.168.15.140:9092"); 17 props.put("acks", "0"); 18 props.put("group.id", "1111"); 19 props.put("retries", "0"); 20 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 21 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 22 23 //生产者实例 24 KafkaProducer producer = new KafkaProducer(props); 25 26 int i = 1; 27 28 // 发送业务消息 29 // 读取文件 读取内存数据库 读socket端口 30 while (i<50) { 31 Thread.sleep(100); 32 producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i)); 33 System.out.println("key:" + i + " " + "value:" + i); 34 i++; 35 } 36 } 37 }
消费者
1 package com.xuliugen.kafka.demo; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FSDataOutputStream; 5 import org.apache.hadoop.fs.FileSystem; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IOUtils; 8 import org.apache.kafka.clients.consumer.ConsumerRecord; 9 import org.apache.kafka.clients.consumer.ConsumerRecords; 10 import org.apache.kafka.clients.consumer.KafkaConsumer; 11 import org.slf4j.Logger; 12 import org.slf4j.LoggerFactory; 13 14 15 import java.io.ByteArrayInputStream; 16 import java.io.IOException; 17 import java.net.URI; 18 import java.util.*; 19 20 public class ConsumerDemo { 21 private static final Logger logger = LoggerFactory.getLogger(ConsumerDemo.class); 22 private static final String topic = "tangsonghuai"; 23 24 public static void main(String[] args) throws IOException { 25 26 Properties props = new Properties(); 27 props.put("bootstrap.servers", "192.168.15.140:9092"); 28 props.put("group.id", "1111"); 29 props.put("enable.auto.commit", "true"); 30 props.put("auto.commit.interval.ms", "1000"); 31 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 32 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 33 34 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 35 36 consumer.subscribe(Arrays.asList(topic)); 37 int i = 0; 38 String uri = "hdfs://192.168.15.140:9000/"; 39 Configuration configuration = new Configuration(); 40 configuration.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); 41 42 FileSystem fs = FileSystem.get(URI.create(uri), configuration); 43 final String pathString = "/d1/tangsonghuai"; 44 final FSDataOutputStream fsDataOutputStream = fs.append(new Path(pathString)); 45 while (true) { 46 ConsumerRecords<String, String> records = consumer.poll(1000); 47 for (ConsumerRecord<String, String> record : records) { 48 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 49 // HashMap<String, String> hmap= new HashMap<String, String>(); 50 // hmap.put(record.key(),record.value()); 51 52 53 fsDataOutputStream.write((record.offset()+","+record.key() + "," + record.value()+" ").getBytes()); 54 fsDataOutputStream.hsync(); 55 i++; 56 if (i == 70) { 57 fsDataOutputStream.close(); 58 consumer.close(); 59 } 60 61 // IOUtils.copyBytes(new ByteArrayInputStream(record.value().getBytes()), 62 // fsDataOutputStream,configuration, true); 63 } 64 } 65 66 67 } 68 }
pom.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>com.xuliugen.kafka</groupId> 8 <artifactId>kafka.demo</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 12 <dependencies> 13 <dependency> 14 <groupId>org.apache.kafka</groupId> 15 <artifactId>kafka-clients</artifactId> 16 <version>2.0.0</version> 17 </dependency> 18 19 <dependency> 20 <groupId>org.slf4j</groupId> 21 <artifactId>slf4j-log4j12</artifactId> 22 <version>1.7.12</version> 23 </dependency> 24 <dependency> 25 <groupId>org.slf4j</groupId> 26 <artifactId>slf4j-api</artifactId> 27 <version>1.7.12</version> 28 </dependency> 29 30 <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> 31 <dependency> 32 <groupId>org.apache.hadoop</groupId> 33 <artifactId>hadoop-common</artifactId> 34 <version>2.8.5</version> 35 </dependency> 36 37 <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> 38 <dependency> 39 <groupId>org.apache.hadoop</groupId> 40 <artifactId>hadoop-hdfs</artifactId> 41 <version>2.8.5</version> 42 </dependency> 43 44 45 </dependencies> 46 47 </project>
以上是关于kafka写入hdfs的主要内容,如果未能解决你的问题,请参考以下文章
如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS
Kafka HDFS Sink Connector Protobuf 未写入