1. 说明

本文介绍以下,在平台中如何使用消息队列收发消息。在平台中集成了spring cloud stream 来整合不同的消息队列来收发消息,他目前支持主流的消息队列,比如 KAFKA,ROCKETMQ,ROCKETMQ ,他的收发消息时接口统一的,因此,只需要调整配置就可以完成消息队列的切换。

2. 实现步骤

2.1 定义收发接口

现在以流程消息收发为例,写一些如何使用队列发送消息。

/**
 * 消息队列扩展。
 */
public interface BpmInputOutput {
    /**
     * MessageModel的消息收发。
     */
    String OUTPUT = "output";
    String INPUT = "input";

    /**
     * 流程异步事件消息收发。
     */
    String OUTPUT_EVENT = "eventOutput";
    String INPUT_EVENT = "eventInput";




    @Output(OUTPUT)
    MessageChannel output();
    @Input(INPUT)
    SubscribableChannel input();


    @Output(OUTPUT_EVENT)
    MessageChannel eventOutput();
    @Input(INPUT_EVENT)
    SubscribableChannel eventInput();


}

在nacos配置中 可以看到我们的配置。

jpaas-bpm-dev.properties

#流程消息配置
spring.cloud.stream.bindings.output.destination=bpmmessage
spring.cloud.stream.bindings.output.group= bpmmessage-group
spring.cloud.stream.bindings.input.destination= bpmmessage
spring.cloud.stream.bindings.input.group= bpmmessage-group-consumer

#异步事件配置
spring.cloud.stream.bindings.eventOutput.destination=event
spring.cloud.stream.bindings.eventOutput.group= event-group
spring.cloud.stream.bindings.eventInput.destination= event
spring.cloud.stream.bindings.eventInput.group= event-group-consumer

2.2 使用接口

比如BPM项目,需要增加注解 使用上面定义的接口。

@EnableBinding({BpmInputOutput.class})
public class JPaasBpmApplication {

2.3 发送消息

//注入消息收发接口
@Resource
BpmInputOutput inputOutput;

//发送消息,注意 output 是发送 messagemodel 消息
inputOutput.output().send(MessageBuilder.withPayload(messageModel).build());

2.4 接收消息

@StreamListener(BpmInputOutput.INPUT)
public void handMessageModel(MessageModel messageModel) {

}
文档更新时间: 2022-03-30 15:08   作者:zyg