欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot2+es7使用RestHighLevelClient的示例代碼

 更新時(shí)間:2022年07月01日 11:38:11   作者:yzh_1346983557  
本文主要介紹了springboot2+es7使用RestHighLevelClient的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

由于spring和es的集成并不是特別友好,es的高低版本兼容問題、api更新頻率高等問題,所以我選擇是官網(wǎng)提供的原生Client(RestHighLevelClient),但又不想去關(guān)注es的配置類以及和spring的集成配置、jar包沖突等問題,所以使用spring-boot-starter-data-elasticsearch。

一、引入依賴jar

? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.springframework.boot</groupId>
? ? ? ? ? ? <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
? ? ? ? </dependency>

二、application.properties配置

spring.elasticsearch.rest.uris=http://127.0.0.1:9200,http://127.0.0.1:9201,http://127.0.0.1:9202
spring.elasticsearch.rest.connection-timeout=5s
spring.elasticsearch.rest.read-timeout=30s
logging.level.org.springframework.data.convert.CustomConversions=error

spring-boot-starter-data-elasticsearch中自動(dòng)裝配es的配置類:ElasticsearchRestClientAutoConfiguration、ElasticsearchRestClientProperties。

ElasticsearchRestClientAutoConfiguration:

@ConditionalOnClass({RestHighLevelClient.class})
@ConditionalOnMissingBean({RestClient.class})
@EnableConfigurationProperties({ElasticsearchRestClientProperties.class})
public class ElasticsearchRestClientAutoConfiguration {
?
? ? @Configuration(
? ? ? ? proxyBeanMethods = false
? ? )
? ? @ConditionalOnMissingBean({RestHighLevelClient.class})
? ? static class RestHighLevelClientConfiguration {
? ? ? ? RestHighLevelClientConfiguration() {
? ? ? ? }
?
? ? ? ? @Bean
? ? ? ? RestHighLevelClient elasticsearchRestHighLevelClient(RestClientBuilder restClientBuilder) {
? ? ? ? ? ? return new RestHighLevelClient(restClientBuilder);
? ? ? ? }
? ? }
?
? ? @Configuration(
? ? ? ? proxyBeanMethods = false
? ? )
? ? @ConditionalOnMissingBean({RestClientBuilder.class})
? ? static class RestClientBuilderConfiguration {
? ? ? ? RestClientBuilderConfiguration() {
? ? ? ? }
?
? ? ? ? @Bean
? ? ? ? RestClientBuilderCustomizer defaultRestClientBuilderCustomizer(ElasticsearchRestClientProperties properties) {
? ? ? ? ? ? return new ElasticsearchRestClientAutoConfiguration.DefaultRestClientBuilderCustomizer(properties);
? ? ? ? }
?
? ? ? ? @Bean
? ? ? ? RestClientBuilder elasticsearchRestClientBuilder(ElasticsearchRestClientProperties properties, ObjectProvider<RestClientBuilderCustomizer> builderCustomizers) {
? ? ? ? ? ? HttpHost[] hosts = (HttpHost[])properties.getUris().stream().map(this::createHttpHost).toArray((x$0) -> {
? ? ? ? ? ? ? ? return new HttpHost[x$0];
? ? ? ? ? ? });
? ? ? ? ? ? RestClientBuilder builder = RestClient.builder(hosts);
? ? ? ? ? ? builder.setHttpClientConfigCallback((httpClientBuilder) -> {
? ? ? ? ? ? ? ? builderCustomizers.orderedStream().forEach((customizer) -> {
? ? ? ? ? ? ? ? ? ? customizer.customize(httpClientBuilder);
? ? ? ? ? ? ? ? });
? ? ? ? ? ? ? ? return httpClientBuilder;
? ? ? ? ? ? });
? ? ? ? ? ? builder.setRequestConfigCallback((requestConfigBuilder) -> {
? ? ? ? ? ? ? ? builderCustomizers.orderedStream().forEach((customizer) -> {
? ? ? ? ? ? ? ? ? ? customizer.customize(requestConfigBuilder);
? ? ? ? ? ? ? ? });
? ? ? ? ? ? ? ? return requestConfigBuilder;
? ? ? ? ? ? });
? ? ? ? ? ? builderCustomizers.orderedStream().forEach((customizer) -> {
? ? ? ? ? ? ? ? customizer.customize(builder);
? ? ? ? ? ? });
? ? ? ? ? ? return builder;
? ? ? ? }
?
? ? ? ? private HttpHost createHttpHost(String uri) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? return this.createHttpHost(URI.create(uri));
? ? ? ? ? ? } catch (IllegalArgumentException var3) {
? ? ? ? ? ? ? ? return HttpHost.create(uri);
? ? ? ? ? ? }
? ? ? ? }
?
? ? ? ? private HttpHost createHttpHost(URI uri) {
? ? ? ? ? ? if (!StringUtils.hasLength(uri.getUserInfo())) {
? ? ? ? ? ? ? ? return HttpHost.create(uri.toString());
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? return HttpHost.create((new URI(uri.getScheme(), (String)null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())).toString());
? ? ? ? ? ? ? ? } catch (URISyntaxException var3) {
? ? ? ? ? ? ? ? ? ? throw new IllegalStateException(var3);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
}

ElasticsearchRestClientProperties: 

@ConfigurationProperties(
? ? prefix = "spring.elasticsearch.rest"
)
public class ElasticsearchRestClientProperties {
? ? private List<String> uris = new ArrayList(Collections.singletonList("http://localhost:9200"));
? ? private String username;
? ? private String password;
? ? private Duration connectionTimeout = Duration.ofSeconds(1L);
? ? private Duration readTimeout = Duration.ofSeconds(30L);
?
? ? public ElasticsearchRestClientProperties() {
? ? }
?
? ? public List<String> getUris() {
? ? ? ? return this.uris;
? ? }
?
? ? public void setUris(List<String> uris) {
? ? ? ? this.uris = uris;
? ? }
?
? ? public String getUsername() {
? ? ? ? return this.username;
? ? }
?
? ? public void setUsername(String username) {
? ? ? ? this.username = username;
? ? }
?
? ? public String getPassword() {
? ? ? ? return this.password;
? ? }
?
? ? public void setPassword(String password) {
? ? ? ? this.password = password;
? ? }
?
? ? public Duration getConnectionTimeout() {
? ? ? ? return this.connectionTimeout;
? ? }
?
? ? public void setConnectionTimeout(Duration connectionTimeout) {
? ? ? ? this.connectionTimeout = connectionTimeout;
? ? }
?
? ? public Duration getReadTimeout() {
? ? ? ? return this.readTimeout;
? ? }
?
? ? public void setReadTimeout(Duration readTimeout) {
? ? ? ? this.readTimeout = readTimeout;
? ? }
}

三、使用

ES基本操作持久層

/**
?* es持久層
?*
?* @author yangzihe
?* @date 2022/1/24
?*/
@Repository
@Slf4j
public class EsRepository {
?
? ? @Resource
? ? private RestHighLevelClient highLevelClient;
?
? ? /**
? ? ?* 判斷索引是否存在
? ? ?*/
? ? public boolean existIndex(String index) {
? ? ? ? try {
? ? ? ? ? ? return highLevelClient.indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT);
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? log.error("es持久層異常!index={}", index, e);
? ? ? ? }
? ? ? ? return Boolean.FALSE;
? ? }
?
? ? /**
? ? ?* 創(chuàng)建索引
? ? ?*/
? ? public boolean createIndex(String index, Map<String, Object> columnMap) {
? ? ? ? if (existIndex(index)) {
? ? ? ? ? ? return Boolean.FALSE;
? ? ? ? }
?
? ? ? ? CreateIndexRequest request = new CreateIndexRequest(index);
? ? ? ? if (columnMap != null && columnMap.size() > 0) {
? ? ? ? ? ? Map<String, Object> source = new HashMap<>();
? ? ? ? ? ? source.put("properties", columnMap);
? ? ? ? ? ? request.mapping(source);
? ? ? ? }
? ? ? ? try {
? ? ? ? ? ? highLevelClient.indices().create(request, RequestOptions.DEFAULT);
? ? ? ? ? ? return Boolean.TRUE;
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? log.error("es持久層異常!index={}, columnMap={}", index, columnMap, e);
? ? ? ? }
? ? ? ? return Boolean.FALSE;
? ? }
?
? ? /**
? ? ?* 刪除索引
? ? ?*/
? ? public boolean deleteIndex(String index) {
? ? ? ? try {
? ? ? ? ? ? if (existIndex(index)) {
? ? ? ? ? ? ? ? AcknowledgedResponse response = highLevelClient.indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT);
? ? ? ? ? ? ? ? return response.isAcknowledged();
? ? ? ? ? ? }
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? log.error("es持久層異常!index={}", index, e);
? ? ? ? }
? ? ? ? return Boolean.FALSE;
? ? }
?
? ? /**
? ? ?* 數(shù)據(jù)新增
? ? ?*/
? ? public boolean insert(String index, String jsonString) {
? ? ? ? IndexRequest indexRequest = new IndexRequest(index);
?
? ? ? ? indexRequest.id(new Snowflake().nextIdStr());
? ? ? ? indexRequest.source(jsonString, XContentType.JSON);
?
? ? ? ? try {
? ? ? ? ? ? log.info("indexRequest={}", indexRequest);
? ? ? ? ? ? IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
? ? ? ? ? ? log.info("indexResponse={}", indexResponse);
? ? ? ? ? ? return Boolean.TRUE;
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? log.error("es持久層異常!index={}, jsonString={}", index, jsonString, e);
? ? ? ? }
? ? ? ? return Boolean.FALSE;
? ? }
?
? ? /**
? ? ?* 數(shù)據(jù)更新,可以直接修改索引結(jié)構(gòu)
? ? ?*/
? ? public boolean update(String index, Map<String, Object> dataMap) {
? ? ? ? UpdateRequest updateRequest = new UpdateRequest(index, dataMap.remove("id").toString());
? ? ? ? updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
? ? ? ? updateRequest.doc(dataMap);
? ? ? ? try {
? ? ? ? ? ? highLevelClient.update(updateRequest, RequestOptions.DEFAULT);
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? log.error("es持久層異常!index={}, dataMap={}", index, dataMap, e);
? ? ? ? ? ? return Boolean.FALSE;
? ? ? ? }
? ? ? ? return Boolean.TRUE;
? ? }
?
? ? /**
? ? ?* 刪除數(shù)據(jù)
? ? ?*/
? ? public boolean delete(String index, String id) {
? ? ? ? DeleteRequest deleteRequest = new DeleteRequest(index, id);
? ? ? ? try {
? ? ? ? ? ? highLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? log.error("es持久層異常!index={}, id={}", index, id, e);
? ? ? ? ? ? return Boolean.FALSE;
? ? ? ? }
? ? ? ? return Boolean.TRUE;
? ? }
?
}

