關(guān)于kafka發(fā)送消息的三種方式總結(jié)
kafka發(fā)送消息的方式
package com.zl.kafkademo;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import java.util.Properties;
/**
* @Auther: le
* @Date: 2019/4/23 22:05
* @Description:
*/
public class MyProducer implements Job {
private static KafkaProducer<String,String> producer;
static {
Properties properties = new Properties();
properties.put("bootstrap.servers","127.0.0.1:9092");
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(properties);
}
/**
* 第一種直接發(fā)送,不管結(jié)果
*/
private static void sendMessageForgetResult(){
ProducerRecord<String,String> record = new ProducerRecord<String,String>(
"kafka-study","name","Forget_result"
);
producer.send(record);
producer.close();
}
/**
* 第二種同步發(fā)送,等待執(zhí)行結(jié)果
* @return
* @throws Exception
*/
private static RecordMetadata sendMessageSync() throws Exception{
ProducerRecord<String,String> record = new ProducerRecord<String,String>(
"kafka-study","name","sync"
);
RecordMetadata result = producer.send(record).get();
System.out.println(result.topic());
System.out.println(result.partition());
System.out.println(result.offset());
return result;
}
/**
* 第三種執(zhí)行回調(diào)函數(shù)
*/
private static void sendMessageCallback(){
ProducerRecord<String,String> record = new ProducerRecord<String,String>(
"kafka-study","name","callback"
);
producer.send(record,new MyProducerCallback());
}
//定時(shí)任務(wù)
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
sendMessageSync();
}catch (Exception e){
System.out.println("error:"+e);
}
}
private static class MyProducerCallback implements Callback{
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e !=null){
e.printStackTrace();
return;
}
System.out.println(recordMetadata.topic());
System.out.println(recordMetadata.partition());
System.out.println(recordMetadata.offset());
System.out.println("Coming in MyProducerCallback");
}
}
public static void main(String[] args){
//sendMessageForgetResult();
//sendMessageCallback();
JobDetail job = JobBuilder.newJob(MyProducer.class).build();
Trigger trigger = TriggerBuilder.newTrigger()
.withSchedule(SimpleScheduleBuilder.repeatSecondlyForever()).build();
try {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.scheduleJob(job,trigger);
scheduler.start();
}catch (SchedulerException e){
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}需要引入文件
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.0</version>
</dependency>測(cè)試方法
MAC下操作指令
1、創(chuàng)建主題:
./kafka-topics.sh --create --topic kafka-study --zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1
2、運(yùn)行上述程序,執(zhí)行定時(shí)任務(wù)
3、查看消費(fèi)情況
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-study --from-beginning
windows操作指令
1、進(jìn)入 D:\zookeeper-3.4.14\bin 打開(kāi)新的cmd,輸入“zkServer“,運(yùn)行Zookeeper
2、進(jìn)入 D:\kafka_2.11-0.11.0.0 運(yùn)行cmd
.\bin\windows\kafka-server-start.bat .\config\server.properties
3、 創(chuàng)建主題
進(jìn)入D:\kafka_2.11-0.11.0.0運(yùn)行cmd,輸入:
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看已創(chuàng)建主題:
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
查看指定主題的詳細(xì)信息:
.\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test
查看主題消費(fèi)詳情:
.\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic kafka-study --from-beginning
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringMVC視圖轉(zhuǎn)發(fā)重定向區(qū)別及控制器詳解
這篇文章主要為大家介紹了SpringMVC視圖轉(zhuǎn)發(fā)重定向區(qū)別及控制器示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05
SpringBoot可視化接口開(kāi)發(fā)工具magic-api的簡(jiǎn)單使用教程
作為Java后端開(kāi)發(fā),平時(shí)開(kāi)發(fā)API接口的時(shí)候經(jīng)常需要定義Controller、Service、Dao、Mapper、XML、VO等Java對(duì)象。有沒(méi)有什么辦法可以讓我們不寫(xiě)這些代碼,直接操作數(shù)據(jù)庫(kù)生成API接口呢?今天給大家推薦一款工具magic-api,來(lái)幫我們實(shí)現(xiàn)這個(gè)小目標(biāo)!2021-06-06
Java基于Socket實(shí)現(xiàn)多人聊天室
這篇文章主要為大家詳細(xì)介紹了Java基于Socket實(shí)現(xiàn)多人聊天室,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-09-09
Java編程實(shí)現(xiàn)多線程TCP服務(wù)器完整實(shí)例
這篇文章主要介紹了Java編程實(shí)現(xiàn)多線程TCP服務(wù)器完整實(shí)例,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01
Java?詳細(xì)分析四個(gè)經(jīng)典鏈表面試題
兄弟們,編程,當(dāng)我們學(xué)習(xí)完數(shù)據(jù)結(jié)構(gòu)的時(shí)候,你就會(huì)有一種豁然開(kāi)朗的感覺(jué)。算是真正的入了編程的門(mén),所以打好數(shù)據(jù)結(jié)構(gòu)的基礎(chǔ)是特別特別重要的2022-03-03
Java實(shí)現(xiàn)中文字符串與unicode互轉(zhuǎn)工具類(lèi)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)中文字符串與unicode互轉(zhuǎn)的工具類(lèi),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-04-04
Java使用HttpUtils實(shí)現(xiàn)發(fā)送HTTP請(qǐng)求
這篇文章主要介紹了Java使用HttpUtils實(shí)現(xiàn)發(fā)送HTTP請(qǐng)求,HTTP請(qǐng)求,在日常開(kāi)發(fā)中,還是比較常見(jiàn)的,今天給大家分享HttpUtils如何使用,需要的朋友可以參考下2023-05-05
Java synchronized輕量級(jí)鎖的核心原理詳解
這篇文章主要為大家詳細(xì)介紹了Java synchronized輕量級(jí)鎖的核心原理,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助2022-03-03

