博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot集成kafka
阅读量:4167 次
发布时间:2019-05-26

本文共 8041 字,大约阅读时间需要 26 分钟。

本文介绍如何在springboot项目中集成kafka收发message。

1、先解决依赖pom.xml

springboot相关的依赖我们就不提了,和kafka相关的只依赖一个spring-kafka集成包

org.springframework.kafka
spring-kafka
1.1.1.RELEASE

 这里我们先把配置文件展示一下

#============== kafka ===================kafka.consumer.zookeeper.connect=10.93.21.21:2181kafka.consumer.servers=10.93.21.21:9092kafka.consumer.enable.auto.commit=truekafka.consumer.session.timeout=6000kafka.consumer.auto.commit.interval=100kafka.consumer.auto.offset.reset=latestkafka.consumer.topic=testkafka.consumer.group.id=testkafka.consumer.concurrency=10kafka.producer.servers=10.93.21.21:9092kafka.producer.retries=0kafka.producer.batch.size=4096kafka.producer.linger=1kafka.producer.buffer.memory=40960

2、Configuration:Kafka producer 

1)通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。

2)通过@Value注入application.properties配置文件中的kafka配置。

3)生成bean,@Bean

package com.kangaroo.sentinel.collect.configuration;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;@Configuration@EnableKafkapublic class KafkaProducerConfig {    @Value("${kafka.producer.servers}")    private String servers;    @Value("${kafka.producer.retries}")    private int retries;    @Value("${kafka.producer.batch.size}")    private int batchSize;    @Value("${kafka.producer.linger}")    private int linger;    @Value("${kafka.producer.buffer.memory}")    private int bufferMemory;    public Map
producerConfigs() { Map
props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory
producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate
kafkaTemplate() { return new KafkaTemplate
(producerFactory()); }}

实验我们的producer,写一个Controller。想topic=test,key=key,发送消息message

package com.kangaroo.sentinel.collect.controller;import com.kangaroo.sentinel.common.response.Response;import com.kangaroo.sentinel.common.response.ResultCode;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.*;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;@RestController@RequestMapping("/kafka")public class CollectController {    protected final Logger logger = LoggerFactory.getLogger(this.getClass());    @Autowired    private KafkaTemplate kafkaTemplate;    @RequestMapping(value = "/send", method = RequestMethod.GET)    public Response sendKafka(HttpServletRequest request, HttpServletResponse response) {        try {            String message = request.getParameter("message");            logger.info("kafka的消息={}", message);            kafkaTemplate.send("test", "key", message);            logger.info("发送kafka成功.");            return new Response(ResultCode.SUCCESS, "发送kafka成功", null);        } catch (Exception e) {            logger.error("发送kafka失败", e);            return new Response(ResultCode.EXCEPTION, "发送kafka失败", null);        }    }}

3、configuration:kafka consumer

1)通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。

2)通过@Value注入application.properties配置文件中的kafka配置。

3)生成bean,@Bean

package com.kangaroo.sentinel.collect.configuration;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;import java.util.Map;@Configuration@EnableKafkapublic class KafkaConsumerConfig {    @Value("${kafka.consumer.servers}")    private String servers;    @Value("${kafka.consumer.enable.auto.commit}")    private boolean enableAutoCommit;    @Value("${kafka.consumer.session.timeout}")    private String sessionTimeout;    @Value("${kafka.consumer.auto.commit.interval}")    private String autoCommitInterval;    @Value("${kafka.consumer.group.id}")    private String groupId;    @Value("${kafka.consumer.auto.offset.reset}")    private String autoOffsetReset;    @Value("${kafka.consumer.concurrency}")    private int concurrency;    @Bean    public KafkaListenerContainerFactory
> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); return factory; } public ConsumerFactory
consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map
consumerConfigs() { Map
propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } @Bean public Listener listener() { return new Listener(); }}

new Listener()生成一个bean用来处理从kafka读取的数据。Listener简单的实现demo如下:只是简单的读取并打印key和message值

@KafkaListener中topics属性用于指定kafka topic名称,topic名称由消息生产者指定,也就是由kafkaTemplate在发送消息时指定。

package com.kangaroo.sentinel.collect.configuration;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;public class Listener {    protected final Logger logger = LoggerFactory.getLogger(this.getClass());    @KafkaListener(topics = {"test"})    public void listen(ConsumerRecord
record) { logger.info("kafka的key: " + record.key()); logger.info("kafka的value: " + record.value().toString()); }}

tips:

1)我没有介绍如何安装配置kafka,配置kafka时最好用完全bind网络ip的方式,而不是localhost或者127.0.0.1

2)最好不要使用kafka自带的zookeeper部署kafka,可能导致访问不通。

3)理论上consumer读取kafka应该是通过zookeeper,但是这里我们用的是kafkaserver的地址,为什么没有深究。

4)定义监听消息配置时,GROUP_ID_CONFIG配置项的值用于指定消费者组的名称,如果同组中存在多个监听器对象则只有一个监听器对象能收到消息。

参考:

转载地址:http://rdexi.baihongyu.com/

你可能感兴趣的文章
Spark作业调度类型
查看>>
hive表修改map分隔符
查看>>
Rpc框架(一)要点介绍
查看>>
Container killed on request. Exit code is 143
查看>>
Hadoop误删文件后恢复
查看>>
Hive Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.
查看>>
eclipse添加hadoop插件连接HDFS
查看>>
flume常见报错记录
查看>>
flume知识点归纳
查看>>
Java知识点归纳
查看>>
idea行号栏太宽的问题
查看>>
java 异常java.lang.UnsupportedOperationException
查看>>
EmptyList和Arrays$ArrayLit使用介绍
查看>>
Java多线程相关(1) 线程
查看>>
Flume源码阅读记录(1)flume介绍
查看>>
Flume源码阅读记录(2)flume启动过程
查看>>
Flume源码阅读记录(3)flume启动组件Application代码分析
查看>>
DecimalFormat用于格式化数据简单使用
查看>>
Memory Channel源码阅读笔记
查看>>
Hive动态分区使用
查看>>