kafka生產(chǎn)實踐(詳解)
1.引言
最近接觸到一個APP流量分析的項目,類似于友盟。涉及到幾個C端(客戶端)高并發(fā)的接口,這幾個接口主要用于C端數(shù)據(jù)的提交。在沒有任何緩沖的情況下,一個接口涉及到5張表的提交。壓測的結(jié)果很不理想,主要瓶頸就在與RDS的交互。
一臺雙核,16G機子,單實例,jdbc最大連接數(shù)100,吞吐量竟然只有50TPS。
能想到的改造方案就是引入一層緩沖,讓C端接口不與RDS直接交互,很自然就想到了rabbitmq,但是rabbitmq對分布式的支持比較一般,我們的數(shù)據(jù)體量也比較大,所以我們借鑒了友盟,引入了kafka,Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),起初在不做任何kafka優(yōu)化的時候,簡單地將C端提交的數(shù)據(jù)直接send到單節(jié)點kafka,就這樣,我們的吞吐量達到了100TPS.還是有點小驚喜的。
最近一段時間研究了一下kafka,對一些參數(shù)進行調(diào)整,目前接口的吞吐量已經(jīng)達到220TPS,寫這篇文章主要想記錄一下自己優(yōu)化和部署經(jīng)歷。
2.kafka簡介

