Python 調(diào)用 ES、Solr、Phoenix的示例代碼
更新時間:2020年11月23日 10:33:27 作者:LeoZhanggg
這篇文章主要介紹了Python 調(diào)用 ES、Solr、Phoenix的示例代碼,幫助大家更好的理解和學習python,感興趣的朋友可以了解下
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# *************************************
# @Time : 2019/8/12
# @Author : Zhang Fan
# @Desc : Library
# @File : MyDatabases.py
# @Update : 2019/8/23
# *************************************
import elasticsearch
import phoenixdb
import pysolr
import pymysql
class MyELS(object):
"""
===================================================================
===================== MyELS =========================
===================================================================
"""
def __init__(self):
self.els_conn = None
def connect_to_els(self, host, port):
"""
連接到ElasticSearch服務器.
"""
self.els_conn = elasticsearch.Elasticsearch([{'host': host, 'port': port}])
print('Executing : Connect To Elastic Search | %s' % self.els_conn)
def get_els_data(self, query, index):
"""
獲取ElasticSearch數(shù)據(jù)
"""
print('Executing : Search | %s' % query)
try:
rst = self.els_conn.search(index=index, q=query)
return rst['hits']
except Exception as e:
print('Elastic Search Error | %s' % e)
raise Exception(e)
class MyPhoenix(object):
"""
===================================================================
===================== MyPhoenix ======================
===================================================================
"""
def __init__(self):
self.phoenix_conn = None
self.phoenix_cursor = None
def connect_to_phoenix(self, host, port=8765):
"""
連接到phoenix服務器
"""
address = 'http://{0}:{1}/'.format(host, port)
print('Executing : Connect To Phoenix | %s' % address)
self.phoenix_conn = phoenixdb.connect(address, autocommit=True)
self.phoenix_cursor = self.phoenix_conn.cursor()
def set_schema(self, sql, schema):
"""
設置schema
"""
pre_sub, sub, fol_sub = sql.upper().partition('FROM')
fol_sub = ' ' + schema + '.' + fol_sub.strip()
new_sql = ''.join([pre_sub, sub, fol_sub])
return new_sql
def execute_phoenix_sql(self, sql):
"""
執(zhí)行sql語句
"""
# sql = self.set_schema(sql, schema)
print('Executing : Execute | %s' % sql)
self.phoenix_cursor.execute(sql)
def get_from_phoenix(self, sql):
"""
獲取phoenix數(shù)據(jù)
"""
# sql = self.set_schema(sql, schema)
print('Executing : Query | %s' % sql)
try:
self.phoenix_cursor.execute(sql)
except Exception as e:
print('Phoenix Error | %s' % e)
raise Exception(e)
return self.phoenix_cursor.fetchall()
def disconnect_from_phoenix(self):
"""
斷開phoenix連接
"""
print('Executing : Disconnect From HBase')
self.phoenix_cursor.close()
self.phoenix_conn.close()
class MySolr(object):
"""
===================================================================
===================== MySolr =========================
===================================================================
"""
def __init__(self):
self.solr_conn = None
self.base_url = None
def connect_to_solr(self, address, selector):
"""連接到solr服務器.
"""
self.base_url = 'http://{0}/solr/{1}/'.format(address, selector)
self.solr_conn = pysolr.Solr(self.base_url)
print('Executing : Connect To Solr | %s' % self.base_url)
def get_solr_data(self, query):
"""
獲取solr數(shù)據(jù)
"""
results = list()
print('Executing : Search | %s' % query)
try:
items = self.solr_conn.search(query)
for item in items:
results.append(item)
except Exception as e:
print('Solr Error | %s' % e)
raise Exception(e)
return results
def add_solr_data(self, data):
"""
添加solr數(shù)據(jù)
"""
print('Executing : add | %s' % data)
try:
self.solr_conn.add([data])
self.solr_conn.commit()
except Exception as e:
print('Solr Error | %s' % e)
raise Exception(e)
def del_solr_byId(self, data):
"""
刪除solr數(shù)據(jù)
"""
print('Executing : del | %s' % data)
try:
self.solr_conn.delete(id=data)
self.solr_conn.commit()
except Exception as e:
print('Solr Error | %s' % e)
raise Exception(e)
if __name__ == '__main__':
print('This is test.')
ms = MySolr()
me = MyELS()
mp = MyPhoenix()
以上就是Python 調(diào)用 ES、Solr、Phoenix的示例代碼的詳細內(nèi)容,更多關于Python 調(diào)用 ES、Solr、Phoenix的資料請關注腳本之家其它相關文章!
您可能感興趣的文章:
- python使用ctypes庫調(diào)用DLL動態(tài)鏈接庫
- Python調(diào)用REST API接口的幾種方式匯總
- 如何使用python的ctypes調(diào)用醫(yī)保中心的dll動態(tài)庫下載醫(yī)保中心的賬單
- python使用ctypes調(diào)用擴展模塊的實例方法
- python中使用ctypes調(diào)用so傳參設置遇到的問題及解決方法
- Python使用ctypes調(diào)用C/C++的方法
- python調(diào)用百度REST API實現(xiàn)語音識別
- python調(diào)用百度語音REST API
- Python調(diào)用C語言的方法【基于ctypes模塊】
- Python如何通過subprocess調(diào)用adb命令詳解
相關文章
解決python多線程報錯:AttributeError: Can''t pickle local object問題
這篇文章主要介紹了解決python多線程報錯:AttributeError: Can't pickle local object問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-04-04
python GUI庫圖形界面開發(fā)之PyQt5復選框控件QCheckBox詳細使用方法與實例
這篇文章主要介紹了python GUI庫圖形界面開發(fā)之PyQt5復選框控件QCheckBox詳細使用方法與實例,需要的朋友可以參考下2020-02-02

