使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作(2)
前言 :
上一篇文章:如何使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作
模擬學(xué)生個人信息寫入es數(shù)據(jù)庫,包括姓名、性別、年齡、特點(diǎn)、科目、成績,創(chuàng)建時間。
方案一
在寫入數(shù)據(jù)時未提前創(chuàng)建索引mapping,而是每插入一條數(shù)據(jù)都包含了索引的信息。
示例代碼:【多線程寫入數(shù)據(jù)】【一次性寫入10000*1000條數(shù)據(jù)】 【本人親測耗時3266秒】
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自負(fù),不以自我為中心',
'努力、積極、樂觀、拼搏是我的人生信條',
'抗壓能力強(qiáng),能夠快速適應(yīng)周圍環(huán)境',
'敢做敢拼,腳踏實地;做事認(rèn)真負(fù)責(zé),責(zé)任心強(qiáng)',
'愛好所學(xué)專業(yè),樂于學(xué)習(xí)新知識;對工作有責(zé)任心;踏實,熱情,對生活充滿激情',
'主動性強(qiáng),自學(xué)能力強(qiáng),具有團(tuán)隊合作意識,有一定組織能力',
'忠實誠信,講原則,說到做到,決不推卸責(zé)任',
'有自制力,做事情始終堅持有始有終,從不半途而廢',
'肯學(xué)習(xí),有問題不逃避,愿意虛心向他人學(xué)習(xí)',
'愿意以謙虛態(tài)度贊揚(yáng)接納優(yōu)越者,權(quán)威者',
'會用100%的熱情和精力投入到工作中;平易近人',
'為人誠懇,性格開朗,積極進(jìn)取,適應(yīng)力強(qiáng)、勤奮好學(xué)、腳踏實地',
'有較強(qiáng)的團(tuán)隊精神,工作積極進(jìn)取,態(tài)度認(rèn)真']
subjects = ['語文', '數(shù)學(xué)', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def save_to_es(num):
"""
批量寫入數(shù)據(jù)到es數(shù)據(jù)庫
:param num:
:return:
"""
start = time.time()
action = [
{
"_index": "personal_info_10000000",
"_type": "doc",
"_id": i,
"_source": {
"id": i,
"name": random.choice(names),
"sex": random.choice(sexs),
"age": random.choice(age),
"character": random.choice(character),
"subject": random.choice(subjects),
"grade": random.choice(grades),
"create_time": create_time
}
} for i in range(10000 * num, 10000 * num + 10000)
]
helpers.bulk(es, action)
end = time.time()
print(f"{num}耗時{end - start}s!")
def run():
global queue
while queue.qsize() > 0:
num = queue.get()
print(num)
save_to_es(num)
if __name__ == '__main__':
start = time.time()
queue = Queue()
# 序號數(shù)據(jù)進(jìn)隊列
for num in range(1000):
queue.put(num)
# 多線程執(zhí)行程序
consumer_lst = []
for _ in range(10):
thread = threading.Thread(target=run)
thread.start()
consumer_lst.append(thread)
for consumer in consumer_lst:
consumer.join()
end = time.time()
print('程序執(zhí)行完畢!花費(fèi)時間:', end - start)運(yùn)行結(jié)果:



自動創(chuàng)建的索引mapping:
GET personal_info_10000000/_mapping
{
"personal_info_10000000" : {
"mappings" : {
"properties" : {
"age" : {
"type" : "long"
},
"character" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"create_time" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"grade" : {
"type" : "long"
},
"id" : {
"type" : "long"
},
"name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"sex" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"subject" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}方案二
1.順序插入5000000條數(shù)據(jù)
先創(chuàng)建索引personal_info_5000000,確定好mapping后,再插入數(shù)據(jù)。
新建索引并設(shè)置mapping信息:
PUT personal_info_5000000
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 32
}
}
},
"sex": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 8
}
}
},
"age": {
"type": "long"
},
"character": {
"type": "text",
"analyzer": "ik_smart",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"subject": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"grade": {
"type": "long"
},
"create_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}查看新建索引信息:
GET personal_info_5000000
{
"personal_info_5000000" : {
"aliases" : { },
"mappings" : {
"properties" : {
"age" : {
"type" : "long"
},
"character" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
},
"analyzer" : "ik_smart"
},
"create_time" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"grade" : {
"type" : "long"
},
"id" : {
"type" : "long"
},
"name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 32
}
}
},
"sex" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 8
}
}
},
"subject" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
},
"settings" : {
"index" : {
"routing" : {
"allocation" : {
"include" : {
"_tier_preference" : "data_content"
}
}
},
"number_of_shards" : "3",
"provided_name" : "personal_info_50000000",
"creation_date" : "1663471072176",
"number_of_replicas" : "1",
"uuid" : "5DfmfUhUTJeGk1k4XnN-lQ",
"version" : {
"created" : "7170699"
}
}
}
}
}開始插入數(shù)據(jù):
示例代碼: 【單線程寫入數(shù)據(jù)】【一次性寫入10000*500條數(shù)據(jù)】 【本人親測耗時7916秒】
from elasticsearch import Elasticsearch
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自負(fù),不以自我為中心',
'努力、積極、樂觀、拼搏是我的人生信條',
'抗壓能力強(qiáng),能夠快速適應(yīng)周圍環(huán)境',
'敢做敢拼,腳踏實地;做事認(rèn)真負(fù)責(zé),責(zé)任心強(qiáng)',
'愛好所學(xué)專業(yè),樂于學(xué)習(xí)新知識;對工作有責(zé)任心;踏實,熱情,對生活充滿激情',
'主動性強(qiáng),自學(xué)能力強(qiáng),具有團(tuán)隊合作意識,有一定組織能力',
'忠實誠信,講原則,說到做到,決不推卸責(zé)任',
'有自制力,做事情始終堅持有始有終,從不半途而廢',
'肯學(xué)習(xí),有問題不逃避,愿意虛心向他人學(xué)習(xí)',
'愿意以謙虛態(tài)度贊揚(yáng)接納優(yōu)越者,權(quán)威者',
'會用100%的熱情和精力投入到工作中;平易近人',
'為人誠懇,性格開朗,積極進(jìn)取,適應(yīng)力強(qiáng)、勤奮好學(xué)、腳踏實地',
'有較強(qiáng)的團(tuán)隊精神,工作積極進(jìn)取,態(tài)度認(rèn)真']
subjects = ['語文', '數(shù)學(xué)', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 添加程序耗時的功能
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
end = time.time()
print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start))
return res
return wrapper
@timer
def save_to_es(num):
"""
順序?qū)懭霐?shù)據(jù)到es數(shù)據(jù)庫
:param num:
:return:
"""
body = {
"id": num,
"name": random.choice(names),
"sex": random.choice(sexs),
"age": random.choice(age),
"character": random.choice(character),
"subject": random.choice(subjects),
"grade": random.choice(grades),
"create_time": create_time
}
# 此時若索引不存在時會新建
es.index(index="personal_info_5000000", id=num, doc_type="_doc", document=body)
def run():
global queue
while queue.qsize() > 0:
num = queue.get()
print(num)
save_to_es(num)
if __name__ == '__main__':
start = time.time()
queue = Queue()
# 序號數(shù)據(jù)進(jìn)隊列
for num in range(5000000):
queue.put(num)
# 多線程執(zhí)行程序
consumer_lst = []
for _ in range(10):
thread = threading.Thread(target=run)
thread.start()
consumer_lst.append(thread)
for consumer in consumer_lst:
consumer.join()
end = time.time()
print('程序執(zhí)行完畢!花費(fèi)時間:', end - start)運(yùn)行結(jié)果:

