欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

用python簡單實現(xiàn)mysql數(shù)據(jù)同步到ElasticSearch的教程

 更新時間:2018年05月30日 10:38:05   投稿:jingxian  
今天小編就為大家分享一篇用python簡單實現(xiàn)mysql數(shù)據(jù)同步到ElasticSearch的教程,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧

之前博客有用logstash-input-jdbc同步mysql數(shù)據(jù)到ElasticSearch,但是由于同步時間最少是一分鐘一次,無法滿足線上業(yè)務(wù),所以只能自己實現(xiàn)一個,但是時間比較緊,所以簡單實現(xiàn)一個

思路:

網(wǎng)上有很多思路用什么mysql的binlog功能什么的,但是我對mysql了解實在有限,所以用一個很呆板的辦法查詢mysql得到數(shù)據(jù),再插入es,因為數(shù)據(jù)量不大,而且10秒間隔同步一次,效率還可以,為了避免服務(wù)器之間的時間差和mysql更新和查詢產(chǎn)生的時間差,所以在查詢更新時間條件時是和上一次同步開始時間比較,這樣不管數(shù)據(jù)多少,更新耗時多少都不會少數(shù)據(jù),因為原則是同步不漏掉任何數(shù)據(jù),也可以程序多開將時間差和間隔時間差異化,因為用mysql中一個id當(dāng)作es中的id,也避免了重復(fù)數(shù)據(jù)

使用:

只需要按照escongif.py寫配置文件,然后寫sql文件,最后直接執(zhí)行mstes.py就可以了,我這個也是參考logstash-input-jdbc的配置形式

MsToEs

|----esconfig.py(配置文件)

|----mstes.py(同步程序)

|----sql_manage.py(數(shù)據(jù)庫管理)

|----aa.sql(需要用到sql文件)

|----bb.sql(需要用到sql文件)

sql_manage.py:

# -*-coding:utf-8 -*-
__author__ = "ZJL"
from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
import traceback
import esconfig
# 用于不需要回滾和提交的操作
def find(func):
 def wrapper(self, *args, **kwargs):
  try:
   return func(self, *args, **kwargs)
  except Exception as e:
   print(traceback.format_exc())
   print(str(e))
   return traceback.format_exc()
  finally:
   self.session.close()
 return wrapper
class MysqlManager(object):
 def __init__(self):
  mysql_connection_string = esconfig.mysql.get("mysql_connection_string")
  self.engine = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool,
         pool_recycle=3600)
  # self.DB_Session = sessionmaker(bind=self.engine)
  # self.session = self.DB_Session()
  self.DB_Session = sessionmaker(bind=self.engine, autocommit=False, autoflush=True, expire_on_commit=False)
  self.db = scoped_session(self.DB_Session)
  self.session = self.db()
 @find
 def select_all_dict(self, sql, keys):
  a = self.session.execute(sql)
  a = a.fetchall()
  lists = []
  for i in a:
   if len(keys) == len(i):
    data_dict = {}
    for k, v in zip(keys, i):
     data_dict[k] = v
    lists.append(data_dict)
   else:
    return False
  return lists
 # 關(guān)閉
 def close(self):
  self.session.close()

aa.sql:

select 
 CONVERT(c.`id`,CHAR)    as id, 
 c.`code`   as code, 
 c.`project_name` as project_name, 
 c.`name`   as name, 
 date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')  as update_time, 
from `cc` c 
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now'; 

bb.sql:

select 
 CONVERT(c.`id`,CHAR)    as id, 
 CONVERT(c.`age`,CHAR)    as age, 
 c.`code`   as code, 
 c.`name`   as name, 
 c.`project_name` as project_name, 
 date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, 
from `bb` c 
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now'; 

esconfig.py:

# -*- coding: utf-8 -*-
#__author__="ZJL"
# sql 文件名與es中的type名一致
mysql = {
 # mysql連接信息
 "mysql_connection_string": "root:123456@127.0.0.1:3306/xxx",
 # sql文件信息
 "statement_filespath":[
  # sql對應(yīng)的es索引和es類型
  {
   "index":"a1",
   "sqlfile":"aa.sql",
   "type":"aa"
  },
  {
   "index":"a1",
   "sqlfile":"bb.sql",
   "type":"bb"
  },
 ],
}
# es的ip和端口
elasticsearch = {
 "hosts":"127.0.0.1:9200",
}
# 字段順序與sql文件字段順序一致,這是存進es中的字段名,這里用es的type名作為標(biāo)識
db_field = {
  "aa":
   ("id",
   "code",
   "name",
   "project_name",
   "update_time",
   ),
 "bb":
  ("id",
   "code",
   "age",
   "project_name",
   "name",
   "update_time",
   ),
}
es_config = {
 # 間隔多少秒同步一次
 "sleep_time":10,
 # 為了解決服務(wù)器之間時間差問題
 "time_difference":3,
 # show_json 用來展示導(dǎo)入的json格式數(shù)據(jù),
 "show_json":False,
}

mstes.py:

