SpringCloud Bus消息总线


SpringCloud Bus消息总线

Spring Cloud Bus简介

https://spring.io/projects/spring-cloud-bus

概述

SpringCloud Bus是将分布式系统的节点与轻量级消息系统链接起来的框架,它整合了Java的事件处理机制和消息中间件的功能。目前支持RabbitMQ和Kafka。

管理和传播分布式系统间的消息,像一个分布式执行器,用于广播状态更改,事件推送等,也可以作为微服务间的通信通道。

SpringCloud Bus可以配合SpringCloud Config实现配置的动态刷新。

什么是总线

在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共用的消息主题,并让系统中所有微服务实例都连接上来。由于该主题中产生的消息会被所有实例监听和消费,所以称之为消息总线。

基本原理

ConfigClient 实例都监听MQ中同一个topic【默认是SpringCloudBus】。当一个服务刷新数据的时候,它会把这个消息放入Topic中,这样其他监听同一Topic的服务就能够得到通知,然后去更新自身的配置。

安装RabbitMQ

windows安装RabbitMQ

rabbitMQ是Erlang语言开发的所以先下载Erlang

https://www.erlang.org/downloads

配置环境变量

下载RabbitMQ

https://www.rabbitmq.com/download.html

双击安装

安装完成后,开始安装RabbitMQ-Plugins插件

cd D:\software\RabbitMQ\rabbitmq_server-3.8.8\sbin

rabbitmq-plugins enable rabbitmq_management

Docker安装RabbitMQ

$ docker pull rabbitmq:3-management # management带web界面管理
$ docker images  # 查看image ID

$ docker run -d --name myrabbit -p 5672:5672 -p 15672:15672 cc86ffa2f398 #启动  最后跟着 image ID

$ systemctl status firewalld #查看防火墙的状态【(running)意思是打开,我们需要设置开放的端口】
$ firewall-cmd --list-ports #查看防火墙开放的端口
$ firewall-cmd --zone=public --add-port=15672/tcp --permanent # 开放15672,15672是Web管理界面的端口
$ firewall-cmd --zone=public --add-port=5672/tcp --permanent # 开放5672,5672是MQ访问的端口
$ firewall-cmd --reload # 使修改生效

访问15672端口即可进入RabbitMQ的web管理界面,账号密码默认都是guest。

动态刷新全局广播前置准备

新建模块,引入依赖



    org.springframework.cloud
    spring-cloud-starter-bus-amqp


    org.springframework.cloud
    spring-cloud-starter-config
	

    org.springframework.cloud
    spring-cloud-starter-netflix-eureka-client

配置bootstrap.yml

server:
  port: 3366

spring:
  application:
    name: config-client
  cloud:
    #Config客户端配置
    config:
      label: master #分支名称
      name: config #配置文件名称
      profile: dev #读取后缀名称   
      uri: http://localhost:3344 #配置中心地址

  #rabbitmq相关配置 15672是Web管理界面的端口;5672是MQ访问的端口
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

#服务注册到eureka地址
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka

# 暴露监控端点
management:
  endpoints:
    web:
      exposure:
        include: "*"

编写主启动类

@EnableEurekaClient
@SpringBootApplication
public class ConfigClientMain3366 {
    public static void main(String[] args) {
        SpringApplication.run(ConfigClientMain3366.class, args);
    }
}

编写接口

@RestController
@RefreshScope
public class ConfigClientController {
    @Value("${server.port}")
    private String serverPort;

    @Value("${config.info}")
    private String configInfo;

    @GetMapping("/configInfo")
    public String configInfo() {
        return "serverPort: " + serverPort + "\t\n\n configInfo: " + configInfo;
    }
}

设计思想

设计思想主要是以下两种:

一、利用消息总线触发一个客户端/bus/refresh,而刷新所有客户端的配置。

二、利用消息总线触发一个服务端ConfigServer的/bus/refresh,而刷新所有客户端的配置。

相比之下,图二的架构【通知服务端ConfigServer】更加合理,图一不合理的原因如下:

  1. 打破了微服务的职责单一性,微服务本身为业务模块,不应该担任刷新配置的职责。
  2. 破坏了微服务各节点的对等性。
  3. 存在局限,如微服务迁移时,网络地址时常发生变化,这时如果希望自动刷新,会作更多的修改。

动态刷新全局广播

完成注册中心、配置中心、服务

配置中心和服务端模块都添加消息总线支持



    org.springframework.cloud
    spring-cloud-starter-bus-amqp

所有模块配置yml

#rabbitmq相关配置
  rabbitmq:
    host: [your hostname]
    port: 5672
    username: guest
    password: guest

ConfigServer配置yml

#rabbitmq相关配置,暴露bus刷新配置的端点
management:
  endpoints: #暴露bus刷新配置的端点
    web:
      exposure:
        include: 'bus-refresh'

ConfigClient配置yml

# 暴露监控端点
management:
  endpoints:
    web:
      exposure:
        include: "*"

测试

依次启动7001,3344,3355,3366模块,进行测试,依次访问:

  • http://localhost:3344/master/config-dev.yml
  • http://localhost:3355/configInfo
  • http://localhost:3366/configInfo

此时改变配置信息的版本号,向ConfigServer发送一次POST请求:

$ curl -X POST "http://localhost:3344/actuator/bus-refresh"

再次测试3355和3366config client,已经成功实现一次通知,处处更新。

RabbitMQ查看通知

ConfigClient 实例都监听RabbitMQ中同一个topic【默认是SpringCloudBus】。当一个服务刷新数据的时候,它会把这个消息放入Topic中,这样其他监听同一Topic的服务就能够得到通知,然后去更新自身的配置。

动态刷新定点通知

通过Spring Cloud Bus + RabbitMQ实现了一处通知,全局广播,处处更新。通知其中某个client定制化更新,发送定制请求。

通过通知的url指定实例的destination:http://localhost:3344/actuator/bus-refresh/{destination},此时bus/refresh通知会通过destination参数类指定需要更新配置的服务或实例。

比如,只通知3355可以发送下面这个请求:

$ curl -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355"
  • config-client为spring.application.name。
  • 3355为对应的端口号。

相关