ES查詢持久層

/**
?* es查詢持久層
?*
?* @author yangzihe
?* @date 2022/1/25
?*/
@Repository
@Slf4j
public class EsSearchRepository {
?
? ? @Resource
? ? private RestHighLevelClient highLevelClient;
?
? ? /**
? ? ?* 分頁查詢
? ? ?*
? ? ?* @param queryPO 分頁查詢對(duì)象
? ? ?*
? ? ?* @return 分頁查詢結(jié)果
? ? ?*/
? ? public EsQueryRespPO<Map<String, Object>> searchPage(EsQueryReqPO queryPO) {
? ? ? ? // 默認(rèn)分頁參數(shù)設(shè)置
? ? ? ? if (queryPO.getPageNum() == null) {
? ? ? ? ? ? queryPO.setPageNum(1);
? ? ? ? }
? ? ? ? if (queryPO.getPageSize() == null) {
? ? ? ? ? ? queryPO.setPageSize(10);
? ? ? ? }
?
? ? ? ? // 設(shè)置索引
? ? ? ? SearchRequest searchRequest = new SearchRequest(queryPO.getIndex());
?
? ? ? ? // 封裝查詢?cè)磳?duì)象
? ? ? ? SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
? ? ? ? searchRequest.source(sourceBuilder);
?
? ? ? ? // 查詢條件
? ? ? ? sourceBuilder.query(queryPO.getQuery());
?
? ? ? ? // 排序字段
? ? ? ? if (StringUtils.isNotBlank(queryPO.getSortField()) && queryPO.getSort() != null) {
? ? ? ? ? ? FieldSortBuilder order = new FieldSortBuilder(queryPO.getSortField()).order(queryPO.getSort());
? ? ? ? ? ? sourceBuilder.sort(order);
? ? ? ? }
?
? ? ? ? // 開始行數(shù),默認(rèn)0
? ? ? ? sourceBuilder.from((queryPO.getPageNum() - 1) * queryPO.getPageSize());
? ? ? ? // 頁大小,默認(rèn)10
? ? ? ? sourceBuilder.size(queryPO.getPageSize());
?
? ? ? ? // 查詢結(jié)果
? ? ? ? SearchResponse searchResponse = null;
? ? ? ? try {
? ? ? ? ? ? // log.info("es查詢請(qǐng)求對(duì)象:searchRequest={}", searchRequest);
? ? ? ? ? ? log.info("es查詢請(qǐng)求對(duì)象source:sourceBuilder={}", searchRequest.source());
? ? ? ? ? ? // 執(zhí)行搜索
? ? ? ? ? ? searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
? ? ? ? ? ? log.info("es查詢響應(yīng)結(jié)果:searchResponse={}", searchResponse);
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? log.error("es查詢,IO異常!searchRequest={}", searchRequest, e);
? ? ? ? ? ? // 異常處理
? ? ? ? ? ? return EsQueryRespPO.error("es查詢,IO異常!");
? ? ? ? }
?
? ? ? ? if (RestStatus.OK.equals(searchResponse.status())) {
? ? ? ? ? ? // 解析對(duì)象
? ? ? ? ? ? SearchHit[] hits = searchResponse.getHits().getHits();
? ? ? ? ? ? // 獲取source
? ? ? ? ? ? List<Map<String, Object>> sourceList = Arrays.stream(hits).map(SearchHit::getSourceAsMap).collect(Collectors.toList());
? ? ? ? ? ? long totalHits = searchResponse.getHits().getTotalHits().value;
? ? ? ? ? ? return EsQueryRespPO.success(sourceList, queryPO.getPageNum(), queryPO.getPageSize(), totalHits);
? ? ? ? } else {
? ? ? ? ? ? log.error("es查詢返回的狀態(tài)碼異常!searchResponse.status={}, searchRequest={}", searchResponse.status(), searchRequest);
? ? ? ? ? ? return EsQueryRespPO.error("es查詢返回的狀態(tài)碼異常!");
? ? ? ? }
? ? }
?
? ? /**
? ? ?* 聚合的分頁查詢
? ? ?*
? ? ?* @param queryPO 查詢請(qǐng)求對(duì)象
? ? ?*
? ? ?* @return 聚合分頁查詢結(jié)果
? ? ?*/
? ? public EsQueryRespPO<AggregationBucketPO> searchAggregation(EsQueryReqPO queryPO) {
? ? ? ? // 設(shè)置索引
? ? ? ? SearchRequest searchRequest = new SearchRequest(queryPO.getIndex());
?
? ? ? ? // 封裝查詢?cè)磳?duì)象
? ? ? ? SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
? ? ? ? searchRequest.source(sourceBuilder);
?
? ? ? ? // 查詢條件
? ? ? ? sourceBuilder.query(queryPO.getQuery());
?
? ? ? ? // 排序字段
? ? ? ? if (StringUtils.isNotBlank(queryPO.getSortField()) && queryPO.getSort() != null) {
? ? ? ? ? ? FieldSortBuilder order = new FieldSortBuilder(queryPO.getSortField()).order(queryPO.getSort());
? ? ? ? ? ? sourceBuilder.sort(order);
? ? ? ? }
?
? ? ? ? // 頁大小0,只返回聚合結(jié)果
? ? ? ? sourceBuilder.size(0);
?
? ? ? ? // 設(shè)置聚合查詢,可以設(shè)置多個(gè)聚合查詢條件,只要聚合查詢命名不同就行
? ? ? ? // 聚合分組條件, group by
? ? ? ? sourceBuilder.aggregation(queryPO.getTermsAggregation());
? ? ? ? // 聚合統(tǒng)計(jì)條件, count分組后的數(shù)據(jù),計(jì)算分組后的總大小
? ? ? ? sourceBuilder.aggregation(queryPO.getCardinalityAggregation());
?
? ? ? ? // 查詢結(jié)果
? ? ? ? SearchResponse searchResponse = null;
? ? ? ? try {
? ? ? ? ? ? // log.info("es查詢請(qǐng)求對(duì)象:searchRequest={}", searchRequest);
? ? ? ? ? ? log.info("es查詢請(qǐng)求對(duì)象source:sourceBuilder={}", searchRequest.source());
? ? ? ? ? ? // 執(zhí)行搜索
? ? ? ? ? ? searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
? ? ? ? ? ? log.info("es查詢響應(yīng)結(jié)果:searchResponse={}", searchResponse);
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? log.error("es查詢,IO異常!searchRequest={}", searchRequest, e);
? ? ? ? ? ? return EsQueryRespPO.error("es查詢,IO異常!");
? ? ? ? }
?
? ? ? ? if (RestStatus.OK.equals(searchResponse.status())) {
? ? ? ? ? ? // 解析對(duì)象
? ? ? ? ? ? Aggregations aggregations = searchResponse.getAggregations();
?
? ? ? ? ? ? long docTotal = searchResponse.getHits().getTotalHits().value;
?
? ? ? ? ? ? // 遍歷terms聚合結(jié)果
? ? ? ? ? ? Terms terms = aggregations.get(queryPO.getTermsAggregation().getName());
? ? ? ? ? ? List<AggregationBucketPO> bucketList = terms.getBuckets().stream().map(bucket -> {
? ? ? ? ? ? ? ? String key = bucket.getKeyAsString();
? ? ? ? ? ? ? ? long docCount = bucket.getDocCount();
? ? ? ? ? ? ? ? return new AggregationBucketPO(key, docCount, docTotal);
? ? ? ? ? ? }).collect(Collectors.toList());
?
? ? ? ? ? ? // 總數(shù)量
? ? ? ? ? ? Cardinality cardinality = aggregations.get(queryPO.getCardinalityAggregation().getName());
? ? ? ? ? ? long totalHits = cardinality.getValue();
?
? ? ? ? ? ? return EsQueryRespPO.success(bucketList, queryPO.getPageNum(), queryPO.getPageSize(), totalHits);
? ? ? ? } else {
? ? ? ? ? ? log.error("es查詢返回的狀態(tài)碼異常!searchResponse.status={}, searchRequest={}", searchResponse.status(), searchRequest);
? ? ? ? ? ? return EsQueryRespPO.error("es查詢返回的狀態(tài)碼異常!");
? ? ? ? }
? ? }
}

