使用python生成大量數(shù)據寫入es數(shù)據庫并查詢操作(2)
前言 :
上一篇文章:如何使用python生成大量數(shù)據寫入es數(shù)據庫并查詢操作
模擬學生個人信息寫入es數(shù)據庫,包括姓名、性別、年齡、特點、科目、成績,創(chuàng)建時間。
方案一
在寫入數(shù)據時未提前創(chuàng)建索引mapping,而是每插入一條數(shù)據都包含了索引的信息。
示例代碼:【多線程寫入數(shù)據】【一次性寫入10000*1000條數(shù)據】 【本人親測耗時3266秒】
from elasticsearch import Elasticsearch from elasticsearch import helpers from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自負,不以自我為中心', '努力、積極、樂觀、拼搏是我的人生信條', '抗壓能力強,能夠快速適應周圍環(huán)境', '敢做敢拼,腳踏實地;做事認真負責,責任心強', '愛好所學專業(yè),樂于學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情', '主動性強,自學能力強,具有團隊合作意識,有一定組織能力', '忠實誠信,講原則,說到做到,決不推卸責任', '有自制力,做事情始終堅持有始有終,從不半途而廢', '肯學習,有問題不逃避,愿意虛心向他人學習', '愿意以謙虛態(tài)度贊揚接納優(yōu)越者,權威者', '會用100%的熱情和精力投入到工作中;平易近人', '為人誠懇,性格開朗,積極進取,適應力強、勤奮好學、腳踏實地', '有較強的團隊精神,工作積極進取,態(tài)度認真'] subjects = ['語文', '數(shù)學', '英語', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') def save_to_es(num): """ 批量寫入數(shù)據到es數(shù)據庫 :param num: :return: """ start = time.time() action = [ { "_index": "personal_info_10000000", "_type": "doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } } for i in range(10000 * num, 10000 * num + 10000) ] helpers.bulk(es, action) end = time.time() print(f"{num}耗時{end - start}s!") def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num) if __name__ == '__main__': start = time.time() queue = Queue() # 序號數(shù)據進隊列 for num in range(1000): queue.put(num) # 多線程執(zhí)行程序 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程序執(zhí)行完畢!花費時間:', end - start)
運行結果:
自動創(chuàng)建的索引mapping:
GET personal_info_10000000/_mapping { "personal_info_10000000" : { "mappings" : { "properties" : { "age" : { "type" : "long" }, "character" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "create_time" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "grade" : { "type" : "long" }, "id" : { "type" : "long" }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "sex" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "subject" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } } } }
方案二
1.順序插入5000000條數(shù)據
先創(chuàng)建索引personal_info_5000000,確定好mapping后,再插入數(shù)據。
新建索引并設置mapping信息:
PUT personal_info_5000000 { "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { "id": { "type": "long" }, "name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 32 } } }, "sex": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 8 } } }, "age": { "type": "long" }, "character": { "type": "text", "analyzer": "ik_smart", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "subject": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "grade": { "type": "long" }, "create_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" } } } }
查看新建索引信息:
GET personal_info_5000000 { "personal_info_5000000" : { "aliases" : { }, "mappings" : { "properties" : { "age" : { "type" : "long" }, "character" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } }, "analyzer" : "ik_smart" }, "create_time" : { "type" : "date", "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }, "grade" : { "type" : "long" }, "id" : { "type" : "long" }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 32 } } }, "sex" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 8 } } }, "subject" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } }, "settings" : { "index" : { "routing" : { "allocation" : { "include" : { "_tier_preference" : "data_content" } } }, "number_of_shards" : "3", "provided_name" : "personal_info_50000000", "creation_date" : "1663471072176", "number_of_replicas" : "1", "uuid" : "5DfmfUhUTJeGk1k4XnN-lQ", "version" : { "created" : "7170699" } } } } }
開始插入數(shù)據:
示例代碼: 【單線程寫入數(shù)據】【一次性寫入10000*500條數(shù)據】 【本人親測耗時7916秒】
from elasticsearch import Elasticsearch from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自負,不以自我為中心', '努力、積極、樂觀、拼搏是我的人生信條', '抗壓能力強,能夠快速適應周圍環(huán)境', '敢做敢拼,腳踏實地;做事認真負責,責任心強', '愛好所學專業(yè),樂于學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情', '主動性強,自學能力強,具有團隊合作意識,有一定組織能力', '忠實誠信,講原則,說到做到,決不推卸責任', '有自制力,做事情始終堅持有始有終,從不半途而廢', '肯學習,有問題不逃避,愿意虛心向他人學習', '愿意以謙虛態(tài)度贊揚接納優(yōu)越者,權威者', '會用100%的熱情和精力投入到工作中;平易近人', '為人誠懇,性格開朗,積極進取,適應力強、勤奮好學、腳踏實地', '有較強的團隊精神,工作積極進取,態(tài)度認真'] subjects = ['語文', '數(shù)學', '英語', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 添加程序耗時的功能 def timer(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start)) return res return wrapper @timer def save_to_es(num): """ 順序寫入數(shù)據到es數(shù)據庫 :param num: :return: """ body = { "id": num, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } # 此時若索引不存在時會新建 es.index(index="personal_info_5000000", id=num, doc_type="_doc", document=body) def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num) if __name__ == '__main__': start = time.time() queue = Queue() # 序號數(shù)據進隊列 for num in range(5000000): queue.put(num) # 多線程執(zhí)行程序 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程序執(zhí)行完畢!花費時間:', end - start)
運行結果:
2.批量插入5000000條數(shù)據
先創(chuàng)建索引personal_info_5000000_v2,確定好mapping后,再插入數(shù)據。
新建索引并設置mapping信息:
PUT personal_info_5000000_v2 { "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { "id": { "type": "long" }, "name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 32 } } }, "sex": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 8 } } }, "age": { "type": "long" }, "character": { "type": "text", "analyzer": "ik_smart", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "subject": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "grade": { "type": "long" }, "create_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" } } } }
查看新建索引信息:
GET personal_info_5000000_v2 { "personal_info_5000000_v2" : { "aliases" : { }, "mappings" : { "properties" : { "age" : { "type" : "long" }, "character" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } }, "analyzer" : "ik_smart" }, "create_time" : { "type" : "date", "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }, "grade" : { "type" : "long" }, "id" : { "type" : "long" }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 32 } } }, "sex" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 8 } } }, "subject" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } }, "settings" : { "index" : { "routing" : { "allocation" : { "include" : { "_tier_preference" : "data_content" } } }, "number_of_shards" : "3", "provided_name" : "personal_info_5000000_v2", "creation_date" : "1663485323617", "number_of_replicas" : "1", "uuid" : "XBPaDn_gREmAoJmdRyBMAA", "version" : { "created" : "7170699" } } } } }
批量插入數(shù)據:
通過elasticsearch模塊導入helper,通過helper.bulk來批量處理大量的數(shù)據。首先將所有的數(shù)據定義成字典形式,各字段含義如下:
- _index對應索引名稱,并且該索引必須存在。
- _type對應類型名稱。
- _source對應的字典內,每一篇文檔的字段和值,可有有多個字段。
示例代碼: 【程序中途異常,寫入4714000條數(shù)據】
from elasticsearch import Elasticsearch from elasticsearch import helpers from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自負,不以自我為中心', '努力、積極、樂觀、拼搏是我的人生信條', '抗壓能力強,能夠快速適應周圍環(huán)境', '敢做敢拼,腳踏實地;做事認真負責,責任心強', '愛好所學專業(yè),樂于學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情', '主動性強,自學能力強,具有團隊合作意識,有一定組織能力', '忠實誠信,講原則,說到做到,決不推卸責任', '有自制力,做事情始終堅持有始有終,從不半途而廢', '肯學習,有問題不逃避,愿意虛心向他人學習', '愿意以謙虛態(tài)度贊揚接納優(yōu)越者,權威者', '會用100%的熱情和精力投入到工作中;平易近人', '為人誠懇,性格開朗,積極進取,適應力強、勤奮好學、腳踏實地', '有較強的團隊精神,工作積極進取,態(tài)度認真'] subjects = ['語文', '數(shù)學', '英語', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 添加程序耗時的功能 def timer(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start)) return res return wrapper @timer def save_to_es(num): """ 批量寫入數(shù)據到es數(shù)據庫 :param num: :return: """ action = [ { "_index": "personal_info_5000000_v2", "_type": "_doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } } for i in range(10000 * num, 10000 * num + 10000) ] helpers.bulk(es, action) def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num) if __name__ == '__main__': start = time.time() queue = Queue() # 序號數(shù)據進隊列 for num in range(500): queue.put(num) # 多線程執(zhí)行程序 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程序執(zhí)行完畢!花費時間:', end - start)
運行結果:
3.批量插入50000000條數(shù)據
先創(chuàng)建索引personal_info_5000000_v2,確定好mapping后,再插入數(shù)據。
此過程是在上面批量插入的前提下進行優(yōu)化,采用python生成器。
建立索引和mapping同上,直接上代碼:
示例代碼: 【程序中途異常,寫入3688000條數(shù)據】
from elasticsearch import Elasticsearch from elasticsearch import helpers from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自負,不以自我為中心', '努力、積極、樂觀、拼搏是我的人生信條', '抗壓能力強,能夠快速適應周圍環(huán)境', '敢做敢拼,腳踏實地;做事認真負責,責任心強', '愛好所學專業(yè),樂于學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情', '主動性強,自學能力強,具有團隊合作意識,有一定組織能力', '忠實誠信,講原則,說到做到,決不推卸責任', '有自制力,做事情始終堅持有始有終,從不半途而廢', '肯學習,有問題不逃避,愿意虛心向他人學習', '愿意以謙虛態(tài)度贊揚接納優(yōu)越者,權威者', '會用100%的熱情和精力投入到工作中;平易近人', '為人誠懇,性格開朗,積極進取,適應力強、勤奮好學、腳踏實地', '有較強的團隊精神,工作積極進取,態(tài)度認真'] subjects = ['語文', '數(shù)學', '英語', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 添加程序耗時的功能 def timer(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start)) return res return wrapper @timer def save_to_es(num): """ 使用生成器批量寫入數(shù)據到es數(shù)據庫 :param num: :return: """ action = ( { "_index": "personal_info_5000000_v3", "_type": "_doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } } for i in range(10000 * num, 10000 * num + 10000) ) helpers.bulk(es, action) def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num) if __name__ == '__main__': start = time.time() queue = Queue() # 序號數(shù)據進隊列 for num in range(500): queue.put(num) # 多線程執(zhí)行程序 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程序執(zhí)行完畢!花費時間:', end - start)
運行結果:
到此這篇關于使用python生成大量數(shù)據寫入es數(shù)據庫并查詢操作(2)的文章就介紹到這了,更多相關python生成 數(shù)據 內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
詳解Python prometheus_client使用方式
本文主要介紹了Python prometheus_client使用方式,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02python生成每日報表數(shù)據(Excel)并郵件發(fā)送的實例
今天小編就為大家分享一篇python生成每日報表數(shù)據(Excel)并郵件發(fā)送的實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-02-02python encrypt 實現(xiàn)AES加密的實例詳解
在本篇文章里小編給大家分享的是關于python encrypt 實現(xiàn)AES加密的實例內容,有興趣的朋友們可以參考下。2020-02-02