欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

普通java項(xiàng)目集成kafka方式

 更新時(shí)間:2024年11月28日 11:05:03   作者:西柚感覺日了狗  
文章介紹了如何在非Spring Cloud或Spring Boot項(xiàng)目中配置和使用Kafka,提供了一個(gè)簡單的Kafka配置讀取類,可以靈活地從不同配置中讀取屬性,并提供默認(rèn)值

現(xiàn)在假設(shè)一種需求,我方業(yè)務(wù)系統(tǒng)要與某服務(wù)平臺通過kafka交互,異步獲取服務(wù),而系統(tǒng)架構(gòu)可能老舊,不是spring cloud桶,不是spring boot,只是java普通項(xiàng)目或者 java web項(xiàng)目

依賴

		<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>2.4.1</version>
        </dependency>

Kafka配置讀取類

本文后邊沒用到,直接填配置了,簡單點(diǎn)

但如果生產(chǎn)需要,還是有這個(gè)類比較好,可以從不同配置中讀取,同時(shí)給個(gè)默認(rèn)值

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

/**
 * kafka配置讀取類
 *
 * @author zzy
 */
public class KafkaProperties {

    private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class);

    private static Properties serverProps = new Properties();

    private static Properties clientProps = new Properties();

    private static Properties producerProps = new Properties();

    private static Properties consumerProps = new Properties();

    private static KafkaProperties instance = null;

    private KafkaProperties() {

        String filePath = System.getProperty("user.dir") + File.separator
                + "kafkaConf" + File.separator;

        File file;
        FileInputStream fis = null;
        try {
            file = new File(filePath + "producer.properties");
            if (file.exists()) {
                fis = new FileInputStream(filePath + "producer.properties");
                producerProps.load(fis);
            }

            file = new File(filePath + "consumer.properties");
            if (file.exists()) {
                fis = new FileInputStream(filePath + "consumer.properties");
                consumerProps.load(fis);
            }

            file = new File(filePath + "server.properties");
            if (file.exists()) {
                fis = new FileInputStream(filePath + "server.properties");
                serverProps.load(fis);
            }

            file = new File(filePath + "client.properties");
            if (file.exists()) {
                fis = new FileInputStream(filePath + "client.properties");
                clientProps.load(fis);
            }

        } catch (Exception e) {

            LOG.error("init kafka props error." + e.getMessage());

        } finally {

            if (fis != null) {
                try {
                    fis.close();
                } catch (IOException e) {
                    LOG.error("close kafka properties fis error." + e);
                }
            }

        }

    }

    /**
     * 獲取懶漢式單例
     */
    public static synchronized KafkaProperties getInstance() {
        if (instance == null) {
            instance = new KafkaProperties();
        }

        return instance;
    }


    /**
     * 獲取配置,獲取不到時(shí)使用參數(shù)的默認(rèn)配置
     */
    public String getValue(String key, String defaultValue) {
        String value;

        if (StringUtils.isEmpty(key)) {
            LOG.error("key is null or empty");
        }
        value = getPropsValue(key);

        if (value == null) {
            LOG.warn("kafka property getValue return null, the key is " + key);
            value = defaultValue;
        }
        LOG.info("kafka property getValue, key:" + key + ", value:" + value);

        return value;
    }


    private String getPropsValue(String key) {
        String value = serverProps.getProperty(key);

        if (value == null) {
            value = producerProps.getProperty(key);
        }

        if (value == null) {
            value = consumerProps.getProperty(key);
        }

        if (value == null) {
            value = clientProps.getProperty(key);
        }

        return value;
    }
}

producer

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * kafka producer
 * @author zzy
 */
public class KafkaProFormal {

    public static final Logger LOG = LoggerFactory.getLogger(KafkaProFormal.class);

    private Properties properties = new Properties();

    private final String bootstrapServers = "bootstrap.servers";
    private final String clientId = "client.id";
    private final String keySerializer = "key.serializer";
    private final String valueSerializer = "value.serializer";
    //private final String securityProtocol = "security.protocol";
    //private final String saslKerberosServiceName = "sasl.kerberos.service.name";
    //private final String kerberosDomainName = "kerberos.domain.name";
    private final String maxRequestSize = "max.request.size";

    private KafkaProducer<String, String> producer;

    private volatile static KafkaProFormal kafkaProFormal;

    private KafkaProFormal(String servers) {
        properties.put(bootstrapServers, servers);
        properties.put(keySerializer, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(valueSerializer, "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<String, String>(properties);
    }

    public static KafkaProFormal getInstance(String servers) {
        if(kafkaProFormal == null) {
            synchronized(KafkaProFormal.class) {
                if(kafkaProFormal == null) {
                    kafkaProFormal = new KafkaProFormal(servers);
                }
            }
        }

        return kafkaProFormal;
    }

    public void sendStringWithCallBack(String topic, String message, boolean asyncFlag) {
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message);
        long startTime = System.currentTimeMillis();
        if(asyncFlag) {
            //異步發(fā)送
            producer.send(record, new KafkaCallBack(startTime, message));
        } else {
            //同步發(fā)送
            try {
                producer.send(record, new KafkaCallBack(startTime, message)).get();
            } catch (InterruptedException e) {
                LOG.error("InterruptedException occured : {0}", e);
            } catch (ExecutionException e) {
                LOG.error("ExecutionException occured : {0}", e);
            }
        }
    }
}

class KafkaCallBack implements Callback {
    private static Logger LOG = LoggerFactory.getLogger(KafkaCallBack.class);

    private String key;

    private long startTime;

    private String message;

