Chao's Blog Chao's Blog
首页
  • vue

    • vue路由
  • 学习笔记

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • java
  • spring
  • springboot
  • springcloud
  • git
  • maven
  • nginx
  • tomcat
  • springmvc
  • jvm
  • 图数据库
  • mysql数据库
  • redis数据库
  • windows下docker安装nginx并挂载目录
  • linux命令
  • linux安装软件
  • linux脚本
  • idea
  • vscode
  • 归档
  • 综合项目

    • 若依项目
    • mall项目
  • java
  • mybatis
  • xxl-job
  • mybatis
GitHub (opens new window)

~chao

永远菜鸟,不断努力~
首页
  • vue

    • vue路由
  • 学习笔记

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • java
  • spring
  • springboot
  • springcloud
  • git
  • maven
  • nginx
  • tomcat
  • springmvc
  • jvm
  • 图数据库
  • mysql数据库
  • redis数据库
  • windows下docker安装nginx并挂载目录
  • linux命令
  • linux安装软件
  • linux脚本
  • idea
  • vscode
  • 归档
  • 综合项目

    • 若依项目
    • mall项目
  • java
  • mybatis
  • xxl-job
  • mybatis
GitHub (opens new window)
  • XXL-JOB的使用(详细教程)
  • Kafka【入门】就这一篇!
  • kafka在springboot环境下多线程请求和多线程消费
    • 组件技术
    ~chao
    2023-02-24
    目录

    kafka在springboot环境下多线程请求和多线程消费

    # kafka在springboot环境下多线程请求和多线程消费

    ### kafka在springboot环境下多线程请求和多线程消费
    
    #### 1.需求描述:
    
    接到一个需求,A模块将某些渠道获取的数据发送到kafka,B模块从kafka消费数据,设置的主题是r2p5,即设置了5个分区,为了消费速度最大化,代码中设置了五个线程
    
    开发完生产者的代码如下:
    
    ```
    //pom.xml引入kafka配置包
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
       <!--此处的版本号依赖父版本,省略-->
    </dependency>
    
    //java代码
    @RestController
    public class TestController {
        @Autowired
        private KafkaTemplate kafkaTemplate;
        @RequestMapping("/test")
        public void test(){
            kafkaTemplate.send("topicName","msg");
        }
    }
    
    ```
    
    消费者代码如下(配置类是百度的,因为代码在内网,单敲麻烦):
    
    1)配置类
    
    ```
    
    
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
     
        final static String list ="10.28.18.103:6667";
     
        /**
         * Description:获取配置
         * Date:        2017年7月11日
         * @author      shaqf
         */
        private Map<String, Object> consumerConfigs() {
            Map<String, Object> props = Maps.newHashMap();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, list);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            System.out.println("KafkaConsumer consumerConfigs "+ JSON.toJSONString(props));
            return props;
        }
        /** 获取工厂 */
        private ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory(consumerConfigs());
        }
        /** 获取实例 */
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory1 = new ConcurrentKafkaListenerContainerFactory();
            factory1.setConsumerFactory(consumerFactory());
            factory1.setConcurrency(5);
            factory1.getContainerProperties().setPollTimeout(3000);
            System.out.println("KafkaConsumer kafkaListenerContainerFactory factory"+ JSON.toJSONString(factory1));
            return factory1;
        }
     
       
    }
    
    ```
    
    2) 消费者java代码
    
    ```
    @Controller
    public class ConsumerController {
        
        @KafkaListener(containerFactory="consumerFactory",id="#{'${spring.kafka.consumer.group-id}'}",topics = "#{'${spring.kafka.topic}'}")
        public void batchListener(List<ConsumerRecord<?,?>> records, Acknowledgment ack){
           //获取数据逻辑处理
        }
    }
    
    ```
    
    #### 2.问题描述
    
    用postman单个线程启用发送批量数据的情况下数据正常,但是使用jmeter启用五个线程发送批量数据的情况下会出现消费数据丢失的问题,经过验证排除了生产者丢数据的可能,而zookeeper是很久以前部署的公司测试环境服务器,理论上不会出问题,经过查证,kafka的消费者本身是线程不安全的,需要对消费者做下处理,个人在网上copy多线程代码时出现了一些问题,记录下:
    
    参考的博主地址[(2条消息) springboot集成kafka多线程定时消费_weixin_40510917的博客-CSDN博客_kafka定时消费](https://blog.csdn.net/weixin_40510917/article/details/119703827) 
    
    第二版的消费者处理类:
    
    ```java
    package com.example.demo_kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    @Component
    public class ConsumerHandler {
        //kafka消费对象
        private KafkaConsumer<Object, Object> consumer;
        //线程池对象
        private ExecutorService executors;
        //kafka属性配置()
        public static Properties initConfig() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "172.16.1.240:9092");
            props.put("group.id", "test01");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            //下面两个参数是我新加的,不加的情况下会报错,报错见第三小节
            props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,"60000");
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"60000");
            return props;
        }
        //初始化kafka连接
        @PostConstruct
        public void initKafkaConfig() {
            Properties properties = initConfig();
            consumer = new KafkaConsumer<>(properties);
            consumer.subscribe(Collections.singleton("test001"));
        }
    
        /**
         * 多线程消费kafka数据
         * @param workerNum
         */
        public void execute(int workerNum) {
            executors = new ThreadPoolExecutor(3, 5, 5000L, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
            while (true) {
                if(consumer!=null){
                     ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofMillis(100));
                if (!consumerRecords.isEmpty()) {
                    for (final ConsumerRecord record : consumerRecords) {
                        executors.submit(new Worker(record));
                        commitOffset();
                    }
                }
                }else{
                    Properties props =initConfig();
                    consumer = new KafkaConsumer<>(properties);
                    consumer.subscribe(Collections.singleton("test001"));
                }
               
            }
    
        }
        private void commitOffset(){
            try{
                consumer.commitAsync();
                
            }catch(Exception E){
                consumer.commitSync();
            }
        }
    }
    
    
    ```
    
    ```
    package com.netintech.kafka.impl;
    import com.alibaba.fastjson.JSONObject;
    import com.netintech.kafka.bean.Test;
    import com.netintech.kafka.service.TestService;
    import com.netintech.kafka.task.SendVehicleInfo;
    import com.netintech.kafka.utils.SpringUtils;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.transaction.annotation.Transactional;
     
    /**
     * 多线程kafka消费类
     */
    public class OneWork implements Runnable {
     
          //日志类
          private static final Logger LOG = LoggerFactory.getLogger(OneWork.class);
     
          private ConsumerRecord<String, String> consumerRecord;
     
          public OneWork(ConsumerRecord record) {
             this.consumerRecord = record;
          }
     
          @Override
          public void run() {
                try{
                      //执行自己的逻辑
                      //todo
                }catch (Exception e){
                      LOG.info("异常错误信息:"+e.getMessage());
                }
          }
    }
    ```
    实际调用使用的是定时器
    
    ```
    @Controller
    @EnableScheduling
    public class ConController{
        @Autowired
        private ConsumerHandler consumers;
        @Scheduled(corn="${work.start:0/1 * * * * ?}")
        public void listen(){
            consumer.execute(5);//这个5参数在实际中我并没有使用,而是在代码中写死,如果需要可以配置。
        }
    }
    ```
    
    #### 3.报错(没加两个参数之前)
    
    ```
    org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with
    a retriable exception.You should retry commiting the lastest consumed offsets.
    Caused by: org.apache.kafka.common.errors.DisconnectException:null
    ```
    
    #### 4.总结
    
    加上参数之后,从日志来看没有报错,数据也没有丢失,但是真正入库是否可行还有待验证,此外,报错的原因以及为什么加上两个参数就解决问题,我其实并不理解,有待考证
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    编辑 (opens new window)
    上次更新: 2024/01/26, 05:03:22
    Kafka【入门】就这一篇!

    ← Kafka【入门】就这一篇!

    最近更新
    01
    python使用生成器读取大文件-500g
    09-24
    02
    Windows环境下 Docker Desktop 安装 Nginx
    04-10
    03
    使用nginx部署多个前端项目(三种方式)
    04-10
    更多文章>
    Theme by Vdoing | Copyright © 2022-2024 chaos | MIT License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式