SpringBoot實(shí)現(xiàn)Logback輸出日志到Kafka方式
SpringBoot Logback輸出日志到Kafka
本文通過(guò)在SpringBoot應(yīng)用中創(chuàng)建一個(gè)自定義的Appender從而實(shí)現(xiàn)Logback輸出日志到Kafka。
pom.xml
pom.xml中配置相關(guān)的maven依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.demo</groupId>
<artifactId>log2kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>log2kafka</name>
<description>Demo project for send log to kafka</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- logback插件 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<!-- kafka插件 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>不建議修改pom文件中的kafka版本,容易造成各種各樣的錯(cuò)誤。
KafkaUtil.java
創(chuàng)建一個(gè)kafka工具類,用于配置生成Producer
package com.demo.log2kafka.util;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class KafkaUtil {
public static Producer<String, String> createProducer(
String bootstrapServers, String batchSize, String lingerMs,
String compressionType, String retries, String maxRequestSize) {
// 當(dāng)配置項(xiàng)為IS_UNDEFINED時(shí),使用默認(rèn)值
if (bootstrapServers == null) {
bootstrapServers = "localhost:9092";
}
if (batchSize.contains("IS_UNDEFINED")) {
batchSize = "50000";
}
if (lingerMs.contains("IS_UNDEFINED")) {
lingerMs = "60000";
}
if (retries.contains("IS_UNDEFINED")) {
retries = "3";
}
if (maxRequestSize.contains("IS_UNDEFINED")) {
maxRequestSize = "5242880";
}
Properties properties = new Properties();
// kafka地址,集群用逗號(hào)分隔開(kāi)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// acks取值:
// 0: kafka不返回確認(rèn)信息,不保證record是否被收到,因?yàn)闆](méi)有返回所以重試機(jī)制不會(huì)生效
// 1: partition leader確認(rèn)record寫(xiě)入到日志中,但不保證信息是否被正確復(fù)制(建議設(shè)為該值)
// all: leader會(huì)等待所有信息被同步后返回確認(rèn)信息
properties.put(ProducerConfig.ACKS_CONFIG, "1");
properties.put(ProducerConfig.RETRIES_CONFIG, Integer.valueOf(retries));
// 批量發(fā)送,當(dāng)達(dá)到batch size最大值觸發(fā)發(fā)送機(jī)制(10.0后支持批量發(fā)送)
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.valueOf(batchSize));
// 該配置是指在batch.size數(shù)量未達(dá)到時(shí),指定時(shí)間內(nèi)也會(huì)推送數(shù)據(jù)
properties.put(ProducerConfig.LINGER_MS_CONFIG, Integer.valueOf(lingerMs));
// 配置緩存
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
if (!compressionType.contains("IS_UNDEFINED")) {
// 指定壓縮算法
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
}
// 每個(gè)請(qǐng)求的消息大小
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, Integer.valueOf(maxRequestSize));
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<String, String>(properties);
}
}KafkaAppender.java
package com.demo.log2kafka.appender;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.ConsoleAppender;
import com.demo.log2kafka.util.KafkaUtil;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaAppender extends ConsoleAppender<ILoggingEvent> {
public static final Logger LOGGER = LoggerFactory.getLogger(KafkaAppender.class);
private String bootstrapServers;
private String topic;
private String batchSize;
private String lingerMs;
private String compressionType;
private String retries;
private String maxRequestSize;
private String isSend;
private Producer<String, String> producer;
@Override
public String toString() {
return "KafkaAppender{" +
"bootstrapServers='" + bootstrapServers + '\'' +
", topic='" + topic + '\'' +
", batchSize='" + batchSize + '\'' +
", lingerMs='" + lingerMs + '\'' +
", compressionType='" + compressionType + '\'' +
", retries='" + retries + '\'' +
", maxRequestSize='" + maxRequestSize + '\'' +
", isSend='" + isSend + '\'' +
", producer=" + producer +
'}';
}
@Override
public void start() {
super.start();
if ("true".equals(this.isSend)) {
if (producer == null) {
producer = KafkaUtil.createProducer(this.bootstrapServers, this.batchSize,
this.lingerMs, this.compressionType, this.retries, this.maxRequestSize);
}
}
}
@Override
public void stop() {
super.stop();
if ("true".equals(this.isSend)) {
this.producer.close();
}
LOGGER.info(Markers.KAFKA, "Stopping kafkaAppender...");
}
@Override
protected void append(ILoggingEvent eventObject) {
byte[] byteArray;
String log;
// 對(duì)日志格式進(jìn)行解碼
byteArray = this.encoder.encode(eventObject);
log = new String(byteArray);
ProducerRecord<String, String> record = new ProducerRecord<>(this.topic, log);
if (eventObject.getMarker() == null && "true".equals(this.isSend)) {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
LOGGER.error(Markers.KAFKA, "Send log to kafka failed: [{}]", log);
}
}
});
}
}
public String getBootstrapServers() {
return bootstrapServers;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getBatchSize() {
return batchSize;
}
public void setBatchSize(String batchSize) {
this.batchSize = batchSize;
}
public String getLingerMs() {
return lingerMs;
}
public void setLingerMs(String lingerMs) {
this.lingerMs = lingerMs;
}
public String getCompressionType() {
return compressionType;
}
public void setCompressionType(String compressionType) {
this.compressionType = compressionType;
}
public String getRetries() {
return retries;
}
public void setRetries(String retries) {
this.retries = retries;
}
public String getMaxRequestSize() {
return maxRequestSize;
}
public void setMaxRequestSize(String maxRequestSize) {
this.maxRequestSize = maxRequestSize;
}
public Producer<String, String> getProducer() {
return producer;
}
public void setProducer(Producer<String, String> producer) {
this.producer = producer;
}
public String getIsSend() {
return isSend;
}
public void setIsSend(String isSend) {
this.isSend = isSend;
}
}為了實(shí)現(xiàn)根據(jù)指定格式發(fā)送Kafka日志,直接繼承了ConsoleAppender.
logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOG_HOME" value="./logs"/>
<springProperty scope="context" name="springAppName"
source="spring.application.name"/>
<!-- 讀取配置文件中kafka的信息 -->
<springProperty scope="context" name="isSend"
source="log.config.kafka.isSend" defalutValue="false"/>
<springProperty scope="context" name="bootstrapServers"
source="log.config.kafka.bootstrapServers" defalutValue="localhost:9002"/>
<springProperty scope="context" name="topic"
source="log.config.kafka.topic" defalutValue="test-topic"/>
<springProperty scope="context" name="batchSize"
source="log.config.kafka.batchSize" defalutValue="1"/>
<springProperty scope="context" name="lingerMs"
source="log.config.kafka.lingerMs" defalutValue="1000"/>
<springProperty scope="context" name="compressionType"
source="log.config.kafka.compressionType" defalutValue="gzip"/>
<springProperty scope="context" name="retries"
source="log.config.kafka.retries" defalutValue="3"/>
<springProperty scope="context" name="maxRequestSize"
source="log.config.kafka.maxRequestSize" defalutValue="5242880"/>
<!-- 根據(jù)需要自行配置 -->
<property name="APP_NAME" value="${springAppName}"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>
{
"timestamp":"%date{yyyy-MM-dd HH:mm:ss.SSS}",
"app": "${APP_NAME}",
"logLevel": "%level",
"message": "%message"
}\n
</pattern>
</encoder>
</appender>
<appender name="KAFKA" class="com.demo.log2kafka.appender.KafkaAppender" >
<!-- encoder必須配置, 日志格式 -->
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>
{
"timestamp":"%date{yyyy-MM-dd HH:mm:ss.SSS}",
"app": "${APP_NAME}",
"logLevel": "%level",
"message": "%message"
}\n
</pattern>
</encoder>
<bootstrapServers>${bootstrapServers}</bootstrapServers>
<topic>${topic}</topic>
<batchSize>${batchSize}</batchSize>
<lingerMs>${lingerMs}</lingerMs>
<compressionType>${compressionType}</compressionType>
<retries>${retries}</retries>
<maxRequestSize>${maxRequestSize}</maxRequestSize>
<isSend>${isSend}</isSend>
</appender>
<!-- 使用logback-kafka-appender 當(dāng)日志級(jí)別配為debug時(shí),請(qǐng)使用該配置,不要使用root -->
<logger name="com.demo.log2kafka" level="DEBUG">
<appender-ref ref="KAFKA"/>
</logger>
<!-- 日志輸出級(jí)別 -->
<root level="INFO">
<!-- 用于控制臺(tái)輸出 -->
<appender-ref ref="STDOUT"/>
</root>
</configuration>application.yml
spring:
application:
name: log2kafka
# 不使用時(shí)可以不配置
log:
config:
kafka:
# 是否將日志發(fā)送至kafka,true或false,使用時(shí)必須配置
isSend: true
# kafka的地址,使用時(shí)必須配置
bootstrapServers: 192.168.254.152:9092,192.168.254.156:9092
# 日志發(fā)往的topic,使用時(shí)必須配置
topic: test-topic
# # 批量上傳數(shù)目,達(dá)到該數(shù)目后發(fā)送
batchSize: 5
# # 間隔時(shí)間后發(fā)送,即使未達(dá)到批量上傳最大數(shù),間隔時(shí)間到了也會(huì)發(fā)送,單位為毫秒
lingerMs: 1000
# # 數(shù)據(jù)壓縮類型
# compressionType: gzip
# # 重試次數(shù)
# retries: 3
# # 最大消息大小,此處設(shè)為5M
# maxRequestSize: 5242880
server:
port: 9090總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java多態(tài)中的向上轉(zhuǎn)型與向下轉(zhuǎn)型淺析
多態(tài)是指不同類的對(duì)象在調(diào)用同一個(gè)方法是所呈現(xiàn)出的多種不同行為,下面這篇文章主要給大家介紹了關(guān)于Java多態(tài)中向上轉(zhuǎn)型與向下轉(zhuǎn)型的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-02-02
Java使用嵌套循環(huán)模擬ATM機(jī)取款業(yè)務(wù)操作示例
這篇文章主要介紹了Java使用嵌套循環(huán)模擬ATM機(jī)取款業(yè)務(wù)操作,結(jié)合實(shí)例形式分析了Java模擬ATM機(jī)取款業(yè)務(wù)操作的相關(guān)流程控制、數(shù)值判斷等操作技巧,需要的朋友可以參考下2019-11-11
mybatis 插件: 打印 sql 及其執(zhí)行時(shí)間實(shí)現(xiàn)方法
下面小編就為大家?guī)?lái)一篇mybatis 插件: 打印 sql 及其執(zhí)行時(shí)間實(shí)現(xiàn)方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-06-06
Springmvc DispatcherServlet原理及用法解析
這篇文章主要介紹了Springmvc DispatcherServlet原理及用法解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09
Java使用jni清屏功能的實(shí)現(xiàn)(只針對(duì)cmd)
JNI是Java Native Interface的縮寫(xiě),它提供了若干的API實(shí)現(xiàn)了Java和其他語(yǔ)言的通信(主要是C&C++)。這篇文章主要介紹了Java使用jni清屏功能的實(shí)現(xiàn)(只針對(duì)cmd) ,感興趣的朋友跟隨腳本之家小編一起學(xué)習(xí)吧2018-05-05
Spring Boot中使用Activiti的方法教程(二)
工作流(Workflow),就是“業(yè)務(wù)過(guò)程的部分或整體在計(jì)算機(jī)應(yīng)用環(huán)境下的自動(dòng)化”,下面這篇文章主要給大家介紹了關(guān)于Spring Boot中使用Activiti的相關(guān)資料,需要的朋友可以參考下2018-08-08
IDEA連接Mysql數(shù)據(jù)庫(kù)的詳細(xì)圖文教程
項(xiàng)目開(kāi)發(fā)時(shí)使用Intellij IDEA連接本地?cái)?shù)據(jù)庫(kù),將數(shù)據(jù)庫(kù)可視化,還可對(duì)數(shù)據(jù)庫(kù)表直接進(jìn)行增刪改查操作,方便快捷又清晰,下面這篇文章主要給大家介紹了關(guān)于IDEA連接Mysql數(shù)據(jù)庫(kù)的詳細(xì)圖文教程,需要的朋友可以參考下2023-03-03
Java使用PreparedStatement接口及ResultSet結(jié)果集的方法示例
這篇文章主要介紹了Java使用PreparedStatement接口及ResultSet結(jié)果集的方法,結(jié)合實(shí)例形式分析了PreparedStatement接口及ResultSet結(jié)果集的相關(guān)使用方法與操作注意事項(xiàng),需要的朋友可以參考下2018-07-07

