關(guān)于kafka發(fā)送消息的三種方式總結(jié)
kafka發(fā)送消息的方式
package com.zl.kafkademo; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import java.util.Properties; /** * @Auther: le * @Date: 2019/4/23 22:05 * @Description: */ public class MyProducer implements Job { private static KafkaProducer<String,String> producer; static { Properties properties = new Properties(); properties.put("bootstrap.servers","127.0.0.1:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(properties); } /** * 第一種直接發(fā)送,不管結(jié)果 */ private static void sendMessageForgetResult(){ ProducerRecord<String,String> record = new ProducerRecord<String,String>( "kafka-study","name","Forget_result" ); producer.send(record); producer.close(); } /** * 第二種同步發(fā)送,等待執(zhí)行結(jié)果 * @return * @throws Exception */ private static RecordMetadata sendMessageSync() throws Exception{ ProducerRecord<String,String> record = new ProducerRecord<String,String>( "kafka-study","name","sync" ); RecordMetadata result = producer.send(record).get(); System.out.println(result.topic()); System.out.println(result.partition()); System.out.println(result.offset()); return result; } /** * 第三種執(zhí)行回調(diào)函數(shù) */ private static void sendMessageCallback(){ ProducerRecord<String,String> record = new ProducerRecord<String,String>( "kafka-study","name","callback" ); producer.send(record,new MyProducerCallback()); } //定時任務(wù) @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { try { sendMessageSync(); }catch (Exception e){ System.out.println("error:"+e); } } private static class MyProducerCallback implements Callback{ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e !=null){ e.printStackTrace(); return; } System.out.println(recordMetadata.topic()); System.out.println(recordMetadata.partition()); System.out.println(recordMetadata.offset()); System.out.println("Coming in MyProducerCallback"); } } public static void main(String[] args){ //sendMessageForgetResult(); //sendMessageCallback(); JobDetail job = JobBuilder.newJob(MyProducer.class).build(); Trigger trigger = TriggerBuilder.newTrigger() .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever()).build(); try { Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); scheduler.scheduleJob(job,trigger); scheduler.start(); }catch (SchedulerException e){ e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
需要引入文件
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.1</version> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.3.0</version> </dependency>
測試方法
MAC下操作指令
1、創(chuàng)建主題:
./kafka-topics.sh --create --topic kafka-study --zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1
2、運行上述程序,執(zhí)行定時任務(wù)
3、查看消費情況
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-study --from-beginning
windows操作指令
1、進(jìn)入 D:\zookeeper-3.4.14\bin 打開新的cmd,輸入“zkServer“,運行Zookeeper
2、進(jìn)入 D:\kafka_2.11-0.11.0.0 運行cmd
.\bin\windows\kafka-server-start.bat .\config\server.properties
3、 創(chuàng)建主題
進(jìn)入D:\kafka_2.11-0.11.0.0運行cmd,輸入:
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看已創(chuàng)建主題:
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
查看指定主題的詳細(xì)信息:
.\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test
查看主題消費詳情:
.\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic kafka-study --from-beginning
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringMVC視圖轉(zhuǎn)發(fā)重定向區(qū)別及控制器詳解
這篇文章主要為大家介紹了SpringMVC視圖轉(zhuǎn)發(fā)重定向區(qū)別及控制器示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05SpringBoot可視化接口開發(fā)工具magic-api的簡單使用教程
作為Java后端開發(fā),平時開發(fā)API接口的時候經(jīng)常需要定義Controller、Service、Dao、Mapper、XML、VO等Java對象。有沒有什么辦法可以讓我們不寫這些代碼,直接操作數(shù)據(jù)庫生成API接口呢?今天給大家推薦一款工具magic-api,來幫我們實現(xiàn)這個小目標(biāo)!2021-06-06Java編程實現(xiàn)多線程TCP服務(wù)器完整實例
這篇文章主要介紹了Java編程實現(xiàn)多線程TCP服務(wù)器完整實例,具有一定借鑒價值,需要的朋友可以參考下2018-01-01Java實現(xiàn)中文字符串與unicode互轉(zhuǎn)工具類
這篇文章主要為大家詳細(xì)介紹了Java實現(xiàn)中文字符串與unicode互轉(zhuǎn)的工具類,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-04-04Java使用HttpUtils實現(xiàn)發(fā)送HTTP請求
這篇文章主要介紹了Java使用HttpUtils實現(xiàn)發(fā)送HTTP請求,HTTP請求,在日常開發(fā)中,還是比較常見的,今天給大家分享HttpUtils如何使用,需要的朋友可以參考下2023-05-05