springboot2+es7使用RestHighLevelClient的示例代碼
由于spring和es的集成并不是特別友好,es的高低版本兼容問題、api更新頻率高等問題,所以我選擇是官網(wǎng)提供的原生Client(RestHighLevelClient),但又不想去關(guān)注es的配置類以及和spring的集成配置、jar包沖突等問題,所以使用spring-boot-starter-data-elasticsearch。
一、引入依賴jar
? ? ? ? <dependency> ? ? ? ? ? ? <groupId>org.springframework.boot</groupId> ? ? ? ? ? ? <artifactId>spring-boot-starter-data-elasticsearch</artifactId> ? ? ? ? </dependency>
二、application.properties配置
spring.elasticsearch.rest.uris=http://127.0.0.1:9200,http://127.0.0.1:9201,http://127.0.0.1:9202 spring.elasticsearch.rest.connection-timeout=5s spring.elasticsearch.rest.read-timeout=30s logging.level.org.springframework.data.convert.CustomConversions=error
spring-boot-starter-data-elasticsearch中自動(dòng)裝配es的配置類:ElasticsearchRestClientAutoConfiguration、ElasticsearchRestClientProperties。
ElasticsearchRestClientAutoConfiguration:
@ConditionalOnClass({RestHighLevelClient.class}) @ConditionalOnMissingBean({RestClient.class}) @EnableConfigurationProperties({ElasticsearchRestClientProperties.class}) public class ElasticsearchRestClientAutoConfiguration { ? ? ? @Configuration( ? ? ? ? proxyBeanMethods = false ? ? ) ? ? @ConditionalOnMissingBean({RestHighLevelClient.class}) ? ? static class RestHighLevelClientConfiguration { ? ? ? ? RestHighLevelClientConfiguration() { ? ? ? ? } ? ? ? ? ? @Bean ? ? ? ? RestHighLevelClient elasticsearchRestHighLevelClient(RestClientBuilder restClientBuilder) { ? ? ? ? ? ? return new RestHighLevelClient(restClientBuilder); ? ? ? ? } ? ? } ? ? ? @Configuration( ? ? ? ? proxyBeanMethods = false ? ? ) ? ? @ConditionalOnMissingBean({RestClientBuilder.class}) ? ? static class RestClientBuilderConfiguration { ? ? ? ? RestClientBuilderConfiguration() { ? ? ? ? } ? ? ? ? ? @Bean ? ? ? ? RestClientBuilderCustomizer defaultRestClientBuilderCustomizer(ElasticsearchRestClientProperties properties) { ? ? ? ? ? ? return new ElasticsearchRestClientAutoConfiguration.DefaultRestClientBuilderCustomizer(properties); ? ? ? ? } ? ? ? ? ? @Bean ? ? ? ? RestClientBuilder elasticsearchRestClientBuilder(ElasticsearchRestClientProperties properties, ObjectProvider<RestClientBuilderCustomizer> builderCustomizers) { ? ? ? ? ? ? HttpHost[] hosts = (HttpHost[])properties.getUris().stream().map(this::createHttpHost).toArray((x$0) -> { ? ? ? ? ? ? ? ? return new HttpHost[x$0]; ? ? ? ? ? ? }); ? ? ? ? ? ? RestClientBuilder builder = RestClient.builder(hosts); ? ? ? ? ? ? builder.setHttpClientConfigCallback((httpClientBuilder) -> { ? ? ? ? ? ? ? ? builderCustomizers.orderedStream().forEach((customizer) -> { ? ? ? ? ? ? ? ? ? ? customizer.customize(httpClientBuilder); ? ? ? ? ? ? ? ? }); ? ? ? ? ? ? ? ? return httpClientBuilder; ? ? ? ? ? ? }); ? ? ? ? ? ? builder.setRequestConfigCallback((requestConfigBuilder) -> { ? ? ? ? ? ? ? ? builderCustomizers.orderedStream().forEach((customizer) -> { ? ? ? ? ? ? ? ? ? ? customizer.customize(requestConfigBuilder); ? ? ? ? ? ? ? ? }); ? ? ? ? ? ? ? ? return requestConfigBuilder; ? ? ? ? ? ? }); ? ? ? ? ? ? builderCustomizers.orderedStream().forEach((customizer) -> { ? ? ? ? ? ? ? ? customizer.customize(builder); ? ? ? ? ? ? }); ? ? ? ? ? ? return builder; ? ? ? ? } ? ? ? ? ? private HttpHost createHttpHost(String uri) { ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? return this.createHttpHost(URI.create(uri)); ? ? ? ? ? ? } catch (IllegalArgumentException var3) { ? ? ? ? ? ? ? ? return HttpHost.create(uri); ? ? ? ? ? ? } ? ? ? ? } ? ? ? ? ? private HttpHost createHttpHost(URI uri) { ? ? ? ? ? ? if (!StringUtils.hasLength(uri.getUserInfo())) { ? ? ? ? ? ? ? ? return HttpHost.create(uri.toString()); ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ? ? return HttpHost.create((new URI(uri.getScheme(), (String)null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())).toString()); ? ? ? ? ? ? ? ? } catch (URISyntaxException var3) { ? ? ? ? ? ? ? ? ? ? throw new IllegalStateException(var3); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? } ? ? } }
ElasticsearchRestClientProperties:
@ConfigurationProperties( ? ? prefix = "spring.elasticsearch.rest" ) public class ElasticsearchRestClientProperties { ? ? private List<String> uris = new ArrayList(Collections.singletonList("http://localhost:9200")); ? ? private String username; ? ? private String password; ? ? private Duration connectionTimeout = Duration.ofSeconds(1L); ? ? private Duration readTimeout = Duration.ofSeconds(30L); ? ? ? public ElasticsearchRestClientProperties() { ? ? } ? ? ? public List<String> getUris() { ? ? ? ? return this.uris; ? ? } ? ? ? public void setUris(List<String> uris) { ? ? ? ? this.uris = uris; ? ? } ? ? ? public String getUsername() { ? ? ? ? return this.username; ? ? } ? ? ? public void setUsername(String username) { ? ? ? ? this.username = username; ? ? } ? ? ? public String getPassword() { ? ? ? ? return this.password; ? ? } ? ? ? public void setPassword(String password) { ? ? ? ? this.password = password; ? ? } ? ? ? public Duration getConnectionTimeout() { ? ? ? ? return this.connectionTimeout; ? ? } ? ? ? public void setConnectionTimeout(Duration connectionTimeout) { ? ? ? ? this.connectionTimeout = connectionTimeout; ? ? } ? ? ? public Duration getReadTimeout() { ? ? ? ? return this.readTimeout; ? ? } ? ? ? public void setReadTimeout(Duration readTimeout) { ? ? ? ? this.readTimeout = readTimeout; ? ? } }
三、使用
ES基本操作持久層
/** ?* es持久層 ?* ?* @author yangzihe ?* @date 2022/1/24 ?*/ @Repository @Slf4j public class EsRepository { ? ? ? @Resource ? ? private RestHighLevelClient highLevelClient; ? ? ? /** ? ? ?* 判斷索引是否存在 ? ? ?*/ ? ? public boolean existIndex(String index) { ? ? ? ? try { ? ? ? ? ? ? return highLevelClient.indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT); ? ? ? ? } catch (IOException e) { ? ? ? ? ? ? log.error("es持久層異常!index={}", index, e); ? ? ? ? } ? ? ? ? return Boolean.FALSE; ? ? } ? ? ? /** ? ? ?* 創(chuàng)建索引 ? ? ?*/ ? ? public boolean createIndex(String index, Map<String, Object> columnMap) { ? ? ? ? if (existIndex(index)) { ? ? ? ? ? ? return Boolean.FALSE; ? ? ? ? } ? ? ? ? ? CreateIndexRequest request = new CreateIndexRequest(index); ? ? ? ? if (columnMap != null && columnMap.size() > 0) { ? ? ? ? ? ? Map<String, Object> source = new HashMap<>(); ? ? ? ? ? ? source.put("properties", columnMap); ? ? ? ? ? ? request.mapping(source); ? ? ? ? } ? ? ? ? try { ? ? ? ? ? ? highLevelClient.indices().create(request, RequestOptions.DEFAULT); ? ? ? ? ? ? return Boolean.TRUE; ? ? ? ? } catch (IOException e) { ? ? ? ? ? ? log.error("es持久層異常!index={}, columnMap={}", index, columnMap, e); ? ? ? ? } ? ? ? ? return Boolean.FALSE; ? ? } ? ? ? /** ? ? ?* 刪除索引 ? ? ?*/ ? ? public boolean deleteIndex(String index) { ? ? ? ? try { ? ? ? ? ? ? if (existIndex(index)) { ? ? ? ? ? ? ? ? AcknowledgedResponse response = highLevelClient.indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT); ? ? ? ? ? ? ? ? return response.isAcknowledged(); ? ? ? ? ? ? } ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? log.error("es持久層異常!index={}", index, e); ? ? ? ? } ? ? ? ? return Boolean.FALSE; ? ? } ? ? ? /** ? ? ?* 數(shù)據(jù)新增 ? ? ?*/ ? ? public boolean insert(String index, String jsonString) { ? ? ? ? IndexRequest indexRequest = new IndexRequest(index); ? ? ? ? ? indexRequest.id(new Snowflake().nextIdStr()); ? ? ? ? indexRequest.source(jsonString, XContentType.JSON); ? ? ? ? ? try { ? ? ? ? ? ? log.info("indexRequest={}", indexRequest); ? ? ? ? ? ? IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT); ? ? ? ? ? ? log.info("indexResponse={}", indexResponse); ? ? ? ? ? ? return Boolean.TRUE; ? ? ? ? } catch (IOException e) { ? ? ? ? ? ? log.error("es持久層異常!index={}, jsonString={}", index, jsonString, e); ? ? ? ? } ? ? ? ? return Boolean.FALSE; ? ? } ? ? ? /** ? ? ?* 數(shù)據(jù)更新,可以直接修改索引結(jié)構(gòu) ? ? ?*/ ? ? public boolean update(String index, Map<String, Object> dataMap) { ? ? ? ? UpdateRequest updateRequest = new UpdateRequest(index, dataMap.remove("id").toString()); ? ? ? ? updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); ? ? ? ? updateRequest.doc(dataMap); ? ? ? ? try { ? ? ? ? ? ? highLevelClient.update(updateRequest, RequestOptions.DEFAULT); ? ? ? ? } catch (IOException e) { ? ? ? ? ? ? log.error("es持久層異常!index={}, dataMap={}", index, dataMap, e); ? ? ? ? ? ? return Boolean.FALSE; ? ? ? ? } ? ? ? ? return Boolean.TRUE; ? ? } ? ? ? /** ? ? ?* 刪除數(shù)據(jù) ? ? ?*/ ? ? public boolean delete(String index, String id) { ? ? ? ? DeleteRequest deleteRequest = new DeleteRequest(index, id); ? ? ? ? try { ? ? ? ? ? ? highLevelClient.delete(deleteRequest, RequestOptions.DEFAULT); ? ? ? ? } catch (IOException e) { ? ? ? ? ? ? log.error("es持久層異常!index={}, id={}", index, id, e); ? ? ? ? ? ? return Boolean.FALSE; ? ? ? ? } ? ? ? ? return Boolean.TRUE; ? ? } ? }
ES查詢持久層
/** ?* es查詢持久層 ?* ?* @author yangzihe ?* @date 2022/1/25 ?*/ @Repository @Slf4j public class EsSearchRepository { ? ? ? @Resource ? ? private RestHighLevelClient highLevelClient; ? ? ? /** ? ? ?* 分頁查詢 ? ? ?* ? ? ?* @param queryPO 分頁查詢對(duì)象 ? ? ?* ? ? ?* @return 分頁查詢結(jié)果 ? ? ?*/ ? ? public EsQueryRespPO<Map<String, Object>> searchPage(EsQueryReqPO queryPO) { ? ? ? ? // 默認(rèn)分頁參數(shù)設(shè)置 ? ? ? ? if (queryPO.getPageNum() == null) { ? ? ? ? ? ? queryPO.setPageNum(1); ? ? ? ? } ? ? ? ? if (queryPO.getPageSize() == null) { ? ? ? ? ? ? queryPO.setPageSize(10); ? ? ? ? } ? ? ? ? ? // 設(shè)置索引 ? ? ? ? SearchRequest searchRequest = new SearchRequest(queryPO.getIndex()); ? ? ? ? ? // 封裝查詢?cè)磳?duì)象 ? ? ? ? SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); ? ? ? ? searchRequest.source(sourceBuilder); ? ? ? ? ? // 查詢條件 ? ? ? ? sourceBuilder.query(queryPO.getQuery()); ? ? ? ? ? // 排序字段 ? ? ? ? if (StringUtils.isNotBlank(queryPO.getSortField()) && queryPO.getSort() != null) { ? ? ? ? ? ? FieldSortBuilder order = new FieldSortBuilder(queryPO.getSortField()).order(queryPO.getSort()); ? ? ? ? ? ? sourceBuilder.sort(order); ? ? ? ? } ? ? ? ? ? // 開始行數(shù),默認(rèn)0 ? ? ? ? sourceBuilder.from((queryPO.getPageNum() - 1) * queryPO.getPageSize()); ? ? ? ? // 頁大小,默認(rèn)10 ? ? ? ? sourceBuilder.size(queryPO.getPageSize()); ? ? ? ? ? // 查詢結(jié)果 ? ? ? ? SearchResponse searchResponse = null; ? ? ? ? try { ? ? ? ? ? ? // log.info("es查詢請(qǐng)求對(duì)象:searchRequest={}", searchRequest); ? ? ? ? ? ? log.info("es查詢請(qǐng)求對(duì)象source:sourceBuilder={}", searchRequest.source()); ? ? ? ? ? ? // 執(zhí)行搜索 ? ? ? ? ? ? searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT); ? ? ? ? ? ? log.info("es查詢響應(yīng)結(jié)果:searchResponse={}", searchResponse); ? ? ? ? } catch (IOException e) { ? ? ? ? ? ? log.error("es查詢,IO異常!searchRequest={}", searchRequest, e); ? ? ? ? ? ? // 異常處理 ? ? ? ? ? ? return EsQueryRespPO.error("es查詢,IO異常!"); ? ? ? ? } ? ? ? ? ? if (RestStatus.OK.equals(searchResponse.status())) { ? ? ? ? ? ? // 解析對(duì)象 ? ? ? ? ? ? SearchHit[] hits = searchResponse.getHits().getHits(); ? ? ? ? ? ? // 獲取source ? ? ? ? ? ? List<Map<String, Object>> sourceList = Arrays.stream(hits).map(SearchHit::getSourceAsMap).collect(Collectors.toList()); ? ? ? ? ? ? long totalHits = searchResponse.getHits().getTotalHits().value; ? ? ? ? ? ? return EsQueryRespPO.success(sourceList, queryPO.getPageNum(), queryPO.getPageSize(), totalHits); ? ? ? ? } else { ? ? ? ? ? ? log.error("es查詢返回的狀態(tài)碼異常!searchResponse.status={}, searchRequest={}", searchResponse.status(), searchRequest); ? ? ? ? ? ? return EsQueryRespPO.error("es查詢返回的狀態(tài)碼異常!"); ? ? ? ? } ? ? } ? ? ? /** ? ? ?* 聚合的分頁查詢 ? ? ?* ? ? ?* @param queryPO 查詢請(qǐng)求對(duì)象 ? ? ?* ? ? ?* @return 聚合分頁查詢結(jié)果 ? ? ?*/ ? ? public EsQueryRespPO<AggregationBucketPO> searchAggregation(EsQueryReqPO queryPO) { ? ? ? ? // 設(shè)置索引 ? ? ? ? SearchRequest searchRequest = new SearchRequest(queryPO.getIndex()); ? ? ? ? ? // 封裝查詢?cè)磳?duì)象 ? ? ? ? SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); ? ? ? ? searchRequest.source(sourceBuilder); ? ? ? ? ? // 查詢條件 ? ? ? ? sourceBuilder.query(queryPO.getQuery()); ? ? ? ? ? // 排序字段 ? ? ? ? if (StringUtils.isNotBlank(queryPO.getSortField()) && queryPO.getSort() != null) { ? ? ? ? ? ? FieldSortBuilder order = new FieldSortBuilder(queryPO.getSortField()).order(queryPO.getSort()); ? ? ? ? ? ? sourceBuilder.sort(order); ? ? ? ? } ? ? ? ? ? // 頁大小0,只返回聚合結(jié)果 ? ? ? ? sourceBuilder.size(0); ? ? ? ? ? // 設(shè)置聚合查詢,可以設(shè)置多個(gè)聚合查詢條件,只要聚合查詢命名不同就行 ? ? ? ? // 聚合分組條件, group by ? ? ? ? sourceBuilder.aggregation(queryPO.getTermsAggregation()); ? ? ? ? // 聚合統(tǒng)計(jì)條件, count分組后的數(shù)據(jù),計(jì)算分組后的總大小 ? ? ? ? sourceBuilder.aggregation(queryPO.getCardinalityAggregation()); ? ? ? ? ? // 查詢結(jié)果 ? ? ? ? SearchResponse searchResponse = null; ? ? ? ? try { ? ? ? ? ? ? // log.info("es查詢請(qǐng)求對(duì)象:searchRequest={}", searchRequest); ? ? ? ? ? ? log.info("es查詢請(qǐng)求對(duì)象source:sourceBuilder={}", searchRequest.source()); ? ? ? ? ? ? // 執(zhí)行搜索 ? ? ? ? ? ? searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT); ? ? ? ? ? ? log.info("es查詢響應(yīng)結(jié)果:searchResponse={}", searchResponse); ? ? ? ? } catch (IOException e) { ? ? ? ? ? ? log.error("es查詢,IO異常!searchRequest={}", searchRequest, e); ? ? ? ? ? ? return EsQueryRespPO.error("es查詢,IO異常!"); ? ? ? ? } ? ? ? ? ? if (RestStatus.OK.equals(searchResponse.status())) { ? ? ? ? ? ? // 解析對(duì)象 ? ? ? ? ? ? Aggregations aggregations = searchResponse.getAggregations(); ? ? ? ? ? ? ? long docTotal = searchResponse.getHits().getTotalHits().value; ? ? ? ? ? ? ? // 遍歷terms聚合結(jié)果 ? ? ? ? ? ? Terms terms = aggregations.get(queryPO.getTermsAggregation().getName()); ? ? ? ? ? ? List<AggregationBucketPO> bucketList = terms.getBuckets().stream().map(bucket -> { ? ? ? ? ? ? ? ? String key = bucket.getKeyAsString(); ? ? ? ? ? ? ? ? long docCount = bucket.getDocCount(); ? ? ? ? ? ? ? ? return new AggregationBucketPO(key, docCount, docTotal); ? ? ? ? ? ? }).collect(Collectors.toList()); ? ? ? ? ? ? ? // 總數(shù)量 ? ? ? ? ? ? Cardinality cardinality = aggregations.get(queryPO.getCardinalityAggregation().getName()); ? ? ? ? ? ? long totalHits = cardinality.getValue(); ? ? ? ? ? ? ? return EsQueryRespPO.success(bucketList, queryPO.getPageNum(), queryPO.getPageSize(), totalHits); ? ? ? ? } else { ? ? ? ? ? ? log.error("es查詢返回的狀態(tài)碼異常!searchResponse.status={}, searchRequest={}", searchResponse.status(), searchRequest); ? ? ? ? ? ? return EsQueryRespPO.error("es查詢返回的狀態(tài)碼異常!"); ? ? ? ? } ? ? } }
其中,EsQueryReqPO、EsQueryRespPO、AggregationBucketPO等類如下:
/** ?* es查詢請(qǐng)求對(duì)象 ?*/ @Data public class EsQueryReqPO { ? ? ? /** ? ? ?* 索引名 ? ? ?*/ ? ? String[] index; ? ? ? /** ? ? ?* 查詢條件 ? ? ?*/ ? ? QueryBuilder query; ? ? ? /** ? ? ?* 排序字段 ? ? ?*/ ? ? String sortField; ? ? ? /** ? ? ?* 排序方式 SortOrder.ASC、SortOrder.DESC ? ? ?*/ ? ? SortOrder sort; ? ? ? /** ? ? ?* 頁數(shù) ? ? ?*/ ? ? private Integer pageNum; ? ? ? /** ? ? ?* 頁大小 ? ? ?*/ ? ? private Integer pageSize; ? ? ? /** ? ? ?* 聚合分組條件, group by ? ? ?*/ ? ? private TermsAggregationBuilder termsAggregation; ? ? ? /** ? ? ?* 聚合統(tǒng)計(jì)條件, count分組后的數(shù)據(jù) ? ? ?*/ ? ? private CardinalityAggregationBuilder cardinalityAggregation; } /** ?* es分頁響應(yīng)對(duì)象 ?* ?* @author yangzihe ?* @date 2022/1/25 ?*/ @Data @NoArgsConstructor @AllArgsConstructor public class EsQueryRespPO<T> { ? ? ? /** ? ? ?* 是否成功 ? ? ?*/ ? ? private Boolean success; ? ? ? /** ? ? ?* 信息 ? ? ?*/ ? ? private String message; ? ? ? /** ? ? ?* 頁數(shù) ? ? ?*/ ? ? private Integer pageNum; ? ? ? /** ? ? ?* 頁大小 ? ? ?*/ ? ? private Integer pageSize; ? ? ? /** ? ? ?* 總大小 ? ? ?*/ ? ? private Long totalSize; ? ? ? /** ? ? ?* 數(shù)據(jù) ? ? ?*/ ? ? private List<T> sourceList; ? ? ? public static <T> EsQueryRespPO<T> success(List<T> sourceList, Integer pageNum, Integer pageSize, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Long totalSize) { ? ? ? ? EsQueryRespPO<T> esQueryRespPO = new EsQueryRespPO<>(); ? ? ? ? esQueryRespPO.setSuccess(true); ? ? ? ? esQueryRespPO.setSourceList(sourceList); ? ? ? ? esQueryRespPO.setPageNum(pageNum); ? ? ? ? esQueryRespPO.setPageSize(pageSize); ? ? ? ? esQueryRespPO.setTotalSize(totalSize); ? ? ? ? return esQueryRespPO; ? ? } ? ? ? public static EsQueryRespPO error() { ? ? ? ? EsQueryRespPO esQueryRespPO = new EsQueryRespPO(); ? ? ? ? esQueryRespPO.setSuccess(false); ? ? ? ? esQueryRespPO.setMessage("es查詢異常"); ? ? ? ? return esQueryRespPO; ? ? } ? ? ? public static EsQueryRespPO error(String message) { ? ? ? ? EsQueryRespPO esQueryRespPO = new EsQueryRespPO(); ? ? ? ? esQueryRespPO.setSuccess(false); ? ? ? ? esQueryRespPO.setMessage(message); ? ? ? ? return esQueryRespPO; ? ? } ? }
/** ?* 聚合桶對(duì)象 ?* ?* @author yangzihe ?* @date 2022/1/26 ?*/ @Data @NoArgsConstructor @AllArgsConstructor public class AggregationBucketPO { ? ? ? /** ? ? ?* 聚合Bucket的key名 ? ? ?*/ ? ? private String key; ? ? ? /** ? ? ?* 聚合Bucket的文檔數(shù)量 ? ? ?*/ ? ? private Long docCount; ? ? ? /** ? ? ?* 文檔總數(shù)量 ? ? ?*/ ? ? private Long docTotal; ? }
ES多級(jí)(二級(jí))聚合分桶查詢
import com.yy.armor.manager.common.exception.EsException; import com.yy.armor.manager.persist.es.po.AggregationBucketPO; import com.yy.armor.manager.persist.es.po.EsMultiAggregationReqPO; ? import java.io.IOException; import java.util.List; import java.util.stream.Collectors; import javax.annotation.Resource; ? import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.shade.org.apache.commons.compress.utils.Lists; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.springframework.stereotype.Repository; ? ? @Repository @Slf4j public class EsSearchRepository { ? ? ? @Resource ? ? private RestHighLevelClient highLevelClient; ? ? ? ? /** ? ? ?* 多級(jí)聚合查詢(二級(jí)聚合) ? ? ?* ? ? ?* @param reqPO 查詢請(qǐng)求對(duì)象 ? ? ?* ? ? ?* @return 聚合查詢結(jié)果 ? ? ?*/ ? ? public List<AggregationBucketPO> searchMultiAggregation(EsMultiAggregationReqPO reqPO) { ? ? ? ? // 設(shè)置索引 ? ? ? ? SearchRequest searchRequest = new SearchRequest(reqPO.getIndex()); ? ? ? ? ? // 封裝查詢?cè)磳?duì)象 ? ? ? ? SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); ? ? ? ? searchRequest.source(sourceBuilder); ? ? ? ? ? // 查詢條件 ? ? ? ? sourceBuilder.query(reqPO.getQuery()); ? ? ? ? ? // 排序字段 ? ? ? ? if (StringUtils.isNotBlank(reqPO.getSortField()) && reqPO.getSort() != null) { ? ? ? ? ? ? FieldSortBuilder order = new FieldSortBuilder(reqPO.getSortField()).order(reqPO.getSort()); ? ? ? ? ? ? sourceBuilder.sort(order); ? ? ? ? } ? ? ? ? ? // 頁大小0,只返回聚合結(jié)果 ? ? ? ? sourceBuilder.size(0); ? ? ? ? ? // 聚合分桶。創(chuàng)建terms桶聚合,聚合名字=terms_by_XXX, 字段=XXX ? ? ? ? TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("terms_by_" + reqPO.getField()).field(reqPO.getField()); ? ? ? ? if (reqPO.getFieldSize() != null) { ? ? ? ? ? ? termsAggregationBuilder.size(reqPO.getFieldSize()); ? ? ? ? } ? ? ? ? // 二級(jí)聚合分桶 ? ? ? ? TermsAggregationBuilder subTermsAggregationBuilder = AggregationBuilders.terms("terms_by_" + reqPO.getSubField()).field(reqPO.getSubField()); ? ? ? ? if (reqPO.getSubFieldSize() != null) { ? ? ? ? ? ? subTermsAggregationBuilder.size(reqPO.getSubFieldSize()); ? ? ? ? } ? ? ? ? termsAggregationBuilder.subAggregation(subTermsAggregationBuilder); ? ? ? ? ? // 聚合分組條件 ? ? ? ? sourceBuilder.aggregation(termsAggregationBuilder); ? ? ? ? ? // 查詢結(jié)果 ? ? ? ? SearchResponse searchResponse = null; ? ? ? ? try { ? ? ? ? ? ? log.info("es查詢請(qǐng)求對(duì)象source:sourceBuilder={}", searchRequest.source()); ? ? ? ? ? ? // 執(zhí)行搜索 ? ? ? ? ? ? searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT); ? ? ? ? ? ? log.info("es查詢響應(yīng)結(jié)果:searchResponse={}", searchResponse); ? ? ? ? } catch (IOException e) { ? ? ? ? ? ? log.error("es查詢,IO異常!searchRequest={}", searchRequest, e); ? ? ? ? ? ? throw new EsException("es查詢,IO異常!"); ? ? ? ? } ? ? ? ? ? if (RestStatus.OK.equals(searchResponse.status())) { ? ? ? ? ? ? // 遍歷terms聚合結(jié)果 ? ? ? ? ? ? Terms terms = searchResponse.getAggregations().get(termsAggregationBuilder.getName()); ? ? ? ? ? ? List<AggregationBucketPO> bucketList = terms.getBuckets().stream().map(bucket -> { ? ? ? ? ? ? ? ? // 一級(jí)聚合分桶的數(shù)據(jù) ? ? ? ? ? ? ? ? String key = bucket.getKeyAsString(); ? ? ? ? ? ? ? ? long docCount = bucket.getDocCount(); ? ? ? ? ? ? ? ? // 二級(jí)聚合分桶的數(shù)據(jù) ? ? ? ? ? ? ? ? Terms subTerms = bucket.getAggregations().get(subTermsAggregationBuilder.getName()); ? ? ? ? ? ? ? ? List<AggregationBucketPO> subBucketList = convertTerms(subTerms); ? ? ? ? ? ? ? ? return new AggregationBucketPO(key, docCount, subBucketList); ? ? ? ? ? ? }).collect(Collectors.toList()); ? ? ? ? ? ? ? return bucketList; ? ? ? ? } else { ? ? ? ? ? ? log.error("es查詢返回的狀態(tài)碼異常!searchResponse.status={}, searchRequest={}", searchResponse.status(), searchRequest); ? ? ? ? ? ? throw new EsException("es查詢返回的狀態(tài)碼異常!"); ? ? ? ? } ? ? } ? ? ? private List<AggregationBucketPO> convertTerms(Terms terms) { ? ? ? ? if (CollectionUtils.isEmpty(terms.getBuckets())) { ? ? ? ? ? ? return Lists.newArrayList(); ? ? ? ? } ? ? ? ? ? return terms.getBuckets().stream().map(bucket -> { ? ? ? ? ? ? String key = bucket.getKeyAsString(); ? ? ? ? ? ? long docCount = bucket.getDocCount(); ? ? ? ? ? ? return new AggregationBucketPO(key, docCount); ? ? ? ? }).collect(Collectors.toList()); ? ? } }
其中,EsMultiAggregationReqPO、AggregationBucketPO類如下:
@Data public class EsMultiAggregationReqPO { ? ?? ? ? /** ? ? ?* 索引名 ? ? ?*/ ? ? String[] index; ? ? ? /** ? ? ?* 查詢條件 ? ? ?*/ ? ? QueryBuilder query; ? ? ? /** ? ? ?* 聚合分桶字段 ? ? ?*/ ? ? private String field; ? ? ? /** ? ? ?* 二級(jí)聚合分桶字段 ? ? ?*/ ? ? private String subField; ? ? ? /** ? ? ?* 聚合分桶大小,非必傳 ? ? ?*/ ? ? private Integer fieldSize; ? ? ? /** ? ? ?* 二級(jí)聚合分桶大小,非必傳 ? ? ?*/ ? ? private Integer subFieldSize; ? ? ? /** ? ? ?* 排序字段,非必傳 ? ? ?*/ ? ? String sortField; ? ? ? /** ? ? ?* 排序方式 SortOrder.ASC、SortOrder.DESC,非必傳 ? ? ?*/ ? ? SortOrder sort; }
@Data @NoArgsConstructor @AllArgsConstructor public class AggregationBucketPO { /** * 聚合Bucket的key名 */ private String key; /** * 聚合Bucket的文檔數(shù)量 */ private Long docCount; /** * 子桶集合 */ private List<AggregationBucketPO> subBucketList; public AggregationBucketPO(String key, Long docCount) { this.key = key; this.docCount = docCount; } }
二級(jí)聚合分桶測(cè)試代碼
? ? @PostConstruct ? ? private void init() { ? ? ? ? // 查詢對(duì)象的封裝 ? ? ? ? EsMultiAggregationReqPO reqPO = new EsMultiAggregationReqPO(); ? ? ? ? ? reqPO.setIndex(new String[]{"test_log"}); ? ? ? ? ? List<Long> ids = Lists.newArrayList(); ? ? ? ? ids.add(140L); ? ? ? ? ids.add(141L); ? ? ? ? ids.add(142L); ? ? ? ? QueryBuilder queryBuilder4 = QueryBuilders.termsQuery("eventId", ids); ? ? ? ? BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(queryBuilder4); ? ? ? ? reqPO.setQuery(queryBuilder); ? ? ? ? ? reqPO.setField("eventId"); ? ? ? ? reqPO.setFieldSize(9999); ? ? ? ? reqPO.setSubField("riskFlag"); ? ? ? ? ? // 執(zhí)行查詢 ? ? ? ? List<AggregationBucketPO> esQueryRespPO = searchMultiAggregation(reqPO); ? ? ? ? System.out.println("esQueryRespPO=" + esQueryRespPO); ? ? }
其它
如果沒有用spring-boot-starter-data-elasticsearch來自動(dòng)注入es組件,那么需要自己做es client的注入,es配置類如下:
/** * @author yangzihe * @date 2022/1/25 */ @Configuration public class EsClientConfig { @Value("${spring.elasticsearch.rest.uris}") private List<String> uris; @Bean public RestHighLevelClient restHighLevelClient() { List<HttpHost> httpHostList = uris.stream().map(HttpHost::create).collect(Collectors.toList()); HttpHost[] httpHost = new HttpHost[uris.size()]; httpHostList.toArray(httpHost); RestClientBuilder clientBuilder = RestClient.builder(httpHost); return new RestHighLevelClient(clientBuilder); } }
Snowflake是hutool包里的,導(dǎo)包:
<!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.14</version> </dependency>
聚合查詢的測(cè)試用例:
@RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(classes = StartApplication.class) public class EsTest { @Resource private EsSearchRepository esSearchRepository; @Test public void testSearchAggregation() { // 查詢對(duì)象的封裝 EsQueryReqPO queryPO = new EsQueryReqPO(); queryPO.setIndex(new String[]{"yzh1", "yzh2"}); queryPO.setPageNum(1); queryPO.setPageSize(10); // 時(shí)間戳范圍 QueryBuilder queryBuilder1 = QueryBuilders.rangeQuery("timestamp") .from(System.currentTimeMillis() - 1000) .to(System.currentTimeMillis()); // 登錄標(biāo)識(shí) QueryBuilder queryBuilder2 = QueryBuilders.termQuery("name", "yang"); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(queryBuilder1).must(queryBuilder2); queryPO.setQuery(queryBuilder); // 根據(jù)userName分組。創(chuàng)建terms桶聚合,聚合名字=terms_by_userName, 字段=payload.userName.keyword TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders .terms("terms_by_userName").field("payload.userName.keyword"); termsAggregationBuilder.size(queryPO.getPageSize() * queryPO.getPageNum()); termsAggregationBuilder.subAggregation(new BucketSortPipelineAggregationBuilder("bucket_field", null) .from((queryPO.getPageNum() - 1) * queryPO.getPageSize()).size(queryPO.getPageSize())); queryPO.setTermsAggregation(termsAggregationBuilder); // 根據(jù)userName聚合count統(tǒng)計(jì). cardinality名=count_userName, 字段=payload.userName.keyword CardinalityAggregationBuilder cardinalityAggregationBuilder = AggregationBuilders .cardinality("count_userName").field("payload.userName.keyword"); queryPO.setCardinalityAggregation(cardinalityAggregationBuilder); // 執(zhí)行查詢 EsQueryRespPO<AggregationBucketPO> esQueryRespPO = esSearchRepository.searchAggregation(queryPO); } }
到此這篇關(guān)于springboot2+es7使用RestHighLevelClient的示例代碼的文章就介紹到這了,更多相關(guān)springboot2 es7使用RestHighLevelClient內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Spring MVC利用Swagger2如何構(gòu)建動(dòng)態(tài)RESTful API詳解
- 詳解spring cloud整合Swagger2構(gòu)建RESTful服務(wù)的APIs
- SpringBoot2.1 RESTful API項(xiàng)目腳手架(種子)項(xiàng)目
- SpringBoot結(jié)合Swagger2自動(dòng)生成api文檔的方法
- SpringBoot集成Swagger2構(gòu)建在線API文檔的代碼詳解
- swagger2隱藏在API文檔顯示某些參數(shù)的操作
- SpringBoot2.7?WebSecurityConfigurerAdapter類過期配置
- 淺談Springboot2.0防止XSS攻擊的幾種方式
- Spring Boot2配置Swagger2生成API接口文檔詳情
相關(guān)文章
手把手教你用Java給暗戀對(duì)象發(fā)送一份表白郵件
隨著我們學(xué)習(xí)java的深入,也漸漸發(fā)現(xiàn)了它的一些樂趣,比如發(fā)送郵件,下面這篇文章主要給大家介紹了關(guān)于如何利用Java給暗戀對(duì)象發(fā)送一份表白郵件的相關(guān)資料,需要的朋友可以參考下2021-11-11使用springboot通過spi機(jī)制加載mysql驅(qū)動(dòng)的過程
這篇文章主要介紹了使用springboot通過spi機(jī)制加載mysql驅(qū)動(dòng)的過程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07淺談一下SpringCloud中Hystrix服務(wù)熔斷和降級(jí)原理
這篇文章主要介紹了淺談一下SpringCloud中Hystrix服務(wù)熔斷和降級(jí)原理,Hystrix 是 Netflix 的一款開源的容錯(cuò)框架,通過服務(wù)隔離來避免由于依賴延遲、異常,引起資源耗盡導(dǎo)致系統(tǒng)不可用的解決方案,需要的朋友可以參考下2023-05-05Springboot工具類ReflectionUtils使用教程
這篇文章主要介紹了Springboot內(nèi)置的工具類之ReflectionUtils的使用,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2022-12-12Java中為什么start方法不能重復(fù)調(diào)用而run方法可以?
這篇文章主要介紹了Java中為什么start方法不能重復(fù)調(diào)用而run方法可以?帶著疑問一起學(xué)習(xí)下面文章的詳細(xì)內(nèi)容吧2022-05-05java多線程關(guān)鍵字final和static詳解
這篇文章主要介紹了java多線程關(guān)鍵字final和static詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-01-01