其中,EsQueryReqPO、EsQueryRespPO、AggregationBucketPO等類如下:

/**
?* es查詢請(qǐng)求對(duì)象
?*/
@Data
public class EsQueryReqPO {
?
? ? /**
? ? ?* 索引名
? ? ?*/
? ? String[] index;
?
? ? /**
? ? ?* 查詢條件
? ? ?*/
? ? QueryBuilder query;
?
? ? /**
? ? ?* 排序字段
? ? ?*/
? ? String sortField;
?
? ? /**
? ? ?* 排序方式 SortOrder.ASC、SortOrder.DESC
? ? ?*/
? ? SortOrder sort;
?
? ? /**
? ? ?* 頁數(shù)
? ? ?*/
? ? private Integer pageNum;
?
? ? /**
? ? ?* 頁大小
? ? ?*/
? ? private Integer pageSize;
?
? ? /**
? ? ?* 聚合分組條件, group by
? ? ?*/
? ? private TermsAggregationBuilder termsAggregation;
?
? ? /**
? ? ?* 聚合統(tǒng)計(jì)條件, count分組后的數(shù)據(jù)
? ? ?*/
? ? private CardinalityAggregationBuilder cardinalityAggregation;
}
/**
?* es分頁響應(yīng)對(duì)象
?*
?* @author yangzihe
?* @date 2022/1/25
?*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class EsQueryRespPO<T> {
?
? ? /**
? ? ?* 是否成功
? ? ?*/
? ? private Boolean success;
?
? ? /**
? ? ?* 信息
? ? ?*/
? ? private String message;
?
? ? /**
? ? ?* 頁數(shù)
? ? ?*/
? ? private Integer pageNum;
?
? ? /**
? ? ?* 頁大小
? ? ?*/
? ? private Integer pageSize;
?
? ? /**
? ? ?* 總大小
? ? ?*/
? ? private Long totalSize;
?
? ? /**
? ? ?* 數(shù)據(jù)
? ? ?*/
? ? private List<T> sourceList;
?
? ? public static <T> EsQueryRespPO<T> success(List<T> sourceList, Integer pageNum, Integer pageSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Long totalSize) {
? ? ? ? EsQueryRespPO<T> esQueryRespPO = new EsQueryRespPO<>();
? ? ? ? esQueryRespPO.setSuccess(true);
? ? ? ? esQueryRespPO.setSourceList(sourceList);
? ? ? ? esQueryRespPO.setPageNum(pageNum);
? ? ? ? esQueryRespPO.setPageSize(pageSize);
? ? ? ? esQueryRespPO.setTotalSize(totalSize);
? ? ? ? return esQueryRespPO;
? ? }
?
? ? public static EsQueryRespPO error() {
? ? ? ? EsQueryRespPO esQueryRespPO = new EsQueryRespPO();
? ? ? ? esQueryRespPO.setSuccess(false);
? ? ? ? esQueryRespPO.setMessage("es查詢異常");
? ? ? ? return esQueryRespPO;
? ? }
?
? ? public static EsQueryRespPO error(String message) {
? ? ? ? EsQueryRespPO esQueryRespPO = new EsQueryRespPO();
? ? ? ? esQueryRespPO.setSuccess(false);
? ? ? ? esQueryRespPO.setMessage(message);
? ? ? ? return esQueryRespPO;
? ? }
?
}
/**
?* 聚合桶對(duì)象
?*
?* @author yangzihe
?* @date 2022/1/26
?*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AggregationBucketPO {
?
? ? /**
? ? ?* 聚合Bucket的key名
? ? ?*/
? ? private String key;
?
? ? /**
? ? ?* 聚合Bucket的文檔數(shù)量
? ? ?*/
? ? private Long docCount;
?
? ? /**
? ? ?* 文檔總數(shù)量
? ? ?*/
? ? private Long docTotal;
?
}

