Rust整合Elasticsearch的詳細過程(收藏)
全文搜索Elasticsearch是什么
Lucene:Java實現(xiàn)的搜索引擎類庫
- 易擴展
- 高性能
- 僅限Java開發(fā)
- 不支持水平擴展
Elasticsearch:基于Lucene開發(fā)的分布式搜索和分析引擎
- 支持分布式、水平擴展
- 提高RestfulAPI,可被任何語言調(diào)用
Elastic Stack是什么
ELK(Elastic Stack):Elasticsearch結(jié)合Kibana、Logstash、Beats實現(xiàn)日志數(shù)據(jù)分析、實時監(jiān)控
Elasticsearch
:負責存儲、搜索、分析數(shù)據(jù)Kibana
:數(shù)據(jù)可視化Logstash
、Beats
:數(shù)據(jù)抓?。ㄒ话阌肈ebezium、Flink、RisingWave…)
Elasticsearch能做什么
實時數(shù)據(jù)分析:支持對實時數(shù)據(jù)進行索引和分析,可快速處理大量的日志、指標和事件數(shù)據(jù)
實時監(jiān)控:對系統(tǒng)指標、業(yè)務數(shù)據(jù)和用戶行為進行實時監(jiān)控
電商搜索:為電商平臺提供商品搜索功能,幫助用戶快速找到所需的商品
知識庫搜索:為企業(yè)內(nèi)部的文檔、知識庫和業(yè)務數(shù)據(jù)提供搜索功能,提高員工的工作效率
Elasticsearch 索引
傳統(tǒng)數(shù)據(jù)庫使用正向索引,依據(jù)id構(gòu)建B+樹,根據(jù)索引id查快,對于非索引文檔如商品描述查需要全表掃描
倒排索引:將文檔分為詞條和id進行存儲,先查文檔獲取id,再根據(jù)id查數(shù)據(jù)庫
- 文檔(Document):每條數(shù)據(jù)就是一個Json文檔
- 詞條(Term):文檔按語義分成的詞語
索引(Index):相同類型文檔的集合
映射(Mapping):索引中的文檔約束信息
字段(Fielf):Json文檔中的字段
DSL:Json風格的請求語句,用來實現(xiàn)CRUD
Docker安裝Elasticsearch、Kibana、IK
1、先創(chuàng)建自定義網(wǎng)絡
使用默認
bridge
只能通過ip通信,這里加入了自定義網(wǎng)絡,自定義網(wǎng)絡可以自動解析容器名
- docker network ls查看已有網(wǎng)絡
- 創(chuàng)建自定義網(wǎng)絡docker network create pub-network
- 手動連接網(wǎng)絡docker network connect pub-network container_name_or_id
- 刪除網(wǎng)絡docker network rm network_name_or_idid
2、創(chuàng)建文件夾
mkdir -p /opt/es/data mkdir -p /opt/es/plugins mkdir -p /opt/es/logs
3、授權(quán)
chmod -R 777 /opt/es/data chmod -R 777 /opt/es/logs
安裝IK分詞器
由于ES對中文分詞無法理解語義,需要IK插件
https://release.infinilabs.com/analysis-ik/stable/
Elasticsearch、Kibana、IK所有版本保持一致,解壓后使用shell工具將整個文件夾上傳到/opt/es/plugins
離線部署Elasticsearch、Kibana
在能訪問的地方拉取鏡像
docker pull elasticsearch:8.15.2 docker pull kibana:8.15.2
這里使用wsl,wsl
進入wsl,然后進入win的D盤
cd /mnt/d
打包鏡像,這個文件可以在win D盤找到
docker save elasticsearch:8.15.2 > elasticsearch.tar docker save kibana:8.15.2 > kibana.tar
使用shell工具如Windterm上傳文件
加載鏡像
docker load -i elasticsearch.tar docker load -i kibana.tar
查看鏡像
docker images
然后命令部署或者docker-compose部署即可
命令部署Elasticsearch、Kibana
部署Elasticsearch
docker run -d \ --name es \ --network pub-network \ --restart always \ -p 9200:9200 \ -p 9300:9300 \ -e "xpack.security.enabled=false" \ -e "discovery.type=single-node" \ -e "http.cors.enabled=true" \ -e "http.cors.allow-origin:*" \ -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \ -v /opt/es/data:/usr/share/elasticsearch/data \ -v /opt/es/plugins:/usr/share/elasticsearch/plugins \ -v /opt/es/logs:/usr/share/elasticsearch/logs \ --privileged=true \ elasticsearch:8.15.2
xpack.security.enabled=false
禁用密碼登錄
如果要使用token: -e "xpack.security.enrollment.enabled=true" \
docker部署一般用于開發(fā),不要為難自己,使用token會有很多問題,生產(chǎn)環(huán)境再開,使用SSl需要證書
部署Kibana
docker run -d \ --name kibana \ --network pub-network \ --restart always \ -p 5601:5601 \ -e CSP_STRICT=false \ -e I18N_LOCALE=zh-CN \ kibana:8.15.2
報錯kibana 服務器尚未準備就緒,是因為配置了ELASTICSEARCH_HOSTS
docker-compose部署Elasticsearch、Kibana
es: image: elasticsearch:8.15.2 container_name: es network_mode: pub-network restart: always ports: # 9200:對外暴露的端口 - 9200:9200 # 9300:節(jié)點間通信端口 - 9300:9300 environment: # 禁用密碼登錄 xpack.security.enabled: 'false' # 單節(jié)點運行 discovery.type: single-node # 允許跨域 http.cors.enabled: 'true' # 允許所有訪問 http.cors.allow-origin: '*' # 堆內(nèi)存大小 ES_JAVA_OPTS: '-Xms512m -Xmx512m' volumes: # 數(shù)據(jù)掛載 - /opt/es/data:/usr/share/elasticsearch/data # 插件掛載 - /opt/es/plugins:/usr/share/elasticsearch/plugins # 日志掛載 - /opt/es/logs:/usr/share/elasticsearch/logs # 允許root用戶運行 privileged: true kibana: image: kibana:8.15.2 container_name: kibana network_mode: pub-network restart: always ports: - 5601:5601 environment: # 禁用安全檢查 CSP_STRICT: 'false' # 設(shè)置中文 I18N_LOCALE: zh-CN networks: pub-network: name: pub-network
部署
docker-compose up -d
刪除Elasticsearch、Kibana
docker rm -f es docker rm -f kibana
開啟安全配置(可選,如果要用密碼和token)
es8開始需要密碼訪問,kibana通過token訪問
# 生成密碼 docker exec -it es /usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic # 生成kibana訪問token docker exec -it es /usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token -s kibana
訪問Elasticsearch、Kibana
Elasticsearch:127.0.0.1:9200
,看到以下界面就部署成功了
Kibana:127.0.0.1:5601
看到以下界面就部署成功了
訪問:http://127.0.0.1:9200/.kibana
跨域查看有沒有發(fā)現(xiàn)可視化工具kibana
我們選擇手動配置,使用http://es:9200
,我們沒有配置ssl只能用http,容器名為es
在終端運行命令查看日志中的驗證碼
docker logs kibana
使用
GET /_analyze { "analyzer": "ik_max_word", "text": "好好學習天天向上" }
如果一個字為一個詞條,就說明分詞插件IK沒裝好,重新安裝后重啟容器docker restart es
分詞原理
依據(jù)字典進行分詞
對于一些新詞語,如鋁合金鍵盤被稱為“鋁坨坨”,詞典中沒有這個詞語,會將其逐字分詞
分詞流程
- 1、
character filters
:字符過濾器,進行原始處理,如轉(zhuǎn)換編碼、去停用詞、轉(zhuǎn)小寫 - 2、
tokenizer
:分詞器,將文本流進行分詞為詞條 - 3、
tokenizer filter
:將詞條進行進一步處理,如同義詞處理、拼音處理
擴展詞庫
在IK插件config/IKAnalyzer.cfg.xml
中添加
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 擴展配置</comment> <!--用戶可以在這里配置自己的擴展字典 --> <entry key="ext_dict">ext.dic</entry> <!--用戶可以在這里配置自己的擴展停止詞字典--> <entry key="ext_stopwords">stopword.dic</entry> <!--用戶可以在這里配置遠程擴展字典 --> <!-- <entry key="remote_ext_dict">words_location</entry> --> <!--用戶可以在這里配置遠程擴展停止詞字典--> <!-- <entry key="remote_ext_stopwords">words_location</entry> --> </properties>
停用詞庫
例如敏感詞
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 擴展配置</comment> <!--用戶可以在這里配置自己的擴展字典 --> <entry key="ext_stopwords">stopword.dic</entry> </properties>
使用
生產(chǎn)使用可以用AI、ELP進行分詞
修改配置,添加擴展詞庫和停用詞庫
vim /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/IKAnalyzer.cfg.xml
這里新建一個詞庫
touch /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/ext.dic
編輯擴展詞庫
vim /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/ext.dic
添加分詞
鋁坨坨
編輯停用詞庫
vim /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/stopword.dic
添加
的
重啟ES
docker restart es
測試分詞
GET /_analyze{ "analyzer": "ik_max_word", "text": "重重的鋁坨坨"}
可以看到擴展詞庫的“鋁坨坨”被分詞識別出來了,“的”沒有被分詞
分詞作用
- 創(chuàng)建倒排索引時對文檔分詞
- 用戶搜索時對輸入的內(nèi)容分詞
IK分詞模式
- ik_smart:智能切分,粗粒度
- ik_max_word:最細切分,細粒度
DSL 索引操作
- 僅允許GET, PUT, DELETE, HEAD
- mapping:對索引庫中文檔的約束,常見的屬性有
- type:字段數(shù)據(jù)類型
- 字符串:text(可分詞的文本)、keyword(不分詞的精確值,合在一起有意義的詞,如國家、品牌)
- 數(shù)值:long、integer、short、byte、double、float
- 布爾:boolean
- 日期:date
- 對象:object
- index:是否創(chuàng)建倒排索引,默認true
- analyzer:使用哪種分詞器
- properties:字段的子字段
- type:字段數(shù)據(jù)類型
添加索引庫,每次寫入操作版本都會+1,如添加(POST)、更新(PUT)
索引庫mgr
PUT /mgr { "mappings": { "properties": { "info": { "type": "text", "analyzer": "ik_smart" }, "email": { "type": "keyword", "index": false }, "name": { "type": "object", "properties": { "firstName": { "type": "keyword" }, "lastName": { "type": "keyword" } } } } } }
查詢索引庫
GET /mgr
更新索引庫(索引庫禁止修改,因為索引庫建立倒排索引后無法修改,只能添加新字段)
PUT /mgr/_mapping { "properties":{ "age":{ "type":"integer" } } }
刪除索引庫
DELETE /mgr
DSL文檔操作
添加文檔
索引庫mgr/文檔/文檔id
POST /mgr/_doc/1 { "info": "鋁坨坨鍵盤", "email": "11111@gmail.com", "name": { "firstName": "C", "lastName": "I" } }
查詢文檔
GET /mgr/_doc/1
更新文檔
全量更新,刪除舊文檔,添加新文檔
如果文檔id不存在則與添加文檔功能相同
PUT /mgr/_doc/1 { "info": "鋁坨坨鍵盤", "email": "222@gmail.com", "name": { "firstName": "C", "lastName": "I" } }
增量更新(局部更新)
指定
_update
,指定文檔doc
POST /mgr/_update/1{ "doc": { "email": "333@gmail.com" }}
刪除文檔
DELETE /mgr/_doc/1
Rust客戶端操作Elasticsearch
添加Cargo.toml
elasticsearch = "8.15.0-alpha.1" # 序列化和反序列化數(shù)據(jù) serde = { version = "1.0.127", features = ["derive"] } # 序列化JSON serde_json = "1.0.128" tokio = { version = "1", features = ["full"] } # 異步鎖 once_cell = "1.20.2"
添加環(huán)境變量.env
# 指定當前配置文件 RUN_MODE=development
添加配置settings\development.toml
debug = true # 指定開發(fā)環(huán)境配置 profile = "development" [es] host = "127.0.0.1"
獲取配置config\es.rs
use serde::Deserialize; #[derive(Debug, Deserialize, Clone)] pub struct EsConfig { host: String, port: u16, } impl EsConfig { // 獲取redis連接地址 pub fn get_url(&self) -> String { format!("http://{host}:{port}", host = self.host, port = self.port) } }
將配置存放到AppConfig
#[derive(Debug, Deserialize, Clone)] pub struct AppConfig { pub es:EsConfig, } impl AppConfig { pub fn read(env_src: Environment) -> Result<Self, config::ConfigError> { // 獲取配置文件目錄 let config_dir = get_settings_dir()?; info!("config_dir: {:#?}", config_dir); // 獲取配置文件環(huán)境 let run_mode = std::env::var("RUN_MODE") .map(|env| Profile::from_str(&env).map_err(|e| ConfigError::Message(e.to_string()))) .unwrap_or_else(|_e| Ok(Profile::Dev))?; // 當前配置文件名 let profile_filename = format!("{run_mode}.toml"); // 獲取配置 let config = config::Config::builder() // 添加默認配置 .add_source(config::File::from(config_dir.join("default.toml"))) // 添加自定義前綴配置 .add_source(config::File::from(config_dir.join(profile_filename))) // 添加環(huán)境變量 .add_source(env_src) .build()?; info!("Successfully read config profile: {run_mode}."); // 反序列化 config.try_deserialize() } } // 獲取配置文件目錄 pub fn get_settings_dir() -> Result<std::path::PathBuf, ConfigError> { Ok(get_project_root() .map_err(|e| ConfigError::Message(e.to_string()))? .join("settings")) } #[cfg(test)] mod tests { use crate::config::profile::Profile; use self::env::get_env_source; pub use super::*; #[test] pub fn test_profile_to_string() { // 設(shè)置dev模式 let profile: Profile = Profile::try_from("development").unwrap(); println!("profile: {:#?}", profile); assert_eq!(profile, Profile::Dev) } #[test] pub fn test_read_app_config_prefix() { // 讀取配置 let config = AppConfig::read(get_env_source("APP")).unwrap(); println!("config: {:#?}", config); } }
將配置存放到全局constant\mod.rs
// 環(huán)境變量前綴 pub const ENV_PREFIX: &str = "APP"; // 配置 pub static CONFIG: Lazy<crate::config::AppConfig> = Lazy::new(|| crate::config::AppConfig::read(get_env_source(ENV_PREFIX)).unwrap() );
加載配置文件client\builder.rs
use crate::config::AppConfig; // 傳輸配置文件到客戶端 pub trait ClientBuilder: Sized { fn build_from_config(config: &AppConfig) -> Result<Self,InfraError>; }
Es客戶端client\es.rs
InfraError為自定義錯誤,請修改為你想要的錯誤,如標準庫錯誤
// 類型別名 pub type EsClient = Arc<Elasticsearch>; // 加載配置文件 pub trait EsClientExt: Sized { fn build_from_config(config: &AppConfig) -> impl Future<Output = Result<Self, InfraError>>; } impl EsClientExt for EsClient { async fn build_from_config(config: &AppConfig) -> Result<Self, InfraError> { // 1、使用single_node方式創(chuàng)建client // let transport = Transport::single_node(&config.es.get_url()).unwrap(); // let client = Elasticsearch::new(transport); // Ok(Arc::new(client)) // 2、使用builder方式創(chuàng)建client,可以添加多個url let url = config.es.get_url(); let url_parsed = url .parse::<elasticsearch::http::Url>() .map_err(|_| InfraError::OtherError("url err".to_string()))?; let conn_pool = SingleNodeConnectionPool::new(url_parsed); let transport = TransportBuilder::new(conn_pool) .disable_proxy() .build() .map_err(|_| InfraError::OtherError("transport err".to_string()))?; let client = Elasticsearch::new(transport); Ok(Arc::new(client)) } }
測試client\es.rs
,所有請求在body()
中定義DSL語句,通過send()
發(fā)送
#[cfg(test)] mod tests { use elasticsearch::{ cat::CatIndicesParts, DeleteParts, IndexParts, UpdateParts }; use serde_json::json; use super::*; use crate::constant::CONFIG; #[tokio::test] async fn test_add_document() { let client_result = EsClient::build_from_config(&CONFIG).await; assert!(client_result.is_ok()); let client = client_result.unwrap(); let response = client .index(IndexParts::IndexId("mgr", "1")) .body( json!({ "id": 1, "user": "cci", "post_date": "2024-01-15T00:00:00Z", "message": "Trying out Elasticsearch, so far so good?" }) ) .send().await; assert!(response.is_ok()); let response = response.unwrap(); assert!(response.status_code().is_success()); } #[tokio::test] async fn test_get_indices() { let client_result = EsClient::build_from_config(&CONFIG).await; assert!(client_result.is_ok()); let client = client_result.unwrap(); let get_index_response = client .cat() .indices(CatIndicesParts::Index(&["*"])) .send().await; assert!(get_index_response.is_ok()); } #[tokio::test] async fn test_update_document() { let client_result = EsClient::build_from_config(&CONFIG).await; assert!(client_result.is_ok()); let client = client_result.unwrap(); let update_response = client .update(UpdateParts::IndexId("mgr", "1")) .body( json!({ "doc": { "message": "Updated message" } }) ) .send().await; assert!(update_response.is_ok()); let update_response = update_response.unwrap(); assert!(update_response.status_code().is_success()); } #[tokio::test] async fn test_delete_document() { let client_result = EsClient::build_from_config(&CONFIG).await; assert!(client_result.is_ok()); let client = client_result.unwrap(); let delete_response = client.delete(DeleteParts::IndexId("mgr", "1")).send().await; assert!(delete_response.is_ok()); let delete_response = delete_response.unwrap(); assert!(delete_response.status_code().is_success()); } }
使用流程
// 1、創(chuàng)建client let client_result = EsClient::build_from_config(&CONFIG).await; assert!(client_result.is_ok()); let client = client_result.unwrap(); // 2、定義DSL語句 let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4); // 添加文檔 body.push(json!({"index": {"_id": "1"}}).into()); body.push( json!({ "id": 1, "user": "kimchy", "post_date": "2009-11-15T00:00:00Z", "message": "Trying out Elasticsearch, so far so good?" }).into() ); // 添加文檔 body.push(json!({"index": {"_id": "2"}}).into()); body.push( json!({ "id": 2, "user": "forloop", "post_date": "2020-01-08T00:00:00Z", "message": "Bulk indexing with the rust client, yeah!" }).into() ); // 3、發(fā)送請求 let response = client.bulk(BulkParts::Index("mgr")).body(body).send().await.unwrap();
項目地址:https://github.com/VCCICCV/MGR
分析數(shù)據(jù)結(jié)構(gòu)
mapping
要考慮的問題:字段名、數(shù)據(jù)類型、是否參與搜索(建立倒排索引"index":false
,默認true)、是否分詞(參與搜索的字段,text分詞,keyword、數(shù)據(jù)類型不分詞)、分詞器
- 地理坐標:
- geo_point:由經(jīng)度(longitude)和緯度(latitude)確定的一個點,如
[ 13.400544, 52.530286 ]
- geo_shape:由多個
geo_point
組成的幾何圖形,如一條線[[13.0, 53.0], [14.0, 52.0]]
- geo_point:由經(jīng)度(longitude)和緯度(latitude)確定的一個點,如
copy_to
:將多個字段組合為一個字段進行索引 Rust客戶端操作索引庫
生產(chǎn)環(huán)境不要使用
unwrap()
這里演示在請求正文中操作,使用send()
Transport
支持的方法Method
:
Get
:獲取資源Put
:創(chuàng)建或更新資源(全量更新)Post
:創(chuàng)建或更新資源(部分更新)Delete
:刪除資源Head
:獲取頭信息
send()
請求正文需要包含的參數(shù):
method
:必須path
:必須headers
:必須query_string
:可選body
:可選timeout
:可選
添加索引庫
#[tokio::test] async fn test_create_index() { // 1、創(chuàng)建client let client_result = EsClient::build_from_config(&CONFIG).await; assert!(client_result.is_ok()); let client = client_result.unwrap(); // 2、定義DSL語句 let index_name = "mgr"; let index_definition = json!({ "mappings":{ "properties":{ "age":{ "type":"integer" } } } }); let body = Some(serde_json::to_vec(&index_definition).unwrap()); let path = format!("/{}", index_name); let headers = HeaderMap::new(); let query_string = None; let timeout = None; let method = Method::Put; // 3、發(fā)送請求 let response = client.send::<Vec<u8>, ()>( method, &path, headers, query_string, body, timeout ).await; assert!(response.is_ok()); let response = response.unwrap(); assert_eq!(response.status_code().is_success(), true); }
你也可以將其簡化
#[tokio::test] async fn test_create_index() { // 1、創(chuàng)建client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2、定義DSL let index_definition = json!({ "mappings":{ "properties":{ "age":{ "type":"integer" } } } }); // 3、發(fā)送請求 let response = client.send::<Vec<u8>, ()>( Method::Put, format!("/mgr").as_str(), HeaderMap::new(), None, Some(index_definition.to_string().as_bytes().to_vec()), None ).await; assert!(response.is_ok()); let response = response.unwrap(); assert_eq!(response.status_code().is_success(), true); }
查詢索引庫是否存在
#[tokio::test] async fn test_query_index() { // 1、創(chuàng)建 client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2、定義查詢 DSL 語句 let query = json!({ "query": { "match_all": {} } }); // 3、發(fā)送請求 let response = client.send::<Vec<u8>, ()>( Method::Get, format!("/mgr/_search").as_str(), HeaderMap::new(), None, Some(query.to_string().as_bytes().to_vec()), None ).await; assert!(response.is_ok()); let response = response.unwrap(); println!("{:?}", response); assert_eq!(response.status_code().is_success(), true); }
也可以不定義DSL查詢
#[tokio::test] async fn test_query_index2() { // 1、創(chuàng)建 client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2、發(fā)送請求 let response = client.send::<Vec<u8>, ()>( Method::Get, format!("/mgr").as_str(), HeaderMap::new(), None, None, None ).await; assert!(response.is_ok()); let response = response.unwrap(); println!("{:?}", response); assert_eq!(response.status_code().is_success(), true); }
更新索引庫
#[tokio::test] async fn test_update_index() { // 1、創(chuàng)建 client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2、定義查詢 DSL 語句 let update_content = json!({ "properties":{ "age":{ "type":"integer" } } }); // 3、發(fā)送請求 let response = client.send::<Vec<u8>, ()>( Method::Put, format!("/mgr/_mapping").as_str(), HeaderMap::new(), None, Some(update_content.to_string().as_bytes().to_vec()), None ).await; assert!(response.is_ok()); let response = response.unwrap(); println!("{:?}", response); assert_eq!(response.status_code().is_success(), true); }
刪除索引庫
#[tokio::test] async fn test_delete_index() { // 1、創(chuàng)建 client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2、發(fā)送請求 let response = client.send::<(), ()>( Method::Delete, format!("/mgr").as_str(), HeaderMap::new(), None, None, None ).await; assert!(response.is_ok()); let response = response.unwrap(); assert_eq!(response.status_code().is_success(), true); }
Rust客戶端操作文檔
添加文檔
#[tokio::test] async fn test_create_doc() { // 1、創(chuàng)建 client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2、定義查詢 DSL 語句 let doc_content = json!({ "id": "1", "user": "kimchy", "post_date": "2009-11-15T00:00:00Z", "message": "Trying out Elasticsearch, so far so good?" }); // 3、發(fā)送請求 let response = client.send::<Vec<u8>, ()>( Method::Post, format!("/mgr/_doc/1").as_str(), HeaderMap::new(), None, Some(doc_content.to_string().as_bytes().to_vec()), None ).await; assert!(response.is_ok()); let response = response.unwrap(); println!("{:?}", response); assert_eq!(response.status_code().is_success(), true); }
查詢文檔是否存在
#[tokio::test] async fn test_get_doc() { // 1、創(chuàng)建 client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2、發(fā)送請求 let response = client.send::<Vec<u8>, ()>( Method::Get, format!("/mgr/_doc/1").as_str(), HeaderMap::new(), None, None, None ).await; assert!(response.is_ok()); let response = response.unwrap(); println!("{:?}", response); assert_eq!(response.status_code().is_success(), true); }
更新文檔
#[tokio::test] async fn test_update_doc() { // 1、創(chuàng)建 client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2、定義查詢 DSL 語句 let doc_content = json!({ "doc": { "message": "Updated message" } }); // 3、發(fā)送請求 let response = client.send::<Vec<u8>, ()>( Method::Post, format!("/mgr/_update/1").as_str(), HeaderMap::new(), None, Some(doc_content.to_string().as_bytes().to_vec()), None ).await; assert!(response.is_ok()); let response = response.unwrap(); println!("{:?}", response); assert_eq!(response.status_code().is_success(), true); }
刪除文檔
#[tokio::test] async fn test_delete_doc() { // 1、創(chuàng)建 client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2、發(fā)送請求 let response = client.send::<Vec<u8>, ()>( Method::Delete, format!("/mgr/_doc/1").as_str(), HeaderMap::new(), None, None, None ).await; assert!(response.is_ok()); let response = response.unwrap(); println!("{:?}", response); assert_eq!(response.status_code().is_success(), true); }
批量添加文檔
#[tokio::test] async fn test_bulk_add_to_mgr() { // 1、創(chuàng)建client let client_result = EsClient::build_from_config(&CONFIG).await; assert!(client_result.is_ok()); let client = client_result.unwrap(); // 2、定義DSL語句 let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4); // 添加第一個操作和文檔 body.push(json!({"index": {"_id": "1"}}).into()); body.push( json!({ "id": 1, "user": "kimchy", "post_date": "2009-11-15T00:00:00Z", "message": "Trying out Elasticsearch, so far so good?" }).into() ); // 添加第二個操作和文檔 body.push(json!({"index": {"_id": "2"}}).into()); body.push( json!({ "id": 2, "user": "forloop", "post_date": "2020-01-08T00:00:00Z", "message": "Bulk indexing with the rust client, yeah!" }).into() ); // 3、發(fā)送請求 let response = client.bulk(BulkParts::Index("mgr")).body(body).send().await.unwrap(); assert!(response.status_code().is_success()); }
Rust客戶端操作搜索
這里演示在請求體body
中進行API調(diào)用
- 查詢所有:查出所有數(shù)據(jù)
- 全文檢索查詢(full text):利用分詞器對內(nèi)容分詞,從倒排索引庫中查詢
- match_query
- multi_match_query
- 精確查詢:根據(jù)精確值查詢,如integer、keyword、日期
- id
- range:根據(jù)值的范圍查詢
- term:根據(jù)詞條精確值查詢
- 地理坐標查詢(geo):根據(jù)經(jīng)緯度查詢
- geo_distance:查詢geo_point指定距離范圍內(nèi)的所有文檔
- geo_bounding_box:查詢geo_point值落在某個矩形范圍內(nèi)的所有文檔
- 復合查詢(compound):將上述條件組合起來
查詢所有
默認10條
#[tokio::test] async fn test_search_match_all() { // 1、創(chuàng)建 client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2. 執(zhí)行搜索 let response = client .search(SearchParts::Index(&["mgr"])) .from(0) .size(5) .body( json!({ "query": { "match_all": { } } }) ) .send().await .unwrap(); // 3. 解析響應 let response_body = response.json::<Value>().await.unwrap(); // 搜索耗時 let took = response_body["took"].as_i64().unwrap(); println!("took: {}ms", took); // 搜索結(jié)果 for hit in response_body["hits"]["hits"].as_array().unwrap() { println!("{:?}", hit["_source"]); } }
等價于
GET /mgr/_search { "query": { "match_all": {} } }
全文搜索
message
為文檔中的字段
#[tokio::test] async fn test_search_match() { // 1、創(chuàng)建 client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2. 執(zhí)行搜索 let response = client .search(SearchParts::Index(&["mgr"])) .from(0) .size(5) .body( json!({ "query": { "match": { "message": "good" } } }) ) .send().await .unwrap(); // 3. 解析響應 let response_body = response.json::<Value>().await.unwrap(); // 搜索耗時 let took = response_body["took"].as_i64().unwrap(); println!("took: {}ms", took); // 搜索結(jié)果 for hit in response_body["hits"]["hits"].as_array().unwrap() { println!("{:?}", hit["_source"]); } }
相當于
GET /mgr/_search { "query": { "match": { "message": "good" } } }
多字段查詢
多字段查詢效率低,一般在創(chuàng)建時使用copy_to
到一個字段中
#[tokio::test] async fn test_search_multi_match() { // 1、創(chuàng)建 client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2. 執(zhí)行搜索 let response = client .search(SearchParts::Index(&["mgr"])) .from(0) .size(5) .body( json!({ "query": { "multi_match": { "query": "good", "fields": [ "message", "user" ] } } }) ) .send().await .unwrap(); // 3. 解析響應 let response_body = response.json::<Value>().await.unwrap(); // 搜索耗時 let took = response_body["took"].as_i64().unwrap(); println!("took: {}ms", took); // 搜索結(jié)果 for hit in response_body["hits"]["hits"].as_array().unwrap() { println!("{:?}", hit["_source"]); } }
相當于
GET /mgr/_search { "query": { "multi_match": { "query": "good", "fields": [ "message", "user" ] } } }
根據(jù)范圍查詢(range)
gte
大于等于,lte
小于等于;gt
大于lt
小于
#[tokio::test] async fn test_search_range() { // 1、創(chuàng)建 client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2. 執(zhí)行搜索 let response = client .search(SearchParts::Index(&["mgr"])) .from(0) .size(5) .body( json!({ "query": { "range": { "id": { "gte": 1, "lte": 1 } } } }) ) .send().await .unwrap(); // 3. 解析響應 let response_body = response.json::<Value>().await.unwrap(); // 搜索耗時 let took = response_body["took"].as_i64().unwrap(); println!("took: {}ms", took); // 搜索結(jié)果 for hit in response_body["hits"]["hits"].as_array().unwrap() { println!("{:?}", hit["_source"]); } }
相當于
GET /mgr/_search { "query": { "range": { "id": { "gte": 1, "lte": 1 } } } }
根據(jù)詞條精確查詢(term)
#[tokio::test] async fn test_search_term() { // 1、創(chuàng)建 client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2. 執(zhí)行搜索 let response = client .search(SearchParts::Index(&["mgr"])) .from(0) .size(5) .body( json!({ "query": { "term": { "user": "kimchy" } } }) ) .send().await .unwrap(); // 3. 解析響應 let response_body = response.json::<Value>().await.unwrap(); // 搜索耗時 let took = response_body["took"].as_i64().unwrap(); println!("took: {}ms", took); // 搜索結(jié)果 for hit in response_body["hits"]["hits"].as_array().unwrap() { println!("{:?}", hit["_source"]); } }
相當于
GET /mgr/_search { "query": { "term": { "user": "kimchy" } } }
根據(jù)地理距離查詢
GET /mgr/_search { "query": { "geo_distance": { "distance": "100km", "location": "31.04, 45.12" } } }
根據(jù)指定矩形范圍查詢
左上經(jīng)緯度與右下經(jīng)緯度
geo
為文檔中的字段
GET /mgr/_search { "query": { "geo_bounding_box": { "geo": { "top_left": { "lon": 124.45, "lat": 32.11 }, "bottom_right": { "lon": 125.12, "lat": 30.21 } } } } }
復合查詢
查詢時文檔會對搜索詞條的關(guān)聯(lián)度打分_score
,返回結(jié)果時按照降序排列
關(guān)聯(lián)度計算方法
- TF-IDF算法(ES5.0之前)
TF(詞條頻率)= 詞條出現(xiàn)次數(shù)/文檔中詞條總數(shù)
IDF(逆文檔頻率)=log(文檔總數(shù)/包含詞條的文檔總數(shù))
score = ∑(??=1,??)(TF*IDF):將詞條頻率與逆文檔頻率相乘再求和
- BM25算法(ES5.0之后)
默認采用BM25算法:考慮了TF、IDF、文檔長度等因素,能夠平衡長短文的關(guān)聯(lián)度
function_score
修改關(guān)聯(lián)度
指定文檔和算分函數(shù)
GET /mgr/_search { "query": { "function_score": { "query": { "match": {// 查詢方法 "message": "good" } }, "functions": [ // 算分函數(shù) { "filter": {// 只有符合過濾條件的才被計算 "term": {// 根據(jù)詞條精確查詢 "id": 1 } }, "weight": 3 // 指定加權(quán)函數(shù) } ], // 加權(quán)模式:相乘 "boost_mode": "multiply" } } }
weight
:給定常量值,還可以指定以下值field_value_factor
:用文檔中的指定字段值作為函數(shù)結(jié)果random_score
:隨機生成一個值script_score
:自定義計算公式boost_mode
:加權(quán)模式,multiply
與原來的_score
相乘,還可以配置:replace
:替換原來的_score
sum
:求和avg
:取平均值min
:取最小值max
:取最大值
相當于
#[tokio::test] async fn test_function_score_query() { // 1、創(chuàng)建 client let client = EsClient::build_from_config(&CONFIG).await.unwrap(); // 2. 執(zhí)行搜索 let response = client .search(SearchParts::Index(&["mgr"])) .from(0) .size(5) .body( json!({ "query": { "function_score": { "query": { "match": {// 查詢方法 "message": "good" } }, "functions": [ // 算分函數(shù) { "filter": {// 只有符合過濾條件的才被計算 "term": {// 根據(jù)詞條精確查詢 "id": 1 } }, "weight": 3 // 指定加權(quán)函數(shù) } ], // 加權(quán)模式:相乘 "boost_mode": "multiply" } } }) ) .send().await .unwrap(); // 3. 解析響應 let response_body = response.json::<Value>().await.unwrap(); // 搜索耗時 let took = response_body["took"].as_i64().unwrap(); println!("took: {}ms", took); // 搜索結(jié)果 for hit in response_body["hits"]["hits"].as_array().unwrap() { println!("{:?}", hit["_source"]); } }
boolean query 布爾查詢
布爾查詢是一個或多個子句查詢的組合,組合方式有
must
:必須匹配每個子查詢,類似于“與”should
:選擇性匹配子查詢,類似于“或”must_not
:必須不匹配,不參與算分,類似于“非”filter
:必須匹配,
查詢message中包含rust,post_date不小于2020年1月1日的文檔
GET /mgr/_search { "query": { "bool": { "must": [ { "match_phrase": { "message": "rust" } } ], "must_not": [ { "range": { "post_date": { "lt": "2020-01-01T00:00:00Z" } } } ] } } }
搜索結(jié)果處理
排序
GET /mgr/_search { "query": { "match_all": {} }, "sort": [ { "id": "desc"http:// ASC升序,DESC降序 } ] }
地理位置排序
GET /mgr/_search { "query": { "match_all": {} }, "sort": [ { "_geo_distance":{ "FIELD": { "lat": 40,// 緯度 "lon": -70// 經(jīng)度 }, "order":"asc",// 排序方式 "unit":"km" // 單位 } } ] }
分頁
1、from+size
分頁查詢(默認10條數(shù)據(jù))
GET /mgr/_search { "query": { "match_all": {} }, "from":1,// 分頁開始位置 "size":10,// 期望獲取的文檔總數(shù) "sort": [ { "id": "desc"http:// ASC升序,DESC降序 } ] }
深度分頁問題:一般將ES作為分布式部署,當需要"from"=990,"size"=10
查數(shù)據(jù)時:
1、先從每個數(shù)據(jù)分片上查詢前1000
條數(shù)據(jù)
2、將所有節(jié)點的結(jié)果聚合,在內(nèi)存中重新排序選出前1000
條文檔
3、在這1000條文檔中選取"from"=990,"size"=10
的數(shù)據(jù)
如果搜索頁數(shù)過深,或者結(jié)果集(from+size)越大,對內(nèi)存和CPU的消耗越高,因此ES設(shè)定的查詢上限是
10000
深度分頁解決方案:
2、search after
分頁查詢:分頁時排序,從上一次的排序值開始查詢下一頁文檔(只能向后查詢)
3、scroll
分頁查詢:將排序數(shù)據(jù)形成快照,保存在內(nèi)存中(內(nèi)存消耗大,官方不推薦)
高亮處理
搜索鍵盤時關(guān)鍵字高亮
highlight
指定高亮字段
默認搜索字段和高亮字段匹配才高亮
GET /mgr/_search { "query": { "match": { "message":"rust"http:// 搜索message中包含rust的文檔 } }, "highlight":{ "fields":{ "message":{// 指定高亮字段 "require_field_match":"false"http:// 搜索字段和高亮字段可以不匹配 } } } }
數(shù)據(jù)聚合
聚合(aggregations)可以實現(xiàn)對文檔數(shù)據(jù)的統(tǒng)計、分析、運算,聚合分類:
- 桶(Buket):用來對數(shù)據(jù)分組
- https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket.html
- TermAggregation:按文檔字段或詞條值分組
- Date Histogram:按日期階梯分組,如一周為一組
- 度量(Metric):用于計算一些值,如最大值、最小值、平均值
- https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics.html
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Sum:求和
- Stats:同時求Max、Min、Avg、Sum等
- 管道(pipeline):以其他聚合的結(jié)果作為聚合的基礎(chǔ)
- https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-pipeline.html
桶(Buket)
Buket默認統(tǒng)計其中的文檔數(shù)量_count
,并且按照降序排序
GET /mgr/_search { "size":0,// 文檔大小,結(jié)果不包含文檔,只包含聚合結(jié)果 "aggs": {//指定聚合 "idAgg": {// 聚合名 "terms": {// 精確查詢 "field":"id",// 指定字段 "order":{ "_count":"asc"http:// 按升序排序 } } } } }
度量(Metric)
GET /mgr/_search { "size":0,// 文檔大小,結(jié)果不包含文檔,只包含聚合結(jié)果 "aggs": {//指定聚合 "idAgg": {// 聚合名 "terms": {// 精確查詢 "field":"id",// 指定字段 "size":20 }, "aggs":{// 子聚合 "score_stats":{// 聚合名 "max":{//聚合類型,min、max、avg等 "field":"score"http:// 聚合字段 } } } } } }
自動補全
拼音補全
如果你想要通過拼音補全,請下載解壓拼音分詞器上傳到/opt/es/plugins
目錄然后重啟es
https://github.com/infinilabs/analysis-pinyin/releases
- 補全字段必須是
completion
類型 - 拼音分詞需要自定義分詞器
進行拼音分詞:創(chuàng)建索引并設(shè)置字段類型為completion
,同時指定先分詞再根據(jù)詞條過濾(如果不自定義分詞器,默認將每個漢字單獨分為拼音,所以先分詞詞條再進行拼音處理),其他設(shè)置見github倉庫
PUT /test { "settings": {// 設(shè)置 "analysis": { "analyzer": {// 設(shè)置分詞器 "my_analyzer": {// 分詞器名 "filters": [ "lowercase",// 轉(zhuǎn)小寫 "stop"http:// 去停用詞 ], "tokenizer": "ik_max_word", // 分詞器 "filter": "py" // 過濾時進行拼音 } } }, "filter": { // 自定義tokenizer filter "py": { // 過濾器名稱 "type": "pinyin", // 過濾器類型,這里是pinyin "keep_full_pinyin": false,// 是否保留完整的拼音形式 "keep_joined_full_pinyin": true,// 是否保留連接起來的完整拼音形式 "keep_original": true,// 是否保留原始的文本內(nèi)容 "limit_first_letter_length": 16,// 限制拼音首字母的長度為 16 "remove_duplicated_term": true,// 是否移除重復的詞條 "none_chinese_pinyin_tokenize": false// 不對非中文字符進行拼音分詞 } } }, "mappings": { "properties": { "user": { "type": "completion" } } } }
不進行拼音分詞:創(chuàng)建索引并設(shè)置字段類型為completion
PUT /test { "mappings": { "properties": { "user": { "type": "completion" } } } }
添加文檔
POST /test/_doc/1 { "id": 1, "message": "Trying out Elasticsearch, so far so good?", "post_date": "2009-11-15T00:00:00Z", "user": "kimchy" }
根據(jù)關(guān)鍵字查詢補全
GET /test/_search { "suggest": { "YOUR_SUGGESTION": {// 指定自動補全查詢名字 "text": "k",// 關(guān)鍵字前綴 "completion": {// 自動補全類型 "field": "user",// 補全字段 "skip_duplicates": true,// 是否跳過重復的建議 "size": 10 // 獲取前10條結(jié)果 } } } }
所有代碼地址:https://github.com/VCCICCV/MGR/blob/main/auth/infrastructure/src/client/es.rs
到此這篇關(guān)于Rust整合Elasticsearch的詳細過程(收藏)的文章就介紹到這了,更多相關(guān)Rust整合Elasticsearch內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!