Rust整合Elasticsearch的詳細(xì)過(guò)程(收藏)
全文搜索Elasticsearch是什么
Lucene:Java實(shí)現(xiàn)的搜索引擎類庫(kù)
- 易擴(kuò)展
- 高性能
- 僅限Java開(kāi)發(fā)
- 不支持水平擴(kuò)展
Elasticsearch:基于Lucene開(kāi)發(fā)的分布式搜索和分析引擎
- 支持分布式、水平擴(kuò)展
- 提高RestfulAPI,可被任何語(yǔ)言調(diào)用
Elastic Stack是什么
ELK(Elastic Stack):Elasticsearch結(jié)合Kibana、Logstash、Beats實(shí)現(xiàn)日志數(shù)據(jù)分析、實(shí)時(shí)監(jiān)控
Elasticsearch:負(fù)責(zé)存儲(chǔ)、搜索、分析數(shù)據(jù)Kibana:數(shù)據(jù)可視化Logstash、Beats:數(shù)據(jù)抓取(一般用Debezium、Flink、RisingWave…)
Elasticsearch能做什么
實(shí)時(shí)數(shù)據(jù)分析:支持對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行索引和分析,可快速處理大量的日志、指標(biāo)和事件數(shù)據(jù)
實(shí)時(shí)監(jiān)控:對(duì)系統(tǒng)指標(biāo)、業(yè)務(wù)數(shù)據(jù)和用戶行為進(jìn)行實(shí)時(shí)監(jiān)控
電商搜索:為電商平臺(tái)提供商品搜索功能,幫助用戶快速找到所需的商品
知識(shí)庫(kù)搜索:為企業(yè)內(nèi)部的文檔、知識(shí)庫(kù)和業(yè)務(wù)數(shù)據(jù)提供搜索功能,提高員工的工作效率
Elasticsearch 索引
傳統(tǒng)數(shù)據(jù)庫(kù)使用正向索引,依據(jù)id構(gòu)建B+樹(shù),根據(jù)索引id查快,對(duì)于非索引文檔如商品描述查需要全表掃描
倒排索引:將文檔分為詞條和id進(jìn)行存儲(chǔ),先查文檔獲取id,再根據(jù)id查數(shù)據(jù)庫(kù)
- 文檔(Document):每條數(shù)據(jù)就是一個(gè)Json文檔
- 詞條(Term):文檔按語(yǔ)義分成的詞語(yǔ)
索引(Index):相同類型文檔的集合
映射(Mapping):索引中的文檔約束信息
字段(Fielf):Json文檔中的字段
DSL:Json風(fēng)格的請(qǐng)求語(yǔ)句,用來(lái)實(shí)現(xiàn)CRUD
Docker安裝Elasticsearch、Kibana、IK
1、先創(chuàng)建自定義網(wǎng)絡(luò)
使用默認(rèn)
bridge只能通過(guò)ip通信,這里加入了自定義網(wǎng)絡(luò),自定義網(wǎng)絡(luò)可以自動(dòng)解析容器名
- docker network ls查看已有網(wǎng)絡(luò)
- 創(chuàng)建自定義網(wǎng)絡(luò)docker network create pub-network
- 手動(dòng)連接網(wǎng)絡(luò)docker network connect pub-network container_name_or_id
- 刪除網(wǎng)絡(luò)docker network rm network_name_or_idid
2、創(chuàng)建文件夾
mkdir -p /opt/es/data mkdir -p /opt/es/plugins mkdir -p /opt/es/logs
3、授權(quán)
chmod -R 777 /opt/es/data chmod -R 777 /opt/es/logs
安裝IK分詞器
由于ES對(duì)中文分詞無(wú)法理解語(yǔ)義,需要IK插件
https://release.infinilabs.com/analysis-ik/stable/
Elasticsearch、Kibana、IK所有版本保持一致,解壓后使用shell工具將整個(gè)文件夾上傳到/opt/es/plugins
離線部署Elasticsearch、Kibana
在能訪問(wèn)的地方拉取鏡像
docker pull elasticsearch:8.15.2 docker pull kibana:8.15.2
這里使用wsl,wsl進(jìn)入wsl,然后進(jìn)入win的D盤
cd /mnt/d
打包鏡像,這個(gè)文件可以在win D盤找到
docker save elasticsearch:8.15.2 > elasticsearch.tar docker save kibana:8.15.2 > kibana.tar
使用shell工具如Windterm上傳文件
加載鏡像
docker load -i elasticsearch.tar docker load -i kibana.tar
查看鏡像
docker images
然后命令部署或者docker-compose部署即可
命令部署Elasticsearch、Kibana
部署Elasticsearch
docker run -d \ --name es \ --network pub-network \ --restart always \ -p 9200:9200 \ -p 9300:9300 \ -e "xpack.security.enabled=false" \ -e "discovery.type=single-node" \ -e "http.cors.enabled=true" \ -e "http.cors.allow-origin:*" \ -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \ -v /opt/es/data:/usr/share/elasticsearch/data \ -v /opt/es/plugins:/usr/share/elasticsearch/plugins \ -v /opt/es/logs:/usr/share/elasticsearch/logs \ --privileged=true \ elasticsearch:8.15.2
xpack.security.enabled=false禁用密碼登錄
如果要使用token: -e "xpack.security.enrollment.enabled=true" \
docker部署一般用于開(kāi)發(fā),不要為難自己,使用token會(huì)有很多問(wèn)題,生產(chǎn)環(huán)境再開(kāi),使用SSl需要證書
部署Kibana
docker run -d \ --name kibana \ --network pub-network \ --restart always \ -p 5601:5601 \ -e CSP_STRICT=false \ -e I18N_LOCALE=zh-CN \ kibana:8.15.2
報(bào)錯(cuò)kibana 服務(wù)器尚未準(zhǔn)備就緒,是因?yàn)榕渲昧?code>ELASTICSEARCH_HOSTS
docker-compose部署Elasticsearch、Kibana
es:
image: elasticsearch:8.15.2
container_name: es
network_mode: pub-network
restart: always
ports:
# 9200:對(duì)外暴露的端口
- 9200:9200
# 9300:節(jié)點(diǎn)間通信端口
- 9300:9300
environment:
# 禁用密碼登錄
xpack.security.enabled: 'false'
# 單節(jié)點(diǎn)運(yùn)行
discovery.type: single-node
# 允許跨域
http.cors.enabled: 'true'
# 允許所有訪問(wèn)
http.cors.allow-origin: '*'
# 堆內(nèi)存大小
ES_JAVA_OPTS: '-Xms512m -Xmx512m'
volumes:
# 數(shù)據(jù)掛載
- /opt/es/data:/usr/share/elasticsearch/data
# 插件掛載
- /opt/es/plugins:/usr/share/elasticsearch/plugins
# 日志掛載
- /opt/es/logs:/usr/share/elasticsearch/logs
# 允許root用戶運(yùn)行
privileged: true
kibana:
image: kibana:8.15.2
container_name: kibana
network_mode: pub-network
restart: always
ports:
- 5601:5601
environment:
# 禁用安全檢查
CSP_STRICT: 'false'
# 設(shè)置中文
I18N_LOCALE: zh-CN
networks:
pub-network:
name: pub-network部署
docker-compose up -d
刪除Elasticsearch、Kibana
docker rm -f es docker rm -f kibana
開(kāi)啟安全配置(可選,如果要用密碼和token)
es8開(kāi)始需要密碼訪問(wèn),kibana通過(guò)token訪問(wèn)
# 生成密碼 docker exec -it es /usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic # 生成kibana訪問(wèn)token docker exec -it es /usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token -s kibana
訪問(wèn)Elasticsearch、Kibana
Elasticsearch:127.0.0.1:9200,看到以下界面就部署成功了

