RabbitMQ入门教程
MQ?
MQ(Message Quene):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为消息中间件通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
主要用途:不同进程Process/线程Thread之间通信。
RabbitMQ
你可以注意到一句非常“狂”的话,RabbitMQ is the most widely deployed open source message broker.
确实是这样哈,目前市面上用的最多的就是RabbitMQ。
RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that Mr. or Ms. Mailperson will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office and a postman. The major difference between RabbitMQ and the post office is that it doesn't deal with paper, instead it accepts, stores and forwards binary blobs of data ‒ messages.
RabbitMQ是一个消息代理(也称中间件):它接受和转发消息。你可以把它想象成邮局:当你把要邮寄的邮件放在邮筒里时,你可以确定送信先生或女士最终会将邮件发送给你的收件人。在这个类比中,RabbitMQ是一个邮政信箱,一个邮局和一个邮递员。
区别在于:RabbitMQ不会帮你处理里面的内容(官方原话的纸张是为了让读者更好的理解),而是帮你接受,存储和转发。
安装RabbitMQ
这里我推荐大家开启虚拟机,然后使用docker来安装RabbitMQ,不要用Windows版本。
docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
启动和停止
docker start rabbitmq
docker stop rabbitmq
这里为什么有两个端口呢
15672:WEB界面的端口,启动RabbitMQ后,使用ip+15672
就可以访问了。
5672:通信端口(比如使用JAVA连接肯定是使用这个端口啦)
访问:你的ip:15672
,如果你是本机,localhost:15672
,如果你在虚拟机(Linux)不知道ip,输入命令ifconfig
即可查看
默认的管理员用户密码都为guest
JAVA(Hello World)
In this part of the tutorial we'll write two programs in Java; a producer that sends a single message, and a consumer that receives messages and prints them out. We'll gloss over some of the detail in the Java API, concentrating on this very simple thing just to get started. It's a "Hello World" of messaging. In the diagram below, "P" is our producer and "C" is our consumer. The box in the middle is a queue - a message buffer that RabbitMQ keeps on behalf of the consumer.
我们将用Java编写两个程序
发送单个消息的producer(生产者)和接收消息并打印出消息的consumer(消费者)。官方文档说会掩盖JavaAPI的一些细节,专注于这个非常简单的事情,以便开始。这是一个"Hello World"的消息。但我是一个细节的人,所以我会处理得比官方细节。
在上图中,"P"是我们的生产者,"C"是我们的消费者。中间的框是队列-RabbitMQ代表消费者保留的消息缓冲区。
设计生产者
我们将新建一个Maven项目
引入依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
</dependencies>
创建生产者类,name->producer,注意,factory.setHost
填写的是你安装的IP地址
package com.xn2001;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 乐心湖
* @date 2020/5/31 0:26
**/
public class producer {
//命名队列
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建到服务器的工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.123.128");
//创建连接
Connection connection = factory.newConnection();
//创建一个通道,这是大多数API用于完成工作的位置。
Channel channel = connection.createChannel();
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "乐心湖好帅";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
运行程序,可以看到这个消息队列已经发送了过来。
这个1代表有一个消息还没给消费者接收到。我们点进去hello,
设计消费者
新建一个类,name->consumer
package com.xn2001;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @author 乐心湖
* @date 2020/5/31 0:51
**/
public class consumer{
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//由于它将异步推送我们的消息,这里我们以对象的形式提供回调,该对象将缓冲消息,直到我们使用它们。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [√] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
官方这里有一段话
Why don't we use a try-with-resource statement to automatically close the channel and the connection? By doing so we would simply make the program move on, close everything, and exit! This would be awkward because we want the process to stay alive while the consumer is listening asynchronously for messages to arrive.
这里我们为什么不尝试使用关闭通道和连接呢,如果这样做,这个程序就会运行一遍就过去了,不能处在活跃状态,那如何接受消息呢。
换句话说,我们必须让通道和连接保持活跃,这样就能时刻监听到消息。
我们启动consumer
,就可以收到消息了。
我们将封装一个工具类,RabbitMQUtil
package com.xn2001.util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author 乐心湖
* @date 2020/5/31 13:01
**/
public class RabbitMQUtil {
private static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.111.129");
}
//定义连接对象的方法
public static Connection getConnection() {
try {
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
//定义一个关闭通道和连接的方法
public static void closeChannelAndConnection(Channel channel, Connection connection) {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
把生产者和消费者重新写一下
package com.xn2001;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 乐心湖
* @date 2020/5/31 0:26
**/
public class producer {
//命名队列
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException {
//创建到服务器的工厂
//ConnectionFactory factory = new ConnectionFactory();
//factory.setHost("192.168.111.129");
//创建连接
//Connection connection = factory.newConnection();
//创建一个通道,这是大多数API用于完成工作的位置。
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "乐心湖好帅";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [√] Sent '" + message + "'");
RabbitMQUtil.closeChannelAndConnection(channel,connection);
}
}
package com.xn2001;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.xn2001.util.RabbitMQUtil;
/**
* @author 乐心湖
* @date 2020/5/31 0:51
**/
public class comsumr {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
//ConnectionFactory factory = new ConnectionFactory();
//factory.setHost("192.168.111.129");
//Connection connection = factory.newConnection();
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [√] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
解释说明
package com.xn2001;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 乐心湖
* @date 2020/6/1 15:52
**/
public class Test {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//设置ip
factory.setHost("192.168.111.111");
//若你的端口不是默认的5672,就需要设置,否则不写也可。
factory.setPort(5672);
//若用户不是使用guest,请往下看
//设置连接到哪一个虚拟机
factory.setVirtualHost("/ems");
//设置访问该虚拟机的用户和密码
factory.setUsername("ems");
factory.setPassword("123456");
//获取连接对象
Connection connection = factory.newConnection();
//创建通道连接
Channel channel = connection.createChannel();
/**
* 通道绑定消息队列
* @param queue 消息队列的名称,不存在时自动创建
* @param durable 队列是否持久化,持久化后当重新启动rabbitmq时队列依旧存在
* @param exclusive 是否独占队列,只允许一个在用,一般设置为false
* @param autoDelete 消费完消息后自动删除这个消息队列
* @param arguments
*/
channel.queueDeclare("hello",false,false,false,null);
/**
* 发布消息
* @param exchange 交换机名称
* @param routingKey 消息队列名称
* @param props 传递消息的额外设置,例如消息需要持久化,可以设置为MessageProperties.PERSISTENT_TEXT_PLAIN
* @param body 消息具体内容
*/
channel.basicPublish("","hello", null,"hello rabbitmq".getBytes());
//关闭
channel.close();
connection.close();
}
}
注意:上面我仅展示出了生产者这些常用的设置,消费者理论上几乎是一样的。
同时你需要关注的是,生产者和消费者的消息队列参数必须一致,这样才能匹配得上。
- 腾讯Bugly Unity3D Plugin使用指南
- 远丰集团旗下CMS疑有官方后门
- 前端黑魔法之远程控制地址栏
- 信息收集利器:ZoomEye
- go sync.Mutex 设计思想与演化过程 --转
- 漏洞预警 | Ubuntu 16.04版本存在本地提权漏洞(附EXP)
- 通过“震网三代”和Siemens PLC 0day漏洞,实现对工控系统的入侵实验
- 安卓端渗透工具DVHMA:自带漏洞的混合模式APP
- 小萝莉说Crash(二): Unrecognized selector xxx 之 ForwardInvocation
- 5分钟教程:如何通过UART获得root权限
- 源码级剖析PHP 7.2.x GD拒绝服务漏洞
- 美女程序媛发福利,读懂ANR的trace文件So easy
- Openshift高阶探索实验
- 卡卡卡!小萝莉告诉你开发iOS应用如何避免卡顿
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- R语言如何解决线性混合模型中畸形拟合(Singular fit)的问题
- Android如何在Gradle中更改APK文件名详解
- 面试中常见的 C 语言与 C++ 区别的问题
- Linux系统实现ansible自动化安装配置httpd的方法
- 常用Linux发行版镜像源配置小结
- Linux如何处理文件已删除但空间不释放的问题
- 解析linux或android添加文件系统的属性接口的方法
- linux查看软件的安装位置简单方法
- 使用 bind 设置 DNS 服务器的方法
- Linux jdk安装及环境变量配置教程(jdk-8u144-linux-x64.tar.gz)
- centos6.6 下 安装 php7 + nginx环境的方法
- 如何优雅地删除 Linux 中的垃圾文件的方法
- Ubuntu18.04 安装 Anaconda3的教程详解
- VScode Remote SSH通过远程编辑与调试代码
- Ubuntu18.04下安装配置SSH服务的方法步骤