java 访问 kerberos 认证的 kafka

Posted 之奇一昂

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 访问 kerberos 认证的 kafka相关的知识,希望对你有一定的参考价值。

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6 
 7     <groupId>com.ht</groupId>
 8     <artifactId>kafkatest</artifactId>
 9     <version>1.0-SNAPSHOT</version>
10     <build>
11         <plugins>
12             <plugin>
13                 <groupId>org.apache.maven.plugins</groupId>
14                 <artifactId>maven-compiler-plugin</artifactId>
15                 <configuration>
16                     <source>1.7</source>
17                     <target>1.7</target>
18                 </configuration>
19             </plugin>
20         </plugins>
21     </build>
22 
23 
24     <dependencies>
25         <dependency>
26             <groupId>org.apache.kafka</groupId>
27             <artifactId>kafka-clients</artifactId>
28             <version>0.10.0.0</version>
29         </dependency>
30     </dependencies>
31 </project>

java 代码

 1 import org.apache.kafka.clients.CommonClientConfigs;
 2 import org.apache.kafka.clients.consumer.ConsumerRecord;
 3 import org.apache.kafka.clients.consumer.ConsumerRecords;
 4 import org.apache.kafka.clients.consumer.KafkaConsumer;
 5 
 6 import java.util.Collections;
 7 import java.util.Properties;
 8 
 9 import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
10 
11 /**
12  * @author sunzq
13  * @since 2017/8/29
14  */
15 public class Application {
16     public static void main(String[] args) {
17 
18         Properties props = new Properties();
19         props.put(BOOTSTRAP_SERVERS_CONFIG, "node1:6667,node2:6667,node3:6667,node4:6667");
20         props.put(ENABLE_AUTO_COMMIT_CONFIG, "true");
21         props.put(GROUP_ID_CONFIG, "test08291103");
22 //      props.put(ConsumerConfig.CLIENT_ID_CONFIG, "test0829");
23         props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
24         props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
25         props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
26         props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
27         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
28 
29         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
30         // topic name: test9
31         consumer.subscribe(Collections.singleton("test9"));
32         while (true) {
33             ConsumerRecords<String, String> records = consumer.poll(100);
34             for (ConsumerRecord<String, String> record : records)
35                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
36         }
37     }
38 }

启动参数

 -Djava.security.krb5.conf=c:\\app\\conf\\krb5.conf -Djava.security.auth.login.config=c:\\app\\conf\\kafka_jaas.conf  

 

windows 下记得用 \\ 

以上是关于java 访问 kerberos 认证的 kafka的主要内容,如果未能解决你的问题,请参考以下文章

Flink访问带kerberos认证的kafka: Can‘t get Master Kerberos principal for use as renewer

kafka kerberos 认证访问与非认证访问共存下的ACL问题

FlinkFlink跨集群访问开启Kerberos认证的Kafka

Flink记录一次Flink消费kafka写入Elastic在kerberos认证的情况下不消费问题

Kerberos认证原理

使用Spark访问有kerberos认证的hbase