kafka的結(jié)構(gòu)圖
這張圖很好的詮釋了kafka的結(jié)構(gòu),但是遺漏了一點,就是group的概念,我這里補充一下,一個組可以包含多個consumer對多個topic進行消費,但是不同組的消費都是獨立的。
也就是說同一個topic的同一條消息可以被不同組的consumer消費。
我這里的主要的優(yōu)化途徑就是將kafka集群化,多partition化,使其并發(fā)度更高。
集群化都很好理解,那什么是多partition?
partition是topic的一個概念,即對topic進行分組,不同partition之間的消費相互獨立,并且有序。并且一個partiton只能被一個消費者消費,所以咯,假如topic只有一個partition的話,那么消費者實例不能大于一個,那實例再多也沒用,受限于kafka的partition。
上面都是講消費,其實send操作也是一樣的,要保證有序必然要等上一個發(fā)送ack之后,下一個發(fā)送才能進行,如果只有一個partition,那send之后的ack的等待時間必然會阻塞下面一次send,設(shè)計多個partition之后,可以同時往多個partition發(fā)送消息,自然吞吐量也就上去。
3.kafka集群的搭建以及參數(shù)配置
集群搭建
準備兩臺機子,然后去官網(wǎng)(http://kafka.apache.org/downloads)下載一個包。通過scp到服務(wù)器上,解壓進入config目錄,編輯server.config.
第一臺機子配置(172.18.240.36):
broker.id=0 每臺服務(wù)器的broker.id都不能相同 #hostname host.name=172.18.240.36 #在log.retention.hours=168 下面新增下面三項 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #設(shè)置zookeeper的連接端口 zookeeper.connect=172.18.240.36:4001 #默認partition數(shù) num.partitions=2
第二臺機子配置(172.18.240.62):
broker.id=1 每臺服務(wù)器的broker.id都不能相同
#hostname host.name=172.18.240.62 #在log.retention.hours=168 下面新增下面三項 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #設(shè)置zookeeper的連接端口 zookeeper.connect=172.18.240.36:4001 #默認partition數(shù) num.partitions=2
新增或者修改成以上配置。
對了,在此之前請先安裝zookeeper,如果你用的是zookeeper集群的話,zookeeper.connect可以填寫多個,中間用逗號隔開。
然后啟動
nohup ./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &
測試一下:
在第一臺機子上開啟一個producer
./kafka-console-producer.sh --broker-list 172.18.240.36:9092 --topic test-test
在第二臺機子上開啟一個consumer
./kafka-console-consumer.sh --bootstrap-server 172.18.240.62:9092 --topic test-test --from-beginning
第一臺機子發(fā)送一條消息

第二臺機子立馬收到消息

這樣kafka的集群部署就完成了。就下來我們來看看,java的客戶端代碼如何編寫。
4.kafka客戶端代碼示例
我這里的工程是建立在spring boot 之下的,僅供參考。
在 application.yml下添加如下配置:
kafka: consumer: default: server: 172.18.240.36:9092,172.18.240.62:9092 enableAutoCommit: false autoCommitIntervalMs: 100 sessionTimeoutMs: 15000 groupId: data_analysis_group autoOffsetReset: latest producer: default: server: 172.18.240.36:9092,172.18.240.62:9092 retries: 0 batchSize: 4096 lingerMs: 1 bufferMemory: 40960
添加兩個配置類
package com.dtdream.analysis.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import java.util.HashMap;
import java.util.Map;
@ConfigurationProperties(
prefix = "kafka.consumer.default"
)
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);
private static String autoCommitIntervalMs;
private static String sessionTimeoutMs;
private static Class keyDeserializerClass = StringDeserializer.class;
private static Class valueDeserializerClass = StringDeserializer.class;
private static String groupId = "test-group";
private static String autoOffsetReset = "latest";
private static String server;
private static boolean enableAutoCommit;
public static String getServer() {
return server;
}
public static void setServer(String server) {
KafkaConsumerConfig.server = server;
}
public static boolean isEnableAutoCommit() {
return enableAutoCommit;
}
public static void setEnableAutoCommit(boolean enableAutoCommit) {
KafkaConsumerConfig.enableAutoCommit = enableAutoCommit;
}
public static String getAutoCommitIntervalMs() {
return autoCommitIntervalMs;
}
public static void setAutoCommitIntervalMs(String autoCommitIntervalMs) {
KafkaConsumerConfig.autoCommitIntervalMs = autoCommitIntervalMs;
}
public static String getSessionTimeoutMs() {
return sessionTimeoutMs;
}
public static void setSessionTimeoutMs(String sessionTimeoutMs) {
KafkaConsumerConfig.sessionTimeoutMs = sessionTimeoutMs;
}
public static Class getKeyDeserializerClass() {
return keyDeserializerClass;
}
public static void setKeyDeserializerClass(Class keyDeserializerClass) {
KafkaConsumerConfig.keyDeserializerClass = keyDeserializerClass;
}
public static Class getValueDeserializerClass() {
return valueDeserializerClass;
}
public static void setValueDeserializerClass(Class valueDeserializerClass) {
KafkaConsumerConfig.valueDeserializerClass = valueDeserializerClass;
}
public static String getGroupId() {
return groupId;
}
public static void setGroupId(String groupId) {
KafkaConsumerConfig.groupId = groupId;
}
public static String getAutoOffsetReset() {
return autoOffsetReset;
}
public static void setAutoOffsetReset(String autoOffsetReset) {
KafkaConsumerConfig.autoOffsetReset = autoOffsetReset;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(3000);
factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {
@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {
log.debug("partition is {},key is {},topic is {}",
consumerRecord.partition(), consumerRecord.key(), consumerRecord.topic());
return false;
}
});
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
/* @Bean
public Listener listener() {
return new Listener();
}*/
}
package com.dtdream.analysis.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* Created with IntelliJ IDEA.
* User: chenqimiao
* Date: 2017/7/24
* Time: 9:43
* To change this template use File | Settings | File Templates.
*/
@ConfigurationProperties(
prefix = "kafka.producer.default",
ignoreInvalidFields = true
)//注入一些屬性域
@EnableKafka
@Configuration//使得@Bean注解生效
public class KafkaProducerConfig {
private static String server;
private static Integer retries;
private static Integer batchSize;
private static Integer lingerMs;
private static Integer bufferMemory;
private static Class keySerializerClass = StringSerializer.class;
private static Class valueSerializerClass = StringSerializer.class;
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
return props;
}
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public static String getServer() {
return server;
}
public static void setServer(String server) {
KafkaProducerConfig.server = server;
}
public static Integer getRetries() {
return retries;
}
public static void setRetries(Integer retries) {
KafkaProducerConfig.retries = retries;
}
public static Integer getBatchSize() {
return batchSize;
}
public static void setBatchSize(Integer batchSize) {
KafkaProducerConfig.batchSize = batchSize;
}
public static Integer getLingerMs() {
return lingerMs;
}
public static void setLingerMs(Integer lingerMs) {
KafkaProducerConfig.lingerMs = lingerMs;
}
public static Integer getBufferMemory() {
return bufferMemory;
}
public static void setBufferMemory(Integer bufferMemory) {
KafkaProducerConfig.bufferMemory = bufferMemory;
}
public static Class getKeySerializerClass() {
return keySerializerClass;
}
public static void setKeySerializerClass(Class keySerializerClass) {
KafkaProducerConfig.keySerializerClass = keySerializerClass;
}
public static Class getValueSerializerClass() {
return valueSerializerClass;
}
public static void setValueSerializerClass(Class valueSerializerClass) {
KafkaProducerConfig.valueSerializerClass = valueSerializerClass;
}
@Bean(name = "kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
利用kafkaTemplate即可完成發(fā)送。
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping(
value = "/openApp",
method = RequestMethod.POST,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE,
consumes = MediaType.APPLICATION_JSON_UTF8_VALUE
)
@ResponseBody
public ResultDTO openApp(@RequestBody ActiveLogPushBo activeLogPushBo, HttpServletRequest request) {
logger.info("openApp: activeLogPushBo {}, dateTime {}", JSONObject.toJSONString(activeLogPushBo),new DateTime().toString("yyyy-MM-dd HH:mm:ss.SSS"));
String ip = (String) request.getAttribute("ip");
activeLogPushBo.setIp(ip);
activeLogPushBo.setDate(new Date());
//ResultDTO resultDTO = dataCollectionService.collectOpenInfo(activeLogPushBo);
kafkaTemplate.send("data_collection_open",JSONObject.toJSONString(activeLogPushBo));
// logger.info("openApp: resultDTO {} ,dateTime {}", resultDTO.toJSONString(),new DateTime().toString("yyyy-MM-dd HH:mm:ss.SSS"));
return new ResultDTO().success();
}
kafkaTemplate的send方法會更根據(jù)你指定的key進行hash,再對partition數(shù)進行去模,最后決定發(fā)送到那一個分區(qū),假如沒有指定key,那send方法對分區(qū)的選擇是隨機。具體怎么隨機的話,這里就不展開講了,有興趣的同學可以自己看源碼,我們可以交流交流。
接著配置一個監(jiān)聽器
package com.dtdream.analysis.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.Optional;
@Component
public class Listener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@KafkaListener(topics = {"test-topic"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
logger.info("message is {} ", message);
}
}
}
@KafkaListener其實可以具體指定消費哪個分區(qū),如果不指定的話,并且只有一個消費者實例,那么這個實例會消費所有的分區(qū)的消息。
消費者的數(shù)量是一定要少于partition的數(shù)量的,不然沒有任何意義。會出現(xiàn)消費者過剩的情況。
消費者數(shù)量和partition數(shù)量的多與少,會動態(tài)影響消費節(jié)點所消費的partition數(shù)目,最終會在整個集群中達到一種動態(tài)平衡。
5.總結(jié)
理論上只要cpu核心數(shù)無限,那么partition數(shù)也可以無上限,與此同時消費者節(jié)點和生產(chǎn)者節(jié)點也可以無上限,最終會使單個topic的并發(fā)無上限。單機的cpu的核心數(shù)總是會達到一個上限,kafka作為分布式系統(tǒng),可以很好利用集群的運算能力,進行動態(tài)擴展,在DT時代,應(yīng)該會慢慢成為主流吧。
以上這篇kafka生產(chǎn)實踐(詳解)就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Spring?Security實現(xiàn)統(tǒng)一登錄與權(quán)限控制的示例代碼
這篇文章主要介紹了Spring?Security實現(xiàn)統(tǒng)一登錄與權(quán)限控制,本文通過示例代碼重點看一下統(tǒng)一認證中心和業(yè)務(wù)網(wǎng)關(guān)的建設(shè),需要的朋友可以參考下2022-03-03
心動嗎?正大光明的免費使用IntelliJ IDEA商業(yè)版
這篇文章主要介紹了正大光明的免費使用IntelliJ IDEA商業(yè)版,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2020-02-02
Springcloud seata nacos環(huán)境搭建過程圖解
這篇文章主要介紹了Springcloud seata nacos環(huán)境搭建過程圖解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-03-03
教你快速搭建sona服務(wù)及idea使用sona的方法
Sonar 是一個用于代碼質(zhì)量管理的開放平臺。通過插件機制,Sonar 可以集成不同的測試工具,代碼分析工具,以及持續(xù)集成工具,本文給大家分享搭建sona服務(wù)及idea使用sona的方法,感興趣的朋友一起看看吧2021-06-06
java高并發(fā)ScheduledThreadPoolExecutor與Timer區(qū)別
這篇文章主要為大家介紹了java高并發(fā)ScheduledThreadPoolExecutor與Timer區(qū)別,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-10-10
SpringBoot整合RabbitMQ處理死信隊列和延遲隊列
這篇文章將通過示例為大家詳細介紹SpringBoot整合RabbitMQ時如何處理死信隊列和延遲隊列,文中的示例代碼講解詳細,需要的可以參考一下2022-05-05
SpringBoot實現(xiàn)列表數(shù)據(jù)導出為Excel文件
這篇文章主要為大家詳細介紹了在Spring?Boot框架中如何將列表數(shù)據(jù)導出為Excel文件,文中的示例代碼講解詳細,感興趣的小伙伴可以了解下2024-02-02
SpringBoot3結(jié)合Vue3實現(xiàn)用戶登錄功能
最近項目需求搭建一個結(jié)合Vue.js前端框架和Spring Boot后端框架的登錄系統(tǒng),本文主要介紹了SpringBoot3結(jié)合Vue3實現(xiàn)用戶登錄功能,具有一定的參考價值,感興趣的可以了解一下2024-03-03

