kafka在springboot环境下多线程请求和多线程消费
# kafka在springboot环境下多线程请求和多线程消费
### kafka在springboot环境下多线程请求和多线程消费
#### 1.需求描述:
接到一个需求,A模块将某些渠道获取的数据发送到kafka,B模块从kafka消费数据,设置的主题是r2p5,即设置了5个分区,为了消费速度最大化,代码中设置了五个线程
开发完生产者的代码如下:
```
//pom.xml引入kafka配置包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<!--此处的版本号依赖父版本,省略-->
</dependency>
//java代码
@RestController
public class TestController {
@Autowired
private KafkaTemplate kafkaTemplate;
@RequestMapping("/test")
public void test(){
kafkaTemplate.send("topicName","msg");
}
}
```
消费者代码如下(配置类是百度的,因为代码在内网,单敲麻烦):
1)配置类
```
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
final static String list ="10.28.18.103:6667";
/**
* Description:获取配置
* Date: 2017年7月11日
* @author shaqf
*/
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = Maps.newHashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, list);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
System.out.println("KafkaConsumer consumerConfigs "+ JSON.toJSONString(props));
return props;
}
/** 获取工厂 */
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs());
}
/** 获取实例 */
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory1 = new ConcurrentKafkaListenerContainerFactory();
factory1.setConsumerFactory(consumerFactory());
factory1.setConcurrency(5);
factory1.getContainerProperties().setPollTimeout(3000);
System.out.println("KafkaConsumer kafkaListenerContainerFactory factory"+ JSON.toJSONString(factory1));
return factory1;
}
}
```
2) 消费者java代码
```
@Controller
public class ConsumerController {
@KafkaListener(containerFactory="consumerFactory",id="#{'${spring.kafka.consumer.group-id}'}",topics = "#{'${spring.kafka.topic}'}")
public void batchListener(List<ConsumerRecord<?,?>> records, Acknowledgment ack){
//获取数据逻辑处理
}
}
```
#### 2.问题描述
用postman单个线程启用发送批量数据的情况下数据正常,但是使用jmeter启用五个线程发送批量数据的情况下会出现消费数据丢失的问题,经过验证排除了生产者丢数据的可能,而zookeeper是很久以前部署的公司测试环境服务器,理论上不会出问题,经过查证,kafka的消费者本身是线程不安全的,需要对消费者做下处理,个人在网上copy多线程代码时出现了一些问题,记录下:
参考的博主地址[(2条消息) springboot集成kafka多线程定时消费_weixin_40510917的博客-CSDN博客_kafka定时消费](https://blog.csdn.net/weixin_40510917/article/details/119703827)
第二版的消费者处理类:
```java
package com.example.demo_kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
public class ConsumerHandler {
//kafka消费对象
private KafkaConsumer<Object, Object> consumer;
//线程池对象
private ExecutorService executors;
//kafka属性配置()
public static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "172.16.1.240:9092");
props.put("group.id", "test01");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//下面两个参数是我新加的,不加的情况下会报错,报错见第三小节
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,"60000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"60000");
return props;
}
//初始化kafka连接
@PostConstruct
public void initKafkaConfig() {
Properties properties = initConfig();
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("test001"));
}
/**
* 多线程消费kafka数据
* @param workerNum
*/
public void execute(int workerNum) {
executors = new ThreadPoolExecutor(3, 5, 5000L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
if(consumer!=null){
ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofMillis(100));
if (!consumerRecords.isEmpty()) {
for (final ConsumerRecord record : consumerRecords) {
executors.submit(new Worker(record));
commitOffset();
}
}
}else{
Properties props =initConfig();
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("test001"));
}
}
}
private void commitOffset(){
try{
consumer.commitAsync();
}catch(Exception E){
consumer.commitSync();
}
}
}
```
```
package com.netintech.kafka.impl;
import com.alibaba.fastjson.JSONObject;
import com.netintech.kafka.bean.Test;
import com.netintech.kafka.service.TestService;
import com.netintech.kafka.task.SendVehicleInfo;
import com.netintech.kafka.utils.SpringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.annotation.Transactional;
/**
* 多线程kafka消费类
*/
public class OneWork implements Runnable {
//日志类
private static final Logger LOG = LoggerFactory.getLogger(OneWork.class);
private ConsumerRecord<String, String> consumerRecord;
public OneWork(ConsumerRecord record) {
this.consumerRecord = record;
}
@Override
public void run() {
try{
//执行自己的逻辑
//todo
}catch (Exception e){
LOG.info("异常错误信息:"+e.getMessage());
}
}
}
```
实际调用使用的是定时器
```
@Controller
@EnableScheduling
public class ConController{
@Autowired
private ConsumerHandler consumers;
@Scheduled(corn="${work.start:0/1 * * * * ?}")
public void listen(){
consumer.execute(5);//这个5参数在实际中我并没有使用,而是在代码中写死,如果需要可以配置。
}
}
```
#### 3.报错(没加两个参数之前)
```
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with
a retriable exception.You should retry commiting the lastest consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException:null
```
#### 4.总结
加上参数之后,从日志来看没有报错,数据也没有丢失,但是真正入库是否可行还有待验证,此外,报错的原因以及为什么加上两个参数就解决问题,我其实并不理解,有待考证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
编辑 (opens new window)
上次更新: 2024/01/26, 05:03:22
- 01
- python使用生成器读取大文件-500g09-24
- 02
- Windows环境下 Docker Desktop 安装 Nginx04-10
- 03
- 使用nginx部署多个前端项目(三种方式)04-10