Java Kafka分區(qū)發(fā)送及消費實戰(zhàn)
前言
Kafka是現在非常熱門的分布式消息隊列,常用于微服務間異步通信,業(yè)務解耦等場景。kafka的性能非常強大,但是單個微服務吞吐性能是有上限的,我們就會用到分布式微服務,多消費者多生產者進行數據處理,保證性能同時也能根據業(yè)務量進行橫向拓展,對于同一個微服務的多個實例,輸入輸出的topic是同一個,這時候我們就可以利用Kafka分區(qū)消費來解決這個問題。
業(yè)務場景
我們開發(fā)的是一個物聯網系統,大量設備接入到平臺實時發(fā)送數據,有秒級數據和分鐘級別數據等等,處理流程包含接入、處理、存儲,這三個模塊間就是使用kafka進行數據流轉,數據處理模塊中包含多個微服務,單條數據會經歷多次處理,部分業(yè)務耗時較長,導致在高頻率接收到數據時候單體服務無法達到吞吐平衡,于是對于這些服務進行了分布式部署,多個實例進行消費處理。
業(yè)務實現
不指定分區(qū)
我們在給kafka發(fā)送消息時候,如果不指定分區(qū),是不需要手動創(chuàng)建topic的,發(fā)送時沒有topic,kafka會自動創(chuàng)建一個分區(qū)為1的topic,如下:
@Service public class ProductService { @Autowired private KafkaTemplate kafkaTemplate; public void send(String msg, String topic) { kafkaTemplate.send(topic, msg); } }
指定分區(qū)
topic分區(qū)初始化及配置
指定分區(qū)發(fā)送時候,如果未配置topic分區(qū)數,指定>0的分區(qū),會提示分區(qū)不存在,這時候我們就需要提前創(chuàng)建好topic及分區(qū)
手動創(chuàng)建,服務啟動前,使用kafka tool手動創(chuàng)建topic 不推薦 x
自動創(chuàng)建,服務啟動時,使用KafkaClient創(chuàng)建 推薦 √
/** * 初始化多分區(qū)的topic 基于springboot2 */ @Component public void TopicInitRunner implements ApplicationRunner { @Autowired private AdminClient adminClient; @Override public void run(ApplicationArguments args) throws Exception { // 通過配置文件讀取自定義配置的topic名及分區(qū)數 省略... // Key topic V 分區(qū)數 Map<String, Integer> topicPartitionMap = new HashMap<>(); for (Map.Entry<String, Integer> e : topicPartitionMap.entrySet()) { createTopic(e.getKey(), e.getValue()); } } public void createTopic(String topic, int partition) { NewTopic newTopic = new NewTopic(topic, partition); adminClient.createTopics(Lists.newArrayList(newTopic)); } } /** * 配置類參考 基于springboot2 * 如果只進行普通的單消息發(fā)送 無需添加此配置到項目中 */ @Configuration public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String servers; @Bean public AdminClient adminClient() { return AdminClient.create(kafkaAdmin().getConfig()); } @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> props = Maps.newHashMap(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers); return new KafkaAdmin(props); } }
生產者分區(qū)發(fā)送方案
上面講到如何初始化分區(qū)topic,這時候我們的kafka環(huán)境已經準備好了,我們先使用TopicInitRunner為我們創(chuàng)建一個名稱為 partition-topic 分區(qū)數為三,現在講一講如何均勻的講消息發(fā)送的每個分區(qū)上,如何保證多消費者實例是負載均衡的,具體方案如下:
- 1.因為每條消息都是設備上傳的,都會有設備id,先給每個設備生成一個自增號,這樣1000個設備,每個設備就會有0到999的自增號,放到緩存中,每次根據消息中的設備id獲取到該設備的自增號
- 2.使用自增號對分區(qū)數進行取模操作,代碼實現如下:
public class ProductService { /** * data為需要發(fā)送的數據 */ public void partitionSend(String topic, int partition, JSONObject data) { // 獲取設備id String deviceId = data.getString("deviceId"); // 獲取自增數 如果是新設備會創(chuàng)建一個并放入緩存中 int inc = getDeviceInc(deviceId); // 如果分區(qū)數為3 設備自增id為1 取模結果為1 就是發(fā)送到1分區(qū) 這樣1000個設備就可以保證每個分區(qū)發(fā)送數據量是1000 / 3 int targetPartition = Math.floorMod(inc, partition); // 分區(qū)發(fā)送時候 需要指定一個唯一k 可以使用uuid或者百度提供的雪花算法獲取id 字符串即可 kafkaTemplate.send(topic, partition, getUuid(), data.toJSONString()); } }
消費者
我們講到消費者使用分布式部署,一個微服務有多個實例,我們只需要按照服務監(jiān)聽的topic分區(qū)數創(chuàng)建對應數目的服務實例即可,這樣kafka就會自動分配對應分區(qū)的數據到每個實例。
我們采取批量消費,進一步提高服務吞吐性能,消費及配置代碼如下,配置文件參考springbootkafka配置即可,主要設計kafka服務配置,消費及生產配置,比較核心的是
@Component public class DataListener { @Autowired private MongoTemplate mongoTemplate; /** * 站點報文監(jiān)聽消費 * * @param records */ @KafkaListener(topics = "partition-topic", containerFactory = "batchConsumerFactory") public void iotSiteHistoryMessageConsumer(List<ConsumerRecord<String, String>> records) { } /** * 消費者配置 */ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = Maps.newHashMap(); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } /** * 批量消費配置 */ @Bean public KafkaListenerContainerFactory batchConsumerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); factory.setBatchListener(true); return factory; } }
到此這篇關于Java Kafka分區(qū)發(fā)送及消費實戰(zhàn)的文章就介紹到這了,更多相關Kafka分區(qū)發(fā)送及消費內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
java網絡編程中向指定URL發(fā)送GET POST請求示例
這篇文章主要介紹了java向指定URL發(fā)送GET POST請求示例,學習JAVA網絡編程一定會用到的,大家參考使用吧2013-11-11