2.批量插入5000000條數(shù)據(jù)
先創(chuàng)建索引personal_info_5000000_v2,確定好mapping后,再插入數(shù)據(jù)。
新建索引并設(shè)置mapping信息:
PUT personal_info_5000000_v2
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 32
}
}
},
"sex": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 8
}
}
},
"age": {
"type": "long"
},
"character": {
"type": "text",
"analyzer": "ik_smart",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"subject": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"grade": {
"type": "long"
},
"create_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}查看新建索引信息:
GET personal_info_5000000_v2
{
"personal_info_5000000_v2" : {
"aliases" : { },
"mappings" : {
"properties" : {
"age" : {
"type" : "long"
},
"character" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
},
"analyzer" : "ik_smart"
},
"create_time" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"grade" : {
"type" : "long"
},
"id" : {
"type" : "long"
},
"name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 32
}
}
},
"sex" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 8
}
}
},
"subject" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
},
"settings" : {
"index" : {
"routing" : {
"allocation" : {
"include" : {
"_tier_preference" : "data_content"
}
}
},
"number_of_shards" : "3",
"provided_name" : "personal_info_5000000_v2",
"creation_date" : "1663485323617",
"number_of_replicas" : "1",
"uuid" : "XBPaDn_gREmAoJmdRyBMAA",
"version" : {
"created" : "7170699"
}
}
}
}
}批量插入數(shù)據(jù):
通過elasticsearch模塊導(dǎo)入helper,通過helper.bulk來批量處理大量的數(shù)據(jù)。首先將所有的數(shù)據(jù)定義成字典形式,各字段含義如下:
- _index對應(yīng)索引名稱,并且該索引必須存在。
- _type對應(yīng)類型名稱。
- _source對應(yīng)的字典內(nèi),每一篇文檔的字段和值,可有有多個字段。
示例代碼: 【程序中途異常,寫入4714000條數(shù)據(jù)】
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自負(fù),不以自我為中心',
'努力、積極、樂觀、拼搏是我的人生信條',
'抗壓能力強(qiáng),能夠快速適應(yīng)周圍環(huán)境',
'敢做敢拼,腳踏實地;做事認(rèn)真負(fù)責(zé),責(zé)任心強(qiáng)',
'愛好所學(xué)專業(yè),樂于學(xué)習(xí)新知識;對工作有責(zé)任心;踏實,熱情,對生活充滿激情',
'主動性強(qiáng),自學(xué)能力強(qiáng),具有團(tuán)隊合作意識,有一定組織能力',
'忠實誠信,講原則,說到做到,決不推卸責(zé)任',
'有自制力,做事情始終堅持有始有終,從不半途而廢',
'肯學(xué)習(xí),有問題不逃避,愿意虛心向他人學(xué)習(xí)',
'愿意以謙虛態(tài)度贊揚(yáng)接納優(yōu)越者,權(quán)威者',
'會用100%的熱情和精力投入到工作中;平易近人',
'為人誠懇,性格開朗,積極進(jìn)取,適應(yīng)力強(qiáng)、勤奮好學(xué)、腳踏實地',
'有較強(qiáng)的團(tuán)隊精神,工作積極進(jìn)取,態(tài)度認(rèn)真']
subjects = ['語文', '數(shù)學(xué)', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 添加程序耗時的功能
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
end = time.time()
print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start))
return res
return wrapper
@timer
def save_to_es(num):
"""
批量寫入數(shù)據(jù)到es數(shù)據(jù)庫
:param num:
:return:
"""
action = [
{
"_index": "personal_info_5000000_v2",
"_type": "_doc",
"_id": i,
"_source": {
"id": i,
"name": random.choice(names),
"sex": random.choice(sexs),
"age": random.choice(age),
"character": random.choice(character),
"subject": random.choice(subjects),
"grade": random.choice(grades),
"create_time": create_time
}
} for i in range(10000 * num, 10000 * num + 10000)
]
helpers.bulk(es, action)
def run():
global queue
while queue.qsize() > 0:
num = queue.get()
print(num)
save_to_es(num)
if __name__ == '__main__':
start = time.time()
queue = Queue()
# 序號數(shù)據(jù)進(jìn)隊列
for num in range(500):
queue.put(num)
# 多線程執(zhí)行程序
consumer_lst = []
for _ in range(10):
thread = threading.Thread(target=run)
thread.start()
consumer_lst.append(thread)
for consumer in consumer_lst:
consumer.join()
end = time.time()
print('程序執(zhí)行完畢!花費(fèi)時間:', end - start)運(yùn)行結(jié)果:


