1. 说明
本文介绍以下,在平台中如何使用消息队列收发消息。在平台中集成了spring cloud stream 来整合不同的消息队列来收发消息,他目前支持主流的消息队列,比如 KAFKA,ROCKETMQ,ROCKETMQ ,他的收发消息时接口统一的,因此,只需要调整配置就可以完成消息队列的切换。
2. 实现步骤
2.1 定义收发接口
现在以流程消息收发为例,写一些如何使用队列发送消息。
/**
* 消息队列扩展。
*/
public interface BpmInputOutput {
/**
* MessageModel的消息收发。
*/
String OUTPUT = "output";
String INPUT = "input";
/**
* 流程异步事件消息收发。
*/
String OUTPUT_EVENT = "eventOutput";
String INPUT_EVENT = "eventInput";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT_EVENT)
MessageChannel eventOutput();
@Input(INPUT_EVENT)
SubscribableChannel eventInput();
}
在nacos配置中 可以看到我们的配置。
jpaas-bpm-dev.properties
#流程消息配置
spring.cloud.stream.bindings.output.destination=bpmmessage
spring.cloud.stream.bindings.output.group= bpmmessage-group
spring.cloud.stream.bindings.input.destination= bpmmessage
spring.cloud.stream.bindings.input.group= bpmmessage-group-consumer
#异步事件配置
spring.cloud.stream.bindings.eventOutput.destination=event
spring.cloud.stream.bindings.eventOutput.group= event-group
spring.cloud.stream.bindings.eventInput.destination= event
spring.cloud.stream.bindings.eventInput.group= event-group-consumer
2.2 使用接口
比如BPM项目,需要增加注解 使用上面定义的接口。
@EnableBinding({BpmInputOutput.class})
public class JPaasBpmApplication {
2.3 发送消息
//注入消息收发接口
@Resource
BpmInputOutput inputOutput;
//发送消息,注意 output 是发送 messagemodel 消息
inputOutput.output().send(MessageBuilder.withPayload(messageModel).build());
2.4 接收消息
@StreamListener(BpmInputOutput.INPUT)
public void handMessageModel(MessageModel messageModel) {
}
文档更新时间: 2022-03-30 15:08 作者:zyg