springboot?整合hbase的示例代碼
一、Springboot整合HBase數(shù)據(jù)庫
1、添加依賴
<!-- Spring Boot HBase 依賴 --> <dependency> <groupId>com.spring4all</groupId> <artifactId>spring-boot-starter-hbase</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop-hbase</artifactId> <version>2.5.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop</artifactId> <version>2.5.0.RELEASE</version> </dependency>
2、添加配置
通過Yaml方式配置
spring: hbase: zookeeper: quorum: hbase1.xxx.org,hbase2.xxx.org,hbase3.xxx.org property: clientPort: 2181 data: hbase: quorum: XXX rootDir: XXX nodeParent: XXX zookeeper: znode: parent: /hbase
3、添加配置類
@Configuration public class HBaseConfig { @Bean public HBaseService getHbaseService() { //設(shè)置臨時的hadoop環(huán)境變量,之后程序會去這個目錄下的\bin目錄下找winutils.exe工具,windows連接hadoop時會用到 //System.setProperty("hadoop.home.dir", "D:\\Program Files\\Hadoop"); //執(zhí)行此步時,會去resources目錄下找相應(yīng)的配置文件,例如hbase-site.xml org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); return new HBaseService(conf); } }
4、工具類的方式實現(xiàn)HBASE操作
@Service public class HBaseService { private Admin admin = null; private Connection connection = null; public HBaseService(Configuration conf) { connection = ConnectionFactory.createConnection(conf); admin = connection.getAdmin(); } //創(chuàng)建表 create <table>, {NAME => <column family>, VERSIONS => <VERSIONS>} public boolean creatTable(String tableName, List<String> columnFamily) { //列族column family List<ColumnFamilyDescriptor> cfDesc = new ArrayList<>(columnFamily.size()); columnFamily.forEach(cf -> { cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder( Bytes.toBytes(cf)).build()); }); //表 table TableDescriptor tableDesc = TableDescriptorBuilder .newBuilder(TableName.valueOf(tableName)) .setColumnFamilies(cfDesc).build(); if (admin.tableExists(TableName.valueOf(tableName))) { log.debug("table Exists!"); } else { admin.createTable(tableDesc); log.debug("create table Success!"); } close(admin, null, null); return true; } public List<String> getAllTableNames() { List<String> result = new ArrayList<>(); TableName[] tableNames = admin.listTableNames(); for (TableName tableName : tableNames) { result.add(tableName.getNameAsString()); } close(admin, null, null); return result; } public Map<String, Map<String, String>> getResultScanner(String tableName) { Scan scan = new Scan(); return this.queryData(tableName, scan); } private Map<String, Map<String, String>> queryData(String tableName, Scan scan) { // <rowKey,對應(yīng)的行數(shù)據(jù)> Map<String, Map<String, String>> result = new HashMap<>(); ResultScanner rs = null; //獲取表 Table table = null; table = getTable(tableName); rs = table.getScanner(scan); for (Result r : rs) { // 每一行數(shù)據(jù) Map<String, String> columnMap = new HashMap<>(); String rowKey = null; // 行鍵,列族和列限定符一起確定一個單元(Cell) for (Cell cell : r.listCells()) { if (rowKey == null) { rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); } columnMap.put( //列限定符 Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), //列族 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } if (rowKey != null) { result.put(rowKey, columnMap); } } close(null, rs, table); return result; } public void putData(String tableName, String rowKey, String familyName, String[] columns, String[] values) { Table table = null; table = getTable(tableName); putData(table, rowKey, tableName, familyName, columns, values); close(null, null, table); } private void putData(Table table, String rowKey, String tableName, String familyName, String[] columns, String[] values) { //設(shè)置rowkey Put put = new Put(Bytes.toBytes(rowKey)); if (columns != null && values != null && columns.length == values.length) { for (int i = 0; i < columns.length; i++) { if (columns[i] != null && values[i] != null) { put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i])); } else { throw new NullPointerException(MessageFormat.format( "列名和列數(shù)據(jù)都不能為空,column:{0},value:{1}", columns[i], values[i])); } } } table.put(put); log.debug("putData add or update data Success,rowKey:" + rowKey); table.close(); } private Table getTable(String tableName) throws IOException { return connection.getTable(TableName.valueOf(tableName)); } private void close(Admin admin, ResultScanner rs, Table table) { if (admin != null) { try { admin.close(); } catch (IOException e) { log.error("關(guān)閉Admin失敗", e); } if (rs != null) { rs.close(); } if (table != null) { rs.close(); } if (table != null) { try { table.close(); } catch (IOException e) { log.error("關(guān)閉Table失敗", e); } } } } }
測試類
@RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest class HBaseApplicationTests { @Resource private HBaseService hbaseService; //測試創(chuàng)建表 @Test public void testCreateTable() { hbaseService.creatTable("test_base", Arrays.asList("a", "back")); } //測試加入數(shù)據(jù) @Test public void testPutData() { hbaseService.putData("test_base", "000001", "a", new String[]{ "project_id", "varName", "coefs", "pvalues", "tvalues", "create_time"}, new String[]{"40866", "mob_3", "0.9416", "0.0000", "12.2293", "null"}); hbaseService.putData("test_base", "000002", "a", new String[]{ "project_id", "varName", "coefs", "pvalues", "tvalues", "create_time"}, new String[]{"40866", "idno_prov", "0.9317", "0.0000", "9.8679", "null"}); hbaseService.putData("test_base", "000003", "a", new String[]{ "project_id", "varName", "coefs", "pvalues", "tvalues", "create_time"}, new String[]{"40866", "education", "0.8984", "0.0000", "25.5649", "null"}); } //測試遍歷全表 @Test public void testGetResultScanner() { Map<String, Map<String, String>> result2 = hbaseService.getResultScanner("test_base"); System.out.println("-----遍歷查詢?nèi)韮?nèi)容-----"); result2.forEach((k, value) -> { System.out.println(k + "--->" + value); }); } }
二、使用spring-data-hadoop-hbase
1、配置類
@Configuration public class HBaseConfiguration { @Value("${hbase.zookeeper.quorum}") private String zookeeperQuorum; @Value("${hbase.zookeeper.property.clientPort}") private String clientPort; @Value("${zookeeper.znode.parent}") private String znodeParent; @Bean public HbaseTemplate hbaseTemplate() { org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); conf.set("hbase.zookeeper.quorum", zookeeperQuorum); conf.set("hbase.zookeeper.property.clientPort", clientPort); conf.set("zookeeper.znode.parent", znodeParent); return new HbaseTemplate(conf); } }
2、業(yè)務(wù)類中使用HbaseTemplate
這個是作為工具類
@Service @Slf4j public class HBaseService { @Autowired private HbaseTemplate hbaseTemplate; //查詢列簇 public List<Result> getRowKeyAndColumn(String tableName, String startRowkey, String stopRowkey, String column, String qualifier) { FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); if (StringUtils.isNotBlank(column)) { log.debug("{}", column); filterList.addFilter(new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(column)))); } if (StringUtils.isNotBlank(qualifier)) { log.debug("{}", qualifier); filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(qualifier)))); } Scan scan = new Scan(); if (filterList.getFilters().size() > 0) { scan.setFilter(filterList); } scan.setStartRow(Bytes.toBytes(startRowkey)); scan.setStopRow(Bytes.toBytes(stopRowkey)); return hbaseTemplate.find(tableName, scan, (rowMapper, rowNum) -> rowMapper); } public List<Result> getListRowkeyData(String tableName, List<String> rowKeys, String familyColumn, String column) { return rowKeys.stream().map(rk -> { if (StringUtils.isNotBlank(familyColumn)) { if (StringUtils.isNotBlank(column)) { return hbaseTemplate.get(tableName, rk, familyColumn, column, (rowMapper, rowNum) -> rowMapper); } else { return hbaseTemplate.get(tableName, rk, familyColumn, (rowMapper, rowNum) -> rowMapper); } } return hbaseTemplate.get(tableName, rk, (rowMapper, rowNum) -> rowMapper); }).collect(Collectors.toList()); } }
三、使用spring-boot-starter-data-hbase
## 下載spring-boot-starter-hbase代碼 git clone https://github.com/SpringForAll/spring-boot-starter-hbase.git ## 安裝 cd spring-boot-starter-hbase mvn clean install
1、添加配置項
- spring.data.hbase.quorum 指定 HBase 的 zk 地址
- spring.data.hbase.rootDir 指定 HBase 在 HDFS 上存儲的路徑
- spring.data.hbase.nodeParent 指定 ZK 中 HBase 的根 ZNode
2、定義好DTO
@Data public class City { private Long id; private Integer age; private String cityName; }
3、創(chuàng)建對應(yīng)rowMapper
public class CityRowMapper implements RowMapper<City> { private static byte[] COLUMN_FAMILY = "f".getBytes(); private static byte[] NAME = "name".getBytes(); private static byte[] AGE = "age".getBytes(); @Override public City mapRow(Result result, int rowNum) throws Exception { String name = Bytes.toString(result.getValue(COLUMN_FAMILY, NAME)); int age = Bytes.toInt(result.getValue(COLUMN_FAMILY, AGE)); City dto = new City(); dto.setCityName(name); dto.setAge(age); return dto; } }
4、操作實現(xiàn)增改查
- HbaseTemplate.find 返回 HBase 映射的 City 列表
- HbaseTemplate.get 返回 row 對應(yīng)的 City 信息
- HbaseTemplate.saveOrUpdates 保存或者更新
如果 HbaseTemplate 操作不滿足需求,完全可以使用 hbaseTemplate 的getConnection() 方法,獲取連接。進而類似 HbaseTemplate 實現(xiàn)的邏輯,實現(xiàn)更復(fù)雜的需求查詢等功能
@Service public class CityServiceImpl implements CityService { @Autowired private HbaseTemplate hbaseTemplate; //查詢 public List<City> query(String startRow, String stopRow) { Scan scan = new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow)); scan.setCaching(5000); List<City> dtos = this.hbaseTemplate.find("people_table", scan, new CityRowMapper()); return dtos; } //查詢 public City query(String row) { City dto = this.hbaseTemplate.get("people_table", row, new CityRowMapper()); return dto; } //新增或者更新 public void saveOrUpdate() { List<Mutation> saveOrUpdates = new ArrayList<Mutation>(); Put put = new Put(Bytes.toBytes("135xxxxxx")); put.addColumn(Bytes.toBytes("people"), Bytes.toBytes("name"), Bytes.toBytes("test")); saveOrUpdates.add(put); this.hbaseTemplate.saveOrUpdates("people_table", saveOrUpdates); } }
四、Springboot整合Influxdb
中文文檔:https://jasper-zhang1.gitbooks.io/influxdb/content/Introduction/installation.html
注意,項目建立在spring-boot-web基礎(chǔ)上
1、添加依賴
<dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.15</version> </dependency>
2、添加配置
spring: influx: database: my_sensor1 password: admin url: http://127.0.0.1:6086 user: admin
3、編寫配置類
@Configuration public class InfluxdbConfig { @Value("${spring.influx.url}") private String influxDBUrl; @Value("${spring.influx.user}") private String userName; @Value("${spring.influx.password}") private String password; @Value("${spring.influx.database}") private String database; @Bean("influxDB") public InfluxDB influxdb(){ InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password); try { /** * 異步插入: * enableBatch這里第一個是point的個數(shù),第二個是時間,單位毫秒 * point的個數(shù)和時間是聯(lián)合使用的,如果滿100條或者60 * 1000毫秒 * 滿足任何一個條件就會發(fā)送一次寫的請求。 */ influxDB.setDatabase(database).enableBatch(100,1000 * 60, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); } finally { //設(shè)置默認策略 influxDB.setRetentionPolicy("sensor_retention"); } //設(shè)置日志輸出級別 influxDB.setLogLevel(InfluxDB.LogLevel.BASIC); return influxDB; } }
4、InfluxDB原生API實現(xiàn)
@SpringBootTest(classes = {MainApplication.class}) @RunWith(SpringJUnit4ClassRunner.class) public class InfluxdbDBTest { @Autowired private InfluxDB influxDB; //measurement private final String measurement = "sensor"; @Value("${spring.influx.database}") private String database; /** * 批量插入第一種方式 */ @Test public void insert(){ List<String> lines = new ArrayList<String>(); Point point = null; for(int i=0;i<50;i++){ point = Point.measurement(measurement) .tag("deviceId", "sensor" + i) .addField("temp", 3) .addField("voltage", 145+i) .addField("A1", "4i") .addField("A2", "4i").build(); lines.add(point.lineProtocol()); } //寫入 influxDB.write(lines); } /** * 批量插入第二種方式 */ @Test public void batchInsert(){ BatchPoints batchPoints = BatchPoints .database(database) .consistency(InfluxDB.ConsistencyLevel.ALL) .build(); //遍歷sqlserver獲取數(shù)據(jù) for(int i=0;i<50;i++){ //創(chuàng)建單條數(shù)據(jù)對象——表名 Point point = Point.measurement(measurement) //tag屬性——只能存儲String類型 .tag("deviceId", "sensor" + i) .addField("temp", 3) .addField("voltage", 145+i) .addField("A1", "4i") .addField("A2", "4i").build(); //將單條數(shù)據(jù)存儲到集合中 batchPoints.point(point); } //批量插入 influxDB.write(batchPoints); } /** * 獲取數(shù)據(jù) */ @Test public void datas(@RequestParam Integer page){ int pageSize = 10; // InfluxDB支持分頁查詢,因此可以設(shè)置分頁查詢條件 String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize; String queryCondition = ""; //查詢條件暫且為空 // 此處查詢所有內(nèi)容,如果 String queryCmd = "SELECT * FROM " // 查詢指定設(shè)備下的日志信息 // 要指定從 RetentionPolicyName.measurement中查詢指定數(shù)據(jù),默認的策略可以不加; // + 策略name + "." + measurement + measurement // 添加查詢條件(注意查詢條件選擇tag值,選擇field數(shù)值會嚴重拖慢查詢速度) + queryCondition // 查詢結(jié)果需要按照時間排序 + " ORDER BY time DESC" // 添加分頁查詢條件 + pageQuery; QueryResult queryResult = influxDB.query(new Query(queryCmd, database)); System.out.println("query result => "+queryResult); } }
5、采用封裝工具類
1、創(chuàng)建實體類
@Data @Measurement(name = "sensor") public class Sensor { @Column(name="deviceId",tag=true) private String deviceId; @Column(name="temp") private float temp; @Column(name="voltage") private float voltage; @Column(name="A1") private float A1; @Column(name="A2") private float A2; @Column(name="time") private String time; }
2、創(chuàng)建工具類
@Component public class InfluxdbUtils { @Autowired private InfluxDB influxDB; @Value("${spring.influx.database}") private String database; /** * 新增單條記錄,利用java的反射機制進行新增操作 */ @SneakyThrows public void insertOne(Object obj){ //獲取度量 Class<?> clasz = obj.getClass(); Measurement measurement = clasz.getAnnotation(Measurement.class); //構(gòu)建 Point.Builder builder = Point.measurement(measurement.name()); // 獲取對象屬性 Field[] fieldArray = clasz.getDeclaredFields(); Column column = null; for(Field field : fieldArray){ column = field.getAnnotation(Column.class); //設(shè)置屬性可操作 field.setAccessible(true); if(column.tag()){ //tag屬性只能存儲String類型 builder.tag(column.name(), field.get(obj).toString()); }else{ //設(shè)置field if(field.get(obj) != null){ builder.addField(column.name(), field.get(obj).toString()); } } } influxDB.write(builder.build()); } /** * 批量新增,方法一 */ @SneakyThrows public void insertBatchByRecords(List<?> records){ List<String> lines = new ArrayList<String>(); records.forEach(record->{ Class<?> clasz = record.getClass(); //獲取度量 Measurement measurement = clasz.getAnnotation(Measurement.class); //構(gòu)建 Point.Builder builder = Point.measurement(measurement.name()); Field[] fieldArray = clasz.getDeclaredFields(); Column column = null; for(Field field : fieldArray){ column = field.getAnnotation(Column.class); //設(shè)置屬性可操作 field.setAccessible(true); if(column.tag()){ //tag屬性只能存儲String類型 builder.tag(column.name(), field.get(record).toString()); }else{ //設(shè)置field if(field.get(record) != null){ builder.addField(column.name(), field.get(record).toString()); } } } lines.add(builder.build().lineProtocol()); }); influxDB.write(lines); } /** * 批量新增,方法二 */ @SneakyThrows public void insertBatchByPoints(List<?> records){ BatchPoints batchPoints = BatchPoints.database(database) .consistency(InfluxDB.ConsistencyLevel.ALL) .build(); records.forEach(record->{ Class<?> clasz = record.getClass(); //獲取度量 Measurement measurement = clasz.getAnnotation(Measurement.class); //構(gòu)建 Point.Builder builder = Point.measurement(measurement.name()); Field[] fieldArray = clasz.getDeclaredFields(); Column column = null; for(Field field : fieldArray){ column = field.getAnnotation(Column.class); //設(shè)置屬性可操作 field.setAccessible(true); if(column.tag()){ //tag屬性只能存儲String類型 builder.tag(column.name(), field.get(record).toString()); }else{ //設(shè)置field if(field.get(record) != null){ builder.addField(column.name(), field.get(record).toString()); } } } batchPoints.point(builder.build()); }); influxDB.write(batchPoints); } /** * 查詢,返回Map集合 * @param query 完整的查詢語句 */ public List<Object> fetchRecords(String query){ List<Object> results = new ArrayList<Object>(); QueryResult queryResult = influxDB.query(new Query(query, database)); queryResult.getResults().forEach(result->{ result.getSeries().forEach(serial->{ List<String> columns = serial.getColumns(); int fieldSize = columns.size(); serial.getValues().forEach(value->{ Map<String,Object> obj = new HashMap<String,Object>(); for(int i=0;i<fieldSize;i++){ obj.put(columns.get(i), value.get(i)); } results.add(obj); }); }); }); return results; } /** * 查詢,返回map集合 * @param fieldKeys 查詢的字段,不可為空;不可為單獨的tag * @param measurement 度量,不可為空; */ public List<Object> fetchRecords(String fieldKeys, String measurement){ StringBuilder query = new StringBuilder(); query.append("select ").append(fieldKeys).append(" from ").append(measurement); return this.fetchRecords(query.toString()); } /** * 查詢,返回map集合 * @param fieldKeys 查詢的字段,不可為空;不可為單獨的tag * @param measurement 度量,不可為空; */ public List<Object> fetchRecords(String fieldKeys, String measurement, String order){ StringBuilder query = new StringBuilder(); query.append("select ").append(fieldKeys).append(" from ").append(measurement); query.append(" order by ").append(order); return this.fetchRecords(query.toString()); } /** * 查詢,返回map集合 * @param fieldKeys 查詢的字段,不可為空;不可為單獨的tag * @param measurement 度量,不可為空; */ public List<Object> fetchRecords(String fieldKeys, String measurement, String order, String limit){ StringBuilder query = new StringBuilder(); query.append("select ").append(fieldKeys).append(" from ").append(measurement); query.append(" order by ").append(order); query.append(limit); return this.fetchRecords(query.toString()); } /** * 查詢,返回對象的list集合 */ @SneakyThrows public <T> List<T> fetchResults(String query, Class<?> clasz){ List results = new ArrayList<>(); QueryResult queryResult = influxDB.query(new Query(query, database)); queryResult.getResults().forEach(result->{ result.getSeries().forEach(serial->{ List<String> columns = serial.getColumns(); int fieldSize = columns.size(); serial.getValues().forEach(value->{ Object obj = null; obj = clasz.newInstance(); for(int i=0;i<fieldSize;i++){ String fieldName = columns.get(i); Field field = clasz.getDeclaredField(fieldName); field.setAccessible(true); Class<?> type = field.getType(); if(type == float.class){ field.set(obj, Float.valueOf(value.get(i).toString())); }else{ field.set(obj, value.get(i)); } } results.add(obj); }); }); }); return results; } /** * 查詢,返回對象的list集合 */ public <T> List<T> fetchResults(String fieldKeys, String measurement, Class<?> clasz){ StringBuilder query = new StringBuilder(); query.append("select ").append(fieldKeys).append(" from ").append(measurement); return this.fetchResults(query.toString(), clasz); } /** * 查詢,返回對象的list集合 */ public <T> List<T> fetchResults(String fieldKeys, String measurement, String order, Class<?> clasz){ StringBuilder query = new StringBuilder(); query.append("select ").append(fieldKeys).append(" from ").append(measurement); query.append(" order by ").append(order); return this.fetchResults(query.toString(), clasz); } /** * 查詢,返回對象的list集合 */ public <T> List<T> fetchResults(String fieldKeys, String measurement, String order, String limit, Class<?> clasz){ StringBuilder query = new StringBuilder(); query.append("select ").append(fieldKeys).append(" from ").append(measurement); query.append(" order by ").append(order); query.append(limit); return this.fetchResults(query.toString(), clasz); } }
3、使用工具類的測試代碼
@SpringBootTest(classes = {MainApplication.class}) @RunWith(SpringJUnit4ClassRunner.class) public class InfluxdbUtilTest { @Autowired private InfluxdbUtils influxdbUtils; /** * 插入單條記錄 */ @Test public void insert(){ Sensor sensor = new Sensor(); sensor.setA1(10); sensor.setA2(10); sensor.setDeviceId("0002"); sensor.setTemp(10L); sensor.setTime("2021-01-19"); sensor.setVoltage(10); influxdbUtils.insertOne(sensor); } /** * 批量插入第一種方式 */ @GetMapping("/index22") public void batchInsert(){ List<Sensor> sensorList = new ArrayList<Sensor>(); for(int i=0; i<50; i++){ Sensor sensor = new Sensor(); sensor.setA1(2); sensor.setA2(12); sensor.setTemp(9); sensor.setVoltage(12); sensor.setDeviceId("sensor4545-"+i); sensorList.add(sensor); } influxdbUtils.insertBatchByRecords(sensorList); } /** * 批量插入第二種方式 */ @GetMapping("/index23") public void batchInsert1(){ List<Sensor> sensorList = new ArrayList<Sensor>(); Sensor sensor = null; for(int i=0; i<50; i++){ sensor = new Sensor(); sensor.setA1(2); sensor.setA2(12); sensor.setTemp(9); sensor.setVoltage(12); sensor.setDeviceId("sensor4545-"+i); sensorList.add(sensor); } influxdbUtils.insertBatchByPoints(sensorList); } /** * 查詢數(shù)據(jù) */ @GetMapping("/datas2") public void datas(@RequestParam Integer page){ int pageSize = 10; // InfluxDB支持分頁查詢,因此可以設(shè)置分頁查詢條件 String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize; String queryCondition = ""; //查詢條件暫且為空 // 此處查詢所有內(nèi)容,如果 String queryCmd = "SELECT * FROM sensor" // 查詢指定設(shè)備下的日志信息 // 要指定從 RetentionPolicyName.measurement中查詢指定數(shù)據(jù),默認的策略可以不加; // + 策略name + "." + measurement // 添加查詢條件(注意查詢條件選擇tag值,選擇field數(shù)值會嚴重拖慢查詢速度) + queryCondition // 查詢結(jié)果需要按照時間排序 + " ORDER BY time DESC" // 添加分頁查詢條件 + pageQuery; List<Object> sensorList = influxdbUtils.fetchRecords(queryCmd); System.out.println("query result => {}"+sensorList ); } /** * 獲取數(shù)據(jù) */ @GetMapping("/datas21") public void datas1(@RequestParam Integer page){ int pageSize = 10; // InfluxDB支持分頁查詢,因此可以設(shè)置分頁查詢條件 String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize; String queryCondition = ""; //查詢條件暫且為空 // 此處查詢所有內(nèi)容,如果 String queryCmd = "SELECT * FROM sensor" // 查詢指定設(shè)備下的日志信息 // 要指定從 RetentionPolicyName.measurement中查詢指定數(shù)據(jù),默認的策略可以不加; // + 策略name + "." + measurement // 添加查詢條件(注意查詢條件選擇tag值,選擇field數(shù)值會嚴重拖慢查詢速度) + queryCondition // 查詢結(jié)果需要按照時間排序 + " ORDER BY time DESC" // 添加分頁查詢條件 + pageQuery; List<Sensor> sensorList = influxdbUtils.fetchResults(queryCmd, Sensor.class); //List<Sensor> sensorList = influxdbUtils.fetchResults("*", "sensor", Sensor.class); sensorList.forEach(sensor->{ System.out.println("query result => {}"+sensorList ); }); } }
6、采用封裝數(shù)據(jù)模型的方式
1、在Influxdb庫中創(chuàng)建存儲策略
CREATE RETENTION POLICY "rp_order_payment" ON "db_order" DURATION 30d REPLICATION 1 DEFAULT
2、創(chuàng)建數(shù)據(jù)模型
@Data @Measurement(name = "m_order_payment", database = "db_order", retentionPolicy = "rp_order_payment") public class OrderPayment implements Serializable { // 統(tǒng)計批次 @Column(name = "batch_id", tag = true) private String batchId; // 哪個BU @Column(name = "bu_id", tag = true) private String buId; // BU 名稱 @Column(name = "bu_name") private String buName; // 總數(shù) @Column(name = "total_count", tag = true) private String totalCount; // 支付量 @Column(name = "pay_count", tag = true) private String payCount; // 金額 @Column(name = "total_money", tag = true) private String totalMoney; }
3、創(chuàng)建Mapper
public class InfluxMapper extends InfluxDBMapper { public InfluxMapper(InfluxDB influxDB) { super(influxDB); } }
4、配置Mapper
@Log4j2 @Configuration public class InfluxAutoConfiguration { @Bean public InfluxMapper influxMapper(InfluxDB influxDB) { InfluxMapper influxMapper = new InfluxMapper(influxDB); return influxMapper; } }
5、測試CRUD
@SpringBootTest(classes = {MainApplication.class}) @RunWith(SpringJUnit4ClassRunner.class) public class InfluxdbMapperTest { @Autowired private InfluxMapper influxMapper; @Test public void save(OrderPayment product) { influxMapper.save(product); } @Test public void queryAll() { List<OrderPayment> products = influxMapper.query(OrderPayment.class); System.out.println(products); } @Test public void queryByBu(String bu) { String sql = String.format("%s'%s'", "select * from m_order_payment where bu_id = ", bu); Query query = new Query(sql, "db_order"); List<OrderPayment> products = influxMapper.query(query, OrderPayment.class); System.out.println(products); } }
參考:https://blog.csdn.net/cpongo1/article/details/89550486
https://github.com/SpringForAll/spring-boot-starter-hbase
https://github.com/JeffLi1993/springboot-learning-example
到此這篇關(guān)于springboot 整合hbase的示例代碼的文章就介紹到這了,更多相關(guān)springboot 整合hbase內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
通過Spring Boot配置動態(tài)數(shù)據(jù)源訪問多個數(shù)據(jù)庫的實現(xiàn)代碼
這篇文章主要介紹了通過Spring Boot配置動態(tài)數(shù)據(jù)源訪問多個數(shù)據(jù)庫的實現(xiàn)代碼,需要的朋友可以參考下2018-03-03