Spring Cloud Stream消息驱动 ,在没有绑定器这个概念的情况下,我们的SpringBoot应用要 直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性.通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

1. 简介

1.1 工作流程

1.2 常用API和注解

2. 消息驱动生产者

新建一个cloud-stream-rabbitmq-provider8801 项目作为消息提供者

2.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>live.yremp.springcloud</artifactId>
        <groupId>live.yremp</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>
</project>

2.2 配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 39.105.173.178
                port: 5672
                username: admin
                password: admin
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
  client:
    service-url:
      defaultZone:  http://localhost:7001/eureka

2.3 主启动类

package live.yremp.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication

public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class,args);
    }
}

2.3 业务代码

service层的接口

package live.yremp.springcloud.service;

public interface IMessageProvider {
    public String send();
}

以及接口的实现类

package live.yremp.springcloud.service.impl;

import live.yremp.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.UUID;

@EnableBinding(Source.class)// 定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {
    @Resource
    private MessageChannel output;// 消息发送管道

    @Override
    public String send() {
        String serial= UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("-------------------serial:"+serial);
        return null;
    }
}

SendMessageController

package live.yremp.springcloud.controller;

import live.yremp.springcloud.service.impl.MessageProviderImpl;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
@RestController
public class SendMessageController {
    @Resource
    MessageProviderImpl messageProvider;

    @GetMapping(value ="/send")
    public String send(){
        return messageProvider.send();
    }
}

2.4 服务测试

先去RabbitMQ的Web页面查看Exchange,可以看到配置文件中配置的studyExchange

访问几次 http://localhost:8801/send接口,测试控制台会有相应输出如下图:

3. 消息驱动之消费者

消息发送之后消费者从对相应的通道获取消息,新建cloud-stream-rabbitmq-consumer8802 作为消费者

3.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>live.yremp.springcloud</artifactId>
        <groupId>live.yremp</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>
</project>

3.2 配置文件

这里相相对于 cloud-stream-rabbitmq-provider8801主要修改了bindings中的output 修改为input

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 39.105.173.178
                port: 5672
                username: admin
                password: admin
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
  client:
    service-url:
      defaultZone:  http://localhost:7001/eureka

3.3 主启动类

package live.yremp.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication

public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class,args);
    }
}

3.4 业务代码

这里只有一个监听的Controller

package live.yremp.springcloud.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
@RestController
@EnableBinding(Sink.class)

public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消费者1收到的消息:"+message.getPayload()+","+"port="+serverPort);
    }
}

当有消息提供者向studyExchange发送消息时会被监听到,并且打印出来

3.5 测试

仍然去访问几次 消息驱动提供者 loud-stream-rabbitmq-provider8801 中的 http://localhost:8801/send 接口

loud-stream-rabbitmq-provider8801 的后台输出

loud-stream-rabbitmq-consumer8802 的后台输出

都能够成功的监听到相应的消息

4. 重复消费

4.1 重复消费问题

按照cloud-stream-rabbitmq-consumer8802新建一个 cloud-stream-rabbitmq-consumer8803,这两个服务几乎一模一样,下面具体的代码不展示了,新建完成并启动 cloud-stream-rabbitmq-consumer8803 项目

先去Eureka看看服务注册情况

三个服务都注册成功了,然后再次访问几次 http://localhost:8801/send 分别查看两个消费者服务获取情况:

8002
8003

可以看到两个服务都得到相同的消息,但是这就造成了重复消费问题,一个订单肯定直会交由一个 消费者者服务处理,不能两个服务都去处理,如何接解决这个问题就需要用到RabbitMQ的分组。

4.2 分组

默认情况下每个服务都是不同的组,这样就会把每个消息都消费一次,就造成了上面的重复消费问题,需要为微服务设置为同一个组

cloud-stream-rabbitmq-consumer8802配置修改为如下,新增group属性

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 39.105.173.178
                port: 5672
                username: admin
                password: admin
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          group: Yremp-A

eureka:
  client:
    service-url:
      defaultZone:  http://localhost:7001/eureka

cloud-stream-rabbitmq-consumer8803配置修改为如下,新增group属性

server:
  port: 8803
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 39.105.173.178
                port: 5672
                username: admin
                password: admin
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          group: Yremp-A
eureka:
  client:
    service-url:
      defaultZone:  http://localhost:7001/eureka

此时修改胡重启项目,测试访问 http://localhost:8801/send 三次,查看三个项目后台输出,下面是

cloud-stream-rabbitmq-provider8801
cloud-stream-rabbitmq-consumer8802
cloud-stream-rabbitmq-consumer8803

总共三个请求,被两个消费者服务获取到并且不是重复消费

5. 持久化问题

接着上面的两个,如果删除 cloud-stream-rabbitmq-consumer8802 配置中的group: Yremp-A属性,此时访问 cloud-stream-rabbitmq-provider8801的 http://localhost: 8801/send 接口四次 ,然后重启 cloud-stream-rabbitmq-consumer8802 和 cloud-stream-rabbitmq-consumer8803 ,可以发现 cloud-stream-rabbitmq-consumer8802后台没有任何输出,但是在 cloud-stream-rabbitmq-consumer8803 后台仍然会获取到上面发送的4条消息。

在 cloud-stream-rabbitmq-consumer8803 启动过程中仍然可以获取到发送的信息,这就是分组的作用,可以实现消息的持久化。

标签云

ajax AOP Bootstrap cdn Chevereto CSS Docker Editormd GC Hexo IDEA IPA JavaScript jsDeliver JS樱花特效 JVM Linux markdown Maven MyBatis MyBatis-plus MySQL Pictures Sakura SEO shadowrocket Spring Boot Spring Cloud Spring Cloud Alibaba SpringMVC SSR Thymeleaf V2ray Vue Web WebSocket Wechat Social WordPress Yoast SEO 代理 分页 图床 小幸运 苹果iOS国外账号 苹果IOS账号

Spring Cloud Stream消息驱动
Spring Cloud Stream消息驱动
本文最后更新于2020年7月25日,已超过 4 个月没更新!