ES多級(jí)(二級(jí))聚合分桶查詢

import com.yy.armor.manager.common.exception.EsException;
import com.yy.armor.manager.persist.es.po.AggregationBucketPO;
import com.yy.armor.manager.persist.es.po.EsMultiAggregationReqPO;
?
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Resource;
?
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.commons.compress.utils.Lists;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.springframework.stereotype.Repository;
?
?
@Repository
@Slf4j
public class EsSearchRepository {
?
? ? @Resource
? ? private RestHighLevelClient highLevelClient;
?
?
? ? /**
? ? ?* 多級(jí)聚合查詢(二級(jí)聚合)
? ? ?*
? ? ?* @param reqPO 查詢請(qǐng)求對(duì)象
? ? ?*
? ? ?* @return 聚合查詢結(jié)果
? ? ?*/
? ? public List<AggregationBucketPO> searchMultiAggregation(EsMultiAggregationReqPO reqPO) {
? ? ? ? // 設(shè)置索引
? ? ? ? SearchRequest searchRequest = new SearchRequest(reqPO.getIndex());
?
? ? ? ? // 封裝查詢?cè)磳?duì)象
? ? ? ? SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
? ? ? ? searchRequest.source(sourceBuilder);
?
? ? ? ? // 查詢條件
? ? ? ? sourceBuilder.query(reqPO.getQuery());
?
? ? ? ? // 排序字段
? ? ? ? if (StringUtils.isNotBlank(reqPO.getSortField()) && reqPO.getSort() != null) {
? ? ? ? ? ? FieldSortBuilder order = new FieldSortBuilder(reqPO.getSortField()).order(reqPO.getSort());
? ? ? ? ? ? sourceBuilder.sort(order);
? ? ? ? }
?
? ? ? ? // 頁大小0,只返回聚合結(jié)果
? ? ? ? sourceBuilder.size(0);
?
? ? ? ? // 聚合分桶。創(chuàng)建terms桶聚合,聚合名字=terms_by_XXX, 字段=XXX
? ? ? ? TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("terms_by_" + reqPO.getField()).field(reqPO.getField());
? ? ? ? if (reqPO.getFieldSize() != null) {
? ? ? ? ? ? termsAggregationBuilder.size(reqPO.getFieldSize());
? ? ? ? }
? ? ? ? // 二級(jí)聚合分桶
? ? ? ? TermsAggregationBuilder subTermsAggregationBuilder = AggregationBuilders.terms("terms_by_" + reqPO.getSubField()).field(reqPO.getSubField());
? ? ? ? if (reqPO.getSubFieldSize() != null) {
? ? ? ? ? ? subTermsAggregationBuilder.size(reqPO.getSubFieldSize());
? ? ? ? }
? ? ? ? termsAggregationBuilder.subAggregation(subTermsAggregationBuilder);
?
? ? ? ? // 聚合分組條件
? ? ? ? sourceBuilder.aggregation(termsAggregationBuilder);
?
? ? ? ? // 查詢結(jié)果
? ? ? ? SearchResponse searchResponse = null;
? ? ? ? try {
? ? ? ? ? ? log.info("es查詢請(qǐng)求對(duì)象source:sourceBuilder={}", searchRequest.source());
? ? ? ? ? ? // 執(zhí)行搜索
? ? ? ? ? ? searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
? ? ? ? ? ? log.info("es查詢響應(yīng)結(jié)果:searchResponse={}", searchResponse);
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? log.error("es查詢,IO異常!searchRequest={}", searchRequest, e);
? ? ? ? ? ? throw new EsException("es查詢,IO異常!");
? ? ? ? }
?
? ? ? ? if (RestStatus.OK.equals(searchResponse.status())) {
? ? ? ? ? ? // 遍歷terms聚合結(jié)果
? ? ? ? ? ? Terms terms = searchResponse.getAggregations().get(termsAggregationBuilder.getName());
? ? ? ? ? ? List<AggregationBucketPO> bucketList = terms.getBuckets().stream().map(bucket -> {
? ? ? ? ? ? ? ? // 一級(jí)聚合分桶的數(shù)據(jù)
? ? ? ? ? ? ? ? String key = bucket.getKeyAsString();
? ? ? ? ? ? ? ? long docCount = bucket.getDocCount();
? ? ? ? ? ? ? ? // 二級(jí)聚合分桶的數(shù)據(jù)
? ? ? ? ? ? ? ? Terms subTerms = bucket.getAggregations().get(subTermsAggregationBuilder.getName());
? ? ? ? ? ? ? ? List<AggregationBucketPO> subBucketList = convertTerms(subTerms);
? ? ? ? ? ? ? ? return new AggregationBucketPO(key, docCount, subBucketList);
? ? ? ? ? ? }).collect(Collectors.toList());
?
? ? ? ? ? ? return bucketList;
? ? ? ? } else {
? ? ? ? ? ? log.error("es查詢返回的狀態(tài)碼異常!searchResponse.status={}, searchRequest={}", searchResponse.status(), searchRequest);
? ? ? ? ? ? throw new EsException("es查詢返回的狀態(tài)碼異常!");
? ? ? ? }
? ? }
?
? ? private List<AggregationBucketPO> convertTerms(Terms terms) {
? ? ? ? if (CollectionUtils.isEmpty(terms.getBuckets())) {
? ? ? ? ? ? return Lists.newArrayList();
? ? ? ? }
?
? ? ? ? return terms.getBuckets().stream().map(bucket -> {
? ? ? ? ? ? String key = bucket.getKeyAsString();
? ? ? ? ? ? long docCount = bucket.getDocCount();
? ? ? ? ? ? return new AggregationBucketPO(key, docCount);
? ? ? ? }).collect(Collectors.toList());
? ? }
}

