Java连接开启Kerberos认证的Kafka

概要描述

本文描述如何通过java连接开启Kerberos认证的Kafka

详细说明

maven项目添加依赖

在kafka的pod中获取kafka-clients-xxxx.jar与federation-utils-xxxx.jar

kubectl cp <POD_NAME>:/usr/lib/kafka/libs/kafka-client-xxxxxx.jar ./kafka-client-xxxxxx.jar

kubectl cp <POD_NAME>:/usr/lib/kafka/libs/federation-utils-xxxx.jar ./federation-utils-xxxx.jar

将上述两个jar包获取到本机,并放在项目根目录的lib文件夹中,并在pom.xml中添加依赖

 <dependency>
    <groupId>io.transwarp</groupId>//随便写
    <artifactId>kafka-client</artifactId> //随便写
    <version>1.0</version>// 随便写
    <scope>system</scope>
    <systemPath>${project.basedir}/lib/kafka-clients-0.10.2.0-transwarp-6.2.2.jar</systemPath>
</dependency>
<dependency>
    <groupId>io.transwarp</groupId>//随便写
    <artifactId>guardian</artifactId> //随便写
    <version>1.0</version>// 随便写
    <scope>system</scope>
    <systemPath>${project.basedir}/lib/federation-utils-guardian-3.1.3.jar</systemPath>
</dependency>

创建生产者示例

package io.transwarp.demo.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;


public class TestKafkaProducer {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.setProperty("java.security.krb5.conf", "C:/Users/Transwarp/Desktop/kafka-test/krb5.conf");
        //krb5.conf从集群服务器中获取到本机,路径填本机上该文件的绝对路径
        Properties properties = new Properties();
        String topic = "demoxltest";//此处为topic名称
        properties.put("bootstrap.servers", "ip:port");//填写kafka的ip与端口
        properties.put("security.protocol", "SASL_PLAINTEXT");
        properties.put("sasl.kerberos.service.name","kafka");
        properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true debug=true storeKey=true keyTab="C:/Users/Transwarp/Desktop/xxll.keytab" principal="xxll@TDH";");
        // keytab去guardian页面下载,用户需要有kafka的admin权限,principal使用"klist -ket xxx.keytab"查看,路径同样为本机上keytab文件的绝对路径。
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10; i++){
            String s = UUID.randomUUID().toString() +" " + i + " Test Date: " + new Date();
            Future<RecordMetadata>  future = producer.send(new ProducerRecord<>(topic,s ));
            future.get();
            System.out.println("Success producer :" + s);
            Thread.sleep(500);
        }
    }
}

执行成功结果如下图所示

创建消费者示例

package io.transwarp.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;

//import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;


public class TestKafkaConsumer {


    public static void main(String[] args) {

        System.setProperty("java.security.krb5.conf", "C:/Users/Transwarp/Desktop/kafka-test/krb5.conf");
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "ip:port");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("security.protocol", "SASL_PLAINTEXT");
        properties.put("sasl.kerberos.service.name","kafka");
        properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true debug=true storeKey=true keyTab="C:/Users/Transwarp/Desktop/xxll.keytab" principal="xxll@TDH";");
        properties.put("group.id", "test-consumer-group");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        Map<String, List<PartitionInfo>> topics = consumer.listTopics();
        consumer.subscribe(Collections.singleton("demoxltest"));
        boolean flag = true;

        while(flag) {
            ConsumerRecords<String, String> records = consumer.poll(3000);
            System.out.println("Get record size : " + records.count());
            for (ConsumerRecord<String, String> record : records){
                // 循环打印消息记录
                //处理消息
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                consumer.commitAsync();
            }
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        consumer.close();
    }
}

执行成功结果如下图所示

阅读剩余
THE END