3.批量插入50000000條數(shù)據(jù)
先創(chuàng)建索引personal_info_5000000_v2,確定好mapping后,再插入數(shù)據(jù)。
此過程是在上面批量插入的前提下進(jìn)行優(yōu)化,采用python生成器。
建立索引和mapping同上,直接上代碼:
示例代碼: 【程序中途異常,寫入3688000條數(shù)據(jù)】
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自負(fù),不以自我為中心',
'努力、積極、樂觀、拼搏是我的人生信條',
'抗壓能力強(qiáng),能夠快速適應(yīng)周圍環(huán)境',
'敢做敢拼,腳踏實地;做事認(rèn)真負(fù)責(zé),責(zé)任心強(qiáng)',
'愛好所學(xué)專業(yè),樂于學(xué)習(xí)新知識;對工作有責(zé)任心;踏實,熱情,對生活充滿激情',
'主動性強(qiáng),自學(xué)能力強(qiáng),具有團(tuán)隊合作意識,有一定組織能力',
'忠實誠信,講原則,說到做到,決不推卸責(zé)任',
'有自制力,做事情始終堅持有始有終,從不半途而廢',
'肯學(xué)習(xí),有問題不逃避,愿意虛心向他人學(xué)習(xí)',
'愿意以謙虛態(tài)度贊揚(yáng)接納優(yōu)越者,權(quán)威者',
'會用100%的熱情和精力投入到工作中;平易近人',
'為人誠懇,性格開朗,積極進(jìn)取,適應(yīng)力強(qiáng)、勤奮好學(xué)、腳踏實地',
'有較強(qiáng)的團(tuán)隊精神,工作積極進(jìn)取,態(tài)度認(rèn)真']
subjects = ['語文', '數(shù)學(xué)', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 添加程序耗時的功能
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
end = time.time()
print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start))
return res
return wrapper
@timer
def save_to_es(num):
"""
使用生成器批量寫入數(shù)據(jù)到es數(shù)據(jù)庫
:param num:
:return:
"""
action = (
{
"_index": "personal_info_5000000_v3",
"_type": "_doc",
"_id": i,
"_source": {
"id": i,
"name": random.choice(names),
"sex": random.choice(sexs),
"age": random.choice(age),
"character": random.choice(character),
"subject": random.choice(subjects),
"grade": random.choice(grades),
"create_time": create_time
}
} for i in range(10000 * num, 10000 * num + 10000)
)
helpers.bulk(es, action)
def run():
global queue
while queue.qsize() > 0:
num = queue.get()
print(num)
save_to_es(num)
if __name__ == '__main__':
start = time.time()
queue = Queue()
# 序號數(shù)據(jù)進(jìn)隊列
for num in range(500):
queue.put(num)
# 多線程執(zhí)行程序
consumer_lst = []
for _ in range(10):
thread = threading.Thread(target=run)
thread.start()
consumer_lst.append(thread)
for consumer in consumer_lst:
consumer.join()
end = time.time()
print('程序執(zhí)行完畢!花費(fèi)時間:', end - start)運(yùn)行結(jié)果:


