用python簡單實現(xiàn)mysql數(shù)據(jù)同步到ElasticSearch的教程
之前博客有用logstash-input-jdbc同步mysql數(shù)據(jù)到ElasticSearch,但是由于同步時間最少是一分鐘一次,無法滿足線上業(yè)務(wù),所以只能自己實現(xiàn)一個,但是時間比較緊,所以簡單實現(xiàn)一個
思路:
網(wǎng)上有很多思路用什么mysql的binlog功能什么的,但是我對mysql了解實在有限,所以用一個很呆板的辦法查詢mysql得到數(shù)據(jù),再插入es,因為數(shù)據(jù)量不大,而且10秒間隔同步一次,效率還可以,為了避免服務(wù)器之間的時間差和mysql更新和查詢產(chǎn)生的時間差,所以在查詢更新時間條件時是和上一次同步開始時間比較,這樣不管數(shù)據(jù)多少,更新耗時多少都不會少數(shù)據(jù),因為原則是同步不漏掉任何數(shù)據(jù),也可以程序多開將時間差和間隔時間差異化,因為用mysql中一個id當(dāng)作es中的id,也避免了重復(fù)數(shù)據(jù)
使用:
只需要按照escongif.py寫配置文件,然后寫sql文件,最后直接執(zhí)行mstes.py就可以了,我這個也是參考logstash-input-jdbc的配置形式
MsToEs
|----esconfig.py(配置文件)
|----mstes.py(同步程序)
|----sql_manage.py(數(shù)據(jù)庫管理)
|----aa.sql(需要用到sql文件)
|----bb.sql(需要用到sql文件)
sql_manage.py:
# -*-coding:utf-8 -*-
__author__ = "ZJL"
from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
import traceback
import esconfig
# 用于不需要回滾和提交的操作
def find(func):
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except Exception as e:
print(traceback.format_exc())
print(str(e))
return traceback.format_exc()
finally:
self.session.close()
return wrapper
class MysqlManager(object):
def __init__(self):
mysql_connection_string = esconfig.mysql.get("mysql_connection_string")
self.engine = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool,
pool_recycle=3600)
# self.DB_Session = sessionmaker(bind=self.engine)
# self.session = self.DB_Session()
self.DB_Session = sessionmaker(bind=self.engine, autocommit=False, autoflush=True, expire_on_commit=False)
self.db = scoped_session(self.DB_Session)
self.session = self.db()
@find
def select_all_dict(self, sql, keys):
a = self.session.execute(sql)
a = a.fetchall()
lists = []
for i in a:
if len(keys) == len(i):
data_dict = {}
for k, v in zip(keys, i):
data_dict[k] = v
lists.append(data_dict)
else:
return False
return lists
# 關(guān)閉
def close(self):
self.session.close()
aa.sql:
select CONVERT(c.`id`,CHAR) as id, c.`code` as code, c.`project_name` as project_name, c.`name` as name, date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, from `cc` c where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
bb.sql:
select CONVERT(c.`id`,CHAR) as id, CONVERT(c.`age`,CHAR) as age, c.`code` as code, c.`name` as name, c.`project_name` as project_name, date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, from `bb` c where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
esconfig.py:
# -*- coding: utf-8 -*-
#__author__="ZJL"
# sql 文件名與es中的type名一致
mysql = {
# mysql連接信息
"mysql_connection_string": "root:123456@127.0.0.1:3306/xxx",
# sql文件信息
"statement_filespath":[
# sql對應(yīng)的es索引和es類型
{
"index":"a1",
"sqlfile":"aa.sql",
"type":"aa"
},
{
"index":"a1",
"sqlfile":"bb.sql",
"type":"bb"
},
],
}
# es的ip和端口
elasticsearch = {
"hosts":"127.0.0.1:9200",
}
# 字段順序與sql文件字段順序一致,這是存進es中的字段名,這里用es的type名作為標(biāo)識
db_field = {
"aa":
("id",
"code",
"name",
"project_name",
"update_time",
),
"bb":
("id",
"code",
"age",
"project_name",
"name",
"update_time",
),
}
es_config = {
# 間隔多少秒同步一次
"sleep_time":10,
# 為了解決服務(wù)器之間時間差問題
"time_difference":3,
# show_json 用來展示導(dǎo)入的json格式數(shù)據(jù),
"show_json":False,
}
mstes.py:
# -*- coding: utf-8 -*-
#__author__="ZJL"
from sql_manage import MysqlManager
from esconfig import mysql,elasticsearch,db_field,es_config
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import traceback
import time
class TongBu(object):
def __init__(self):
try:
# 是否展示json數(shù)據(jù)在控制臺
self.show_json = es_config.get("show_json")
# 間隔多少秒同步一次
self.sleep_time = es_config.get("sleep_time")
# 為了解決同步時數(shù)據(jù)更新產(chǎn)生的誤差
self.time_difference = es_config.get("time_difference")
# 當(dāng)前時間,留有后用
self.datetime_now = ""
# es的ip和端口
es_host = elasticsearch.get("hosts")
# 連接es
self.es = Elasticsearch(es_host)
# 連接mysql
self.mm = MysqlManager()
except :
print(traceback.format_exc())
def tongbu_es_mm(self):
try:
# 同步開始時間
start_time = time.time()
print("start..............",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
# 這個list用于批量插入es
actions = []
# 獲得所有sql文件list
statement_filespath = mysql.get("statement_filespath",[])
if self.datetime_now:
# 當(dāng)前時間加上時間差(間隔時間加上執(zhí)行同步用掉的時間,等于上一次同步開始時間)再字符串格式化
# sql中格式化時間時年月日和時分秒之間不能空格,不然導(dǎo)入es時報解析錯誤,所以這里的時間格式化也統(tǒng)一中間加一個T
self.datetime_now = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time()-(self.sleep_time+self.time_difference)))
else:
self.datetime_now = "1999-01-01T00:00:00"
if statement_filespath:
for filepath in statement_filespath:
# sql文件
sqlfile = filepath.get("sqlfile")
# es的索引
es_index = filepath.get("index")
# es的type
es_type = filepath.get("type")
# 讀取sql文件內(nèi)容
with open(sqlfile,"r") as opf:
sqldatas = opf.read()
# ::datetime_now是一個自定義的特殊字符串用于增量更新
if "::datetime_now" in sqldatas:
sqldatas = sqldatas.replace("::datetime_now",self.datetime_now)
else:
sqldatas = sqldatas
# es和sql字段的映射
dict_set = db_field.get(es_type)
# 訪問mysql,得到一個list,元素都是字典,鍵是字段名,值是數(shù)據(jù)
db_data_list = self.mm.select_all_dict(sqldatas, dict_set)
if db_data_list:
# 將數(shù)據(jù)拼裝成es的格式
for db_data in db_data_list:
action = {
"_index": es_index,
"_type": es_type,
"@timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time())),
"_source": db_data
}
# 如果沒有id字段就自動生成
es_id = db_data.get("id", "")
if es_id:
action["_id"] = es_id
# 是否顯示json再終端
if self.show_json:
print(action)
# 將拼裝好的數(shù)據(jù)放進list中
actions.append(action)
# list不為空就批量插入數(shù)據(jù)到es中
if len(actions) > 0 :
helpers.bulk(self.es, actions)
except Exception as e:
print(traceback.format_exc())
else:
end_time = time.time()
print("end...................",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
self.time_difference = end_time-start_time
finally:
# 報錯就關(guān)閉數(shù)據(jù)庫
self.mm.close()
def main():
tb = TongBu()
# 間隔多少秒同步一次
sleep_time = tb.sleep_time
# 死循環(huán)執(zhí)行導(dǎo)入數(shù)據(jù),加上時間間隔
while True:
tb.tongbu_es_mm()
time.sleep(sleep_time)
if __name__ == '__main__':
main()
以上這篇用python簡單實現(xiàn)mysql數(shù)據(jù)同步到ElasticSearch的教程就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python鼠標(biāo)事件及坐標(biāo)獲取窗口和屏幕坐標(biāo)
這篇文章主要介紹了Python編程中如何通過鼠標(biāo)事件及坐標(biāo)獲取窗口坐標(biāo)和屏幕坐標(biāo)的示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助2021-10-10
python等差數(shù)列求和公式前 100 項的和實例
今天小編就為大家分享一篇python等差數(shù)列求和公式前 100 項的和實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-02-02
Python中不同類之間調(diào)用方法的四種方式小結(jié)
類是一種面向?qū)ο蟮木幊谭妒?它允許我們將數(shù)據(jù)和功能封裝在一個實體中,本文主要介紹了Python中不同類之間調(diào)用方法的四種方式小結(jié),具有一定的參考價值,感興趣的可以了解一下2024-02-02
Python實現(xiàn)統(tǒng)計代碼行的方法分析
這篇文章主要介紹了Python實現(xiàn)統(tǒng)計代碼行的方法,結(jié)合實例形式分析了Python針對代碼行數(shù)的計算實現(xiàn)步驟與操作技巧,需要的朋友可以參考下2017-07-07
pycharm遠程調(diào)試openstack的圖文教程
這篇文章主要為大家詳細介紹了pycharm遠程調(diào)試openstack的圖文教程,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-11-11

