普通java項(xiàng)目集成kafka方式
現(xiàn)在假設(shè)一種需求,我方業(yè)務(wù)系統(tǒng)要與某服務(wù)平臺通過kafka交互,異步獲取服務(wù),而系統(tǒng)架構(gòu)可能老舊,不是spring cloud桶,不是spring boot,只是java普通項(xiàng)目或者 java web項(xiàng)目
依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.4.1</version> </dependency>
Kafka配置讀取類
本文后邊沒用到,直接填配置了,簡單點(diǎn)
但如果生產(chǎn)需要,還是有這個(gè)類比較好,可以從不同配置中讀取,同時(shí)給個(gè)默認(rèn)值
import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.Properties; /** * kafka配置讀取類 * * @author zzy */ public class KafkaProperties { private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); private static Properties serverProps = new Properties(); private static Properties clientProps = new Properties(); private static Properties producerProps = new Properties(); private static Properties consumerProps = new Properties(); private static KafkaProperties instance = null; private KafkaProperties() { String filePath = System.getProperty("user.dir") + File.separator + "kafkaConf" + File.separator; File file; FileInputStream fis = null; try { file = new File(filePath + "producer.properties"); if (file.exists()) { fis = new FileInputStream(filePath + "producer.properties"); producerProps.load(fis); } file = new File(filePath + "consumer.properties"); if (file.exists()) { fis = new FileInputStream(filePath + "consumer.properties"); consumerProps.load(fis); } file = new File(filePath + "server.properties"); if (file.exists()) { fis = new FileInputStream(filePath + "server.properties"); serverProps.load(fis); } file = new File(filePath + "client.properties"); if (file.exists()) { fis = new FileInputStream(filePath + "client.properties"); clientProps.load(fis); } } catch (Exception e) { LOG.error("init kafka props error." + e.getMessage()); } finally { if (fis != null) { try { fis.close(); } catch (IOException e) { LOG.error("close kafka properties fis error." + e); } } } } /** * 獲取懶漢式單例 */ public static synchronized KafkaProperties getInstance() { if (instance == null) { instance = new KafkaProperties(); } return instance; } /** * 獲取配置,獲取不到時(shí)使用參數(shù)的默認(rèn)配置 */ public String getValue(String key, String defaultValue) { String value; if (StringUtils.isEmpty(key)) { LOG.error("key is null or empty"); } value = getPropsValue(key); if (value == null) { LOG.warn("kafka property getValue return null, the key is " + key); value = defaultValue; } LOG.info("kafka property getValue, key:" + key + ", value:" + value); return value; } private String getPropsValue(String key) { String value = serverProps.getProperty(key); if (value == null) { value = producerProps.getProperty(key); } if (value == null) { value = consumerProps.getProperty(key); } if (value == null) { value = clientProps.getProperty(key); } return value; } }
producer
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * kafka producer * @author zzy */ public class KafkaProFormal { public static final Logger LOG = LoggerFactory.getLogger(KafkaProFormal.class); private Properties properties = new Properties(); private final String bootstrapServers = "bootstrap.servers"; private final String clientId = "client.id"; private final String keySerializer = "key.serializer"; private final String valueSerializer = "value.serializer"; //private final String securityProtocol = "security.protocol"; //private final String saslKerberosServiceName = "sasl.kerberos.service.name"; //private final String kerberosDomainName = "kerberos.domain.name"; private final String maxRequestSize = "max.request.size"; private KafkaProducer<String, String> producer; private volatile static KafkaProFormal kafkaProFormal; private KafkaProFormal(String servers) { properties.put(bootstrapServers, servers); properties.put(keySerializer, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(valueSerializer, "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(properties); } public static KafkaProFormal getInstance(String servers) { if(kafkaProFormal == null) { synchronized(KafkaProFormal.class) { if(kafkaProFormal == null) { kafkaProFormal = new KafkaProFormal(servers); } } } return kafkaProFormal; } public void sendStringWithCallBack(String topic, String message, boolean asyncFlag) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message); long startTime = System.currentTimeMillis(); if(asyncFlag) { //異步發(fā)送 producer.send(record, new KafkaCallBack(startTime, message)); } else { //同步發(fā)送 try { producer.send(record, new KafkaCallBack(startTime, message)).get(); } catch (InterruptedException e) { LOG.error("InterruptedException occured : {0}", e); } catch (ExecutionException e) { LOG.error("ExecutionException occured : {0}", e); } } } } class KafkaCallBack implements Callback { private static Logger LOG = LoggerFactory.getLogger(KafkaCallBack.class); private String key; private long startTime; private String message; KafkaCallBack(long startTime, String message) { this.startTime = startTime; this.message = message; } @Override public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if(metadata != null) { LOG.info("Record(" + key + "," + message + ") sent to partition(" + metadata.partition() + "), offset(" + metadata.offset() + ") in " + elapsedTime + " ms."); } else { LOG.error("metadata is null." + "Record(" + key + "," + message + ")", exception); } } }
consumer
import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Properties; import java.util.Set; /** * kafka consumer * @author zzy */ public abstract class KafkaConFormal extends ShutdownableThread { private static final Logger LOG = LoggerFactory.getLogger(KafkaConFormal.class); private Set<String> topics; private final String bootstrapServers = "bootstrap.servers"; private final String groupId = "group.id"; private final String keyDeserializer = "key.deserializer"; private final String valueDeserializer = "value.deserializer"; private final String enableAutoCommit = "enable.auto.commit"; private final String autoCommitIntervalMs = "auto.commit.interval.ms"; private final String sessionTimeoutMs = "session.timeout.ms"; private KafkaConsumer<String, String> consumer; public KafkaConFormal(String topic) { super("KafkaConsumerExample", false); topics.add(topic); Properties props = new Properties(); props.put(bootstrapServers, "your servers"); props.put(groupId, "TestGroup"); props.put(enableAutoCommit, "true"); props.put(autoCommitIntervalMs, "1000"); props.put(sessionTimeoutMs, "30000"); props.put(keyDeserializer, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(valueDeserializer, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); } /** * subscribe and handle the msg */ @Override public void doWork() { consumer.subscribe(topics); ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); dealRecords(records); } /** * 實(shí)例化consumer時(shí),進(jìn)行對消費(fèi)信息的處理 * @param records records */ public abstract void dealRecords(ConsumerRecords<String, String> records); public void setTopics(Set<String> topics) { this.topics = topics; } }
使用
KafkaProFormal producer = KafkaProFormal.getInstance("kafka server1.1.1.1:9092,2.2.2.2:9092"); KafkaConFormal consumer = new KafkaConFormal("consume_topic") { @Override public void dealRecords(ConsumerRecords<String, String> records) { for (ConsumerRecord<String, String> record: records) { producer.sendStringWithCallBack("target_topic", record.value(), true); } } }; consumer.start();
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
將java程序打成jar包在cmd命令行下執(zhí)行的方法
這篇文章主要給大家介紹了關(guān)于將java程序打成jar包在cmd命令行下執(zhí)行的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2018-01-01關(guān)于synchronized的參數(shù)及其含義
這篇文章主要介紹了synchronized的參數(shù)及其含義詳解,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10SpringBoot項(xiàng)目修改訪問端口和訪問路徑的方法
這篇文章主要介紹了SpringBoot項(xiàng)目修改訪問端口和訪問路徑的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-12-12JavaSE基礎(chǔ)之反射機(jī)制(反射Class)詳解
反射機(jī)制有什么用?通過java語言中的反射機(jī)制可以操作字節(jié)碼文件,可以讀和修改字節(jié)碼文件。所以本文將為大家講講反射機(jī)制的使用,需要的可以參考一下2022-09-09