如何使用python生成大量數(shù)據(jù)寫(xiě)入es數(shù)據(jù)庫(kù)并查詢操作
前言:
模擬學(xué)生成績(jī)信息寫(xiě)入es數(shù)據(jù)庫(kù),包括姓名、性別、科目、成績(jī)。
示例代碼1:【一次性寫(xiě)入10000*1000條數(shù)據(jù)】 【本人親測(cè)耗時(shí)5100秒】
from elasticsearch import Elasticsearch from elasticsearch import helpers import random import time es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] subjects = ['語(yǔ)文', '數(shù)學(xué)', '英語(yǔ)', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] datas = [] start = time.time() # 開(kāi)始批量寫(xiě)入es數(shù)據(jù)庫(kù) # 批量寫(xiě)入數(shù)據(jù) for j in range(1000): print(j) action = [ { "_index": "grade", "_type": "doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "subject": random.choice(subjects), "grade": random.choice(grades) } } for i in range(10000 * j, 10000 * j + 10000) ] helpers.bulk(es, action) end = time.time() print('花費(fèi)時(shí)間:', end - start)
elasticsearch-head中顯示:
示例代碼2:【一次性寫(xiě)入10000*5000條數(shù)據(jù)】 【本人親測(cè)耗時(shí)23000秒】
from elasticsearch import Elasticsearch from elasticsearch import helpers import random import time es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] subjects = ['語(yǔ)文', '數(shù)學(xué)', '英語(yǔ)', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] datas = [] start = time.time() # 開(kāi)始批量寫(xiě)入es數(shù)據(jù)庫(kù) # 批量寫(xiě)入數(shù)據(jù) for j in range(5000): print(j) action = [ { "_index": "grade3", "_type": "doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "subject": random.choice(subjects), "grade": random.choice(grades) } } for i in range(10000 * j, 10000 * j + 10000) ] helpers.bulk(es, action) end = time.time() print('花費(fèi)時(shí)間:', end - start)
示例代碼3:【一次性寫(xiě)入10000*9205條數(shù)據(jù)】 【耗時(shí)過(guò)長(zhǎng)】
from elasticsearch import Elasticsearch from elasticsearch import helpers import random import time es = Elasticsearch(hosts='http://127.0.0.1:9200') names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] subjects = ['語(yǔ)文', '數(shù)學(xué)', '英語(yǔ)', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] datas = [] start = time.time() # 開(kāi)始批量寫(xiě)入es數(shù)據(jù)庫(kù) # 批量寫(xiě)入數(shù)據(jù) for j in range(9205): print(j) action = [ { "_index": "grade2", "_type": "doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "subject": random.choice(subjects), "grade": random.choice(grades) } } for i in range(10000*j, 10000*j+10000) ] helpers.bulk(es, action) end = time.time() print('花費(fèi)時(shí)間:', end - start)
查詢數(shù)據(jù)并計(jì)算各種方式的成績(jī)總分。
示例代碼4:【一次性獲取所有的數(shù)據(jù),在程序中分別計(jì)算所耗的時(shí)間】
from elasticsearch import Elasticsearch import time def search_data(es, size=10): query = { "query": { "match_all": {} } } res = es.search(index='grade', body=query, size=size) # print(res) return res if __name__ == '__main__': start = time.time() es = Elasticsearch(hosts='http://192.168.1.1:9200') # print(es) size = 10000 res = search_data(es, size) # print(type(res)) # total = res['hits']['total']['value'] # print(total) all_source = [] for i in range(size): source = res['hits']['hits'][i]['_source'] all_source.append(source) # print(source) # 統(tǒng)計(jì)查詢出來(lái)的所有學(xué)生的所有課程的所有成績(jī)的總成績(jī) start1 = time.time() all_grade = 0 for data in all_source: all_grade += int(data['grade']) print('所有學(xué)生總成績(jī)之和:', all_grade) end1 = time.time() print("耗時(shí):", end1 - start1) # 統(tǒng)計(jì)查詢出來(lái)的每個(gè)學(xué)生的所有課程的所有成績(jī)的總成績(jī) start2 = time.time() names1 = [] all_name_grade = {} for data in all_source: if data['name'] in names1: all_name_grade[data['name']] += data['grade'] else: names1.append(data['name']) all_name_grade[data['name']] = data['grade'] print(all_name_grade) end2 = time.time() print("耗時(shí):", end2 - start2) # 統(tǒng)計(jì)查詢出來(lái)的每個(gè)學(xué)生的每門課程的所有成績(jī)的總成績(jī) start3 = time.time() names2 = [] subjects = [] all_name_all_subject_grade = {} for data in all_source: if data['name'] in names2: if all_name_all_subject_grade[data['name']].get(data['subject']): all_name_all_subject_grade[data['name']][data['subject']] += data['grade'] else: all_name_all_subject_grade[data['name']][data['subject']] = data['grade'] else: names2.append(data['name']) all_name_all_subject_grade[data['name']] = {} all_name_all_subject_grade[data['name']][data['subject']] = data['grade'] print(all_name_all_subject_grade) end3 = time.time() print("耗時(shí):", end3 - start3) end = time.time() print('總耗時(shí):', end - start)
運(yùn)行結(jié)果:
在示例代碼4中當(dāng)把size由10000改為 2000000時(shí),運(yùn)行效果如下所示:
在項(xiàng)目中一般不用上述代碼4中所統(tǒng)計(jì)成績(jī)的方法,面對(duì)大量的數(shù)據(jù)是比較耗時(shí)的,要使用es中的聚合查詢。計(jì)算數(shù)據(jù)中所有成績(jī)之和。
示例代碼5:【使用普通計(jì)算方法和聚類方法做對(duì)比驗(yàn)證】
from elasticsearch import Elasticsearch import time def search_data(es, size=10): query = { "query": { "match_all": {} } } res = es.search(index='grade', body=query, size=size) # print(res) return res def search_data2(es, size=10): query = { "aggs": { "all_grade": { "terms": { "field": "grade", "size": 1000 } } } } res = es.search(index='grade', body=query, size=size) # print(res) return res if __name__ == '__main__': start = time.time() es = Elasticsearch(hosts='http://127.0.0.1:9200') size = 2000000 res = search_data(es, size) all_source = [] for i in range(size): source = res['hits']['hits'][i]['_source'] all_source.append(source) # print(source) # 統(tǒng)計(jì)查詢出來(lái)的所有學(xué)生的所有課程的所有成績(jī)的總成績(jī) start1 = time.time() all_grade = 0 for data in all_source: all_grade += int(data['grade']) print('200萬(wàn)數(shù)據(jù)所有學(xué)生總成績(jī)之和:', all_grade) end1 = time.time() print("耗時(shí):", end1 - start1) end = time.time() print('200萬(wàn)數(shù)據(jù)總耗時(shí):', end - start) # 聚合操作 start_aggs = time.time() es = Elasticsearch(hosts='http://127.0.0.1:9200') # size = 2000000 size = 0 res = search_data2(es, size) # print(res) aggs = res['aggregations']['all_grade']['buckets'] print(aggs) sum = 0 for agg in aggs: sum += (agg['key'] * agg['doc_count']) print('1000萬(wàn)數(shù)據(jù)總成績(jī)之和:', sum) end_aggs = time.time() print('1000萬(wàn)數(shù)據(jù)總耗時(shí):', end_aggs - start_aggs)
運(yùn)行結(jié)果:
計(jì)算數(shù)據(jù)中每個(gè)同學(xué)的各科總成績(jī)之和。
示例代碼6: 【子聚合】【先分組,再計(jì)算】
from elasticsearch import Elasticsearch import time def search_data(es, size=10): query = { "query": { "match_all": {} } } res = es.search(index='grade', body=query, size=size) # print(res) return res def search_data2(es): query = { "size": 0, "aggs": { "all_names": { "terms": { "field": "name.keyword", "size": 10 }, "aggs": { "total_grade": { "sum": { "field": "grade" } } } } } } res = es.search(index='grade', body=query) # print(res) return res if __name__ == '__main__': start = time.time() es = Elasticsearch(hosts='http://127.0.0.1:9200') size = 2000000 res = search_data(es, size) all_source = [] for i in range(size): source = res['hits']['hits'][i]['_source'] all_source.append(source) # print(source) # 統(tǒng)計(jì)查詢出來(lái)的每個(gè)學(xué)生的所有課程的所有成績(jī)的總成績(jī) start2 = time.time() names1 = [] all_name_grade = {} for data in all_source: if data['name'] in names1: all_name_grade[data['name']] += data['grade'] else: names1.append(data['name']) all_name_grade[data['name']] = data['grade'] print(all_name_grade) end2 = time.time() print("200萬(wàn)數(shù)據(jù)耗時(shí):", end2 - start2) end = time.time() print('200萬(wàn)數(shù)據(jù)總耗時(shí):', end - start) # 聚合操作 start_aggs = time.time() es = Elasticsearch(hosts='http://127.0.0.1:9200') res = search_data2(es) # print(res) aggs = res['aggregations']['all_names']['buckets'] # print(aggs) dic = {} for agg in aggs: dic[agg['key']] = agg['total_grade']['value'] print('1000萬(wàn)數(shù)據(jù):', dic) end_aggs = time.time() print('1000萬(wàn)數(shù)據(jù)總耗時(shí):', end_aggs - start_aggs)
運(yùn)行結(jié)果:
計(jì)算數(shù)據(jù)中每個(gè)同學(xué)的每科成績(jī)之和。
示例代碼7:
from elasticsearch import Elasticsearch import time def search_data(es, size=10): query = { "query": { "match_all": {} } } res = es.search(index='grade', body=query, size=size) # print(res) return res def search_data2(es): query = { "size": 0, "aggs": { "all_names": { "terms": { "field": "name.keyword", "size": 10 }, "aggs": { "all_subjects": { "terms": { "field": "subject.keyword", "size": 5 }, "aggs": { "total_grade": { "sum": { "field": "grade" } } } } } } } } res = es.search(index='grade', body=query) # print(res) return res if __name__ == '__main__': start = time.time() es = Elasticsearch(hosts='http://127.0.0.1:9200') size = 2000000 res = search_data(es, size) all_source = [] for i in range(size): source = res['hits']['hits'][i]['_source'] all_source.append(source) # print(source) # 統(tǒng)計(jì)查詢出來(lái)的每個(gè)學(xué)生的每門課程的所有成績(jī)的總成績(jī) start3 = time.time() names2 = [] subjects = [] all_name_all_subject_grade = {} for data in all_source: if data['name'] in names2: if all_name_all_subject_grade[data['name']].get(data['subject']): all_name_all_subject_grade[data['name']][data['subject']] += data['grade'] else: all_name_all_subject_grade[data['name']][data['subject']] = data['grade'] else: names2.append(data['name']) all_name_all_subject_grade[data['name']] = {} all_name_all_subject_grade[data['name']][data['subject']] = data['grade'] print('200萬(wàn)數(shù)據(jù):', all_name_all_subject_grade) end3 = time.time() print("耗時(shí):", end3 - start3) end = time.time() print('200萬(wàn)數(shù)據(jù)總耗時(shí):', end - start) # 聚合操作 start_aggs = time.time() es = Elasticsearch(hosts='http://127.0.0.1:9200') res = search_data2(es) # print(res) aggs = res['aggregations']['all_names']['buckets'] # print(aggs) dic = {} for agg in aggs: dic[agg['key']] = {} for sub in agg['all_subjects']['buckets']: dic[agg['key']][sub['key']] = sub['total_grade']['value'] print('1000萬(wàn)數(shù)據(jù):', dic) end_aggs = time.time() print('1000萬(wàn)數(shù)據(jù)總耗時(shí):', end_aggs - start_aggs)
運(yùn)行結(jié)果:
在上面查詢計(jì)算示例代碼中,當(dāng)使用含有1000萬(wàn)數(shù)據(jù)的索引grade時(shí),普通方法查詢計(jì)算是比較耗時(shí)的,使用聚合查詢能夠大大節(jié)約大量時(shí)間。當(dāng)面對(duì)9205萬(wàn)數(shù)據(jù)的索引grade2時(shí),這時(shí)使用普通計(jì)算方法所消耗的時(shí)間太大了,在線上開(kāi)發(fā)環(huán)境中是不可用的,所以必須使用聚合方法來(lái)計(jì)算。
示例代碼8:
from elasticsearch import Elasticsearch import time def search_data(es): query = { "size": 0, "aggs": { "all_names": { "terms": { "field": "name.keyword", "size": 10 }, "aggs": { "all_subjects": { "terms": { "field": "subject.keyword", "size": 5 }, "aggs": { "total_grade": { "sum": { "field": "grade" } } } } } } } } res = es.search(index='grade2', body=query) # print(res) return res if __name__ == '__main__': # 聚合操作 start_aggs = time.time() es = Elasticsearch(hosts='http://127.0.0.1:9200') res = search_data(es) # print(res) aggs = res['aggregations']['all_names']['buckets'] # print(aggs) dic = {} for agg in aggs: dic[agg['key']] = {} for sub in agg['all_subjects']['buckets']: dic[agg['key']][sub['key']] = sub['total_grade']['value'] print('9205萬(wàn)數(shù)據(jù):', dic) end_aggs = time.time() print('9205萬(wàn)數(shù)據(jù)總耗時(shí):', end_aggs - start_aggs)
運(yùn)行結(jié)果:
注意:寫(xiě)查詢語(yǔ)句時(shí)建議使用kibana去寫(xiě),然后復(fù)制查詢語(yǔ)句到代碼中,kibana會(huì)提示查詢語(yǔ)句。
到此這篇關(guān)于如何使用python生成大量數(shù)據(jù)寫(xiě)入es數(shù)據(jù)庫(kù)并查詢操作的文章就介紹到這了,更多相關(guān)python es 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
快速排序的算法思想及Python版快速排序的實(shí)現(xiàn)示例
快速排序算法來(lái)源于分治法的思想策略,這里我們將來(lái)為大家簡(jiǎn)單解析一下快速排序的算法思想及Python版快速排序的實(shí)現(xiàn)示例:2016-07-07深入淺析Python中l(wèi)ist的復(fù)制及深拷貝與淺拷貝
這篇文章主要介紹了Python中l(wèi)ist的復(fù)制及深拷貝與淺拷貝及區(qū)別解析 ,需要的朋友可以參考下2018-09-09Python中關(guān)于浮點(diǎn)數(shù)的冷知識(shí)
這篇文章主要給大家介紹了Python中關(guān)于浮點(diǎn)數(shù)的冷知識(shí),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Python具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09Python3爬蟲(chóng)之自動(dòng)查詢天氣并實(shí)現(xiàn)語(yǔ)音播報(bào)
這篇文章主要介紹了Python3爬蟲(chóng)之自動(dòng)查詢天氣并實(shí)現(xiàn)語(yǔ)音播報(bào),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-02-02