1. 概述
如何在平台中增加一个队列发送与消费实现呢,本文我以错误日志发送为例,介绍下添加一个队列的过程。
2. 业务实现
日志是通过各个微服务进行发送,由系统微服务统一进行消费队列。
3. 具体实现
3.1 编写消息发送和接收类
- 消息发送类
package com.redxun.web.mq;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* 错误日志发送消息。
*/
public interface ErrLogOutput {
/**
* MessageModel的消息收发。
*/
String OUTPUT = "errLogOut";
@Output(OUTPUT)
MessageChannel logOutput();
}
这个类放到项目下
因为这个类是要在每一个微服务下面使用的。
- 消息接收类
因为消息消费只是由系统微服务进行处理,因此这个只需要放到 jpaas-system下。
package com.redxun.system.mq;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* 错误日志记录
*/
public interface ErrLogInput {
String INPUT_FILED = "errLogInput";
@Input(INPUT_FILED)
SubscribableChannel errLogInput();
}
- 日志消费类
@Service
@Slf4j
public class ErrLogConsumer {
@Resource
SysErrorLogMapper errorLogMapper;
@StreamListener(ErrLogInput.INPUT_FILED)
public void handErrLog(SysErrorLogDto dto) {
SysErrorLog errorLog=new SysErrorLog();
BeanUtil.copyProperties(dto,errorLog);
String id= IdGenerator.getIdStr();
errorLog.setId(id);
errorLogMapper.insert(errorLog);
}
}
3.2 消息队列配置
- 消息发送配置
因为消息发送每一个微服务都需要使用,我们把队列发送,配置到 nacos-config-dev.properties
配置如下:
# 错误日志输出队列
spring.cloud.stream.bindings.errLogOut.destination= errlog
spring.cloud.stream.bindings.errLogOut.group= errlog-group
- 消息接收配置
消息接收只有jpaas-system微服务 使用,那么我们需要在 jpaas-system-dev.properties
做配置。
# 错误日志消费队列
spring.cloud.stream.bindings.errLogInput.destination= errlog
spring.cloud.stream.bindings.errLogInput.group= errlog-group-consumer
3.3 微服务修改
- 系统微服务修改
@EnableBinding({SysInputOutput.class, LogOutput.class, LogInput.class,
ApiLogInput.class, ApiLogOutput.class,
ErrLogOutput.class, ErrLogInput.class})
public class JpaasSystemApplication {
使用发送和消费。
ErrLogOutput.class, ErrLogInput.class
- 其他的微服务修改
@EnableBinding({UserInputOutput.class,LogOutput.class,
ApiLogOutput.class, ErrLogOutput.class})
public class JpaasUserApplication {
增加日志输出 ErrLogOutput.class
文档更新时间: 2021-07-19 12:33 作者:zyg