Kibana:127.0.0.1:5601看到以下界面就部署成功了
訪問(wèn):http://127.0.0.1:9200/.kibana跨域查看有沒(méi)有發(fā)現(xiàn)可視化工具kibana

我們選擇手動(dòng)配置,使用http://es:9200,我們沒(méi)有配置ssl只能用http,容器名為es

在終端運(yùn)行命令查看日志中的驗(yàn)證碼
docker logs kibana

使用

GET /_analyze
{
"analyzer": "ik_max_word",
"text": "好好學(xué)習(xí)天天向上"
}如果一個(gè)字為一個(gè)詞條,就說(shuō)明分詞插件IK沒(méi)裝好,重新安裝后重啟容器docker restart es

分詞原理
依據(jù)字典進(jìn)行分詞
對(duì)于一些新詞語(yǔ),如鋁合金鍵盤被稱為“鋁坨坨”,詞典中沒(méi)有這個(gè)詞語(yǔ),會(huì)將其逐字分詞

分詞流程
- 1、
character filters:字符過(guò)濾器,進(jìn)行原始處理,如轉(zhuǎn)換編碼、去停用詞、轉(zhuǎn)小寫 - 2、
tokenizer:分詞器,將文本流進(jìn)行分詞為詞條 - 3、
tokenizer filter:將詞條進(jìn)行進(jìn)一步處理,如同義詞處理、拼音處理
擴(kuò)展詞庫(kù)
在IK插件config/IKAnalyzer.cfg.xml中添加
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 擴(kuò)展配置</comment> <!--用戶可以在這里配置自己的擴(kuò)展字典 --> <entry key="ext_dict">ext.dic</entry> <!--用戶可以在這里配置自己的擴(kuò)展停止詞字典--> <entry key="ext_stopwords">stopword.dic</entry> <!--用戶可以在這里配置遠(yuǎn)程擴(kuò)展字典 --> <!-- <entry key="remote_ext_dict">words_location</entry> --> <!--用戶可以在這里配置遠(yuǎn)程擴(kuò)展停止詞字典--> <!-- <entry key="remote_ext_stopwords">words_location</entry> --> </properties>
停用詞庫(kù)
例如敏感詞
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 擴(kuò)展配置</comment> <!--用戶可以在這里配置自己的擴(kuò)展字典 --> <entry key="ext_stopwords">stopword.dic</entry> </properties>
使用
生產(chǎn)使用可以用AI、ELP進(jìn)行分詞
修改配置,添加擴(kuò)展詞庫(kù)和停用詞庫(kù)
vim /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/IKAnalyzer.cfg.xml
這里新建一個(gè)詞庫(kù)
touch /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/ext.dic
編輯擴(kuò)展詞庫(kù)
vim /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/ext.dic
添加分詞
鋁坨坨
編輯停用詞庫(kù)
vim /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/stopword.dic
添加
的
重啟ES
docker restart es
測(cè)試分詞
GET /_analyze{ "analyzer": "ik_max_word", "text": "重重的鋁坨坨"}可以看到擴(kuò)展詞庫(kù)的“鋁坨坨”被分詞識(shí)別出來(lái)了,“的”沒(méi)有被分詞

