Python對(duì)ElasticSearch獲取數(shù)據(jù)及操作
使用Python對(duì)ElasticSearch獲取數(shù)據(jù)及操作,供大家參考,具體內(nèi)容如下
Version
Python :2.7
ElasticSearch:6.3
代碼:
#!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2018/7/4 @Author : LiuXueWen @Site : @File : ElasticSearchOperation.py @Software: PyCharm @Description: 對(duì)elasticsearch數(shù)據(jù)的操作,包括獲取數(shù)據(jù),發(fā)送數(shù)據(jù) """ import elasticsearch import json import Util_Ini_Operation class elasticsearch_data(): def __init__(self,hosts,username,password,maxsize,is_ssl): # 初始化ini操作腳本,獲取配置文件 try: # 判斷請(qǐng)求方式是否ssl加密 if is_ssl == "true": # 獲取證書地址 cert_pem = Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs") es_ssl = elasticsearch.Elasticsearch( # 地址 hosts=hosts, # 用戶名密碼 http_auth=(username,password), # 開啟ssl use_ssl=True, # 確認(rèn)有加密證書 verify_certs=True, # 對(duì)應(yīng)的加密證書地址 client_cert=cert_pem ) self.es = es_ssl elif is_ssl == "false": # 創(chuàng)建普通類型的ES客戶端 es_ordinary = elasticsearch.Elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize)) self.es = es_ordinary except Exception as e: print(e) def query_data(self,keywords_list,date): gte = "now-"+str(date) query_data = { # 查詢語(yǔ)句 "query": { "bool": { "must": [ { "query_string": { "query": keywords_list, "analyze_wildcard": True } }, { "range": { "@timestamp": { "gte": gte, "lte": "now", "format": "epoch_millis" } } } ], "must_not": [] } } } return query_data # 從es獲取數(shù)據(jù) def get_datas_by_query(self,index_name,keywords,param,date): ''' :param index_name: 索引名稱 :param keywords: 關(guān)鍵字詞,數(shù)組 :param param: 需要數(shù)據(jù)條件,例如_source :param date: 過(guò)去時(shí)間范圍,字符串格式,例如過(guò)去30分鐘內(nèi)數(shù)據(jù),"30m" :return: all_datas 返回查詢到的所有數(shù)據(jù)(已經(jīng)過(guò)param過(guò)濾) ''' all_datas = [] # 遍歷所有的查詢條件 for keywords_list in keywords: # DSL語(yǔ)句 query_data = self.query_data(keywords_list,date) res = self.es.search( index=index_name, body=query_data ) for hit in res['hits']['hits']: # 獲取指定的內(nèi)容 response = hit[param] # 添加所有數(shù)據(jù)到數(shù)據(jù)集中 all_datas.append(response) # 返回所有數(shù)據(jù)內(nèi)容 return all_datas # 當(dāng)索引不存在創(chuàng)建索引 def create_index(self,index_name): ''' :param index_name: 索引名稱 :return:如果創(chuàng)建成功返回創(chuàng)建結(jié)果信息,試過(guò)已經(jīng)存在創(chuàng)建新的index失敗返回index的名稱 ''' # 獲取索引的映射 # index_mapping = IndexMapping.index_mapping # # 判斷索引是否存在 # if self.es.indices.exists(index=index_name) is not True: # # 創(chuàng)建索引 # res = self.es.indices.create(index=index_name,body=index_mapping) # # 返回結(jié)果 # return res # else: # # 返回索引名稱 # return index_name pass # 插入指定的單條數(shù)據(jù)內(nèi)容 def insert_single_data(self,index_name,doc_type,data): ''' :param index_name: 索引名稱 :param doc_type: 文檔類型 :param data: 需要插入的數(shù)據(jù)內(nèi)容 :return: 執(zhí)行結(jié)果 ''' res = self.es.index(index=index_name,doc_type=doc_type,body=data) return res # 向ES中新增數(shù)據(jù),批量插入 def insert_datas(self,index_name): ''' :desc 通過(guò)讀取指定的文件內(nèi)容獲取需要插入的數(shù)據(jù)集 :param index_name: 索引名稱 :return: 插入成功的數(shù)據(jù)條數(shù) ''' insert_datas = [] # 判斷插入數(shù)據(jù)的索引是否存在 self.createIndex(index_name=index_name) # 獲取插入數(shù)據(jù)的文件地址 data_file_path = self.ini.get_key_value("datafile","datafilepath") # 獲取需要插入的數(shù)據(jù)集 with open(data_file_path,"r+") as data_file: # 獲取文件所有數(shù)據(jù) data_lines = data_file.readlines() for data_line in data_lines: # string to json data_line = json.loads(data_line) insert_datas.append(data_line) # 批量處理 res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True) return res # 從ES中在指定的索引中刪除指定數(shù)據(jù)(根據(jù)id判斷) def delete_data_by_id(self,index_name,doc_type,id): ''' :param index_name: 索引名稱 :param index_type: 文檔類型 :param id: 唯一標(biāo)識(shí)id :return: 刪除結(jié)果信息 ''' res = self.es.delete(index=index_name,doc_type=doc_type,id=id) return res # 根據(jù)條件刪除數(shù)據(jù) def delete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time): ''' :param index_name:索引名稱,為空查詢所有索引 :param doc_type:文檔類型,為空查詢所有文檔類型 :param param:過(guò)濾條件值 :param gt_time:時(shí)間范圍,大于該時(shí)間 :param lt_time:時(shí)間范圍,小于該時(shí)間 :return:執(zhí)行條件刪除后的結(jié)果信息 ''' # DSL語(yǔ)句 query_data = { # 查詢語(yǔ)句 "query": { "bool": { "must": [ { "query_string": { "query": param, "analyze_wildcard": True } }, { "range": { "@timestamp": { "gte": gt_time, "lte": lt_time, "format": "epoch_millis" } } } ], "must_not": [] } } } res = self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True) return res # 指定index中刪除指定時(shí)間段內(nèi)的全部數(shù)據(jù) def delete_all_datas(self,index_name,doc_type,gt_time,lt_time): ''' :param index_name:索引名稱,為空查詢所有索引 :param doc_type:文檔類型,為空查詢所有文檔類型 :param gt_time:時(shí)間范圍,大于該時(shí)間 :param lt_time:時(shí)間范圍,小于該時(shí)間 :return:執(zhí)行條件刪除后的結(jié)果信息 ''' # DSL語(yǔ)句 query_data = { # 查詢語(yǔ)句 "query": { "bool": { "must": [ { "match_all": {} }, { "range": { "@timestamp": { "gte": gt_time, "lte": lt_time, "format": "epoch_millis" } } } ], "must_not": [] } } } res = self.es.delete_by_query(index=index_name, doc_type=doc_type, body=query_data, _source=True) return res # 修改ES中指定的數(shù)據(jù) def update_data_by_id(self,index_name,doc_type,id,data): ''' :param index_name: 索引名稱 :param doc_type: 文檔類型,為空表示所有類型 :param id: 文檔唯一標(biāo)識(shí)編號(hào) :param data: 更新的數(shù)據(jù) :return: 更新結(jié)果信息 ''' res = self.es.update(index=index_name,doc_type=doc_type,id=id,body=data) return res
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
python正則表達(dá)式re.match()匹配多個(gè)字符方法的實(shí)現(xiàn)
這篇文章主要介紹了python正則表達(dá)式re.match()匹配多個(gè)字符方法的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01用python結(jié)合jieba和wordcloud實(shí)現(xiàn)詞云效果
詞云,顧名思義就是很多個(gè)單詞,然后通過(guò)出現(xiàn)的頻率或者比重之類的標(biāo)準(zhǔn)匯聚成一個(gè)云朵的樣子嘛,其實(shí)呢現(xiàn)在網(wǎng)上已經(jīng)有很多能自動(dòng)生成詞云的工具了,比如Wordle,Tagxedo等等,Python也能實(shí)現(xiàn)這樣的效果,我們通過(guò)jieba庫(kù)和wordcloud庫(kù)也能十分輕松的完成詞云的構(gòu)建2017-09-0913個(gè)最常用的Python深度學(xué)習(xí)庫(kù)介紹
這篇文章主要介紹了13個(gè)最常用的Python深度學(xué)習(xí)庫(kù)介紹,具有一定參考價(jià)值,需要的朋友可以參考下。2017-10-10python unix時(shí)間戳轉(zhuǎn)換毫秒的實(shí)現(xiàn)
Unix時(shí)間戳是一種常見(jiàn)的時(shí)間表示方式,本文主要介紹了python unix時(shí)間戳轉(zhuǎn)換毫秒的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01python實(shí)現(xiàn)飛機(jī)大戰(zhàn)項(xiàng)目
這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)飛機(jī)大戰(zhàn)項(xiàng)目,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-03-03python內(nèi)置函數(shù)map/filter/reduce詳解
在Python中,map(), filter(), 和 reduce() 是內(nèi)置的高級(jí)函數(shù)(實(shí)際是class),用于處理可迭代對(duì)象(如列表、元組等)的元素,這篇文章主要介紹了python內(nèi)置函數(shù)map/filter/reduce的相關(guān)知識(shí),需要的朋友可以參考下2024-05-05沒(méi)有安裝Python的電腦運(yùn)行Python代碼教程
你有沒(méi)有遇到過(guò)這種情況,自己辛苦碼完了代碼想發(fā)給別人運(yùn)行看效果,可是對(duì)方竟然沒(méi)安裝Python,這要怎么運(yùn)行呢?本篇文章帶你解決這個(gè)問(wèn)題,需要的朋友快來(lái)看看2021-10-10Python matplotlib畫圖時(shí)圖例說(shuō)明(legend)放到圖像外側(cè)詳解
這篇文章主要介紹了Python matplotlib畫圖時(shí)圖例說(shuō)明(legend)放到圖像外側(cè)詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-05-05