1. 概述

如何在平台中增加一个队列发送与消费实现呢,本文我以错误日志发送为例,介绍下添加一个队列的过程。

2. 业务实现

日志是通过各个微服务进行发送,由系统微服务统一进行消费队列。

3. 具体实现

3.1 编写消息发送和接收类

  • 消息发送类
package com.redxun.web.mq;


import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * 错误日志发送消息。
 */
public interface ErrLogOutput {

        /**
         * MessageModel的消息收发。
         */
        String OUTPUT = "errLogOut";

        @Output(OUTPUT)
        MessageChannel logOutput();


}

这个类放到项目下

因为这个类是要在每一个微服务下面使用的。

  • 消息接收类

因为消息消费只是由系统微服务进行处理,因此这个只需要放到 jpaas-system下。

package com.redxun.system.mq;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * 错误日志记录
 */
public interface ErrLogInput {

    String INPUT_FILED = "errLogInput";

    @Input(INPUT_FILED)
    SubscribableChannel errLogInput();
}
  • 日志消费类
@Service
@Slf4j
public class ErrLogConsumer {

    @Resource
    SysErrorLogMapper errorLogMapper;

    @StreamListener(ErrLogInput.INPUT_FILED)
    public void handErrLog(SysErrorLogDto dto) {
        SysErrorLog errorLog=new SysErrorLog();
        BeanUtil.copyProperties(dto,errorLog);
        String id= IdGenerator.getIdStr();
        errorLog.setId(id);
        errorLogMapper.insert(errorLog);
    }
}

3.2 消息队列配置

  • 消息发送配置

因为消息发送每一个微服务都需要使用,我们把队列发送,配置到 nacos-config-dev.properties

配置如下:

# 错误日志输出队列
spring.cloud.stream.bindings.errLogOut.destination= errlog
spring.cloud.stream.bindings.errLogOut.group= errlog-group
  • 消息接收配置

消息接收只有jpaas-system微服务 使用,那么我们需要在 jpaas-system-dev.properties 做配置。

# 错误日志消费队列
spring.cloud.stream.bindings.errLogInput.destination= errlog
spring.cloud.stream.bindings.errLogInput.group= errlog-group-consumer

3.3 微服务修改

  • 系统微服务修改
@EnableBinding({SysInputOutput.class, LogOutput.class, LogInput.class,
            ApiLogInput.class, ApiLogOutput.class,
            ErrLogOutput.class, ErrLogInput.class})
public class JpaasSystemApplication {

使用发送和消费。

ErrLogOutput.class, ErrLogInput.class

  • 其他的微服务修改
@EnableBinding({UserInputOutput.class,LogOutput.class, 
            ApiLogOutput.class, ErrLogOutput.class})
public class JpaasUserApplication {

增加日志输出 ErrLogOutput.class

文档更新时间: 2021-07-19 12:33   作者:zyg