# -*- coding: utf-8 -*-
#__author__="ZJL"
from sql_manage import MysqlManager
from esconfig import mysql,elasticsearch,db_field,es_config
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import traceback
import time
class TongBu(object):
 def __init__(self):
  try:
   # 是否展示json數(shù)據(jù)在控制臺
   self.show_json = es_config.get("show_json")
   # 間隔多少秒同步一次
   self.sleep_time = es_config.get("sleep_time")
   # 為了解決同步時數(shù)據(jù)更新產(chǎn)生的誤差
   self.time_difference = es_config.get("time_difference")
   # 當(dāng)前時間,留有后用
   self.datetime_now = ""
   # es的ip和端口
   es_host = elasticsearch.get("hosts")
   # 連接es
   self.es = Elasticsearch(es_host)
   # 連接mysql
   self.mm = MysqlManager()
  except :
   print(traceback.format_exc())
 def tongbu_es_mm(self):
  try:
   # 同步開始時間
   start_time = time.time()
   print("start..............",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
   # 這個list用于批量插入es
   actions = []
   # 獲得所有sql文件list
   statement_filespath = mysql.get("statement_filespath",[])
   if self.datetime_now:
    # 當(dāng)前時間加上時間差(間隔時間加上執(zhí)行同步用掉的時間,等于上一次同步開始時間)再字符串格式化
    # sql中格式化時間時年月日和時分秒之間不能空格,不然導(dǎo)入es時報解析錯誤,所以這里的時間格式化也統(tǒng)一中間加一個T
    self.datetime_now = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time()-(self.sleep_time+self.time_difference)))
   else:
    self.datetime_now = "1999-01-01T00:00:00"
   if statement_filespath:
    for filepath in statement_filespath:
     # sql文件
     sqlfile = filepath.get("sqlfile")
     # es的索引
     es_index = filepath.get("index")
     # es的type
     es_type = filepath.get("type")
     # 讀取sql文件內(nèi)容
     with open(sqlfile,"r") as opf:
      sqldatas = opf.read()
      # ::datetime_now是一個自定義的特殊字符串用于增量更新
      if "::datetime_now" in sqldatas:
       sqldatas = sqldatas.replace("::datetime_now",self.datetime_now)
      else:
       sqldatas = sqldatas
      # es和sql字段的映射
      dict_set = db_field.get(es_type)
      # 訪問mysql,得到一個list,元素都是字典,鍵是字段名,值是數(shù)據(jù)
      db_data_list = self.mm.select_all_dict(sqldatas, dict_set)
      if db_data_list:
       # 將數(shù)據(jù)拼裝成es的格式
       for db_data in db_data_list:
        action = {
         "_index": es_index,
         "_type": es_type,
         "@timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time())),
         "_source": db_data
        }
        # 如果沒有id字段就自動生成
        es_id = db_data.get("id", "")
        if es_id:
         action["_id"] = es_id
        # 是否顯示json再終端
        if self.show_json:
         print(action)
        # 將拼裝好的數(shù)據(jù)放進list中
        actions.append(action)
   # list不為空就批量插入數(shù)據(jù)到es中
   if len(actions) > 0 :
    helpers.bulk(self.es, actions)
  except Exception as e:
   print(traceback.format_exc())
  else:
   end_time = time.time()
   print("end...................",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
   self.time_difference = end_time-start_time
  finally:
   # 報錯就關(guān)閉數(shù)據(jù)庫
   self.mm.close()
def main():
 tb = TongBu()
 # 間隔多少秒同步一次
 sleep_time = tb.sleep_time
 # 死循環(huán)執(zhí)行導(dǎo)入數(shù)據(jù),加上時間間隔
 while True:
  tb.tongbu_es_mm()
  time.sleep(sleep_time)
if __name__ == '__main__':
 main()

以上這篇用python簡單實現(xiàn)mysql數(shù)據(jù)同步到ElasticSearch的教程就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Pytorch修改ResNet模型全連接層進行直接訓(xùn)練實例

    Pytorch修改ResNet模型全連接層進行直接訓(xùn)練實例

    在本篇文章里小編給大家整理的是關(guān)于Pytorch修改ResNet模型全連接層進行直接訓(xùn)練相關(guān)知識點,有需要的朋友們參考下。
    2019-09-09
  • Pandas多列值合并成一列的實現(xiàn)

    Pandas多列值合并成一列的實現(xiàn)

    本文主要介紹了Pandas多列值合并成一列的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-07-07
  • Python 保持登錄狀態(tài)進行接口測試的方法示例

    Python 保持登錄狀態(tài)進行接口測試的方法示例

    這篇文章主要介紹了Python 保持登錄狀態(tài)進行接口測試的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-08-08
  • Keras自定義IOU方式

    Keras自定義IOU方式

    這篇文章主要介紹了Keras自定義IOU方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-06-06
  • Python編程使用Selenium模擬淘寶登錄實現(xiàn)過程

    Python編程使用Selenium模擬淘寶登錄實現(xiàn)過程

    這篇文章主要介紹了Python編程使用Selenium模擬淘寶登錄的實現(xiàn)過程示例及解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪
    2021-10-10
  • Python讀寫csv文件的超詳細(xì)步驟

    Python讀寫csv文件的超詳細(xì)步驟

    python提供了大量的庫,可以非常方便的進行各種操作,下面這篇文章主要給大家介紹了關(guān)于Python讀寫csv文件的超詳細(xì)步驟,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-05-05
  • Python3多線程處理爬蟲的實戰(zhàn)

    Python3多線程處理爬蟲的實戰(zhàn)

    本文主要介紹了Python3多線程處理爬蟲的實戰(zhàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • 如何在python中執(zhí)行另一個py文件

    如何在python中執(zhí)行另一個py文件

    這篇文章主要介紹了如何在python中執(zhí)行另一個py文件,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-04-04
  • anaconda中更改python版本的方法步驟

    anaconda中更改python版本的方法步驟

    這篇文章主要介紹了anaconda中更改python版本的方法步驟,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-07-07
  • 詳解Python 3D引擎Ursina如何繪制立體圖形

    詳解Python 3D引擎Ursina如何繪制立體圖形

    Python有一個不錯的3D引擎——Ursina。本文就來手把手教你認(rèn)識Ursina并學(xué)會繪制立體圖形,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2023-01-01

最新評論