SpringBoot實(shí)現(xiàn)Logback輸出日志到Kafka方式
SpringBoot Logback輸出日志到Kafka
本文通過(guò)在SpringBoot應(yīng)用中創(chuàng)建一個(gè)自定義的Appender從而實(shí)現(xiàn)Logback輸出日志到Kafka。
pom.xml
pom.xml中配置相關(guān)的maven依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.demo</groupId> <artifactId>log2kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <name>log2kafka</name> <description>Demo project for send log to kafka</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.60</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!-- logback插件 --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <!-- kafka插件 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.1.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
不建議修改pom文件中的kafka版本,容易造成各種各樣的錯(cuò)誤。
KafkaUtil.java
創(chuàng)建一個(gè)kafka工具類,用于配置生成Producer
package com.demo.log2kafka.util; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Properties; public class KafkaUtil { public static Producer<String, String> createProducer( String bootstrapServers, String batchSize, String lingerMs, String compressionType, String retries, String maxRequestSize) { // 當(dāng)配置項(xiàng)為IS_UNDEFINED時(shí),使用默認(rèn)值 if (bootstrapServers == null) { bootstrapServers = "localhost:9092"; } if (batchSize.contains("IS_UNDEFINED")) { batchSize = "50000"; } if (lingerMs.contains("IS_UNDEFINED")) { lingerMs = "60000"; } if (retries.contains("IS_UNDEFINED")) { retries = "3"; } if (maxRequestSize.contains("IS_UNDEFINED")) { maxRequestSize = "5242880"; } Properties properties = new Properties(); // kafka地址,集群用逗號(hào)分隔開(kāi) properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // acks取值: // 0: kafka不返回確認(rèn)信息,不保證record是否被收到,因?yàn)闆](méi)有返回所以重試機(jī)制不會(huì)生效 // 1: partition leader確認(rèn)record寫(xiě)入到日志中,但不保證信息是否被正確復(fù)制(建議設(shè)為該值) // all: leader會(huì)等待所有信息被同步后返回確認(rèn)信息 properties.put(ProducerConfig.ACKS_CONFIG, "1"); properties.put(ProducerConfig.RETRIES_CONFIG, Integer.valueOf(retries)); // 批量發(fā)送,當(dāng)達(dá)到batch size最大值觸發(fā)發(fā)送機(jī)制(10.0后支持批量發(fā)送) properties.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.valueOf(batchSize)); // 該配置是指在batch.size數(shù)量未達(dá)到時(shí),指定時(shí)間內(nèi)也會(huì)推送數(shù)據(jù) properties.put(ProducerConfig.LINGER_MS_CONFIG, Integer.valueOf(lingerMs)); // 配置緩存 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); if (!compressionType.contains("IS_UNDEFINED")) { // 指定壓縮算法 properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType); } // 每個(gè)請(qǐng)求的消息大小 properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, Integer.valueOf(maxRequestSize)); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer<String, String>(properties); } }
KafkaAppender.java
package com.demo.log2kafka.appender; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.ConsoleAppender; import com.demo.log2kafka.util.KafkaUtil; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaAppender extends ConsoleAppender<ILoggingEvent> { public static final Logger LOGGER = LoggerFactory.getLogger(KafkaAppender.class); private String bootstrapServers; private String topic; private String batchSize; private String lingerMs; private String compressionType; private String retries; private String maxRequestSize; private String isSend; private Producer<String, String> producer; @Override public String toString() { return "KafkaAppender{" + "bootstrapServers='" + bootstrapServers + '\'' + ", topic='" + topic + '\'' + ", batchSize='" + batchSize + '\'' + ", lingerMs='" + lingerMs + '\'' + ", compressionType='" + compressionType + '\'' + ", retries='" + retries + '\'' + ", maxRequestSize='" + maxRequestSize + '\'' + ", isSend='" + isSend + '\'' + ", producer=" + producer + '}'; } @Override public void start() { super.start(); if ("true".equals(this.isSend)) { if (producer == null) { producer = KafkaUtil.createProducer(this.bootstrapServers, this.batchSize, this.lingerMs, this.compressionType, this.retries, this.maxRequestSize); } } } @Override public void stop() { super.stop(); if ("true".equals(this.isSend)) { this.producer.close(); } LOGGER.info(Markers.KAFKA, "Stopping kafkaAppender..."); } @Override protected void append(ILoggingEvent eventObject) { byte[] byteArray; String log; // 對(duì)日志格式進(jìn)行解碼 byteArray = this.encoder.encode(eventObject); log = new String(byteArray); ProducerRecord<String, String> record = new ProducerRecord<>(this.topic, log); if (eventObject.getMarker() == null && "true".equals(this.isSend)) { producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { LOGGER.error(Markers.KAFKA, "Send log to kafka failed: [{}]", log); } } }); } } public String getBootstrapServers() { return bootstrapServers; } public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getBatchSize() { return batchSize; } public void setBatchSize(String batchSize) { this.batchSize = batchSize; } public String getLingerMs() { return lingerMs; } public void setLingerMs(String lingerMs) { this.lingerMs = lingerMs; } public String getCompressionType() { return compressionType; } public void setCompressionType(String compressionType) { this.compressionType = compressionType; } public String getRetries() { return retries; } public void setRetries(String retries) { this.retries = retries; } public String getMaxRequestSize() { return maxRequestSize; } public void setMaxRequestSize(String maxRequestSize) { this.maxRequestSize = maxRequestSize; } public Producer<String, String> getProducer() { return producer; } public void setProducer(Producer<String, String> producer) { this.producer = producer; } public String getIsSend() { return isSend; } public void setIsSend(String isSend) { this.isSend = isSend; } }
為了實(shí)現(xiàn)根據(jù)指定格式發(fā)送Kafka日志,直接繼承了ConsoleAppender
.
logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="LOG_HOME" value="./logs"/> <springProperty scope="context" name="springAppName" source="spring.application.name"/> <!-- 讀取配置文件中kafka的信息 --> <springProperty scope="context" name="isSend" source="log.config.kafka.isSend" defalutValue="false"/> <springProperty scope="context" name="bootstrapServers" source="log.config.kafka.bootstrapServers" defalutValue="localhost:9002"/> <springProperty scope="context" name="topic" source="log.config.kafka.topic" defalutValue="test-topic"/> <springProperty scope="context" name="batchSize" source="log.config.kafka.batchSize" defalutValue="1"/> <springProperty scope="context" name="lingerMs" source="log.config.kafka.lingerMs" defalutValue="1000"/> <springProperty scope="context" name="compressionType" source="log.config.kafka.compressionType" defalutValue="gzip"/> <springProperty scope="context" name="retries" source="log.config.kafka.retries" defalutValue="3"/> <springProperty scope="context" name="maxRequestSize" source="log.config.kafka.maxRequestSize" defalutValue="5242880"/> <!-- 根據(jù)需要自行配置 --> <property name="APP_NAME" value="${springAppName}"/> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern> { "timestamp":"%date{yyyy-MM-dd HH:mm:ss.SSS}", "app": "${APP_NAME}", "logLevel": "%level", "message": "%message" }\n </pattern> </encoder> </appender> <appender name="KAFKA" class="com.demo.log2kafka.appender.KafkaAppender" > <!-- encoder必須配置, 日志格式 --> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern> { "timestamp":"%date{yyyy-MM-dd HH:mm:ss.SSS}", "app": "${APP_NAME}", "logLevel": "%level", "message": "%message" }\n </pattern> </encoder> <bootstrapServers>${bootstrapServers}</bootstrapServers> <topic>${topic}</topic> <batchSize>${batchSize}</batchSize> <lingerMs>${lingerMs}</lingerMs> <compressionType>${compressionType}</compressionType> <retries>${retries}</retries> <maxRequestSize>${maxRequestSize}</maxRequestSize> <isSend>${isSend}</isSend> </appender> <!-- 使用logback-kafka-appender 當(dāng)日志級(jí)別配為debug時(shí),請(qǐng)使用該配置,不要使用root --> <logger name="com.demo.log2kafka" level="DEBUG"> <appender-ref ref="KAFKA"/> </logger> <!-- 日志輸出級(jí)別 --> <root level="INFO"> <!-- 用于控制臺(tái)輸出 --> <appender-ref ref="STDOUT"/> </root> </configuration>
application.yml
spring: application: name: log2kafka # 不使用時(shí)可以不配置 log: config: kafka: # 是否將日志發(fā)送至kafka,true或false,使用時(shí)必須配置 isSend: true # kafka的地址,使用時(shí)必須配置 bootstrapServers: 192.168.254.152:9092,192.168.254.156:9092 # 日志發(fā)往的topic,使用時(shí)必須配置 topic: test-topic # # 批量上傳數(shù)目,達(dá)到該數(shù)目后發(fā)送 batchSize: 5 # # 間隔時(shí)間后發(fā)送,即使未達(dá)到批量上傳最大數(shù),間隔時(shí)間到了也會(huì)發(fā)送,單位為毫秒 lingerMs: 1000 # # 數(shù)據(jù)壓縮類型 # compressionType: gzip # # 重試次數(shù) # retries: 3 # # 最大消息大小,此處設(shè)為5M # maxRequestSize: 5242880 server: port: 9090
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java多態(tài)中的向上轉(zhuǎn)型與向下轉(zhuǎn)型淺析
多態(tài)是指不同類的對(duì)象在調(diào)用同一個(gè)方法是所呈現(xiàn)出的多種不同行為,下面這篇文章主要給大家介紹了關(guān)于Java多態(tài)中向上轉(zhuǎn)型與向下轉(zhuǎn)型的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-02-02Java使用嵌套循環(huán)模擬ATM機(jī)取款業(yè)務(wù)操作示例
這篇文章主要介紹了Java使用嵌套循環(huán)模擬ATM機(jī)取款業(yè)務(wù)操作,結(jié)合實(shí)例形式分析了Java模擬ATM機(jī)取款業(yè)務(wù)操作的相關(guān)流程控制、數(shù)值判斷等操作技巧,需要的朋友可以參考下2019-11-11mybatis 插件: 打印 sql 及其執(zhí)行時(shí)間實(shí)現(xiàn)方法
下面小編就為大家?guī)?lái)一篇mybatis 插件: 打印 sql 及其執(zhí)行時(shí)間實(shí)現(xiàn)方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-06-06Springmvc DispatcherServlet原理及用法解析
這篇文章主要介紹了Springmvc DispatcherServlet原理及用法解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09Java使用jni清屏功能的實(shí)現(xiàn)(只針對(duì)cmd)
JNI是Java Native Interface的縮寫(xiě),它提供了若干的API實(shí)現(xiàn)了Java和其他語(yǔ)言的通信(主要是C&C++)。這篇文章主要介紹了Java使用jni清屏功能的實(shí)現(xiàn)(只針對(duì)cmd) ,感興趣的朋友跟隨腳本之家小編一起學(xué)習(xí)吧2018-05-05Spring Boot中使用Activiti的方法教程(二)
工作流(Workflow),就是“業(yè)務(wù)過(guò)程的部分或整體在計(jì)算機(jī)應(yīng)用環(huán)境下的自動(dòng)化”,下面這篇文章主要給大家介紹了關(guān)于Spring Boot中使用Activiti的相關(guān)資料,需要的朋友可以參考下2018-08-08IDEA連接Mysql數(shù)據(jù)庫(kù)的詳細(xì)圖文教程
項(xiàng)目開(kāi)發(fā)時(shí)使用Intellij IDEA連接本地?cái)?shù)據(jù)庫(kù),將數(shù)據(jù)庫(kù)可視化,還可對(duì)數(shù)據(jù)庫(kù)表直接進(jìn)行增刪改查操作,方便快捷又清晰,下面這篇文章主要給大家介紹了關(guān)于IDEA連接Mysql數(shù)據(jù)庫(kù)的詳細(xì)圖文教程,需要的朋友可以參考下2023-03-03Java使用PreparedStatement接口及ResultSet結(jié)果集的方法示例
這篇文章主要介紹了Java使用PreparedStatement接口及ResultSet結(jié)果集的方法,結(jié)合實(shí)例形式分析了PreparedStatement接口及ResultSet結(jié)果集的相關(guān)使用方法與操作注意事項(xiàng),需要的朋友可以參考下2018-07-07