Kafka安装(极简版)
Kafka简介
kafka是一个高吞吐量的分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力,通常用于大数据及流处理平台。消息队列里都有生产者/消费者的概念,生产者往队列里写消息,而消费者则是从队列里获取消息。一般在架构设计中起到解耦、削峰、异步处理的作用。
kafka对外使用topic的概念,生产者往topic里写消息,消费者则从topic里读消息。为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。在单个parition内是保证消息有序。每新写一条消息,kafka就是在对应的文件append写,所以性能非常高。
kafka的总体数据流是这样的:
大概用法就是,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉去指定Topic的消息,然后进行业务处理。
图中有两个topic,topic 0有两个partition,topic 1有一个partition,三副本备份。可以看到consumer gourp 1中的consumer 2没有分到partition处理,这是有可能出现的。
kafka需要依赖zookeeper存储一些元信息,而kafka也自带了zookeeper。其中broker、topics、partitions的一些元信息用zookeeper来存储,监控和路由啥的也都会用到zookeeper。
kafka名词解释:
- producer:生产者。
- consumer:消费者。
- topic: 消息以topic为类别记录,Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic)。
- partitions:每个Topics划分为一个或者多个partition,并且partition中的每条消息都被标记了一个sequential id ,也就是offset,并且存储的数据是可配置存储时间的
- broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker,消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
在kafka中每个消息(也叫作record记录,也被称为消息)通常是由一个key,一个value和时间戳构成。
kafka有四个核心的API:
- 应用程序使用Producer API发布消息到1个或多个topic中。
- 应用程序使用Consumer API来订阅一个或多个topic,并处理产生的消息。
- 应用程序使用Streams API充当一个流处理器,从1个或多个topic消费输入流,并产生一个输出流到1个或多个topic,有效地将输入流转换到输出流。
- Connector API允许构建或运行可重复使用的生产者或消费者,将topic链接到现有的应用程序或数据系统。
kafka就先介绍到这,网络上有很多相关的理论文章,所以这里不过多赘述了,也可以直接查看官方文档。官方文档地址如下:
单实例安装
本小节我们来在CentOS7上安装Kafka,由于kafka是由Scala和Java语言编写的,所以前提需要准备好java运行环境,我这里java环境是1.8的,由于jdk的安装配置都比较简单,这里就不演示jdk的安装过程了,直接安装Kafka。
到官网上复制下载地址,使用wget命令进行下载并解压:
[root@study-01 ~]# cd /usr/local/src/
[root@study-01 /usr/local/src]# wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
[root@study-01 /usr/local/src]# tar -zxvf kafka_2.11-2.0.0.tgz
[root@study-01 /usr/local/src]# mv kafka_2.11-2.0.0 /usr/local/kafka
[root@study-01 /usr/local/src]# cd !$
没有特殊要求的话,我们使用默认的kafka配置即可。若你希望kafka能够被外部机器访问,则需要配置一下你机器的外网ip地址及Kafka监听端口,另外我们可能还需要配置broker的id和kafka日志文件的存储目录,如下:
[root@study-01 /usr/local/kafka]# vim ./config/server.properties
listeners=PLAINTEXT://192.168.190.129:9092
advertised.listeners=PLAINTEXT://192.168.190.129:9092
broker.id=1 # broker的id,必须是集群中唯一的
log.dirs=/tmp/kafka-logs # kafka日志文件的存储目录
[root@study-01 /usr/local/kafka]#
现在我们就可以使用kafka了,由于kafka依赖zookeeper,所以我们在启动kafka前需要先启动kafka自带的zookeeper服务:
[root@study-01 /usr/local/kafka]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties > zookeeper.out &
zookeeper服务启动成功后,启动kafka:
[root@study-01 /usr/local/kafka]# nohup ./bin/kafka-server-start.sh ./config/server.properties > kafka.out &
两个服务都启动成功后,监听的端口如下:
[root@study-01 ~]# netstat -lntp |grep java
tcp6 0 0 :::38031 :::* LISTEN 3629/java
tcp6 0 0 :::33620 :::* LISTEN 3945/java
tcp6 0 0 :::9092 :::* LISTEN 3945/java
tcp6 0 0 :::2181 :::* LISTEN 3629/java
[root@study-01 ~]#
接下来我们测试一下kafka是否正常可用,首先创建一个topic,命令如下:
[root@study-01 /usr/local/kafka]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hello
Created topic "hello".
[root@study-01 /usr/local/kafka]#
创建topic成功后,使用如下命令测试是否能正常获取topic列表:
[root@study-01 /usr/local/kafka]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
hello
[root@study-01 /usr/local/kafka]#
现在可以确定topic创建成功了,然后我们来启动producer,测试往一个topic上发送消息:
[root@study-01 /usr/local/kafka]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello>hello world
>hello kafka
>
接着启动consumer,测试从一个topic上消费消息:
[root@study-01 /usr/local/kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning
hello world
hello kafka
通过以上测试,可以看到,kafka能够正常的创建topic进行发送/接收消息,那么就代表我们安装成功了。
- Netty-整合kryo高性能数据传输
- 40个重要的HTML 5面试问题及答案
- js调用原生API--陀螺仪和加速器
- OpenDaylight开发-DataStoreChange监听器三种类型
- express模拟接口
- spring boot开发的日志系统
- elasticsearch 5.0.1安装analysis-ik分词器
- Spring Cloud中Feign如何统一设置验证token
- laravel+react+webpack+babel+gulp的配置
- OpenvSwitch系列之浅析main函数
- 没有公式如何看懂EM算法?
- Google用来处理海量文本去重的simhash算法原理及实现
- Open vSwitch系列之openflow版本兼容
- R预设配色系统及自定义色板
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- MySQL 切换数据库、用户卡死:“You can turn off this feature to get a quicker startup with -A“处理方法
- MySQL 数据库mysqlbinlog使用问题:unknown variable ‘default-character-set=utf8‘.解决方法
- Python 技术篇-pip安装提示:‘pip‘ 不是内部或外部命令,也不是可运行的程序或批处理文件,问题解决方法
- Jaskson精讲第6篇-自定义JsonSerialize与Deserialize实现数据类型转换
- let和var和const
- Jupyter 编写python代码实现代码自动补齐功能设置实例演示
- 第37期:从头学二叉搜索树(面试常考)
- Jupyter 工具的安装与使用方法,jupyter运行python代码演示,好用的python编辑器推荐!
- Nginx相关配置与操作
- Python 技术篇-全局与当前socket超时连接时间设置方法实例演示,查看socket超时连接时间
- 给 JDK 报了一个 P4 的 Bug,结果居然……
- Python 套接字-判断socket服务端有没有关闭的方法实例演示,查看socket运行状态
- docker安装logstash
- Rook Operator 源码分析(1) - osd 启动的流程
- Python 技术篇-利用pyqt5库监听剪切板变动,clipboard.dataChanged.connect()剪切板监听