SpringCloud文章系列
- SpringCloud
- SpringCloud-注册中心
- SpringCloud-配置中心
- SpringCloud-链路跟踪
- SpringCloud-消息总线
- SpringCloud-API网关
- 【当前文章】SpringCloud-异步消息
- SpringCloud-同步调用
- SpringCloud-断路降级
- SpringCloud-监控管理
- SpringCloud-番外篇-临时任务
- SpringCloud-番外篇-文档生成
- SpringCloud-番外篇-源码解析
接入说明
- 目前采用最新的1.xRelease版本:1.5.15.RELEASE
- SpringCloud项目可以在start.spring.io下载,不过更方便的应该是通过idea新建项目,建立Spring Initializr项目
- 我一般习惯将具体实现服务用具体的服务名,而公共组件用service-xxx来命名,当然还有一些比较固定名字的公共组件
- 基于配置中心,请先按照配置中心章节,配置公共配置
- 依赖链路追踪章节
Stream-Kafka
0. 搭建kafka
详见中间件kafka章节
1. 新建项目
新建artifactId为account-service的服务
2. 导入依赖
依赖说明
- actuator用于暴露监控的接口
- eureka是Eureka客户端,注册到注册中心
- config是Config客户端,从配置中心拉取配置
- sleuth + zipkin是链路追踪客户端,用于监控链路调用
- bus-kafka是基于kafka的bus消息总线客户端,用于接收消息总线异步事件
- stream + stream-binder-kafka + spring-kafka配置的是kafka的发送/消费端依赖
|
|
3. 编写启动类
打开AccountServiceApplication,在class上加入@EnableBinding(Source.class)注解
4. 修改配置
|
|
stream.bindings.output.destination主要是配置kafka发送端的topic
5. 编写调用代码
|
|
暴露一个/accounts接口,提交POST方法请求,传递包含mob字段的json数据,会将mob字段作为kafka消息体发送
其中Source直接注入即可,Source是Stream自带的接口类,默认Source中的配置关联到”output”中,刚才配置了output的destination也就是topic是accounts
6. 启动main
7. 验证
可以打开一个控制台,通过kafka-console-consumer.sh/bat去监听accounts的topic的kafka
通过暴露/accounts接口post提交信息,通过kafka-console-consumer.sh/bat控台可以看到消息结构
stream会自动封装一个content-type类型,用于标识数据类型
客户端使用
1. 添加依赖
|
|
stream + stream-binder-kafka + spring-kafka配置的是kafka的发送/消费端依赖
这个配置与发送端是一样的
2. 开启注解
BusinessServiceApplication添加@EnableBinding(Sink.class)
Sink用于接收kafka,当然如果是多个kafkaTopic这里可以设置多个,接口都是继承了Sink接口
3. 修改配置
|
|
这里是接收端,所以是input
headerMode用于配置是stream的格式还是原始数据
content-type用于配置数据解析类型
4. 接收消息代码
|
|
@StreamListener注解用于标识处理消息的方法
疑问
Stream是怎么保存链路追踪的数据的?
如果不引入sleuth + zipkin依赖,保存的消息格式只是带有content-type字段,而如果引入了这2个依赖就会在Stream消息格式里加入额外4个字段
- traceId
- spanId
消息体会变大,不过带来的好处是消息消费的时候依然可以记录链路信息,这样在链路图上就可以看到所有消息者消费的情况了
其他注册中心
其他还有RabbitMQ的实现
快速接入
略…