分詞作用
- 創(chuàng)建倒排索引時(shí)對(duì)文檔分詞
- 用戶搜索時(shí)對(duì)輸入的內(nèi)容分詞
IK分詞模式
- ik_smart:智能切分,粗粒度
- ik_max_word:最細(xì)切分,細(xì)粒度
DSL 索引操作
- 僅允許GET, PUT, DELETE, HEAD
- mapping:對(duì)索引庫(kù)中文檔的約束,常見(jiàn)的屬性有
- type:字段數(shù)據(jù)類型
- 字符串:text(可分詞的文本)、keyword(不分詞的精確值,合在一起有意義的詞,如國(guó)家、品牌)
- 數(shù)值:long、integer、short、byte、double、float
- 布爾:boolean
- 日期:date
- 對(duì)象:object
- index:是否創(chuàng)建倒排索引,默認(rèn)true
- analyzer:使用哪種分詞器
- properties:字段的子字段
- type:字段數(shù)據(jù)類型
添加索引庫(kù),每次寫入操作版本都會(huì)+1,如添加(POST)、更新(PUT)
索引庫(kù)mgr
PUT /mgr
{
"mappings": {
"properties": {
"info": {
"type": "text",
"analyzer": "ik_smart"
},
"email": {
"type": "keyword",
"index": false
},
"name": {
"type": "object",
"properties": {
"firstName": {
"type": "keyword"
},
"lastName": {
"type": "keyword"
}
}
}
}
}
}查詢索引庫(kù)
GET /mgr
更新索引庫(kù)(索引庫(kù)禁止修改,因?yàn)樗饕龓?kù)建立倒排索引后無(wú)法修改,只能添加新字段)
PUT /mgr/_mapping
{
"properties":{
"age":{
"type":"integer"
}
}
}刪除索引庫(kù)
DELETE /mgr
DSL文檔操作
添加文檔
索引庫(kù)mgr/文檔/文檔id
POST /mgr/_doc/1
{
"info": "鋁坨坨鍵盤",
"email": "11111@gmail.com",
"name": {
"firstName": "C",
"lastName": "I"
}
}查詢文檔
GET /mgr/_doc/1
更新文檔
全量更新,刪除舊文檔,添加新文檔
如果文檔id不存在則與添加文檔功能相同
PUT /mgr/_doc/1
{
"info": "鋁坨坨鍵盤",
"email": "222@gmail.com",
"name": {
"firstName": "C",
"lastName": "I"
}
}增量更新(局部更新)
指定
_update,指定文檔doc
POST /mgr/_update/1{ "doc": { "email": "333@gmail.com" }}刪除文檔
DELETE /mgr/_doc/1
Rust客戶端操作Elasticsearch
添加Cargo.toml
elasticsearch = "8.15.0-alpha.1"
# 序列化和反序列化數(shù)據(jù)
serde = { version = "1.0.127", features = ["derive"] }
# 序列化JSON
serde_json = "1.0.128"
tokio = { version = "1", features = ["full"] }
# 異步鎖
once_cell = "1.20.2"添加環(huán)境變量.env
# 指定當(dāng)前配置文件 RUN_MODE=development
添加配置settings\development.toml
debug = true # 指定開(kāi)發(fā)環(huán)境配置 profile = "development" [es] host = "127.0.0.1"
獲取配置config\es.rs
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone)]
pub struct EsConfig {
host: String,
port: u16,
}
impl EsConfig {
// 獲取redis連接地址
pub fn get_url(&self) -> String {
format!("http://{host}:{port}", host = self.host, port = self.port)
}
}將配置存放到AppConfig
#[derive(Debug, Deserialize, Clone)]
pub struct AppConfig {
pub es:EsConfig,
}
impl AppConfig {
pub fn read(env_src: Environment) -> Result<Self, config::ConfigError> {
// 獲取配置文件目錄
let config_dir = get_settings_dir()?;
info!("config_dir: {:#?}", config_dir);
// 獲取配置文件環(huán)境
let run_mode = std::env::var("RUN_MODE")
.map(|env| Profile::from_str(&env).map_err(|e| ConfigError::Message(e.to_string())))
.unwrap_or_else(|_e| Ok(Profile::Dev))?;
// 當(dāng)前配置文件名
let profile_filename = format!("{run_mode}.toml");
// 獲取配置
let config = config::Config::builder()
// 添加默認(rèn)配置
.add_source(config::File::from(config_dir.join("default.toml")))
// 添加自定義前綴配置
.add_source(config::File::from(config_dir.join(profile_filename)))
// 添加環(huán)境變量
.add_source(env_src)
.build()?;
info!("Successfully read config profile: {run_mode}.");
// 反序列化
config.try_deserialize()
}
}
// 獲取配置文件目錄
pub fn get_settings_dir() -> Result<std::path::PathBuf, ConfigError> {
Ok(get_project_root()
.map_err(|e| ConfigError::Message(e.to_string()))?
.join("settings"))
}
#[cfg(test)]
mod tests {
use crate::config::profile::Profile;
use self::env::get_env_source;
pub use super::*;
#[test]
pub fn test_profile_to_string() {
// 設(shè)置dev模式
let profile: Profile = Profile::try_from("development").unwrap();
println!("profile: {:#?}", profile);
assert_eq!(profile, Profile::Dev)
}
#[test]
pub fn test_read_app_config_prefix() {
// 讀取配置
let config = AppConfig::read(get_env_source("APP")).unwrap();
println!("config: {:#?}", config);
}
}將配置存放到全局constant\mod.rs
// 環(huán)境變量前綴
pub const ENV_PREFIX: &str = "APP";
// 配置
pub static CONFIG: Lazy<crate::config::AppConfig> = Lazy::new(||
crate::config::AppConfig::read(get_env_source(ENV_PREFIX)).unwrap()
);加載配置文件client\builder.rs
use crate::config::AppConfig;
// 傳輸配置文件到客戶端
pub trait ClientBuilder: Sized {
fn build_from_config(config: &AppConfig) -> Result<Self,InfraError>;
}Es客戶端client\es.rs
InfraError為自定義錯(cuò)誤,請(qǐng)修改為你想要的錯(cuò)誤,如標(biāo)準(zhǔn)庫(kù)錯(cuò)誤
// 類型別名
pub type EsClient = Arc<Elasticsearch>;
// 加載配置文件
pub trait EsClientExt: Sized {
fn build_from_config(config: &AppConfig) -> impl Future<Output = Result<Self, InfraError>>;
}
impl EsClientExt for EsClient {
async fn build_from_config(config: &AppConfig) -> Result<Self, InfraError> {
// 1、使用single_node方式創(chuàng)建client
// let transport = Transport::single_node(&config.es.get_url()).unwrap();
// let client = Elasticsearch::new(transport);
// Ok(Arc::new(client))
// 2、使用builder方式創(chuàng)建client,可以添加多個(gè)url
let url = config.es.get_url();
let url_parsed = url
.parse::<elasticsearch::http::Url>()
.map_err(|_| InfraError::OtherError("url err".to_string()))?;
let conn_pool = SingleNodeConnectionPool::new(url_parsed);
let transport = TransportBuilder::new(conn_pool)
.disable_proxy()
.build()
.map_err(|_| InfraError::OtherError("transport err".to_string()))?;
let client = Elasticsearch::new(transport);
Ok(Arc::new(client))
}
}測(cè)試client\es.rs,所有請(qǐng)求在body()中定義DSL語(yǔ)句,通過(guò)send()發(fā)送
#[cfg(test)]
mod tests {
use elasticsearch::{ cat::CatIndicesParts, DeleteParts, IndexParts, UpdateParts };
use serde_json::json;
use super::*;
use crate::constant::CONFIG;
#[tokio::test]
async fn test_add_document() {
let client_result = EsClient::build_from_config(&CONFIG).await;
assert!(client_result.is_ok());
let client = client_result.unwrap();
let response = client
.index(IndexParts::IndexId("mgr", "1"))
.body(
json!({
"id": 1,
"user": "cci",
"post_date": "2024-01-15T00:00:00Z",
"message": "Trying out Elasticsearch, so far so good?"
})
)
.send().await;
assert!(response.is_ok());
let response = response.unwrap();
assert!(response.status_code().is_success());
}
#[tokio::test]
async fn test_get_indices() {
let client_result = EsClient::build_from_config(&CONFIG).await;
assert!(client_result.is_ok());
let client = client_result.unwrap();
let get_index_response = client
.cat()
.indices(CatIndicesParts::Index(&["*"]))
.send().await;
assert!(get_index_response.is_ok());
}
#[tokio::test]
async fn test_update_document() {
let client_result = EsClient::build_from_config(&CONFIG).await;
assert!(client_result.is_ok());
let client = client_result.unwrap();
let update_response = client
.update(UpdateParts::IndexId("mgr", "1"))
.body(
json!({
"doc": {
"message": "Updated message"
}
})
)
.send().await;
assert!(update_response.is_ok());
let update_response = update_response.unwrap();
assert!(update_response.status_code().is_success());
}
#[tokio::test]
async fn test_delete_document() {
let client_result = EsClient::build_from_config(&CONFIG).await;
assert!(client_result.is_ok());
let client = client_result.unwrap();
let delete_response = client.delete(DeleteParts::IndexId("mgr", "1")).send().await;
assert!(delete_response.is_ok());
let delete_response = delete_response.unwrap();
assert!(delete_response.status_code().is_success());
}
}使用流程
// 1、創(chuàng)建client
let client_result = EsClient::build_from_config(&CONFIG).await;
assert!(client_result.is_ok());
let client = client_result.unwrap();
// 2、定義DSL語(yǔ)句
let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);
// 添加文檔
body.push(json!({"index": {"_id": "1"}}).into());
body.push(
json!({
"id": 1,
"user": "kimchy",
"post_date": "2009-11-15T00:00:00Z",
"message": "Trying out Elasticsearch, so far so good?"
}).into()
);
// 添加文檔
body.push(json!({"index": {"_id": "2"}}).into());
body.push(
json!({
"id": 2,
"user": "forloop",
"post_date": "2020-01-08T00:00:00Z",
"message": "Bulk indexing with the rust client, yeah!"
}).into()
);
// 3、發(fā)送請(qǐng)求
let response = client.bulk(BulkParts::Index("mgr")).body(body).send().await.unwrap();項(xiàng)目地址:https://github.com/VCCICCV/MGR
分析數(shù)據(jù)結(jié)構(gòu)
mapping要考慮的問(wèn)題:字段名、數(shù)據(jù)類型、是否參與搜索(建立倒排索引"index":false,默認(rèn)true)、是否分詞(參與搜索的字段,text分詞,keyword、數(shù)據(jù)類型不分詞)、分詞器
- 地理坐標(biāo):
- geo_point:由經(jīng)度(longitude)和緯度(latitude)確定的一個(gè)點(diǎn),如
[ 13.400544, 52.530286 ] - geo_shape:由多個(gè)
geo_point組成的幾何圖形,如一條線[[13.0, 53.0], [14.0, 52.0]]
- geo_point:由經(jīng)度(longitude)和緯度(latitude)確定的一個(gè)點(diǎn),如
copy_to:將多個(gè)字段組合為一個(gè)字段進(jìn)行索引 Rust客戶端操作索引庫(kù)
生產(chǎn)環(huán)境不要使用
unwrap()
這里演示在請(qǐng)求正文中操作,使用send()
Transport支持的方法Method:
Get:獲取資源Put:創(chuàng)建或更新資源(全量更新)Post:創(chuàng)建或更新資源(部分更新)Delete:刪除資源Head:獲取頭信息
send()請(qǐng)求正文需要包含的參數(shù):
method:必須path:必須headers:必須query_string:可選body:可選timeout:可選
添加索引庫(kù)
#[tokio::test]
async fn test_create_index() {
// 1、創(chuàng)建client
let client_result = EsClient::build_from_config(&CONFIG).await;
assert!(client_result.is_ok());
let client = client_result.unwrap();
// 2、定義DSL語(yǔ)句
let index_name = "mgr";
let index_definition =
json!({
"mappings":{
"properties":{
"age":{
"type":"integer"
}
}
}
});
let body = Some(serde_json::to_vec(&index_definition).unwrap());
let path = format!("/{}", index_name);
let headers = HeaderMap::new();
let query_string = None;
let timeout = None;
let method = Method::Put;
// 3、發(fā)送請(qǐng)求
let response = client.send::<Vec<u8>, ()>(
method,
&path,
headers,
query_string,
body,
timeout
).await;
assert!(response.is_ok());
let response = response.unwrap();
assert_eq!(response.status_code().is_success(), true);
}你也可以將其簡(jiǎn)化
#[tokio::test]
async fn test_create_index() {
// 1、創(chuàng)建client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、定義DSL
let index_definition =
json!({
"mappings":{
"properties":{
"age":{
"type":"integer"
}
}
}
});
// 3、發(fā)送請(qǐng)求
let response = client.send::<Vec<u8>, ()>(
Method::Put,
format!("/mgr").as_str(),
HeaderMap::new(),
None,
Some(index_definition.to_string().as_bytes().to_vec()),
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
assert_eq!(response.status_code().is_success(), true);
}查詢索引庫(kù)是否存在
#[tokio::test]
async fn test_query_index() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、定義查詢 DSL 語(yǔ)句
let query = json!({
"query": {
"match_all": {}
}
});
// 3、發(fā)送請(qǐng)求
let response = client.send::<Vec<u8>, ()>(
Method::Get,
format!("/mgr/_search").as_str(),
HeaderMap::new(),
None,
Some(query.to_string().as_bytes().to_vec()),
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
println!("{:?}", response);
assert_eq!(response.status_code().is_success(), true);
}也可以不定義DSL查詢
#[tokio::test]
async fn test_query_index2() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、發(fā)送請(qǐng)求
let response = client.send::<Vec<u8>, ()>(
Method::Get,
format!("/mgr").as_str(),
HeaderMap::new(),
None,
None,
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
println!("{:?}", response);
assert_eq!(response.status_code().is_success(), true);
}更新索引庫(kù)
#[tokio::test]
async fn test_update_index() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、定義查詢 DSL 語(yǔ)句
let update_content = json!({
"properties":{
"age":{
"type":"integer"
}
}
});
// 3、發(fā)送請(qǐng)求
let response = client.send::<Vec<u8>, ()>(
Method::Put,
format!("/mgr/_mapping").as_str(),
HeaderMap::new(),
None,
Some(update_content.to_string().as_bytes().to_vec()),
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
println!("{:?}", response);
assert_eq!(response.status_code().is_success(), true);
}刪除索引庫(kù)
#[tokio::test]
async fn test_delete_index() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、發(fā)送請(qǐng)求
let response = client.send::<(), ()>(
Method::Delete,
format!("/mgr").as_str(),
HeaderMap::new(),
None,
None,
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
assert_eq!(response.status_code().is_success(), true);
}Rust客戶端操作文檔
添加文檔
#[tokio::test]
async fn test_create_doc() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、定義查詢 DSL 語(yǔ)句
let doc_content =
json!({
"id": "1",
"user": "kimchy",
"post_date": "2009-11-15T00:00:00Z",
"message": "Trying out Elasticsearch, so far so good?"
});
// 3、發(fā)送請(qǐng)求
let response = client.send::<Vec<u8>, ()>(
Method::Post,
format!("/mgr/_doc/1").as_str(),
HeaderMap::new(),
None,
Some(doc_content.to_string().as_bytes().to_vec()),
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
println!("{:?}", response);
assert_eq!(response.status_code().is_success(), true);
}查詢文檔是否存在
#[tokio::test]
async fn test_get_doc() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、發(fā)送請(qǐng)求
let response = client.send::<Vec<u8>, ()>(
Method::Get,
format!("/mgr/_doc/1").as_str(),
HeaderMap::new(),
None,
None,
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
println!("{:?}", response);
assert_eq!(response.status_code().is_success(), true);
}更新文檔
#[tokio::test]
async fn test_update_doc() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、定義查詢 DSL 語(yǔ)句
let doc_content =
json!({
"doc": {
"message": "Updated message"
}
});
// 3、發(fā)送請(qǐng)求
let response = client.send::<Vec<u8>, ()>(
Method::Post,
format!("/mgr/_update/1").as_str(),
HeaderMap::new(),
None,
Some(doc_content.to_string().as_bytes().to_vec()),
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
println!("{:?}", response);
assert_eq!(response.status_code().is_success(), true);
}刪除文檔
#[tokio::test]
async fn test_delete_doc() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2、發(fā)送請(qǐng)求
let response = client.send::<Vec<u8>, ()>(
Method::Delete,
format!("/mgr/_doc/1").as_str(),
HeaderMap::new(),
None,
None,
None
).await;
assert!(response.is_ok());
let response = response.unwrap();
println!("{:?}", response);
assert_eq!(response.status_code().is_success(), true);
}批量添加文檔
#[tokio::test]
async fn test_bulk_add_to_mgr() {
// 1、創(chuàng)建client
let client_result = EsClient::build_from_config(&CONFIG).await;
assert!(client_result.is_ok());
let client = client_result.unwrap();
// 2、定義DSL語(yǔ)句
let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);
// 添加第一個(gè)操作和文檔
body.push(json!({"index": {"_id": "1"}}).into());
body.push(
json!({
"id": 1,
"user": "kimchy",
"post_date": "2009-11-15T00:00:00Z",
"message": "Trying out Elasticsearch, so far so good?"
}).into()
);
// 添加第二個(gè)操作和文檔
body.push(json!({"index": {"_id": "2"}}).into());
body.push(
json!({
"id": 2,
"user": "forloop",
"post_date": "2020-01-08T00:00:00Z",
"message": "Bulk indexing with the rust client, yeah!"
}).into()
);
// 3、發(fā)送請(qǐng)求
let response = client.bulk(BulkParts::Index("mgr")).body(body).send().await.unwrap();
assert!(response.status_code().is_success());
}Rust客戶端操作搜索
這里演示在請(qǐng)求體body中進(jìn)行API調(diào)用
- 查詢所有:查出所有數(shù)據(jù)
- 全文檢索查詢(full text):利用分詞器對(duì)內(nèi)容分詞,從倒排索引庫(kù)中查詢
- match_query
- multi_match_query
- 精確查詢:根據(jù)精確值查詢,如integer、keyword、日期
- id
- range:根據(jù)值的范圍查詢
- term:根據(jù)詞條精確值查詢
- 地理坐標(biāo)查詢(geo):根據(jù)經(jīng)緯度查詢
- geo_distance:查詢geo_point指定距離范圍內(nèi)的所有文檔
- geo_bounding_box:查詢geo_point值落在某個(gè)矩形范圍內(nèi)的所有文檔
- 復(fù)合查詢(compound):將上述條件組合起來(lái)
查詢所有
默認(rèn)10條
#[tokio::test]
async fn test_search_match_all() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2. 執(zhí)行搜索
let response = client
.search(SearchParts::Index(&["mgr"]))
.from(0)
.size(5)
.body(
json!({
"query": {
"match_all": {
}
}
})
)
.send().await
.unwrap();
// 3. 解析響應(yīng)
let response_body = response.json::<Value>().await.unwrap();
// 搜索耗時(shí)
let took = response_body["took"].as_i64().unwrap();
println!("took: {}ms", took);
// 搜索結(jié)果
for hit in response_body["hits"]["hits"].as_array().unwrap() {
println!("{:?}", hit["_source"]);
}
}等價(jià)于
GET /mgr/_search
{
"query": {
"match_all": {}
}
}全文搜索
message為文檔中的字段
#[tokio::test]
async fn test_search_match() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2. 執(zhí)行搜索
let response = client
.search(SearchParts::Index(&["mgr"]))
.from(0)
.size(5)
.body(
json!({
"query": {
"match": {
"message": "good"
}
}
})
)
.send().await
.unwrap();
// 3. 解析響應(yīng)
let response_body = response.json::<Value>().await.unwrap();
// 搜索耗時(shí)
let took = response_body["took"].as_i64().unwrap();
println!("took: {}ms", took);
// 搜索結(jié)果
for hit in response_body["hits"]["hits"].as_array().unwrap() {
println!("{:?}", hit["_source"]);
}
}相當(dāng)于
GET /mgr/_search
{
"query": {
"match": {
"message": "good"
}
}
}多字段查詢
多字段查詢效率低,一般在創(chuàng)建時(shí)使用copy_to到一個(gè)字段中
#[tokio::test]
async fn test_search_multi_match() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2. 執(zhí)行搜索
let response = client
.search(SearchParts::Index(&["mgr"]))
.from(0)
.size(5)
.body(
json!({
"query": {
"multi_match": {
"query": "good",
"fields": [
"message",
"user"
]
}
}
})
)
.send().await
.unwrap();
// 3. 解析響應(yīng)
let response_body = response.json::<Value>().await.unwrap();
// 搜索耗時(shí)
let took = response_body["took"].as_i64().unwrap();
println!("took: {}ms", took);
// 搜索結(jié)果
for hit in response_body["hits"]["hits"].as_array().unwrap() {
println!("{:?}", hit["_source"]);
}
}相當(dāng)于
GET /mgr/_search
{
"query": {
"multi_match": {
"query": "good",
"fields": [
"message",
"user"
]
}
}
}根據(jù)范圍查詢(range)
gte大于等于,lte小于等于;gt大于lt小于
#[tokio::test]
async fn test_search_range() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2. 執(zhí)行搜索
let response = client
.search(SearchParts::Index(&["mgr"]))
.from(0)
.size(5)
.body(
json!({
"query": {
"range": {
"id": {
"gte": 1,
"lte": 1
}
}
}
})
)
.send().await
.unwrap();
// 3. 解析響應(yīng)
let response_body = response.json::<Value>().await.unwrap();
// 搜索耗時(shí)
let took = response_body["took"].as_i64().unwrap();
println!("took: {}ms", took);
// 搜索結(jié)果
for hit in response_body["hits"]["hits"].as_array().unwrap() {
println!("{:?}", hit["_source"]);
}
}相當(dāng)于
GET /mgr/_search
{
"query": {
"range": {
"id": {
"gte": 1,
"lte": 1
}
}
}
}根據(jù)詞條精確查詢(term)
#[tokio::test]
async fn test_search_term() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2. 執(zhí)行搜索
let response = client
.search(SearchParts::Index(&["mgr"]))
.from(0)
.size(5)
.body(
json!({
"query": {
"term": {
"user": "kimchy"
}
}
})
)
.send().await
.unwrap();
// 3. 解析響應(yīng)
let response_body = response.json::<Value>().await.unwrap();
// 搜索耗時(shí)
let took = response_body["took"].as_i64().unwrap();
println!("took: {}ms", took);
// 搜索結(jié)果
for hit in response_body["hits"]["hits"].as_array().unwrap() {
println!("{:?}", hit["_source"]);
}
}相當(dāng)于
GET /mgr/_search
{
"query": {
"term": {
"user": "kimchy"
}
}
}根據(jù)地理距離查詢
GET /mgr/_search
{
"query": {
"geo_distance": {
"distance": "100km",
"location": "31.04, 45.12"
}
}
}根據(jù)指定矩形范圍查詢
左上經(jīng)緯度與右下經(jīng)緯度
geo為文檔中的字段
GET /mgr/_search
{
"query": {
"geo_bounding_box": {
"geo": {
"top_left": {
"lon": 124.45,
"lat": 32.11
},
"bottom_right": {
"lon": 125.12,
"lat": 30.21
}
}
}
}
}復(fù)合查詢
查詢時(shí)文檔會(huì)對(duì)搜索詞條的關(guān)聯(lián)度打分_score,返回結(jié)果時(shí)按照降序排列
關(guān)聯(lián)度計(jì)算方法
- TF-IDF算法(ES5.0之前)
TF(詞條頻率)= 詞條出現(xiàn)次數(shù)/文檔中詞條總數(shù)
IDF(逆文檔頻率)=log(文檔總數(shù)/包含詞條的文檔總數(shù))
score = ∑(??=1,??)(TF*IDF):將詞條頻率與逆文檔頻率相乘再求和
- BM25算法(ES5.0之后)
默認(rèn)采用BM25算法:考慮了TF、IDF、文檔長(zhǎng)度等因素,能夠平衡長(zhǎng)短文的關(guān)聯(lián)度

