Java连接开启Kerberos认证的Kafka
概要描述
本文描述如何通过java连接开启Kerberos认证的Kafka
详细说明
maven项目添加依赖
在kafka的pod中获取kafka-clients-xxxx.jar与federation-utils-xxxx.jar
kubectl cp <POD_NAME>:/usr/lib/kafka/libs/kafka-client-xxxxxx.jar ./kafka-client-xxxxxx.jar
kubectl cp <POD_NAME>:/usr/lib/kafka/libs/federation-utils-xxxx.jar ./federation-utils-xxxx.jar
将上述两个jar包获取到本机,并放在项目根目录的lib文件夹中,并在pom.xml中添加依赖
<dependency>
<groupId>io.transwarp</groupId>//随便写
<artifactId>kafka-client</artifactId> //随便写
<version>1.0</version>// 随便写
<scope>system</scope>
<systemPath>${project.basedir}/lib/kafka-clients-0.10.2.0-transwarp-6.2.2.jar</systemPath>
</dependency>
<dependency>
<groupId>io.transwarp</groupId>//随便写
<artifactId>guardian</artifactId> //随便写
<version>1.0</version>// 随便写
<scope>system</scope>
<systemPath>${project.basedir}/lib/federation-utils-guardian-3.1.3.jar</systemPath>
</dependency>
创建生产者示例
package io.transwarp.demo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class TestKafkaProducer {
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.setProperty("java.security.krb5.conf", "C:/Users/Transwarp/Desktop/kafka-test/krb5.conf");
//krb5.conf从集群服务器中获取到本机,路径填本机上该文件的绝对路径
Properties properties = new Properties();
String topic = "demoxltest";//此处为topic名称
properties.put("bootstrap.servers", "ip:port");//填写kafka的ip与端口
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.kerberos.service.name","kafka");
properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true debug=true storeKey=true keyTab="C:/Users/Transwarp/Desktop/xxll.keytab" principal="xxll@TDH";");
// keytab去guardian页面下载,用户需要有kafka的admin权限,principal使用"klist -ket xxx.keytab"查看,路径同样为本机上keytab文件的绝对路径。
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++){
String s = UUID.randomUUID().toString() +" " + i + " Test Date: " + new Date();
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic,s ));
future.get();
System.out.println("Success producer :" + s);
Thread.sleep(500);
}
}
}
执行成功结果如下图所示
创建消费者示例
package io.transwarp.demo.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
//import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class TestKafkaConsumer {
public static void main(String[] args) {
System.setProperty("java.security.krb5.conf", "C:/Users/Transwarp/Desktop/kafka-test/krb5.conf");
Properties properties = new Properties();
properties.put("bootstrap.servers", "ip:port");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.kerberos.service.name","kafka");
properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true debug=true storeKey=true keyTab="C:/Users/Transwarp/Desktop/xxll.keytab" principal="xxll@TDH";");
properties.put("group.id", "test-consumer-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
consumer.subscribe(Collections.singleton("demoxltest"));
boolean flag = true;
while(flag) {
ConsumerRecords<String, String> records = consumer.poll(3000);
System.out.println("Get record size : " + records.count());
for (ConsumerRecord<String, String> record : records){
// 循环打印消息记录
//处理消息
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
consumer.commitAsync();
}
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
consumer.close();
}
}
执行成功结果如下图所示

阅读剩余
版权声明:
作者:admin
链接:https://9923456.xyz/320.html
文章版权归作者所有,未经允许请勿转载。
THE END