其中,EsMultiAggregationReqPO、AggregationBucketPO類如下:

@Data
public class EsMultiAggregationReqPO {
? ??
? ? /**
? ? ?* 索引名
? ? ?*/
? ? String[] index;
?
? ? /**
? ? ?* 查詢條件
? ? ?*/
? ? QueryBuilder query;
?
? ? /**
? ? ?* 聚合分桶字段
? ? ?*/
? ? private String field;
?
? ? /**
? ? ?* 二級(jí)聚合分桶字段
? ? ?*/
? ? private String subField;
?
? ? /**
? ? ?* 聚合分桶大小,非必傳
? ? ?*/
? ? private Integer fieldSize;
?
? ? /**
? ? ?* 二級(jí)聚合分桶大小,非必傳
? ? ?*/
? ? private Integer subFieldSize;
?
? ? /**
? ? ?* 排序字段,非必傳
? ? ?*/
? ? String sortField;
?
? ? /**
? ? ?* 排序方式 SortOrder.ASC、SortOrder.DESC,非必傳
? ? ?*/
? ? SortOrder sort;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AggregationBucketPO {
    /**
     * 聚合Bucket的key名
     */
    private String key;
 
    /**
     * 聚合Bucket的文檔數(shù)量
     */
    private Long docCount;
 
    /**
     * 子桶集合
     */
    private List<AggregationBucketPO> subBucketList;
 
    public AggregationBucketPO(String key, Long docCount) {
        this.key = key;
        this.docCount = docCount;
    }
}

二級(jí)聚合分桶測(cè)試代碼 

? ? @PostConstruct
? ? private void init() {
? ? ? ? // 查詢對(duì)象的封裝
? ? ? ? EsMultiAggregationReqPO reqPO = new EsMultiAggregationReqPO();
?
? ? ? ? reqPO.setIndex(new String[]{"test_log"});
?
? ? ? ? List<Long> ids = Lists.newArrayList();
? ? ? ? ids.add(140L);
? ? ? ? ids.add(141L);
? ? ? ? ids.add(142L);
? ? ? ? QueryBuilder queryBuilder4 = QueryBuilders.termsQuery("eventId", ids);
? ? ? ? BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(queryBuilder4);
? ? ? ? reqPO.setQuery(queryBuilder);
?
? ? ? ? reqPO.setField("eventId");
? ? ? ? reqPO.setFieldSize(9999);
? ? ? ? reqPO.setSubField("riskFlag");
?
? ? ? ? // 執(zhí)行查詢
? ? ? ? List<AggregationBucketPO> esQueryRespPO = searchMultiAggregation(reqPO);
? ? ? ? System.out.println("esQueryRespPO=" + esQueryRespPO);
? ? }

其它

如果沒有用spring-boot-starter-data-elasticsearch來自動(dòng)注入es組件,那么需要自己做es client的注入,es配置類如下:

/**
 * @author yangzihe
 * @date 2022/1/25
 */
@Configuration
public class EsClientConfig {
 