到此這篇關(guān)于使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作(2)的文章就介紹到這了,更多相關(guān)python生成 數(shù)據(jù) 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Python prometheus_client使用方式
本文主要介紹了Python prometheus_client使用方式,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02
python生成每日報表數(shù)據(jù)(Excel)并郵件發(fā)送的實例
今天小編就為大家分享一篇python生成每日報表數(shù)據(jù)(Excel)并郵件發(fā)送的實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-02-02
Python實現(xiàn)將圖像轉(zhuǎn)換為ASCII字符圖
使用Python進(jìn)行圖像處理,非??旖莘奖悖喍處仔写a就可以實現(xiàn)功能強(qiáng)大的效果。在這篇文章中,我們將使用Python將圖像轉(zhuǎn)換為ASCII字符照,感興趣的可以了解一下2022-08-08
pycharm工具連接mysql數(shù)據(jù)庫失敗問題
這篇文章主要介紹了pycharm工具連接mysql數(shù)據(jù)庫失敗問題及解決方法,非常不錯大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-04-04
python encrypt 實現(xiàn)AES加密的實例詳解
在本篇文章里小編給大家分享的是關(guān)于python encrypt 實現(xiàn)AES加密的實例內(nèi)容,有興趣的朋友們可以參考下。2020-02-02

