springCould集成kafka
1.下载zookeeper和kafka。
2.解压到服务器相关的目录。
3.单节点配置:
- 在zookeeper的config目录下复制zoo_sample.cfg重命名为zoo.cfg
- 修改zoo.cfg,修改dataDir目录为数据文件目录。这一步可以省略,使用默认目录。
- 启动zookeeper,在zookeeper的bin目录下使用命令:./zkServer.sh start 查看启动状态:./zkServer.sh status 停服务:./zkServer.sh stop
- 在kafka的config目录下修改server.properties。修改listeners、advertised.listeners、zookeeper.connect三个参数
如果是内网则不需要添加advertised.listeners参数,如果内外网隔离的需要外网访问,则advertised.listeners是外网地址和端口。并且需要修改host.name为0.0.0.0
4.多节点配置
- 集群部署需要有两台或者两台以上服务器,也可以使用伪集群部署,即同一台服务器,两个或者两个以上端口。集群部署按照单节点,新增两个或以上节点(文件目录复制多次)
5.代码的生产和消费
- 配置文件配置
- yml
-
kafka:
bootstrap-servers: 47.104.66.255:9092
producer:
buffer-memory: 40960
retries: 0
batch-size: 4096
linger: 1
consumer:
group-id: test
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer - 生产者初始化
-
@Configuration
@EnableKafka
public class KafkaProducerConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.producer.retries}")
private int retries;
@Value("${spring.kafka.producer.batch-size}")
private int batchSize;
@Value("${spring.kafka.producer.linger}")
private int linger;
@Value("${spring.kafka.producer.buffer-memory}")
private int bufferMemory;
@Bean
public KafkaTemplate, String> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
public ProducerFactory, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public Map, Object> producerConfigs() {
Map, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
} - 生产者生产
-
public class KafkaProducerController {
@Autowired
KafkaProducerService kafkaProducerService;
/**
* 发送kafka消息
*
* @param topic 消息的topic
* @param msg 消息内容
*/
@ApiOperation(value = "发送消息到指定topic",httpMethod = "Get")
@GetMapping("/sendMessageToTopic")
public ApiResultsendMessageToTopic(@RequestParam(value = "topic") String topic, @RequestParam(value = "msg") String msg) {
String str = kafkaProducerService.sendMsgToTopic(topic,msg);
return new ApiResult<>(str);
}
} -
public class KafkaProducerServiceImpl implements KafkaProducerService {
@Autowired
private KafkaTemplate, String> kafkaTemplate;
@Override
public String sendMsgToTopic(String topic, String msg) {
if(StringUtils.isEmpty(msg)){
log.error("kafka消息推送异常,topic为:{}",topic);
return "kafka消息推送异常!消息内容为空。";
}
kafkaTemplate.send(topic, msg).addCallback(
success ->{
int partition = success.getRecordMetadata().partition();
long offset = success.getRecordMetadata().offset();
log.info("kafka消息推送成功 topic:" + topic + " partition:" + partition + " offset:" + offset);
},
failure ->{
String message1 = failure.getMessage();
log.error("kafka消息推送异常,topic为:{},异常原因为:{}",topic,message1);
}
);
return "kafka消息推送成功!";
}
} - 消费者接收
-
public class KafkaConsumer {
/**
*
* @params [msg]
* @return void
* @throws
* @author chencong
* @description 监听接收thsoft topic的消息
* @date 2021/6/23 15:28
*/
@KafkaListener(topics = "thsoft")
public void taskStateConsumer(String msg) {
log.info("----kafka推送receive,topic为thsoft:{}----", msg);
}
}