    @Value("${spring.elasticsearch.rest.uris}")
    private List<String> uris;
 
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        List<HttpHost> httpHostList = uris.stream().map(HttpHost::create).collect(Collectors.toList());
        HttpHost[] httpHost = new HttpHost[uris.size()];
        httpHostList.toArray(httpHost);
        RestClientBuilder clientBuilder = RestClient.builder(httpHost);
        return new RestHighLevelClient(clientBuilder);
    }
 
}

Snowflake是hutool包里的,導(dǎo)包:

        <!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.14</version>
        </dependency>

聚合查詢的測(cè)試用例:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = StartApplication.class)
public class EsTest {
 
    @Resource
    private EsSearchRepository esSearchRepository;
 
    @Test
    public void testSearchAggregation() {
        // 查詢對(duì)象的封裝
        EsQueryReqPO queryPO = new EsQueryReqPO();
        queryPO.setIndex(new String[]{"yzh1", "yzh2"});
        queryPO.setPageNum(1);
        queryPO.setPageSize(10);
 
        // 時(shí)間戳范圍
        QueryBuilder queryBuilder1 = QueryBuilders.rangeQuery("timestamp")
            .from(System.currentTimeMillis() - 1000)
            .to(System.currentTimeMillis());
        // 登錄標(biāo)識(shí)
        QueryBuilder queryBuilder2 = QueryBuilders.termQuery("name", "yang");
        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(queryBuilder1).must(queryBuilder2);
        queryPO.setQuery(queryBuilder);
 
        // 根據(jù)userName分組。創(chuàng)建terms桶聚合,聚合名字=terms_by_userName, 字段=payload.userName.keyword
        TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders
            .terms("terms_by_userName").field("payload.userName.keyword");
        termsAggregationBuilder.size(queryPO.getPageSize() * queryPO.getPageNum());
        termsAggregationBuilder.subAggregation(new BucketSortPipelineAggregationBuilder("bucket_field", null)
            .from((queryPO.getPageNum() - 1) * queryPO.getPageSize()).size(queryPO.getPageSize()));
        queryPO.setTermsAggregation(termsAggregationBuilder);
 
        // 根據(jù)userName聚合count統(tǒng)計(jì). cardinality名=count_userName, 字段=payload.userName.keyword
        CardinalityAggregationBuilder cardinalityAggregationBuilder = AggregationBuilders
            .cardinality("count_userName").field("payload.userName.keyword");
        queryPO.setCardinalityAggregation(cardinalityAggregationBuilder);
 
        // 執(zhí)行查詢
        EsQueryRespPO<AggregationBucketPO> esQueryRespPO = esSearchRepository.searchAggregation(queryPO);
    }
}

