SpringCloud-消息总线

SpringCloud文章系列

  1. SpringCloud
  2. SpringCloud-注册中心
  3. SpringCloud-配置中心
  4. SpringCloud-链路跟踪
  5. 【当前文章】SpringCloud-消息总线
  6. SpringCloud-API网关
  7. SpringCloud-异步消息
  8. SpringCloud-同步调用
  9. SpringCloud-断路降级
  10. SpringCloud-监控管理
  11. SpringCloud-番外篇-临时任务
  12. SpringCloud-番外篇-文档生成
  13. SpringCloud-番外篇-源码解析

接入说明

  1. 目前采用最新的1.xRelease版本:1.5.15.RELEASE
  2. SpringCloud项目可以在start.spring.io下载,不过更方便的应该是通过idea新建项目,建立Spring Initializr项目
  3. 我一般习惯将具体实现服务用具体的服务名,而公共组件用service-xxx来命名,当然还有一些比较固定名字的公共组件
  4. 基于配置中心,请先按照配置中心章节,配置公共配置
  5. 依赖链路追踪章节

Bus

1. 新建项目

新建artifactId为service-bus的服务

2. 导入依赖

依赖说明

  1. actuator用于暴露监控的接口
  2. eureka是Eureka客户端,注册到注册中心
  3. config是Config客户端,从配置中心拉取配置
  4. sleuth + zipkin是链路追踪客户端,用于监控链路调用
  5. bus-kafka是基于kafka的bus消息总线客户端,用于接收消息总线异步事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

3. 编写启动类

不需要添加任何注解,bus依赖进来后扩展了SpringBoot的endpoint

4. 修改配置

1
2
3
4
5
6
7
8
9
server.port=8901
spring.application.name=service-bus

## private
spring.cloud.bus.trace.enabled=true

#public
spring.cloud.stream.kafka.binder.zk-nodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:9092

bus.trace.enabled可以开启bus操作的链路跟踪,每次客户端处理都是异步接收消息去处理的,所以可以观察到每个客户端接收消息时间与ack的情况
由于bus依赖于异步消息Stream,这里采用kafka,所以需要配置下kafka的zk与brokers的地址(这里可以看下之前kafka搭建的文章)

5. 启动main

6. 验证

访问 http://localhost:8761/ 看到service-bus已经注册了一台
使用curl或者Postman用POST方式访问 http://localhost:8901/bus/refresh 可以通过/trace看到每台机器ack的情况
如果开启了链路追踪,也可以通过链路追踪看到每次bus请求的情况

客户端使用

1. 添加依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

2. 开启注解

无注解,引入依赖既开启

3. 修改配置

1
2
3
#kafka
spring.cloud.stream.kafka.binder.zk-nodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:9092

依赖kafka,与服务端统一kafka

扩展

  1. 通过bus调用的命令不加参数是全局的,可以通过增加destination参数,如/bus/refresh?destination=business-service:**,只会请求business-service的所有服务
  2. 如果访问出现401,则配置需要加上management.security.enabled=false
  3. 很多BUS功能单一,一般主要与配置中心配合,所以往往BUS的功能可以直接集成在配置中心中,方便管理

其他消息总线

除了kafka的支持,还可以通过rabbitMQ

快速接入

略…

------ 本文结束 ------

版权声明

dawell's Notes by Dawell is licensed under a Creative Commons BY-NC-ND 4.0 International License.
Dawell创作并维护的dawell's Notes博客采用创作共用保留署名-非商业-禁止演绎4.0国际许可证
本文首发于dawell's Notes 博客( http://dawell.cc ),版权所有,侵权必究。

坚持原创技术分享,您的支持将鼓励我继续创作!