欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

關(guān)于kafka發(fā)送消息的三種方式總結(jié)

 更新時間:2023年04月06日 09:44:18   作者:大佬喝可樂丶  
這篇文章主要介紹了關(guān)于kafka發(fā)送消息的三種方式總結(jié),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

kafka發(fā)送消息的方式

package com.zl.kafkademo;
 
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
 
import java.util.Properties;
 
/**
 * @Auther: le
 * @Date: 2019/4/23 22:05
 * @Description:
 */
public class MyProducer implements Job {
    private static KafkaProducer<String,String> producer;
 
    static {
        Properties properties = new Properties();
        properties.put("bootstrap.servers","127.0.0.1:9092");
        properties.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<String, String>(properties);
    }
 
    /**
     * 第一種直接發(fā)送,不管結(jié)果
     */
    private static void sendMessageForgetResult(){
        ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                "kafka-study","name","Forget_result"
        );
        producer.send(record);
        producer.close();
    }
 
    /**
     * 第二種同步發(fā)送,等待執(zhí)行結(jié)果
     * @return
     * @throws Exception
     */
    private static RecordMetadata sendMessageSync() throws Exception{
        ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                "kafka-study","name","sync"
        );
        RecordMetadata result = producer.send(record).get();
        System.out.println(result.topic());
        System.out.println(result.partition());
        System.out.println(result.offset());
        return result;
    }
 
    /**
     * 第三種執(zhí)行回調(diào)函數(shù)
     */
    private static void sendMessageCallback(){
        ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                "kafka-study","name","callback"
        );
        producer.send(record,new MyProducerCallback());
    }
 
    //定時任務(wù)
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        try {
            sendMessageSync();
        }catch (Exception e){
            System.out.println("error:"+e);
        }
 
    }
 
    private static class MyProducerCallback implements Callback{
 
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e !=null){
                e.printStackTrace();
                return;
            }
            System.out.println(recordMetadata.topic());
            System.out.println(recordMetadata.partition());
            System.out.println(recordMetadata.offset());
            System.out.println("Coming in MyProducerCallback");
        }
    }
 
 
    public static void main(String[] args){
        //sendMessageForgetResult();
        //sendMessageCallback();
        JobDetail job = JobBuilder.newJob(MyProducer.class).build();
 
        Trigger trigger = TriggerBuilder.newTrigger()
                .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever()).build();
 
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            scheduler.scheduleJob(job,trigger);
            scheduler.start();
        }catch (SchedulerException e){
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
 
    }
 
 
}

需要引入文件

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>
 
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.3.0</version>
        </dependency>

測試方法

MAC下操作指令

1、創(chuàng)建主題:

./kafka-topics.sh --create --topic kafka-study --zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1

2、運行上述程序,執(zhí)行定時任務(wù)

3、查看消費情況

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-study --from-beginning

windows操作指令

 1、進(jìn)入  D:\zookeeper-3.4.14\bin   打開新的cmd,輸入“zkServer“,運行Zookeeper

 2、進(jìn)入 D:\kafka_2.11-0.11.0.0 運行cmd

.\bin\windows\kafka-server-start.bat .\config\server.properties

3、 創(chuàng)建主題

進(jìn)入D:\kafka_2.11-0.11.0.0運行cmd,輸入:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看已創(chuàng)建主題:

.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

查看指定主題的詳細(xì)信息:

.\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test

查看主題消費詳情:

.\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic kafka-study --from-beginning

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Java基礎(chǔ)之內(nèi)存泄漏與溢出詳解

    Java基礎(chǔ)之內(nèi)存泄漏與溢出詳解

    今天帶大家來了解一下Java內(nèi)存泄漏與溢出的知識,文中有非常詳細(xì)的介紹,對正在學(xué)習(xí)Java基礎(chǔ)的各位小伙伴呢很有幫助喲,需要的朋友可以參考下
    2021-05-05
  • SpringMVC視圖轉(zhuǎn)發(fā)重定向區(qū)別及控制器詳解

    SpringMVC視圖轉(zhuǎn)發(fā)重定向區(qū)別及控制器詳解

    這篇文章主要為大家介紹了SpringMVC視圖轉(zhuǎn)發(fā)重定向區(qū)別及控制器示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • SpringBoot可視化接口開發(fā)工具magic-api的簡單使用教程

    SpringBoot可視化接口開發(fā)工具magic-api的簡單使用教程

    作為Java后端開發(fā),平時開發(fā)API接口的時候經(jīng)常需要定義Controller、Service、Dao、Mapper、XML、VO等Java對象。有沒有什么辦法可以讓我們不寫這些代碼,直接操作數(shù)據(jù)庫生成API接口呢?今天給大家推薦一款工具magic-api,來幫我們實現(xiàn)這個小目標(biāo)!
    2021-06-06
  • Java基于Socket實現(xiàn)多人聊天室

    Java基于Socket實現(xiàn)多人聊天室

    這篇文章主要為大家詳細(xì)介紹了Java基于Socket實現(xiàn)多人聊天室,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-09-09
  • Java編程實現(xiàn)多線程TCP服務(wù)器完整實例

    Java編程實現(xiàn)多線程TCP服務(wù)器完整實例

    這篇文章主要介紹了Java編程實現(xiàn)多線程TCP服務(wù)器完整實例,具有一定借鑒價值,需要的朋友可以參考下
    2018-01-01
  • Java?詳細(xì)分析四個經(jīng)典鏈表面試題

    Java?詳細(xì)分析四個經(jīng)典鏈表面試題

    兄弟們,編程,當(dāng)我們學(xué)習(xí)完數(shù)據(jù)結(jié)構(gòu)的時候,你就會有一種豁然開朗的感覺。算是真正的入了編程的門,所以打好數(shù)據(jù)結(jié)構(gòu)的基礎(chǔ)是特別特別重要的
    2022-03-03
  • Java實現(xiàn)中文字符串與unicode互轉(zhuǎn)工具類

    Java實現(xiàn)中文字符串與unicode互轉(zhuǎn)工具類

    這篇文章主要為大家詳細(xì)介紹了Java實現(xiàn)中文字符串與unicode互轉(zhuǎn)的工具類,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-04-04
  • Java使用HttpUtils實現(xiàn)發(fā)送HTTP請求

    Java使用HttpUtils實現(xiàn)發(fā)送HTTP請求

    這篇文章主要介紹了Java使用HttpUtils實現(xiàn)發(fā)送HTTP請求,HTTP請求,在日常開發(fā)中,還是比較常見的,今天給大家分享HttpUtils如何使用,需要的朋友可以參考下
    2023-05-05
  • Java synchronized輕量級鎖的核心原理詳解

    Java synchronized輕量級鎖的核心原理詳解

    這篇文章主要為大家詳細(xì)介紹了Java synchronized輕量級鎖的核心原理,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-03-03
  • 深入淺出講解Java集合之Map接口

    深入淺出講解Java集合之Map接口

    這篇文章主要介紹了深入淺出講解Java集合之Map接口,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-09-09

最新評論