Spring Cloud Stream使用细节
上篇文章我们看了Spring Cloud Stream的基本使用,小伙伴们对Spring Cloud Stream应该也有了一个基本的了解,但是上篇文章中的消息我们是从RabbitMQ的web管理页面发来的,如果我们想要从代码中发送消息呢?本文我们就来看看Spring Cloud Stream的一些使用细节。
自定义消息通道
上篇文章我们提到了Sink和Source两个接口,这两个接口中分别定义了输入通道和输出通道,而Processor通过继承Source和Sink,同时具有输入通道和输出通道。这里我们就模仿Sink和Source,来定义一个自己的消息通道。
还是在上文的基础上,首先我们定义一个接口叫做MySink,如下:
public interface MySink {
String INPUT = "mychannel";
@Input(INPUT)
SubscribableChannel input();
}
这里我们定义了一个名为mychannel的消息输入通道,@Input注解的参数则表示了消息通道的名称,同时我们还定义了一个方法返回一个SubscribableChannel对象,该对象用来维护消息通道订阅者。然后,我们再定义一个名为MySource的接口,如下:
public interface MySource {
@Output(MySink.INPUT)
MessageChannel output();
}
@Output注解中描述了消息通道的名称,还是mychannel,然后这里我们也定义了一个返回MessageChannel对象的方法,该对象中有一个向消息通道发送消息的方法。
最后我们定义一个消息接收类,如下:
@EnableBinding(value = {MySink.class})
public class SinkReceiver2 {
private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class);
@StreamListener(MySink.INPUT)
public void receive(Object playload) {
logger.info("Received:" + playload);
}
}
OK,我们在这里绑定消息通道,然后监听自定义的消息通道,最后来一个单元测试测试一下,如下:
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
@SpringBootTest(classes = StreamHelloApplication.class)
@EnableBinding(MySource.class)
public class StreamHelloApplicationTests {
@Autowired
private MySource mySource;
@Test
public void contextLoads() {
mySource.output().send(MessageBuilder.withPayload("hello 123").build());
}
}
运行单元测试,我们可以看到如下日志,表示消息发送成功了:
如果想要发送对象也可以直接发送,不用进行对象转换,如下:
发送:
Book book = new Book(1l, "三国演义", "罗贯中");
mySource.output().send(MessageBuilder.withPayload(book).build());
接收:
@StreamListener(MySink.INPUT)
public void receive(Book playload) {
logger.info("Received:" + playload);
}
如果我们想要在接收成功后给一个回执,也是OK的,如下:
@StreamListener(MySink.INPUT)
@SendTo(Source.OUTPUT)//定义回执发送的消息通道
public String receive(Book playload) {
logger.info("Received:" + playload);
return "receive msg :" + playload;
}
方法的返回值就是回执消息,回执消息在系统默认的output通道中,我们如果想要接收这个消息,当然就要监听这个通道,如下:
@StreamListener(Source.OUTPUT)
public void receive2(String msg) {
System.out.println("msg:"+msg);
}
当然要记得Source类也要在@EnableBinding注解中进行绑定。此时运行结果如下:
消费组
由于我们的服务可能会有多个实例同时在运行,如果不做任何设置,此时发送一条消息将会被所有的实例接收到,但是有的时候我们可能只希望消息被一个实例所接收,这个需求我们可以通过消息分组来解决。方式很简单,给项目配置消息组和主题,如下:
spring.cloud.stream.bindings.mychannel.group=g1
spring.cloud.stream.bindings.mychannel.destination=dest1
这里我们设置该工程都属于g1消费组,输入通道的主题名则为dest1。这里配置完成之后,我们在消息发送方做如下配置:
spring.cloud.stream.bindings.mychannel.destination=dest1
也配置消息主题名为dest1(如果发送和接收就在同一个应用中,则这里可以不配置)。OK,此时我们将我们的项目启动两个实例,注意两个实例的端口不一样,此时如果我们再发送消息,则只会被两个实例中的一个接收到,另外一个应用则接收不到,但是到底是两个实例中的哪一个接收,则是不确定的。
消息分区
有的时候,我们可能需要相同特征的消息能够总是被发送到同一个消费者上去处理,如果我们只是单纯的使用消费组则无法实现功能,此时我们需要借助于消息分区,消息分区之后,具有相同特征的消息就可以总是被同一个消费者处理了,配置方式如下(这里的配置都是在消费组的配置基础上完成的):
在消费者上添加如下配置:
spring.cloud.stream.bindings.mychannel.consumer.partitioned=true
spring.cloud.stream.instance-count=2
spring.cloud.stream.instance-index=0
关于这个配置我说三点:
1.第一行表示开启消息分区 2.第二行表示当前消息者的总的实例个数 3.第三行表示当前实例的索引,从0开始,当我们启动多个实例时,需要在启动时在命令行配置索引
然后在消息生产者上添加如下配置:
spring.cloud.stream.bindings.mychannel.producer.partitionKeyExpression=payload
spring.cloud.stream.bindings.mychannel.producer.partitionCount=2
第一行配置设置了分区键的表达式规则,第二行则设置了消息分区数量。
OK,此时我们再次启动多个消费者实例,然后重复发送多条消息,这些消息都将被同一个消费者处理掉。
Spring Cloud Stream使用细节我们就先说到这里,有问题欢迎留言讨论。
参考资料:
1.《Spring Cloud微服务实战》
- java 快速求素数
- 狄斯奎诺(dijkstra 模板)
- HDUOJ---汉洛塔IX
- 小错误系列
- HDUOJ-----4510 小Q系列故事——为什么时光不能倒流
- HDUOJ----4509湫湫系列故事——减肥记II
- HDUOJ-----4506小明系列故事——师兄帮帮忙
- HDUOJ---4503 湫湫系列故事——植树节
- HDUOJ----4504 威威猫系列故事——篮球梦
- 推荐一款MySQL优化工具
- HDUOJ----4502吉哥系列故事——临时工计划
- HDUOJ----4004The Frog's Games(二分+简单贪心)
- HDUOJ----4006The kth great number(最小堆...)
- HDUOJ----4501小明系列故事——买年货(三维背包)
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- R语言中的岭回归、套索回归、主成分回归:线性模型选择和正则化
- R语言基于树的方法:决策树,随机森林,套袋Bagging,增强树
- R语言无监督学习:PCA主成分分析可视化
- 如何用r语言制作交互可视化报告图表
- R语言大数据分析纽约市的311万条投诉统计可视化与时间序列分析
- R语言动态可视化:制作历史全球平均温度的累积动态折线图动画gif视频图
- R语言里的非线性模型:多项式回归、局部样条、平滑样条、广义加性模型分析
- 使用R语言进行机制检测的隐马尔可夫模型HMM
- 【Kubernetes】Octant再探...
- 聊聊claudb的SlaveReplication
- 深度学习trick--labelsmooth
- Java锁的那些事儿
- React Hooks踩坑分享
- Python 自动化,Helium 凭什么取代 Selenium?
- Explain详解与索引最佳实践