到此這篇關(guān)于springboot2+es7使用RestHighLevelClient的示例代碼的文章就介紹到這了,更多相關(guān)springboot2 es7使用RestHighLevelClient內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java判斷字符串是正整數(shù)的實(shí)例

    java判斷字符串是正整數(shù)的實(shí)例

    今天小編就為大家分享一篇java判斷字符串是正整數(shù)的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2018-07-07
  • 手把手教你用Java給暗戀對(duì)象發(fā)送一份表白郵件

    手把手教你用Java給暗戀對(duì)象發(fā)送一份表白郵件

    隨著我們學(xué)習(xí)java的深入,也漸漸發(fā)現(xiàn)了它的一些樂趣,比如發(fā)送郵件,下面這篇文章主要給大家介紹了關(guān)于如何利用Java給暗戀對(duì)象發(fā)送一份表白郵件的相關(guān)資料,需要的朋友可以參考下
    2021-11-11
  • 使用springboot通過spi機(jī)制加載mysql驅(qū)動(dòng)的過程

    使用springboot通過spi機(jī)制加載mysql驅(qū)動(dòng)的過程

    這篇文章主要介紹了使用springboot通過spi機(jī)制加載mysql驅(qū)動(dòng)的過程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • 淺談一下SpringCloud中Hystrix服務(wù)熔斷和降級(jí)原理

    淺談一下SpringCloud中Hystrix服務(wù)熔斷和降級(jí)原理

    這篇文章主要介紹了淺談一下SpringCloud中Hystrix服務(wù)熔斷和降級(jí)原理,Hystrix 是 Netflix 的一款開源的容錯(cuò)框架,通過服務(wù)隔離來避免由于依賴延遲、異常,引起資源耗盡導(dǎo)致系統(tǒng)不可用的解決方案,需要的朋友可以參考下
    2023-05-05
  • Springboot工具類ReflectionUtils使用教程

    Springboot工具類ReflectionUtils使用教程

    這篇文章主要介紹了Springboot內(nèi)置的工具類之ReflectionUtils的使用,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧
    2022-12-12
  • Java單例模式的幾種常見寫法

    Java單例模式的幾種常見寫法

    這篇文章主要介紹了Java單例模式的幾種寫法,單例模式是面試中的??土耍R妼懛ㄓ?4?種:餓漢模式、懶漢模式、靜態(tài)內(nèi)部類和枚舉,接下來我們一起進(jìn)入文章看看吧
    2022-05-05
  • Java中為什么start方法不能重復(fù)調(diào)用而run方法可以?

    Java中為什么start方法不能重復(fù)調(diào)用而run方法可以?

    這篇文章主要介紹了Java中為什么start方法不能重復(fù)調(diào)用而run方法可以?帶著疑問一起學(xué)習(xí)下面文章的詳細(xì)內(nèi)容吧
    2022-05-05
  • Spring Boot 搭建 ELK正確看日志的配置流程

    Spring Boot 搭建 ELK正確看日志的配置流程

    這篇文章主要介紹了Spring Boot 搭建 ELK正確看日志的配置流程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-03-03
  • java多線程關(guān)鍵字final和static詳解

    java多線程關(guān)鍵字final和static詳解

    這篇文章主要介紹了java多線程關(guān)鍵字final和static詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-01-01
  • IDEA中的clean,清除項(xiàng)目緩存圖文教程

    IDEA中的clean,清除項(xiàng)目緩存圖文教程

    這篇文章主要介紹了IDEA中的clean,清除項(xiàng)目緩存圖文教程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-09-09

最新評(píng)論