SpringCloud Bus消息总线
SpringCloud Bus消息总线
Spring Cloud Bus简介
https://spring.io/projects/spring-cloud-bus
概述
SpringCloud Bus是将分布式系统的节点与轻量级消息系统链接起来的框架,它整合了Java的事件处理机制和消息中间件的功能。目前支持RabbitMQ和Kafka。
管理和传播分布式系统间的消息,像一个分布式执行器,用于广播状态更改,事件推送等,也可以作为微服务间的通信通道。
SpringCloud Bus可以配合SpringCloud Config实现配置的动态刷新。
什么是总线
在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共用的消息主题,并让系统中所有微服务实例都连接上来。由于该主题中产生的消息会被所有实例监听和消费,所以称之为消息总线。
基本原理
ConfigClient 实例都监听MQ中同一个topic【默认是SpringCloudBus】。当一个服务刷新数据的时候,它会把这个消息放入Topic中,这样其他监听同一Topic的服务就能够得到通知,然后去更新自身的配置。
安装RabbitMQ
windows安装RabbitMQ
rabbitMQ是Erlang语言开发的所以先下载Erlang
https://www.erlang.org/downloads
配置环境变量
下载RabbitMQ
https://www.rabbitmq.com/download.html
双击安装
安装完成后,开始安装RabbitMQ-Plugins插件
cd D:\software\RabbitMQ\rabbitmq_server-3.8.8\sbin
rabbitmq-plugins enable rabbitmq_management
Docker安装RabbitMQ
$ docker pull rabbitmq:3-management # management带web界面管理
$ docker images # 查看image ID
$ docker run -d --name myrabbit -p 5672:5672 -p 15672:15672 cc86ffa2f398 #启动 最后跟着 image ID
$ systemctl status firewalld #查看防火墙的状态【(running)意思是打开,我们需要设置开放的端口】
$ firewall-cmd --list-ports #查看防火墙开放的端口
$ firewall-cmd --zone=public --add-port=15672/tcp --permanent # 开放15672,15672是Web管理界面的端口
$ firewall-cmd --zone=public --add-port=5672/tcp --permanent # 开放5672,5672是MQ访问的端口
$ firewall-cmd --reload # 使修改生效
访问15672端口即可进入RabbitMQ的web管理界面,账号密码默认都是guest。
动态刷新全局广播前置准备
新建模块,引入依赖
org.springframework.cloud
spring-cloud-starter-bus-amqp
org.springframework.cloud
spring-cloud-starter-config
org.springframework.cloud
spring-cloud-starter-netflix-eureka-client
配置bootstrap.yml
server:
port: 3366
spring:
application:
name: config-client
cloud:
#Config客户端配置
config:
label: master #分支名称
name: config #配置文件名称
profile: dev #读取后缀名称
uri: http://localhost:3344 #配置中心地址
#rabbitmq相关配置 15672是Web管理界面的端口;5672是MQ访问的端口
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
#服务注册到eureka地址
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
# 暴露监控端点
management:
endpoints:
web:
exposure:
include: "*"
编写主启动类
@EnableEurekaClient
@SpringBootApplication
public class ConfigClientMain3366 {
public static void main(String[] args) {
SpringApplication.run(ConfigClientMain3366.class, args);
}
}
编写接口
@RestController
@RefreshScope
public class ConfigClientController {
@Value("${server.port}")
private String serverPort;
@Value("${config.info}")
private String configInfo;
@GetMapping("/configInfo")
public String configInfo() {
return "serverPort: " + serverPort + "\t\n\n configInfo: " + configInfo;
}
}
设计思想
设计思想主要是以下两种:
一、利用消息总线触发一个客户端/bus/refresh
,而刷新所有客户端的配置。
二、利用消息总线触发一个服务端ConfigServer的/bus/refresh
,而刷新所有客户端的配置。
相比之下,图二的架构【通知服务端ConfigServer】更加合理,图一不合理的原因如下:
- 打破了微服务的职责单一性,微服务本身为业务模块,不应该担任刷新配置的职责。
- 破坏了微服务各节点的对等性。
- 存在局限,如微服务迁移时,网络地址时常发生变化,这时如果希望自动刷新,会作更多的修改。
动态刷新全局广播
完成注册中心、配置中心、服务
配置中心和服务端模块都添加消息总线支持
org.springframework.cloud
spring-cloud-starter-bus-amqp
所有模块配置yml
#rabbitmq相关配置
rabbitmq:
host: [your hostname]
port: 5672
username: guest
password: guest
ConfigServer配置yml
#rabbitmq相关配置,暴露bus刷新配置的端点
management:
endpoints: #暴露bus刷新配置的端点
web:
exposure:
include: 'bus-refresh'
ConfigClient配置yml
# 暴露监控端点
management:
endpoints:
web:
exposure:
include: "*"
测试
依次启动7001,3344,3355,3366模块,进行测试,依次访问:
- http://localhost:3344/master/config-dev.yml
- http://localhost:3355/configInfo
- http://localhost:3366/configInfo
此时改变配置信息的版本号,向ConfigServer发送一次POST请求:
$ curl -X POST "http://localhost:3344/actuator/bus-refresh"
再次测试3355和3366config client,已经成功实现一次通知,处处更新。
RabbitMQ查看通知
ConfigClient 实例都监听RabbitMQ中同一个topic【默认是SpringCloudBus】。当一个服务刷新数据的时候,它会把这个消息放入Topic中,这样其他监听同一Topic的服务就能够得到通知,然后去更新自身的配置。
动态刷新定点通知
通过Spring Cloud Bus + RabbitMQ实现了一处通知,全局广播,处处更新。通知其中某个client定制化更新,发送定制请求。
通过通知的url指定实例的destination:http://localhost:3344/actuator/bus-refresh/{destination}
,此时bus/refresh
通知会通过destination参数类指定需要更新配置的服务或实例。
比如,只通知3355可以发送下面这个请求:
$ curl -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355"
- config-client为spring.application.name。
- 3355为对应的端口号。