SpringCloud-异步消息

SpringCloud文章系列

  1. SpringCloud
  2. SpringCloud-注册中心
  3. SpringCloud-配置中心
  4. SpringCloud-链路跟踪
  5. SpringCloud-消息总线
  6. SpringCloud-API网关
  7. 【当前文章】SpringCloud-异步消息
  8. SpringCloud-同步调用
  9. SpringCloud-断路降级
  10. SpringCloud-监控管理
  11. SpringCloud-番外篇-临时任务
  12. SpringCloud-番外篇-文档生成
  13. SpringCloud-番外篇-源码解析

接入说明

  1. 目前采用最新的1.xRelease版本:1.5.15.RELEASE
  2. SpringCloud项目可以在start.spring.io下载,不过更方便的应该是通过idea新建项目,建立Spring Initializr项目
  3. 我一般习惯将具体实现服务用具体的服务名,而公共组件用service-xxx来命名,当然还有一些比较固定名字的公共组件
  4. 基于配置中心,请先按照配置中心章节,配置公共配置
  5. 依赖链路追踪章节

Stream-Kafka

0. 搭建kafka

详见中间件kafka章节

1. 新建项目

新建artifactId为account-service的服务

2. 导入依赖

依赖说明

  1. actuator用于暴露监控的接口
  2. eureka是Eureka客户端,注册到注册中心
  3. config是Config客户端,从配置中心拉取配置
  4. sleuth + zipkin是链路追踪客户端,用于监控链路调用
  5. bus-kafka是基于kafka的bus消息总线客户端,用于接收消息总线异步事件
  6. stream + stream-binder-kafka + spring-kafka配置的是kafka的发送/消费端依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

3. 编写启动类

打开AccountServiceApplication,在class上加入@EnableBinding(Source.class)注解

4. 修改配置

1
2
3
4
5
6
7
8
9
server.port=8020
spring.application.name=account-service

private
spring.cloud.stream.bindings.output.destination=accounts

#kafka
spring.cloud.stream.kafka.binder.zk-nodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:9092

stream.bindings.output.destination主要是配置kafka发送端的topic

5. 编写调用代码

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
class MessageController {

@Autowired
private Source source;

@PostMapping(value = "/accounts")
public void write(@RequestBody Account account) {
source.output().send(MessageBuilder.withPayload(account.getMob()).build());
log.info("kafka send!");
}

}

暴露一个/accounts接口,提交POST方法请求,传递包含mob字段的json数据,会将mob字段作为kafka消息体发送
其中Source直接注入即可,Source是Stream自带的接口类,默认Source中的配置关联到”output”中,刚才配置了output的destination也就是topic是accounts

6. 启动main

7. 验证

可以打开一个控制台,通过kafka-console-consumer.sh/bat去监听accounts的topic的kafka
通过暴露/accounts接口post提交信息,通过kafka-console-consumer.sh/bat控台可以看到消息结构
stream会自动封装一个content-type类型,用于标识数据类型

客户端使用

1. 添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

stream + stream-binder-kafka + spring-kafka配置的是kafka的发送/消费端依赖
这个配置与发送端是一样的

2. 开启注解

BusinessServiceApplication添加@EnableBinding(Sink.class)

Sink用于接收kafka,当然如果是多个kafkaTopic这里可以设置多个,接口都是继承了Sink接口,也可以在一个接口中声明多个SubscribableChannel与注解如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface CustomSink {

String INPUT = "input3";

@Input(INPUT)
SubscribableChannel input();

String INPUT0 = "input0";

String INPUT1 = "input1";

String INPUT2 = "input2";

@Input(INPUT0)
SubscribableChannel input0();

@Input(INPUT1)
SubscribableChannel input1();

@Input(INPUT2)
SubscribableChannel input2();

}

3. 修改配置

1
2
spring.cloud.stream.bindings.input.destination=accounts
#spring.kafka.listener.poll-timeout=

这里是接收端,所以是input
headerMode用于配置是stream的格式还是原始数据
content-type用于配置数据解析类型

注意:如果通过kafka的kafka-console-producer.bat模拟发送kafka的时候,消费端会报如下错误
could not convert messages from Kafka异常
原因就是因为发送的内容不是stream发送出来的,stream有其固定的格式,至少要有content-type属性
如果想不用stream格式发送也能解析需要在消费方增加如下配置,进行raw源数据的解析

1
2
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.content-type=text/plain

4. 接收消息代码

1
2
3
4
5
6
7
8
@Component
class AccountConsumer {

@StreamListener(Sink.INPUT)
public void write(String name) {
log.info(name);
}
}

@StreamListener注解用于标识处理消息的方法

疑问

Stream是怎么保存链路追踪的数据的?

如果不引入sleuth + zipkin依赖,保存的消息格式只是带有content-type字段,而如果引入了这2个依赖就会在Stream消息格式里加入额外4个字段

  1. traceId
  2. spanId

消息体会变大,不过带来的好处是消息消费的时候依然可以记录链路信息,这样在链路图上就可以看到所有消息者消费的情况了

支持kafka的消息组及消息分区

在生产端需要增加分区规则与个数的配置

1
2
spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload
spring.cloud.stream.bindings.output.producer.partitionCount=2

partitionKeyExpression可以使用SpEL去解析具体传输数据体中的某个key值去作为分区key
partitionCount指定分区的总个数

在消费端也需要开启分区配置

1
2
3
spring.cloud.stream.bindings.input.comsumer.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0

指定总分区个数与当然消费的分区

而消费端如果要指定消费组只需要指定消费组的名称即可开启

1
spring.cloud.stream.bindings.input.group=comsumer1

不加消费组每个订阅者都收到同样的消息,offset独立维护
消费组能保证所有组内消费同一topic中的消息只有一个组内的服务消费,串性消费,offset维护在公共区域,Stream默认存储在zookeeper中

在kafka中的zookeeper中存储个位置:/consumers/组名/offsets/主题/分区移动的位置

其他注册中心

其他还有RabbitMQ的实现

快速接入

略…

------ 本文结束 ------

版权声明

dawell's Notes by Dawell is licensed under a Creative Commons BY-NC-ND 4.0 International License.
Dawell创作并维护的dawell's Notes博客采用创作共用保留署名-非商业-禁止演绎4.0国际许可证
本文首发于dawell's Notes 博客( http://dawell.cc ),版权所有,侵权必究。

坚持原创技术分享,您的支持将鼓励我继续创作!