Kafka核心API——Stream API
Kafka Stream概念及初识高层架构图
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。简而言之,Kafka Stream就是一个用来做流计算的类库,与Storm、Spark Streaming、Flink的作用类似,但要轻量得多。
Kafka Stream的基本概念:
- Kafka Stream是处理分析存储在Kafka数据的客户端程序库(lib)
- 由于Kafka Streams是Kafka的一个lib,所以实现的程序不依赖单独的环境
- Kafka Stream通过state store可以实现高效的状态操作
- 支持原语Processor和高层抽象DSL
Kafka Stream的高层架构图:
- Partition的数据会分发到不同的Task上,Task主要是用来做流式的并行处理
- 每个Task都会有自己的state store去记录状态
- 每个Thread里会有多个Task
Kafka Stream 核心概念
Kafka Stream关键词:
- 流和流处理器:流指的是数据流,流处理器指的是数据流到某个节点时对其进行处理的单元
- 流处理拓扑:一个拓扑图,该拓扑图展示了数据流的走向,以及流处理器的节点位置
- 源处理器及Sink处理器:源处理器指的是数据的源头,即第一个处理器,Sink处理器则反之,是最终产出结果的一个处理器
如下图所示:
Kafka Stream使用演示
下图是Kafka Stream完整的高层架构图:
从上图中可以看到,Consumer
对一组Partition
进行消费,这组Partition
可以在一个Topic中或多个Topic中。然后形成数据流,经过各个流处理器后最终通过Producer
输出到一组Partition
中,同样这组Partition
也可以在一个Topic中或多个Topic中。这个过程就是数据流的输入和输出。
因此,我们在使用Stream API前需要先创建两个Topic,一个作为输入,一个作为输出。到服务器上使用命令行创建两个Topic:
[root@txy-server2 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic input-topic
[root@txy-server2 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic output-topic
由于之前依赖的kafka-clients
包中没有Stream API,所以需要另外引入Stream的依赖包。在项目中添加如下依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.5.0</version>
</dependency>
接下来以一个经典的词频统计为例,演示一下Stream API的使用。代码示例:
package com.zj.study.kafka.stream;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import java.util.List;
import java.util.Properties;
public class StreamSample {
private static final String INPUT_TOPIC = "input-topic";
private static final String OUTPUT_TOPIC = "output-topic";
/**
* 构建配置属性
*/
public static Properties getProperties() {
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "49.232.153.84:9092");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return properties;
}
public static KafkaStreams createKafkaStreams() {
Properties properties = getProperties();
// 构建流结构拓扑
StreamsBuilder builder = new StreamsBuilder();
// 构建wordCount这个Processor
wordCountStream(builder);
Topology topology = builder.build();
// 构建KafkaStreams
return new KafkaStreams(topology, properties);
}
/**
* 定义流计算过程
* 例子为词频统计
*/
public static void wordCountStream(StreamsBuilder builder) {
// 不断的从INPUT_TOPIC上获取新的数据,并追加到流上的一个抽象对象
KStream<String, String> source = builder.stream(INPUT_TOPIC);
// KTable是数据集的抽象对象
KTable<String, Long> count = source.flatMapValues(
// 以空格为分隔符将字符串进行拆分
v -> List.of(v.toLowerCase().split(" "))
// 按value进行分组统计
).groupBy((k, v) -> v).count();
KStream<String, Long> sink = count.toStream();
// 将统计结果输出到OUTPUT_TOPIC
sink.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
}
public static void main(String[] args) {
KafkaStreams streams = createKafkaStreams();
// 启动该Stream
streams.start();
}
}
KTable
与KStream
的关系与区别,如下图:
-
KTable
类似于一个时间片段,在一个时间片段内输入的数据就会update进去,以这样的形式来维护这张表 -
KStream
则没有update这个概念,而是不断的追加
运行以上代码,然后到服务器中使用kafka-console-producer.sh
脚本命令向input-topic
生产一些数据,如下:
[root@txy-server2 ~]# kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic input-topic
>Hello World Java
>Hello World Kafka
>Hello Java Kafka
>Hello Java
然后再运行kafka-console-consumer.sh
脚本命令从output-topic
中消费数据,并进行打印。具体如下:
[root@txy-server2 ~]# kafka-console-consumer.sh --bootstrap-server 172.21.0.10:9092 --topic output-topic --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
控制台输出的结果:
world 2
hello 3
java 2
kafka 2
hello 4
java 3
从输出结果中可以看到,Kafka Stream首先是对前三行语句进行了一次词频统计,所以前半段是:
world 2
hello 3
java 2
kafka 2
当最后一行输入之后,又再做了一次词频统计,并针对新的统计结果进行输出,其他没有变化的则不作输出,所以最后打印了:
hello 4
java 3
这也是KTable
和KStream
的一个体现,从测试的结果可以看出Kafka Stream是实时进行流计算的,并且每次只会针对有变化的内容进行输出。
foreach方法
在之前的例子中,我们是从某个Topic读取数据进行流处理后再输出到另一个Topic里。但在一些场景下,我们可能不希望将结果数据输出到Topic,而是写入到一些存储服务中,例如ElasticSearch、MongoDB、MySQL等。
在这种场景下,就可以利用到foreach
方法,该方法用于迭代流中的元素。我们可以在foreach
中将数据存入例如Map、List等容器,然后再批量写入到数据库或其他存储中间件即可。
foreach
方法使用示例:
public static void foreachStream(StreamsBuilder builder) {
KStream<String, String> source = builder.stream(INPUT_TOPIC);
source.flatMapValues(
v -> List.of(v.toLowerCase().split(" "))
).foreach((k, v) -> System.out.println(k + " : " + v));
}
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Android自定义View实现简单炫酷的球体进度球实例代码
- 详解android 中文字体向上偏移解决方案
- Android自定义控件仿ios下拉回弹效果
- 当飞桨PaddleHub遇到微信小程序,AI也能指物作诗
- Android仿QQ好友详情页下拉顶部图片缩放效果
- 数组对象转为Map
- TensorFlow2.X使用图片制作简单的数据集训练模型
- Android高性能日志写入方案的实现
- MySQL 三万字精华总结 + 面试100 问,吊打面试官绰绰有余(收藏系列)
- 想后台运行没想到导致磁盘满了
- 来看一道"简单的"C语言面试题
- 这行代码的打印结果确实让人迷惑!
- 如何实现一个高效的启发式算法?(VRPTW篇)
- Ubuntu Server搭建Hyperledger Fabric 2.1学习环境
- 辛辛苦苦学会的 webpack dll 配置,可能已经过时了