springCould集成kafka


1.下载zookeeper和kafka。

2.解压到服务器相关的目录。

3.单节点配置:

  • 在zookeeper的config目录下复制zoo_sample.cfg重命名为zoo.cfg
  • 修改zoo.cfg,修改dataDir目录为数据文件目录。这一步可以省略,使用默认目录。
  • 启动zookeeper,在zookeeper的bin目录下使用命令:./zkServer.sh start  查看启动状态:./zkServer.sh status   停服务:./zkServer.sh stop
  • 在kafka的config目录下修改server.properties。修改listeners、advertised.listeners、zookeeper.connect三个参数


  • 如果是内网则不需要添加advertised.listeners参数,如果内外网隔离的需要外网访问,则advertised.listeners是外网地址和端口。并且需要修改host.name为0.0.0.0

4.多节点配置

  • 集群部署需要有两台或者两台以上服务器,也可以使用伪集群部署,即同一台服务器,两个或者两个以上端口。集群部署按照单节点,新增两个或以上节点(文件目录复制多次)

5.代码的生产和消费

  • 配置文件配置
    • yml 
    • kafka:
      bootstrap-servers: 47.104.66.255:9092
      producer:
      buffer-memory: 40960
      retries: 0
      batch-size: 4096
      linger: 1
      consumer:
      group-id: test
      enable-auto-commit: true
      auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    • 生产者初始化
    • @Configuration
      @EnableKafka
      public class KafkaProducerConfiguration {
      @Value("${spring.kafka.bootstrap-servers}")
      private String servers;
      @Value("${spring.kafka.producer.retries}")
      private int retries;
      @Value("${spring.kafka.producer.batch-size}")
      private int batchSize;
      @Value("${spring.kafka.producer.linger}")
      private int linger;
      @Value("${spring.kafka.producer.buffer-memory}")
      private int bufferMemory;
      @Bean
      public KafkaTemplate, String> kafkaTemplate() {
      return new KafkaTemplate(producerFactory());
      }
      public ProducerFactory, String> producerFactory() {
      return new DefaultKafkaProducerFactory<>(producerConfigs());
      }
      public Map, Object> producerConfigs() {
      Map, Object> props = new HashMap<>();
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
      props.put(ProducerConfig.RETRIES_CONFIG, retries);
      props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
      props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
      props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      return props;
      }
      }
    • 生产者生产
    • public class KafkaProducerController {

      @Autowired
      KafkaProducerService kafkaProducerService;

      /**
      * 发送kafka消息
      *
      * @param topic 消息的topic
      * @param msg 消息内容
      */
      @ApiOperation(value = "发送消息到指定topic",httpMethod = "Get")
      @GetMapping("/sendMessageToTopic")
      public ApiResult sendMessageToTopic(@RequestParam(value = "topic") String topic, @RequestParam(value = "msg") String msg) {
      String str = kafkaProducerService.sendMsgToTopic(topic,msg);
      return new ApiResult<>(str);
      }
      }
    • public class KafkaProducerServiceImpl implements KafkaProducerService {

      @Autowired
      private KafkaTemplate, String> kafkaTemplate;

      @Override
      public String sendMsgToTopic(String topic, String msg) {
      if(StringUtils.isEmpty(msg)){
      log.error("kafka消息推送异常,topic为:{}",topic);
      return "kafka消息推送异常!消息内容为空。";
      }
      kafkaTemplate.send(topic, msg).addCallback(
      success ->{
      int partition = success.getRecordMetadata().partition();
      long offset = success.getRecordMetadata().offset();
      log.info("kafka消息推送成功 topic:" + topic + " partition:" + partition + " offset:" + offset);
      },
      failure ->{
      String message1 = failure.getMessage();
      log.error("kafka消息推送异常,topic为:{},异常原因为:{}",topic,message1);
      }
      );
      return "kafka消息推送成功!";
      }
      }
    • 消费者接收
    • public class KafkaConsumer {
      /**
      *
      * @params [msg]
      * @return void
      * @throws
      * @author chencong
      * @description 监听接收thsoft topic的消息
      * @date 2021/6/23 15:28
      */
      @KafkaListener(topics = "thsoft")
      public void taskStateConsumer(String msg) {
      log.info("----kafka推送receive,topic为thsoft:{}----", msg);
      }
      }