    KafkaCallBack(long startTime, String message) {
        this.startTime = startTime;
        this.message = message;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;

        if(metadata != null) {
            LOG.info("Record(" + key + "," + message + ") sent to partition(" + metadata.partition()
                    + "), offset(" + metadata.offset() + ") in " + elapsedTime + " ms.");
        } else {
            LOG.error("metadata is null." + "Record(" + key + "," + message + ")", exception);
        }
    }
}

consumer

import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Properties;
import java.util.Set;

/**
 * kafka consumer
 * @author zzy
 */
public abstract class KafkaConFormal extends ShutdownableThread {

    private static final Logger LOG = LoggerFactory.getLogger(KafkaConFormal.class);

    private Set<String> topics;

    private final String bootstrapServers = "bootstrap.servers";
    private final String groupId = "group.id";
    private final String keyDeserializer = "key.deserializer";
    private final String valueDeserializer = "value.deserializer";
    private final String enableAutoCommit = "enable.auto.commit";
    private final String autoCommitIntervalMs = "auto.commit.interval.ms";
    private final String sessionTimeoutMs = "session.timeout.ms";

    private KafkaConsumer<String, String> consumer;

    public KafkaConFormal(String topic) {
        super("KafkaConsumerExample", false);

        topics.add(topic);

        Properties props = new Properties();
        props.put(bootstrapServers, "your servers");
        props.put(groupId, "TestGroup");
        props.put(enableAutoCommit, "true");
        props.put(autoCommitIntervalMs, "1000");
        props.put(sessionTimeoutMs, "30000");
        props.put(keyDeserializer, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(valueDeserializer, "org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<>(props);
    }

    /**
     * subscribe and handle the msg
     */
    @Override
    public void doWork() {
        consumer.subscribe(topics);
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

        dealRecords(records);
    }

    /**
     * 實(shí)例化consumer時(shí),進(jìn)行對消費(fèi)信息的處理
     * @param records records
     */
    public abstract void dealRecords(ConsumerRecords<String, String> records);

    public void setTopics(Set<String> topics) {
        this.topics = topics;
    }
}

使用

KafkaProFormal producer = KafkaProFormal.getInstance("kafka server1.1.1.1:9092,2.2.2.2:9092");

KafkaConFormal consumer = new KafkaConFormal("consume_topic") {
	@Override
	public void dealRecords(ConsumerRecords<String, String> records) {
		for (ConsumerRecord<String, String> record: records) {
			producer.sendStringWithCallBack("target_topic", record.value(), true);
    	}
	}
};

consumer.start();

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • IDEA接入Deepseek的圖文教程

    IDEA接入Deepseek的圖文教程

    在本篇文章中,我們將詳細(xì)介紹如何在 JetBrains IDEA 中使用 Continue 插件接入 DeepSeek,讓你的 AI 編程助手更智能,提高開發(fā)效率,感興趣的小伙伴跟著小編一起來看看吧
    2025-03-03
  • 淺談Java編程中string的理解與運(yùn)用

    淺談Java編程中string的理解與運(yùn)用

    這篇文章主要介紹了淺談Java編程中string的理解與運(yùn)用,還是比較不錯(cuò)的,這里分享給大家,供需要的朋友參考。
    2017-11-11
  • springcloud LogBack日志使用詳解

    springcloud LogBack日志使用詳解

    這篇文章主要介紹了springcloud LogBack日志使用,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-10-10
  • 一文帶你搞懂Java中Get和Post的使用

    一文帶你搞懂Java中Get和Post的使用

    這篇文章主要為大家詳細(xì)介紹了Java中Get和Post用法的相關(guān)資料,文中的示例代碼講解詳細(xì),對我們學(xué)習(xí)Java有一定的幫助,需要的可以參考一下
    2022-11-11
  • 將java程序打成jar包在cmd命令行下執(zhí)行的方法

    將java程序打成jar包在cmd命令行下執(zhí)行的方法

    這篇文章主要給大家介紹了關(guān)于將java程序打成jar包在cmd命令行下執(zhí)行的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。
    2018-01-01
  • Java中spring讀取配置文件的幾種方法示例

    Java中spring讀取配置文件的幾種方法示例

    本篇文章中主要介紹了Java中spring讀取配置文件的幾種方法示例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。
    2017-02-02
  • 關(guān)于synchronized的參數(shù)及其含義

    關(guān)于synchronized的參數(shù)及其含義

    這篇文章主要介紹了synchronized的參數(shù)及其含義詳解,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • SpringBoot項(xiàng)目修改訪問端口和訪問路徑的方法

    SpringBoot項(xiàng)目修改訪問端口和訪問路徑的方法

    這篇文章主要介紹了SpringBoot項(xiàng)目修改訪問端口和訪問路徑的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-12-12
  • MyBatis-Plus實(shí)用篇超完整教程

    MyBatis-Plus實(shí)用篇超完整教程

    MyBatis-Plus是一個(gè)MyBatis的增強(qiáng)工具,提供了許多便捷的功能,簡化了開發(fā)流程,同時(shí),MyBatis-Plus提供了鏈?zhǔn)讲樵兣c修改、靜態(tài)工具類、自定義SQL、IPage的泛型轉(zhuǎn)換、ActiveRecord等擴(kuò)展功能,以及分頁插件和MyBatisX插件等插件,進(jìn)一步提高了開發(fā)效率,感興趣的朋友一起看看吧
    2025-02-02
  • JavaSE基礎(chǔ)之反射機(jī)制(反射Class)詳解

    JavaSE基礎(chǔ)之反射機(jī)制(反射Class)詳解

    反射機(jī)制有什么用?通過java語言中的反射機(jī)制可以操作字節(jié)碼文件,可以讀和修改字節(jié)碼文件。所以本文將為大家講講反射機(jī)制的使用,需要的可以參考一下
    2022-09-09

最新評論