SpringCloud文章系列
- SpringCloud
- SpringCloud-注册中心
- SpringCloud-配置中心
- SpringCloud-链路跟踪
- SpringCloud-消息总线
- SpringCloud-API网关
- 【当前文章】SpringCloud-异步消息
- SpringCloud-同步调用
- SpringCloud-断路降级
- SpringCloud-监控管理
- SpringCloud-番外篇-临时任务
- SpringCloud-番外篇-文档生成
- SpringCloud-番外篇-源码解析
接入说明
- 目前采用最新的1.xRelease版本:1.5.15.RELEASE
- SpringCloud项目可以在start.spring.io下载,不过更方便的应该是通过idea新建项目,建立Spring Initializr项目
- 我一般习惯将具体实现服务用具体的服务名,而公共组件用service-xxx来命名,当然还有一些比较固定名字的公共组件
- 基于配置中心,请先按照配置中心章节,配置公共配置
- 依赖链路追踪章节
Stream-Kafka
0. 搭建kafka
详见中间件kafka章节
1. 新建项目
新建artifactId为account-service的服务
2. 导入依赖
依赖说明
- actuator用于暴露监控的接口
- eureka是Eureka客户端,注册到注册中心
- config是Config客户端,从配置中心拉取配置
- sleuth + zipkin是链路追踪客户端,用于监控链路调用
- bus-kafka是基于kafka的bus消息总线客户端,用于接收消息总线异步事件
- stream + stream-binder-kafka + spring-kafka配置的是kafka的发送/消费端依赖
1 | <dependency> |
3. 编写启动类
打开AccountServiceApplication,在class上加入@EnableBinding(Source.class)注解
4. 修改配置
1 | server.port=8020 |
stream.bindings.output.destination主要是配置kafka发送端的topic
5. 编写调用代码
1 |
|
暴露一个/accounts接口,提交POST方法请求,传递包含mob字段的json数据,会将mob字段作为kafka消息体发送
其中Source直接注入即可,Source是Stream自带的接口类,默认Source中的配置关联到”output”中,刚才配置了output的destination也就是topic是accounts
6. 启动main
7. 验证
可以打开一个控制台,通过kafka-console-consumer.sh/bat去监听accounts的topic的kafka
通过暴露/accounts接口post提交信息,通过kafka-console-consumer.sh/bat控台可以看到消息结构
stream会自动封装一个content-type类型,用于标识数据类型
客户端使用
1. 添加依赖
1 | <dependency> |
stream + stream-binder-kafka + spring-kafka配置的是kafka的发送/消费端依赖
这个配置与发送端是一样的
2. 开启注解
BusinessServiceApplication添加@EnableBinding(Sink.class)
Sink用于接收kafka,当然如果是多个kafkaTopic这里可以设置多个,接口都是继承了Sink接口,也可以在一个接口中声明多个SubscribableChannel与注解如:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public interface CustomSink {
String INPUT = "input3";
(INPUT)
SubscribableChannel input();
String INPUT0 = "input0";
String INPUT1 = "input1";
String INPUT2 = "input2";
(INPUT0)
SubscribableChannel input0();
(INPUT1)
SubscribableChannel input1();
(INPUT2)
SubscribableChannel input2();
}
3. 修改配置
1 | spring.cloud.stream.bindings.input.destination=accounts |
这里是接收端,所以是input
headerMode用于配置是stream的格式还是原始数据
content-type用于配置数据解析类型
注意:如果通过kafka的kafka-console-producer.bat模拟发送kafka的时候,消费端会报如下错误
could not convert messages from Kafka异常
原因就是因为发送的内容不是stream发送出来的,stream有其固定的格式,至少要有content-type属性
如果想不用stream格式发送也能解析需要在消费方增加如下配置,进行raw源数据的解析1
2spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.content-type=text/plain
4. 接收消息代码
1 |
|
@StreamListener注解用于标识处理消息的方法
疑问
Stream是怎么保存链路追踪的数据的?
如果不引入sleuth + zipkin依赖,保存的消息格式只是带有content-type字段,而如果引入了这2个依赖就会在Stream消息格式里加入额外4个字段
- traceId
- spanId
消息体会变大,不过带来的好处是消息消费的时候依然可以记录链路信息,这样在链路图上就可以看到所有消息者消费的情况了
支持kafka的消息组及消息分区
在生产端需要增加分区规则与个数的配置1
2spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload
spring.cloud.stream.bindings.output.producer.partitionCount=2
partitionKeyExpression可以使用SpEL去解析具体传输数据体中的某个key值去作为分区key
partitionCount指定分区的总个数
在消费端也需要开启分区配置1
2
3spring.cloud.stream.bindings.input.comsumer.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0
指定总分区个数与当然消费的分区
而消费端如果要指定消费组只需要指定消费组的名称即可开启1
spring.cloud.stream.bindings.input.group=comsumer1
不加消费组每个订阅者都收到同样的消息,offset独立维护
消费组能保证所有组内消费同一topic中的消息只有一个组内的服务消费,串性消费,offset维护在公共区域,Stream默认存储在zookeeper中
在kafka中的zookeeper中存储个位置:/consumers/组名/offsets/主题/分区移动的位置
其他注册中心
其他还有RabbitMQ的实现
快速接入
略…