python結(jié)合shell自動創(chuàng)建kafka的連接器實戰(zhàn)教程
環(huán)境
cat /etc/redhat-release CentOS Linux release 7.5.1804 (Core) [root@localhost ~]# uname -a Linux localhost.localdomain 3.10.0-862.el7.x86_64 #1 SMP Fri Apr 20 16:44:24 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux python -V Python 2.7.5
安裝連接oracle的python包
pip install cx_Oracle==7.3
獲取oracle表信息
cat query_oracle.py #!/usr/bin/env python import cx_Oracle import sys import os import csv import traceback file = open("oracle.txt", 'w').close() user = "test" passwd = "test" listener = '10.0.2.15:1521/orcl' conn = cx_Oracle.connect(user, passwd, listener) cursor = conn.cursor() sql = "select table_name from user_tables" cursor.execute(sql) LIST1=[] while True: row = cursor.fetchone() if row == None: break for table in row: #print table LIST1.append(table) LIST2=[] for i in LIST1: sql3 = "select COLUMN_NAME,DATA_TYPE,DATA_PRECISION,DATA_SCALE from cols WHERE TABLE_name=upper('%s')" %i cursor.execute(sql3) cursor.execute(sql3) row3 = cursor.fetchall() for data in row3: #LIST2.append(i) LIST2.extend(list(data)) LIST2.append(i) f=open('oracle.txt','a+') print >> f,LIST2 LIST2=[] #f=open('test.txt','a+') #select table_name,column_name,DATA_TYPE from cols WHERE TABLE_name=upper('student'); #select column_name,DATA_TYPE from cols WHERE TABLE_name=upper('student');
去掉多余部分
cat auto.sh #!/bin/bash #python query_oracle.py |tr "," ' '|tr "'" ' '|tr "[" " "|tr "]" " " #>oracle.txt >oracle_tables.txt cat oracle.txt |tr "[],'" " "|sed "s#[ ][ ]*# #g"|sed 's/^[ \t]*//g' >> oracle_tables.txt
cat oracle_tables.txt SNO NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT DATE_DATE SNO2 NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT2 INPUT_TIME SNO3 NUMBER 19 2 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT3 DATA_DATE
shell 腳本處理表信息文件
cat connect.sh #!/bin/bash #獲取臨時文件的行數(shù) FILE_NUM=$(cat oracle_tables.txt |egrep -v '#|^$'|wc -l) #清空自動創(chuàng)建連接器的腳本 >create-connect.sh #循環(huán)臨時文件每一行 for i in `seq $FILE_NUM` do FILE_LINE=$(sed -n ${i}p oracle_tables.txt) TABLE_NAME=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk '{print $(NF-1)}') COL_NUM=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk -F "[ ]" '{print NF}') REAL_COL_NUM=`expr $COL_NUM - 2` #清空臨時文件 >${TABLE_NAME}.txt >${TABLE_NAME}.sql #循環(huán)臨時文件每行列名所在的列 for j in `seq 1 4 $REAL_COL_NUM` do k=`expr $j + 1` m=`expr $j + 2` n=`expr $j + 3` COL_NAME=$(echo $FILE_LINE|cut -d " " -f${j}) COL_DATA_TYPE=$(echo $FILE_LINE|cut -d " " -f${k}) COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d " " -f${m}) COL_DATA_SCALE=$(echo $FILE_LINE|cut -d " " -f${n}) #判斷列的數(shù)據(jù)類型是否是NUMBER if [ "$COL_DATA_TYPE" = "NUMBER" ] then #循環(huán)拼接SQL查詢中的CAST(* AS *) AS *部分,追加到臨時文件中 echo "CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME" >> ${TABLE_NAME}.txt else #循環(huán)拼接SQL查詢中的列名部分,追加到臨時文件中 echo "$COL_NAME" >> ${TABLE_NAME}.txt fi done #拼接完整的SQL語句,追加到臨時文件中 echo "select $(cat ${TABLE_NAME}.txt |tr "\n" ","|sed -e 's/,$/\n/') from $TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)<trunc(sysdate-1)" >> ${TABLE_NAME}.sql #循環(huán)追加每個表對應(yīng)的連接器到自動創(chuàng)建連接器的腳本中 cat >> create-connect.sh << EOF curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_$TABLE_NAME", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:orcl", "connection.user": "{{ ORACLE_USER }}", "connection.password": "{{ ORACLE_PASSWD }}", "topic.prefix": "YC_$TABLE_NAME", "mode": "{{ CONNECT_MODE }}", "query": "$(cat ${TABLE_NAME}.sql)" } }' >/dev/null 2>&1 EOF done
說明:腳本中{{ 變量名 }}部分的內(nèi)容是獲取ansible中的變量,這個腳本是和ansible結(jié)合使用的。
增強(qiáng)版處理表信息腳本
#!/bin/bash #獲取臨時文件的行數(shù) FILE_NUM=$(cat oracle_time_tables.txt |egrep -v '#|^$'|wc -l) #清空創(chuàng)建連接器的腳本并追加echos函數(shù) > create-jdbc-connect.sh cat >> create-jdbc-connect.sh << EOF #!/bin/bash echos(){ case \$1 in red) echo -e "\033[31m \$2 \033[0m";; green) echo -e "\033[32m \$2 \033[0m";; yellow) echo -e "\033[33m \$2 \033[0m";; blue) echo -e "\033[34m \$2 \033[0m";; purple) echo -e "\033[35m \$2 \033[0m";; *) echo "\$2";; esac } EOF > create-jdbc-connect-time.sh cat >> create-jdbc-connect-time.sh << EOF #!/bin/bash echos(){ case \$1 in red) echo -e "\033[31m \$2 \033[0m";; green) echo -e "\033[32m \$2 \033[0m";; yellow) echo -e "\033[33m \$2 \033[0m";; blue) echo -e "\033[34m \$2 \033[0m";; purple) echo -e "\033[35m \$2 \033[0m";; *) echo "\$2";; esac } EOF #創(chuàng)建表相關(guān)文件目錄 mkdir -p ./TABLE_TIME #循環(huán)臨時文件每一行 for i in `seq $FILE_NUM` do FILE_LINE=$(sed -n ${i}p oracle_time_tables.txt) TABLE_NAME=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk '{print $(NF)}') COL_NUM=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk -F "[ ]" '{print NF}') REAL_COL_NUM=`expr $COL_NUM - 2` #清空臨時文件 >./TABLE_TIME/${TABLE_NAME}_time.txt >./TABLE_TIME/${TABLE_NAME}_time.sql >./TABLE_TIME/${TABLE_NAME}.sql #循環(huán)臨時文件每行列名所在的列 for j in `seq 1 4 $REAL_COL_NUM` do k=`expr $j + 1` m=`expr $j + 2` n=`expr $j + 3` COL_NAME=$(echo $FILE_LINE|cut -d " " -f${j}) COL_DATA_TYPE=$(echo $FILE_LINE|cut -d " " -f${k}) COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d " " -f${m}) COL_DATA_SCALE=$(echo $FILE_LINE|cut -d " " -f${n}) #判斷列的數(shù)據(jù)類型是否是NUMBER if [ "$COL_DATA_TYPE" = "NUMBER" ] then #循環(huán)拼接SQL查詢中的CAST(* AS *) AS *部分,追加到臨時文件中 echo "CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME" >> ./TABLE_TIME/${TABLE_NAME}_time.txt else #循環(huán)拼接SQL查詢中的列名部分,追加到臨時文件中 echo "$COL_NAME" >> ./TABLE_TIME/${TABLE_NAME}_time.txt fi #判斷是否存在hosts中定義的時間列,如果有就追加該列名進(jìn)一個臨時文件中 TIME_COL=({{ TABLE_TIME_COL }}) for TIME in ${TIME_COL[@]} do if [ "$COL_NAME" = "$TIME" ] then echo "$COL_NAME" > ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt fi done done #拼接完整的SQL語句,追加到臨時文件中 if [ -f "./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt" ] then #echo "select $(cat ./TABLE_TIME/${TABLE_NAME}.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)<trunc(sysdate-1)" >> ./TABLE_TIME/${TABLE_NAME}_time.sql echo "select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)>=trunc(sysdate-2) and $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)<trunc(sysdate-1)" >> ./TABLE_TIME/${TABLE_NAME}_time.sql else echo "select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME" >> ./TABLE_TIME/${TABLE_NAME}.sql fi #循環(huán)追加每個表對應(yīng)的連接器到自動創(chuàng)建連接器的腳本中 if [ -f "./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt" ] then cat >> create-jdbc-connect-time.sh << EOF #創(chuàng)建表 $TABLE_NAME 連接器的命令如下 curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_time_$TABLE_NAME", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}", "connection.user": "{{ ORACLE_USER }}", "connection.password": "{{ ORACLE_PASSWD }}", "topic.prefix": "YC_${TABLE_NAME}_INSERT", "poll.interval.ms": "86400000", "mode": "{{ CONNECT_MODE }}", "numeric.mapping": "best_fit", "query": "$(cat ./TABLE_TIME/${TABLE_NAME}_time.sql)" } }' >/dev/null 2>&1 #判斷連接器是否創(chuàng)建成功 if [ \$? -eq 0 ] then echos green "\$(date +"%F %H:%M:%S") 創(chuàng)建jdbc_time_${TABLE_NAME} 連接器成功" else echos red "\$(date +"%F %H:%M:%S") 創(chuàng)建jdbc_time_${TABLE_NAME} 連接器失敗" fi EOF else cat >> create-jdbc-connect.sh << EOF #創(chuàng)建表 $TABLE_NAME 連接器的命令如下 curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_$TABLE_NAME", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}", "connection.user": "{{ ORACLE_USER }}", "connection.password": "{{ ORACLE_PASSWD }}", "topic.prefix": "YC_${TABLE_NAME}_INSERT", "poll.interval.ms": "86400000", "mode": "{{ CONNECT_MODE }}", "numeric.mapping": "best_fit", "query": "$(cat ./TABLE_TIME/${TABLE_NAME}.sql)" } }' >/dev/null 2>&1 #判斷連接器是否創(chuàng)建成功 if [ \$? -eq 0 ] then echos green "\$(date +"%F %H:%M:%S") 創(chuàng)建jdbc_${TABLE_NAME} 連接器成功" else echos red "\$(date +"%F %H:%M:%S") 創(chuàng)建jdbc_${TABLE_NAME} 連接器失敗" fi EOF fi done
到此這篇關(guān)于python結(jié)合shell自動創(chuàng)建kafka的連接器的文章就介紹到這了,更多相關(guān)python創(chuàng)建kafka連接器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python3實現(xiàn)將本地JSON大數(shù)據(jù)文件寫入MySQL數(shù)據(jù)庫的方法
這篇文章主要介紹了Python3實現(xiàn)將本地JSON大數(shù)據(jù)文件寫入MySQL數(shù)據(jù)庫的方法,涉及Python針對json大數(shù)據(jù)文件的逐行讀取、mysql數(shù)據(jù)庫寫入等相關(guān)操作技巧,需要的朋友可以參考下2018-06-06Python中線程的MQ消息隊列實現(xiàn)以及消息隊列的優(yōu)點(diǎn)解析
消息隊列(MQ,Message Queue)在消息數(shù)據(jù)傳輸中的保存作用為數(shù)據(jù)通信提供了保障和實時處理上的便利,這里我們就來看一下Python中線程的MQ消息隊列實現(xiàn)以及消息隊列的優(yōu)點(diǎn)解析2016-06-06如何在向量化NumPy數(shù)組上進(jìn)行移動窗口
這篇文章主要介紹了如何在向量化NumPy數(shù)組上進(jìn)行移動窗口的操作,具有很好的參考價值,希望對大家有所幫助。2021-05-05python3列表刪除大量重復(fù)元素remove()方法的問題詳解
這篇文章主要給大家介紹了關(guān)于python3列表刪除大量重復(fù)元素remove()方法的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01舉例講解Django中數(shù)據(jù)模型訪問外鍵值的方法
這篇文章主要介紹了舉例講解Django中數(shù)據(jù)模型訪問外鍵值的方法,Django是最具人氣的Python web開發(fā)框架,需要的朋友可以參考下2015-07-07