如何使用Java连接Kerberos的Kafka
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
1.文档编写目的
Kafka从0.8版本以后出了新的API接口,用于异步方式发送消息,性能优于旧的API,本篇文章主要使用新的API接口进行测试。继上一篇文章如何通过Cloudera Manager为Kafka启用Kerberos及使用,本篇文章主要讲述如何使用Java连接Kerberos的Kafka集群生产和消费消息。
- 内容概述
1.环境准备
2.创建Java工程
3.编写生产消息代码
4.编写消费消息代码
5.测试
- 测试环境
1.RedHat7.2
2.CM和CDH版本为5.11.2
3.Kafka2.2.0-0.10.2
- 前置条件
1.Intellij已安装且正常运行
2.Maven环境正常
2.环境准备
1.创建topic,test3有3个replication,3个partition
ec2-user@ip-172-31-22-86~$ kafka-topics --create --zookeeper ip-172-31-22-86.ap-southeast-1.compute.internal:2181 --replication-factor 3 --partitions 3 --topic test3
2.krb5.conf配置(直接使用CDH集群的Kerberos配置)
Configuration snippets may beplaced in this directory as well
includedir /etc/krb5.conf.d/
logging
default = FILE:/var/log/krb5libs.log
kdc =FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
libdefaults
dns_lookup_realm = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
rdns = false
default_realm = CLOUDERA.COM
#default_ccache_name = KEYRING:persistent:%{uid}
realms
CLOUDERA.COM = {
kdc =ip-172-31-22-86.ap-southeast-1.compute.internal
admin_server = ip-172-31-22-86.ap-southeast-1.compute.internal
}
domain_realm
.ip-172-31-22-86.ap-southeast-1.compute.internal= CLOUDERA.COM
ip-172-31-22-86.ap-southeast-1.compute.internal= CLOUDERA.COM
3.Kerberos的keytab文件
使用kadmin为Kerberos账号生成keytab,fayson.keytab文件生成在当前目录下。
ec2-user@ip-172-31-22-86~$ sudo kadmin.local
Authenticating as principal hdfs/admin@CLOUDERA.COM with password.
kadmin.local: xst -norandkey -k fayson.keytab fayson@CLOUDERA.COM
...
kadmin.local: exit
ec2-user@ip-172-31-22-86~$
4.jaas-cache.conf配置文件
KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/Volumes/Transcend/keytab/fayson.keytab"
principal="fayson@CLOUDERA.COM";
};
5.在当前开发环境下配置集群的主机信息到hosts文件
在/etc/hosts文件中添加
提示:Fayson使用的AWS环境,所以使用公网IP和hostname对应。如果你的开发环境可以直连Hadoop集群,可以直接配置Hadoop内网IP和hostname对应即可。
3.创建Java工程
1.使用Intellij创建Java Maven工程
2.在pom.xml配置文件中增加Kafka API的Maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
4.编写生产消息代码
package com.cloudera;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Created by fayson on 2017/10/24.
*/
public class MyProducer {
public static String TOPIC_NAME = "test3";
public static void main(String[] args){
System.setProperty("java.security.krb5.conf", "/Volumes/Transcend/keytab/krb5.conf");
System.setProperty("java.security.auth.login.config", "/Volumes/Transcend/keytab/jaas-cache.conf");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
// System.setProperty("sun.security.krb5.debug","true");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
Producer<String,String> producer = new KafkaProducer<String,String>(props);
for (int i = 0; i < 10; i++) {
String key = "key-"+ i;
String message = "Message-"+ i;
ProducerRecord record= new ProducerRecord<String, String>(TOPIC_NAME, key, message);
producer.send(record);
System.out.println(key + "----"+ message);
}
producer.close();
}
}
5.编写消费消息代码
package com.cloudera;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
/**
* Created by fayson on 2017/10/24.
*/
public class MyConsumer {
private static String TOPIC_NAME = "test3";
public static void main(String[] args){
System.setProperty("java.security.krb5.conf", "/Volumes/Transcend/keytab/krb5.conf");
System.setProperty("java.security.auth.login.config", "/Volumes/Transcend/keytab/jaas-cache.conf");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
TopicPartition partition0= new TopicPartition(TOPIC_NAME, 0);
TopicPartition partition1= new TopicPartition(TOPIC_NAME, 1);
TopicPartition partition2= new TopicPartition(TOPIC_NAME, 2);
consumer.assign(Arrays.asList(partition0,partition1, partition2));
ConsumerRecords<String,String> records = null;
while (true){
try {
Thread.sleep(10000l);
System.out.println();
records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Receivedmessage: (" + record.key() + "," + record.value() + ") at offset " + record.offset());
}
} **catch** (**InterruptedException** e){
e.printStackTrace();
}
}
}
}
6.代码测试
1.执行消费程序,消费topic为test3的所有partition消息
启动成功,等待消费test3的消息
2.执行生产消息程序,向test3的topic生产消息
向test3的topic发送的消息
3.查看消费程序读取到的消息
7.总结
在开发环境下通过Java代码直接连接到已启用Kerberos的Kafka集群时,则需要将krb5.conf和jaas.conf配置加载到程序运行环境中。至于使用Kerberos密码的方式Fayson也不会。
测试使用的topic有3个partiton,如果没有将所有的broker列表配置到bootstrap.servers中,会导致部分消息丢失。
参考文档:
http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
http://kafka.apache.org/documentation/#api
为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。
原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操
- javascript实例:逐条记录停顿的走马灯
- Python标准库05 存储对象 (pickle包,cPickle包)
- macOS平台下虚拟摄像头的研发总结
- 网页优化系列三:使用压缩后置viewstate
- 网页优化系列三:使用压缩后置viewstate
- macOS下利用dSYM文件将crash文件中的内存地址转换为可读符号
- 微信小程序的大动作
- Python标准库04 文件管理 (部分os包,shutil包)
- 手把手教你Dojo入门
- location的hash部分和使用window.onhashchange实现ajax请求内容时使用浏览器后退和前进功能
- 协议森林01 邮差与邮局 (网络协议概观)
- Mac OS平台下应用程序安装包制作工具Packages的使用介绍
- 协议森林02 小喇叭开始广播 (以太网与WiFi协议)
- 信号与频谱
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- 理解装饰器是怎么使用的
- 第十一节:Activiti6.0——定时器开始事件、消息开始事件和错误开始事件介绍
- linux centos 安装mailx邮件服务器并测试发送一封邮件
- 深入分析Vue-Router原理,彻底看穿前端路由
- linux LVM 一键分区脚本自动扩容
- 再谈构造函数、原型、原型链之间的关系
- Java ConcurrentHashMap 高并发安全实现原理解析
- 第十二节:Activiti6.0——四种边界事件:定时器、错误、信号、补偿
- parted 磁盘分区-挂载-删除-shell脚本进行磁盘分区
- Ubuntu18.04——安装MySQL
- 八种 Vue 组件间通讯方式合集
- Sharding-JDBC 实现分库分表
- fastjson——使用 aop 打印入参,报错:getOutputStream() has already been called for this response
- webpack从零搭建开发环境
- 博客——使用 Redis 实现博客编辑的自动保存草稿功能