SpringBoot整合InfluxDB的詳細(xì)過(guò)程
一、簡(jiǎn)單介紹InfluxDB是什么?
InfluxDB是一個(gè)由InfluxData開(kāi)發(fā)的開(kāi)源時(shí)序型數(shù)據(jù)。它由Go寫(xiě)成,著力于高性能地查詢(xún)與存儲(chǔ)時(shí)序型數(shù)據(jù)。InfluxDB被廣泛應(yīng)用于存儲(chǔ)系統(tǒng)的監(jiān)控?cái)?shù)據(jù),IoT行業(yè)的實(shí)時(shí)數(shù)據(jù)等場(chǎng)景。
1、主要特點(diǎn)
時(shí)間序列數(shù)據(jù)存儲(chǔ)
專(zhuān)門(mén)設(shè)計(jì)用于高效處理按時(shí)間順序產(chǎn)生的數(shù)據(jù),如傳感器數(shù)據(jù)、日志數(shù)據(jù)、指標(biāo)數(shù)據(jù)等。時(shí)間戳是 InfluxDB 中數(shù)據(jù)的關(guān)鍵組成部分,確保數(shù)據(jù)的時(shí)間順序性。
可以存儲(chǔ)大量的時(shí)間序列數(shù)據(jù),并提供高效的查詢(xún)和存儲(chǔ)機(jī)制,以滿(mǎn)足對(duì)實(shí)時(shí)數(shù)據(jù)和歷史數(shù)據(jù)的處理需求。
高性能
針對(duì)時(shí)間序列數(shù)據(jù)的特點(diǎn)進(jìn)行了優(yōu)化,能夠快速寫(xiě)入和查詢(xún)大規(guī)模的數(shù)據(jù)。它采用了高效的數(shù)據(jù)存儲(chǔ)結(jié)構(gòu)和索引機(jī)制,使得數(shù)據(jù)的讀寫(xiě)操作非常迅速。
支持高并發(fā)的寫(xiě)入和查詢(xún),可以滿(mǎn)足大規(guī)模數(shù)據(jù)采集和實(shí)時(shí)監(jiān)控系統(tǒng)的需求。
靈活的數(shù)據(jù)模型
InfluxDB 使用一種靈活的數(shù)據(jù)模型,包括測(cè)量(measurement)、標(biāo)簽(tag)和字段(field)。
測(cè)量類(lèi)似于傳統(tǒng)數(shù)據(jù)庫(kù)中的表,用于存儲(chǔ)具有相同數(shù)據(jù)結(jié)構(gòu)的時(shí)間序列數(shù)據(jù)。標(biāo)簽用于對(duì)數(shù)據(jù)進(jìn)行分類(lèi)和索引,方便快速查詢(xún)。字段則存儲(chǔ)實(shí)際的測(cè)量值,可以是數(shù)值、字符串或布爾值等。
強(qiáng)大的查詢(xún)語(yǔ)言
InfluxDB 提供了一種功能強(qiáng)大的查詢(xún)語(yǔ)言 InfluxQL,用于查詢(xún)和分析時(shí)間序列數(shù)據(jù)。
InfluxQL 支持各種聚合函數(shù)、時(shí)間范圍查詢(xún)、過(guò)濾條件等,可以方便地進(jìn)行數(shù)據(jù)分析和可視化。它還支持連續(xù)查詢(xún)(Continuous Queries)和存儲(chǔ)策略(Retention Policies),可以自動(dòng)對(duì)數(shù)據(jù)進(jìn)行聚合和清理,以提高查詢(xún)性能和節(jié)省存儲(chǔ)空間。
2、應(yīng)用場(chǎng)景
物聯(lián)網(wǎng)(IoT)
在物聯(lián)網(wǎng)應(yīng)用中,大量的傳感器設(shè)備會(huì)不斷產(chǎn)生時(shí)間序列數(shù)據(jù),如溫度、濕度、壓力等。InfluxDB 可以高效地存儲(chǔ)和查詢(xún)這些數(shù)據(jù),為物聯(lián)網(wǎng)數(shù)據(jù)分析和監(jiān)控提供支持。
可以實(shí)時(shí)監(jiān)測(cè)設(shè)備狀態(tài)、分析設(shè)備性能、預(yù)測(cè)設(shè)備故障等。
系統(tǒng)監(jiān)控
用于監(jiān)控服務(wù)器、網(wǎng)絡(luò)設(shè)備、應(yīng)用程序等的性能指標(biāo)。例如,可以收集 CPU 使用率、內(nèi)存使用率、網(wǎng)絡(luò)流量等數(shù)據(jù),并使用 InfluxDB 進(jìn)行存儲(chǔ)和分析。
通過(guò)實(shí)時(shí)監(jiān)控和歷史數(shù)據(jù)分析,可以及時(shí)發(fā)現(xiàn)系統(tǒng)性能問(wèn)題,進(jìn)行故障排除和優(yōu)化。
金融交易數(shù)據(jù)分析
在金融領(lǐng)域,時(shí)間序列數(shù)據(jù)非常重要,如股票價(jià)格、匯率、交易量等。InfluxDB 可以用于存儲(chǔ)和分析這些金融數(shù)據(jù),為交易決策和風(fēng)險(xiǎn)評(píng)估提供支持。
可以進(jìn)行實(shí)時(shí)行情分析、歷史數(shù)據(jù)回溯、交易策略評(píng)估等。
日志分析
可以將日志數(shù)據(jù)以時(shí)間序列的形式存儲(chǔ)在 InfluxDB 中,方便進(jìn)行日志分析和故障排查。
通過(guò)查詢(xún)特定時(shí)間范圍內(nèi)的日志數(shù)據(jù),可以快速定位問(wèn)題發(fā)生的時(shí)間和原因。
總之,InfluxDB 是一個(gè)功能強(qiáng)大的時(shí)間序列數(shù)據(jù)庫(kù),適用于各種需要處理時(shí)間序列數(shù)據(jù)的場(chǎng)景。它的高性能、靈活的數(shù)據(jù)模型和強(qiáng)大的查詢(xún)語(yǔ)言使得它成為了許多企業(yè)和開(kāi)發(fā)者的首選數(shù)據(jù)庫(kù)之一。
想要更深入了解,請(qǐng):點(diǎn)擊這里
二、使用步驟
1、集成原生的InfluxDB
依賴(lài):
<!-- InfluxDB 原生依賴(lài) --> <dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.22</version> </dependency>
配置:
#--------- # Influxdb #--------- influxdb: url: http://127.0.0.1:8086 username: admin password: admin database: test retention: autogen //數(shù)據(jù)保留策略
InfluxDB數(shù)據(jù)庫(kù)操作類(lèi):
package com.geesun.influxdb; import cn.hutool.core.collection.CollUtil; import org.influxdb.InfluxDB; import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.dto.*; import org.influxdb.dto.Point.Builder; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import plus.ojbk.influxdb.autoconfigure.properties.InfluxdbProperties; import javax.annotation.Resource; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** * InfluxDB數(shù)據(jù)庫(kù)操作類(lèi) */ @Service public class InfluxDbCommand { @Resource private InfluxDB influxDB; @Resource private InfluxdbProperties config; @Value("${influxdb.retention}") private String retentionPolicy; /** * 測(cè)試連接是否正常 * * @return true 正常 */ public boolean ping() { boolean isConnected = false; Pong pong; try { pong = influxDB.ping(); if (pong != null) { isConnected = true; } } catch (Exception e) { e.printStackTrace(); } return isConnected; } /** * 切換數(shù)據(jù)庫(kù) */ public void setDB(String dbName) { influxDB.setDatabase(dbName); } /** * 關(guān)閉數(shù)據(jù)庫(kù) */ public void close() { influxDB.close(); } /** * 創(chuàng)建自定義保留策略 * * @param policyName 策略名 * @param days 保存天數(shù) * @param replication 保存副本數(shù)量 * @param isDefault 是否設(shè)為默認(rèn)保留策略 */ public void createRetentionPolicy(String policyName, int days, int replication, Boolean isDefault) { String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %sd REPLICATION %s ", policyName, config.getDatabase(), days, replication); if (Boolean.TRUE.equals(isDefault)) { sql = sql + " DEFAULT"; } query(sql); } /** * 切換策略 * * @param policyName 策略名 */ public void updRetentionPolicy(String policyName) { String sql = "ALTER RETENTION POLICY \"" + policyName + "\" ON \"" + config.getDatabase() + "\" DEFAULT"; query(sql); this.retentionPolicy = policyName; } /** * 創(chuàng)建默認(rèn)的保留策略 * <p> * 策略名:hour,保存天數(shù):30天,保存副本數(shù)量:1,設(shè)為默認(rèn)保留策略 */ public void createDefaultRetentionPolicy() { String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT" , "hour", config.getDatabase(), "30d", 1); this.query(command); } /*********************************增刪查**************************************************/ /** * 查詢(xún) * * @param command 查詢(xún)語(yǔ)句 * @return */ public QueryResult query(String command) { return influxDB.query(new Query(command, config.getDatabase())); } /** * 插入 * * @param measurement 表 * @param tags 標(biāo)簽 * @param fields 字段 */ public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time, TimeUnit timeUnit) { Builder builder = Point.measurement(measurement); builder.tag(tags); builder.fields(fields); if (0 != time) { builder.time(time, timeUnit); } influxDB.write(config.getDatabase(), retentionPolicy, builder.build()); } /** * 插入 * * @param measurement 表 * @param tags 標(biāo)簽 * @param fields 字段 */ public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields) { insert(measurement, tags, fields, System.currentTimeMillis(), TimeUnit.MILLISECONDS); } /** * 刪除 * * @param command 刪除語(yǔ)句 * @return 返回錯(cuò)誤信息 */ public String deleteMeasurementData(String command) { QueryResult result = influxDB.query(new Query(command, config.getDatabase())); return result.getError(); } /** * 構(gòu)建Point * * @param measurement 表 * @param time 時(shí)間 * @param timeUnit 時(shí)間單位 * @param tags tags * @param fields * @return */ public Point pointBuilder(String measurement, long time, TimeUnit timeUnit, Map<String, String> tags , Map<String, Object> fields) { return Point.measurement(measurement).time(time, timeUnit).tag(tags).fields(fields).build(); } /** * 批量寫(xiě)入測(cè)點(diǎn) * * @param batchPoints */ public void batchInsert(BatchPoints batchPoints) { influxDB.write(batchPoints); } /** * 批量寫(xiě)入數(shù)據(jù) * * @param database 數(shù)據(jù)庫(kù) * @param retentionPolicy 保存策略 * @param consistency 一致性 * @param records 要保存的數(shù)據(jù)(調(diào)用BatchPoints.lineProtocol()可得到一條record) */ public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency , TimeUnit timeUnit, final List<String> records) { influxDB.write(database, retentionPolicy, consistency, timeUnit, records); } /** * 查詢(xún)-把查詢(xún)出的結(jié)果集轉(zhuǎn)換成對(duì)應(yīng)的實(shí)體對(duì)象,聚合成list * @param command : sql語(yǔ)句 */ public List<Map<String, Object>> queryWrapper(String command) { List<Map<String, Object>> list = new ArrayList<>(); QueryResult queryResult = influxDB.query(new Query(command)); List<QueryResult.Result> resultList = queryResult.getResults(); for (QueryResult.Result result : resultList) { List<QueryResult.Series> seriesList = result.getSeries(); if (CollUtil.isEmpty(seriesList)) { return list; } for (QueryResult.Series series : seriesList) { List<String> columns = series.getColumns(); List<List<Object>> values = series.getValues(); if (CollUtil.isEmpty(values)) { continue; } values.forEach(value -> { Map<String, Object> map = new HashMap<>(); for (int i = 0; i < columns.size(); i++) { map.put(columns.get(i), value.get(i)); } list.add(map); }); } } return list; } }
2、集成封裝的InfluxDBTemplate
依賴(lài):
<dependency> <groupId>plus.ojbk</groupId> <artifactId>influxdb-spring-boot-starter</artifactId> <version>1.0.2</version> </dependency>
配置:
#--------- # Influxdb #--------- influxdb: url: http://127.0.0.1:8086 username: admin password: admin database: test retention: autogen //數(shù)據(jù)保留策略
實(shí)體,對(duì)標(biāo)influxDB的表:
package io.springboot.influxdb.entity; import lombok.Data; import org.influxdb.annotation.Column; import org.influxdb.annotation.Measurement; import plus.ojbk.influxdb.annotation.Count; import java.math.BigDecimal; import java.time.LocalDateTime; /** * @version 1.0 * @since 2021/6/17 18:26 */ @Data @Measurement(name = "device") public class Device { /** * 設(shè)備編號(hào) */ @Column(name="device_no", tag = true) //tag 可以理解為influxdb的索引 private String deviceNo; /** * 數(shù)據(jù)值 */ @Count("value") @Column(name="value") private BigDecimal value; /** * 電壓 */ @Column(name="voltage") private Float voltage; /** * 狀態(tài) */ @Column(name="state") private Boolean state; /** * 上報(bào)時(shí)間 */ @Column(name="time") private LocalDateTime time; }
測(cè)試:
package io.springboot.influxdb; import com.alibaba.fastjson.JSON; import io.springboot.influxdb.entity.Device; import org.influxdb.dto.QueryResult; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import plus.ojbk.influxdb.core.Delete; import plus.ojbk.influxdb.core.InfluxdbTemplate; import plus.ojbk.influxdb.core.Op; import plus.ojbk.influxdb.core.Order; import plus.ojbk.influxdb.core.Query; import plus.ojbk.influxdb.core.model.DeleteModel; import plus.ojbk.influxdb.core.model.QueryModel; import java.math.BigDecimal; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; @SpringBootTest class InfluxdbDemoApplicationTests { @Autowired private InfluxdbTemplate influxdbTemplate; private String measurement = "device"; @Test void getCount() { QueryModel countModel = new QueryModel(); ///countModel.setMeasurement(measurement); countModel.setMeasurement(InfluxdbUtils.getMeasurement(Device.class)); countModel.setStart(LocalDateTime.now().plusHours(-2L)); countModel.setEnd(LocalDateTime.now()); //countModel.setSelect(Query.count("voltage")); //只能count field字段 countModel.setSelect(Query.count(InfluxdbUtils.getCountField(Device.class))); countModel.setWhere(Op.where(countModel)); //獲得總條數(shù) long count = influxdbTemplate.count(Query.build(countModel)); System.err.println(count); } @Test void getData() { QueryModel model = new QueryModel(); model.setCurrent(1L); //當(dāng)前頁(yè) model.setSize(10L); //每頁(yè)大小 //model.setMeasurement(measurement); model.setMeasurement(InfluxdbUtils.getMeasurement(Device.class)); model.setStart(LocalDateTime.now().plusHours(-2L)); //開(kāi)始時(shí)間 model.setEnd(LocalDateTime.now()); //結(jié)束時(shí)間 model.setUseTimeZone(true); //時(shí)區(qū) model.setOrder(Order.DESC); //排序 //where 條件中額外參數(shù)可放入model.setMap(); model.setWhere(Op.where(model)); //理解為where條件 //分頁(yè)數(shù)據(jù) List<Device> deviceList = influxdbTemplate.selectList(Query.build(model), Device.class); System.err.println(JSON.toJSONString(deviceList)); } @Test void insert() { List<Device> deviceList = new ArrayList<>(); for (int i = 0; i < 10; i++) { Device device = new Device(); device.setDeviceNo("device-" + i); device.setValue(new BigDecimal(12.548)); device.setState(true); device.setVoltage(3.5F); deviceList.add(device); } influxdbTemplate.insert(deviceList); } @Test void delete() { Map<String, Object> map = new TreeMap<>(); map.put("device_no", "device-1"); DeleteModel model = new DeleteModel(); model.setMap(map); //model.setStart(LocalDateTime.now().plusHours(-10L)); //model.setEnd(LocalDateTime.now()); model.setMeasurement(measurement); model.setWhere(Op.where(model)); influxdbTemplate.delete(Delete.build(model)); } void other(){ influxdbTemplate.execute("自己寫(xiě)sql"); } }
相較于原版,它封裝了自有的Util以及Template等,對(duì)于原版Point的time列類(lèi)型問(wèn)題,它對(duì)number和long 型轉(zhuǎn)換成了LocalDateTime類(lèi)型,并且封裝了更多的方法(具體自行拓展)。
注:原生的influxDB和spring自帶的可一起使用。
到此這篇關(guān)于SpringBoot整合InfluxDB的文章就介紹到這了,更多相關(guān)SpringBoot整合InfluxDB內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java精確抽取網(wǎng)頁(yè)發(fā)布時(shí)間
這篇文章主要為大家詳細(xì)介紹了Java精確抽取網(wǎng)頁(yè)發(fā)布時(shí)間的相關(guān)資料,盡量做到精確無(wú)誤,感興趣的小伙伴們可以參考一下2016-06-06淺析Java中線(xiàn)程的創(chuàng)建和啟動(dòng)
這篇文章運(yùn)用實(shí)例代碼介紹了Java中線(xiàn)程的創(chuàng)建和啟動(dòng),非常詳細(xì),有需要的朋友們可以參考借鑒,下面一起來(lái)看看。2016-08-08Java Yml格式轉(zhuǎn)換為Properties問(wèn)題
本文介紹了作者編寫(xiě)一個(gè)Java工具類(lèi)來(lái)解決在線(xiàn)YAML到Properties轉(zhuǎn)換時(shí)屬性?xún)?nèi)容遺漏的問(wèn)題,通過(guò)遍歷YAML文件的樹(shù)結(jié)構(gòu),作者成功實(shí)現(xiàn)了屬性的完整轉(zhuǎn)換,總結(jié)指出,該工具類(lèi)適用于多種數(shù)據(jù)類(lèi)型,并且代碼簡(jiǎn)潔易懂2024-12-12使用java代碼代替xml實(shí)現(xiàn)SSM教程
這篇文章主要介紹了使用java代碼代替xml實(shí)現(xiàn)SSM教程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12關(guān)于MyBatis中SqlSessionFactory和SqlSession簡(jiǎn)解
這篇文章主要介紹了MyBatis中SqlSessionFactory和SqlSession簡(jiǎn)解,具有很好的參考價(jià)值,希望大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12Springboot 整合 Java DL4J 實(shí)現(xiàn)文物保護(hù)系統(tǒng)的詳細(xì)過(guò)程
在數(shù)字化時(shí)代,文物保護(hù)尤為關(guān)鍵,本文介紹如何利用SpringBoot和Deeplearning4j構(gòu)建一個(gè)圖像識(shí)別的文物保護(hù)系統(tǒng),系統(tǒng)采用卷積神經(jīng)網(wǎng)絡(luò)(CNN),能夠識(shí)別文物的損壞情況,本文介紹Springboot 整合 Java DL4J 實(shí)現(xiàn)文物保護(hù)系統(tǒng),感興趣的朋友一起看看吧2024-10-10Java的String類(lèi)中的startsWith方法和endsWith方法示例詳解
大家應(yīng)該都知道startsWith()方法用于檢測(cè)字符串是否以指定的前綴開(kāi)始,endsWith()方法用于測(cè)試字符串是否以指定的后綴結(jié)束,本文就Java的String類(lèi)中的startsWith方法和endsWith方法給大家詳細(xì)講解,感興趣的朋友一起看看吧2023-11-11使用idea搭建一個(gè)spring mvc項(xiàng)目的圖文教程
這篇文章主要介紹了使用idea直接創(chuàng)建一個(gè)spring mvc項(xiàng)目的圖文教程,本文通過(guò)圖文并茂的方式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-03-03