function_score修改關(guān)聯(lián)度
指定文檔和算分函數(shù)
GET /mgr/_search
{
"query": {
"function_score": {
"query": {
"match": {// 查詢方法
"message": "good"
}
},
"functions": [ // 算分函數(shù)
{
"filter": {// 只有符合過(guò)濾條件的才被計(jì)算
"term": {// 根據(jù)詞條精確查詢
"id": 1
}
},
"weight": 3 // 指定加權(quán)函數(shù)
}
],
// 加權(quán)模式:相乘
"boost_mode": "multiply"
}
}
}weight:給定常量值,還可以指定以下值field_value_factor:用文檔中的指定字段值作為函數(shù)結(jié)果random_score:隨機(jī)生成一個(gè)值script_score:自定義計(jì)算公式boost_mode:加權(quán)模式,multiply與原來(lái)的_score相乘,還可以配置:replace:替換原來(lái)的_scoresum:求和avg:取平均值min:取最小值max:取最大值
相當(dāng)于
#[tokio::test]
async fn test_function_score_query() {
// 1、創(chuàng)建 client
let client = EsClient::build_from_config(&CONFIG).await.unwrap();
// 2. 執(zhí)行搜索
let response = client
.search(SearchParts::Index(&["mgr"]))
.from(0)
.size(5)
.body(
json!({
"query": {
"function_score": {
"query": {
"match": {// 查詢方法
"message": "good"
}
},
"functions": [ // 算分函數(shù)
{
"filter": {// 只有符合過(guò)濾條件的才被計(jì)算
"term": {// 根據(jù)詞條精確查詢
"id": 1
}
},
"weight": 3 // 指定加權(quán)函數(shù)
}
],
// 加權(quán)模式:相乘
"boost_mode": "multiply"
}
}
})
)
.send().await
.unwrap();
// 3. 解析響應(yīng)
let response_body = response.json::<Value>().await.unwrap();
// 搜索耗時(shí)
let took = response_body["took"].as_i64().unwrap();
println!("took: {}ms", took);
// 搜索結(jié)果
for hit in response_body["hits"]["hits"].as_array().unwrap() {
println!("{:?}", hit["_source"]);
}
}boolean query 布爾查詢
布爾查詢是一個(gè)或多個(gè)子句查詢的組合,組合方式有
must:必須匹配每個(gè)子查詢,類似于“與”should:選擇性匹配子查詢,類似于“或”must_not:必須不匹配,不參與算分,類似于“非”filter:必須匹配,
查詢message中包含rust,post_date不小于2020年1月1日的文檔
GET /mgr/_search
{
"query": {
"bool": {
"must": [
{
"match_phrase": {
"message": "rust"
}
}
],
"must_not": [
{
"range": {
"post_date": {
"lt": "2020-01-01T00:00:00Z"
}
}
}
]
}
}
}搜索結(jié)果處理
排序
GET /mgr/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"id": "desc"http:// ASC升序,DESC降序
}
]
}地理位置排序
GET /mgr/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"_geo_distance":{
"FIELD": {
"lat": 40,// 緯度
"lon": -70// 經(jīng)度
},
"order":"asc",// 排序方式
"unit":"km" // 單位
}
}
]
}分頁(yè)
1、from+size分頁(yè)查詢(默認(rèn)10條數(shù)據(jù))
GET /mgr/_search
{
"query": {
"match_all": {}
},
"from":1,// 分頁(yè)開(kāi)始位置
"size":10,// 期望獲取的文檔總數(shù)
"sort": [
{
"id": "desc"http:// ASC升序,DESC降序
}
]
}深度分頁(yè)問(wèn)題:一般將ES作為分布式部署,當(dāng)需要"from"=990,"size"=10查數(shù)據(jù)時(shí):
1、先從每個(gè)數(shù)據(jù)分片上查詢前1000條數(shù)據(jù)
2、將所有節(jié)點(diǎn)的結(jié)果聚合,在內(nèi)存中重新排序選出前1000條文檔
3、在這1000條文檔中選取"from"=990,"size"=10的數(shù)據(jù)
如果搜索頁(yè)數(shù)過(guò)深,或者結(jié)果集(from+size)越大,對(duì)內(nèi)存和CPU的消耗越高,因此ES設(shè)定的查詢上限是
10000
深度分頁(yè)解決方案:
2、search after分頁(yè)查詢:分頁(yè)時(shí)排序,從上一次的排序值開(kāi)始查詢下一頁(yè)文檔(只能向后查詢)
3、scroll分頁(yè)查詢:將排序數(shù)據(jù)形成快照,保存在內(nèi)存中(內(nèi)存消耗大,官方不推薦)
高亮處理
搜索鍵盤時(shí)關(guān)鍵字高亮

highlight指定高亮字段
默認(rèn)搜索字段和高亮字段匹配才高亮
GET /mgr/_search
{
"query": {
"match": {
"message":"rust"http:// 搜索message中包含rust的文檔
}
},
"highlight":{
"fields":{
"message":{// 指定高亮字段
"require_field_match":"false"http:// 搜索字段和高亮字段可以不匹配
}
}
}
}數(shù)據(jù)聚合
聚合(aggregations)可以實(shí)現(xiàn)對(duì)文檔數(shù)據(jù)的統(tǒng)計(jì)、分析、運(yùn)算,聚合分類:
- 桶(Buket):用來(lái)對(duì)數(shù)據(jù)分組
- https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket.html
- TermAggregation:按文檔字段或詞條值分組
- Date Histogram:按日期階梯分組,如一周為一組
- 度量(Metric):用于計(jì)算一些值,如最大值、最小值、平均值
- https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics.html
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Sum:求和
- Stats:同時(shí)求Max、Min、Avg、Sum等
- 管道(pipeline):以其他聚合的結(jié)果作為聚合的基礎(chǔ)
- https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-pipeline.html
桶(Buket)
Buket默認(rèn)統(tǒng)計(jì)其中的文檔數(shù)量_count,并且按照降序排序
GET /mgr/_search
{
"size":0,// 文檔大小,結(jié)果不包含文檔,只包含聚合結(jié)果
"aggs": {//指定聚合
"idAgg": {// 聚合名
"terms": {// 精確查詢
"field":"id",// 指定字段
"order":{
"_count":"asc"http:// 按升序排序
}
}
}
}
}度量(Metric)
GET /mgr/_search
{
"size":0,// 文檔大小,結(jié)果不包含文檔,只包含聚合結(jié)果
"aggs": {//指定聚合
"idAgg": {// 聚合名
"terms": {// 精確查詢
"field":"id",// 指定字段
"size":20
},
"aggs":{// 子聚合
"score_stats":{// 聚合名
"max":{//聚合類型,min、max、avg等
"field":"score"http:// 聚合字段
}
}
}
}
}
}自動(dòng)補(bǔ)全
拼音補(bǔ)全
如果你想要通過(guò)拼音補(bǔ)全,請(qǐng)下載解壓拼音分詞器上傳到/opt/es/plugins目錄然后重啟es
https://github.com/infinilabs/analysis-pinyin/releases
- 補(bǔ)全字段必須是
completion類型 - 拼音分詞需要自定義分詞器
進(jìn)行拼音分詞:創(chuàng)建索引并設(shè)置字段類型為completion,同時(shí)指定先分詞再根據(jù)詞條過(guò)濾(如果不自定義分詞器,默認(rèn)將每個(gè)漢字單獨(dú)分為拼音,所以先分詞詞條再進(jìn)行拼音處理),其他設(shè)置見(jiàn)github倉(cāng)庫(kù)
PUT /test
{
"settings": {// 設(shè)置
"analysis": {
"analyzer": {// 設(shè)置分詞器
"my_analyzer": {// 分詞器名
"filters": [
"lowercase",// 轉(zhuǎn)小寫
"stop"http:// 去停用詞
],
"tokenizer": "ik_max_word", // 分詞器
"filter": "py" // 過(guò)濾時(shí)進(jìn)行拼音
}
}
},
"filter": { // 自定義tokenizer filter
"py": { // 過(guò)濾器名稱
"type": "pinyin", // 過(guò)濾器類型,這里是pinyin
"keep_full_pinyin": false,// 是否保留完整的拼音形式
"keep_joined_full_pinyin": true,// 是否保留連接起來(lái)的完整拼音形式
"keep_original": true,// 是否保留原始的文本內(nèi)容
"limit_first_letter_length": 16,// 限制拼音首字母的長(zhǎng)度為 16
"remove_duplicated_term": true,// 是否移除重復(fù)的詞條
"none_chinese_pinyin_tokenize": false// 不對(duì)非中文字符進(jìn)行拼音分詞
}
}
},
"mappings": {
"properties": {
"user": {
"type": "completion"
}
}
}
}不進(jìn)行拼音分詞:創(chuàng)建索引并設(shè)置字段類型為completion
PUT /test
{
"mappings": {
"properties": {
"user": {
"type": "completion"
}
}
}
}添加文檔
POST /test/_doc/1
{
"id": 1,
"message": "Trying out Elasticsearch, so far so good?",
"post_date": "2009-11-15T00:00:00Z",
"user": "kimchy"
}根據(jù)關(guān)鍵字查詢補(bǔ)全
GET /test/_search
{
"suggest": {
"YOUR_SUGGESTION": {// 指定自動(dòng)補(bǔ)全查詢名字
"text": "k",// 關(guān)鍵字前綴
"completion": {// 自動(dòng)補(bǔ)全類型
"field": "user",// 補(bǔ)全字段
"skip_duplicates": true,// 是否跳過(guò)重復(fù)的建議
"size": 10 // 獲取前10條結(jié)果
}
}
}
}所有代碼地址:https://github.com/VCCICCV/MGR/blob/main/auth/infrastructure/src/client/es.rs
到此這篇關(guān)于Rust整合Elasticsearch的詳細(xì)過(guò)程(收藏)的文章就介紹到這了,更多相關(guān)Rust整合Elasticsearch內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Rust中實(shí)例化動(dòng)態(tài)對(duì)象的示例詳解
這篇文章主要為大家詳細(xì)介紹了Rust中實(shí)例化動(dòng)態(tài)對(duì)象的多種方法,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-02-02
Rust 標(biāo)準(zhǔn)庫(kù)的結(jié)構(gòu)及模塊路徑詳解
在 Rust 中,標(biāo)準(zhǔn)庫(kù)提供了一組核心功能,以幫助開(kāi)發(fā)者執(zhí)行常見(jiàn)的編程任務(wù),這個(gè)路徑樹(shù)可以作為參考,幫助你更好地理解 Rust 標(biāo)準(zhǔn)庫(kù)的結(jié)構(gòu)和模塊之間的關(guān)系,本文介紹 Rust 標(biāo)準(zhǔn)庫(kù)的結(jié)構(gòu),并提供相應(yīng)的 use 路徑,感興趣的朋友一起看看吧2024-05-05

