使用mysql_udf與curl庫(kù)完成http_post通信模塊示例
使用mysql_udf與curl庫(kù)完成http_post通信模塊(mysql_udf,multi_curl,http,post)
這個(gè)模塊其目前主要用于xoyo江湖的sns與kingsoft_xoyo自主研發(fā)的TCSQL數(shù)據(jù)庫(kù)做數(shù)據(jù)同步,當(dāng)有feed插入sns數(shù)據(jù)庫(kù),使用觸 發(fā)器調(diào)用該模塊,向tcsql數(shù)據(jù)庫(kù)發(fā)送同步數(shù)據(jù)。也可以使用該模塊與其它使用socket接口的數(shù)據(jù)庫(kù)或程序做轉(zhuǎn)發(fā)與同步。
http_post模塊主要使用mysql_udf接口,與curl庫(kù)兩部分技術(shù)。
mysql_udf是mysql為c語(yǔ)言提供的一個(gè)接口,通過(guò)這個(gè)接口,用戶可以自定義mysql的函數(shù),通過(guò)調(diào)用這些mysql函數(shù),調(diào)用相應(yīng)的c語(yǔ)言 模塊來(lái)執(zhí)行特定功能,實(shí)現(xiàn)mysql數(shù)據(jù)與外部應(yīng)用的交互。curl庫(kù)是一個(gè)比較常用的應(yīng)用層網(wǎng)絡(luò)協(xié)議庫(kù),主要用到的是其中的curl_multi異步通 信api,用來(lái)進(jìn)行網(wǎng)絡(luò)傳輸。
首先參考mysql官方提供的udf_example.c文件,建立3個(gè)主要的接口函數(shù),分別是初始化函數(shù),執(zhí)行函數(shù)與析構(gòu)函數(shù)。
//args是sql語(yǔ)句傳回的參數(shù),message是返回出錯(cuò)信息使用這些都是規(guī)定好的。
my_bool http_post_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
//主函數(shù)體
longlong http_post(UDF_INIT *initid, UDF_ARGS *args, char *is_null,char *error);
//析構(gòu)函數(shù)體
void http_post_deinit(UDF_INIT *initid);
//args是sql語(yǔ)句傳回的參數(shù),message是返回出錯(cuò)信息,使用這些都是規(guī)定好的。
//初始化函數(shù)體 my_bool http_post_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
//主函數(shù)體 longlong http_post(UDF_INIT *initid, UDF_ARGS *args, char *is_null,char *error);
//析構(gòu)函數(shù)體 void http_post_deinit(UDF_INIT *initid);
在mysql_udf接口中,主函數(shù)體中是不允許使用new或malloc動(dòng)態(tài)分配內(nèi)存,所以如果需要申請(qǐng)內(nèi)存空間,必須用xxxx_init()函數(shù)申 請(qǐng)并將申請(qǐng)的地址賦給initid->ptr指針,然后在主函數(shù)體中使用,并在xxxx_deinit析構(gòu)函數(shù)體中釋放。另外對(duì)于 mysql_udf接口的調(diào)用好像當(dāng)并發(fā)量超過(guò)一定程度,如果是使用動(dòng)態(tài)分配內(nèi)存,會(huì)出現(xiàn)double free的錯(cuò)誤,為了避免這個(gè)錯(cuò)誤,所以在我的程序里使用靜態(tài)空間與動(dòng)態(tài)申請(qǐng)空間相結(jié)合的方式,這樣如果數(shù)據(jù)較小,并發(fā)量較大,不會(huì)出現(xiàn)double free錯(cuò)誤。對(duì)于靜態(tài)申請(qǐng)空間,最大約在160000~170000byte左右,我這里使用的160000,當(dāng)mysql傳送的數(shù)據(jù)大于這個(gè)數(shù)的時(shí) 候,才動(dòng)態(tài)申請(qǐng)內(nèi)存。初始化函數(shù)體如下:
my_bool http_post_init(UDF_INIT *initid, UDF_ARGS *args, char *message)
{
if (args->arg_count != 2)
{
strcpy(message,"Wrong arguments to http_post; ");
return 1;
}
if(args->arg_count == 2 && args->args[1]!=NULL)
{
int flexibleLength = strlen(args->args[1]);
if(flexibleLength > 160000)
{
int allocLength = 200 + flexibleLength;
if (!(initid->ptr=(char*) malloc(allocLength) ) )
{
strcpy(message,"Couldn't allocate memory in http_post_init");
return 1;
}
return 0;
}
else
{
initid->ptr=NULL;
}
}
return 0;
}
其中http_post_init需要返回my_bool型。這個(gè)函數(shù)目的是給用戶提供一個(gè)方式,檢驗(yàn)由mysql參數(shù)傳進(jìn)來(lái)的數(shù)據(jù)是否正確,如果正確則 返回0,則mysql會(huì)自動(dòng)調(diào)用定義的主函數(shù),如果返回1,則mysql打印message信息退出,不會(huì)調(diào)用主函數(shù)。所以在設(shè)定返回值的時(shí)候一定注意。
主函數(shù)如下:
longlong http_post( UDF_INIT *initid, UDF_ARGS *args,
char *is_null __attribute__((unused)),
char *error __attribute__((unused)))
{
char* sendBuffer=NULL;
CURL *curl;
CURLM *multi_handle;
int still_running;
int times=0;//try times if select false
int TRY_TIMES=25;
struct timeval timeout;//set a suitable timeout to play around with
timeout.tv_sec = 0;
timeout.tv_usec = 100000;
char sendArray[160000] = "\0";//can not move this into the if
if(initid->ptr == NULL)
{
//char sendArray[160000] = "\0";//error
sendBuffer=sendArray;
}
else
{
sendBuffer = initid->ptr;
TRY_TIMES=100;
}
strcpy(sendBuffer,args->args[1]);
curl = curl_easy_init();
multi_handle = curl_multi_init();
if(curl && multi_handle)
{
/* what URL that receives this POST */
curl_easy_setopt(curl, CURLOPT_URL,args->args[0]);
curl_easy_setopt(curl, CURLOPT_HTTPPOST, 1);
curl_easy_setopt(curl,CURLOPT_POSTFIELDS,sendBuffer);
curl_multi_add_handle(multi_handle, curl);
while(CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi_handle,\ &still_running));
while(still_running && times< TRY_TIMES)
{
int rc; //select() return code
int maxfd;
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep); //get file descriptors from the transfers
curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep,\ &maxfd);
rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
switch(rc)
{
case -1://select error
break;
case 0:
default: // timeout
while(CURLM_CALL_MULTI_PERFORM !== curl_multi_perform(multi_handle, &still_running));
break;
}
times++;
}//end while
curl_multi_remove_handle(multi_handle,curl);
curl_multi_cleanup(multi_handle);//always cleanup
curl_easy_cleanup(curl);
if(times>=TRY_TIMES)
{
return 1;
}
return 0;
}//end if
return 1;
}
在主函數(shù)中,主要使用curl庫(kù)進(jìn)行通信,curl庫(kù)分成3部分,easy是同步模式,multi是異步模式,share模式是多線程共享數(shù)據(jù)的模式。
對(duì)于easy發(fā)送完數(shù)據(jù)后,會(huì)阻塞等待服務(wù)器的response,如果沒 有返回,就會(huì)一直阻塞,當(dāng)然可以設(shè)置一個(gè)timeout,但如果這個(gè)時(shí)間設(shè)小了,easy發(fā)送大數(shù)據(jù)的時(shí)候就會(huì)中斷,設(shè)太大了影響時(shí)間效率,另外當(dāng)接收端 不發(fā)送response的時(shí)候,easy庫(kù)即使發(fā)送完了數(shù)據(jù),也會(huì)阻塞等待,有些時(shí)候?qū)τ诎l(fā)送端來(lái)講不需要等待接收端的respons,當(dāng)發(fā)送完畢就可以 結(jié)束了,這個(gè)時(shí)候easy就不適用。所以最后選擇multi庫(kù)。
如程序所示,首先得初始化,并設(shè)置easy句柄為post模式,指定需要post的數(shù)據(jù),如下:
curl = curl_easy_init();
multi_handle = curl_multi_init();
curl_easy_setopt(curl, CURLOPT_URL,args->args[0]);
curl_easy_setopt(curl, CURLOPT_HTTPPOST, 1);
curl_easy_setopt(curl,CURLOPT_POSTFIELDS,sendBuffer);
由于要使用multi模式,必須也要初始化一個(gè)easy模式,并將這個(gè)easy模式的句柄放入所謂的multi函數(shù)執(zhí)行棧:
curl_multi_add_handle(multi_handle, curl);
使用curl_multi_perform(multi_handle, &still_running),來(lái)進(jìn)行異步傳輸,但如果該函數(shù)返回的不是CURLM_CALL_MULTI_PERFORM,則需要重新執(zhí)行。直到循環(huán)while(CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi_handle, &still_running));結(jié)束。此時(shí)如果剛才函數(shù)體中的still_running被置為1,表明連接建立,正在發(fā)送數(shù)據(jù)。需要配合select機(jī)制來(lái)進(jìn)行數(shù)據(jù)發(fā)送。
函數(shù) curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep, &maxfd);會(huì)將最大的描述符寫入maxfd,
然后用select進(jìn)行等待:rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
最后如果select返回值不為-1(error)0(timeout)時(shí)候再次進(jìn)行異步傳輸,即執(zhí)行curl_multi_perform函數(shù),直到
still_running為0,程序結(jié)束退出。
這里設(shè)置了一個(gè)最大執(zhí)行次數(shù)的限制,如果服務(wù)器出現(xiàn)了問(wèn)題,不能發(fā)送response,則still_running不會(huì)變?yōu)?,程序會(huì)死循環(huán),
所以,設(shè)置一個(gè)最大循環(huán)次數(shù)TRY_TIMES,防止這種情況發(fā)生。但是這個(gè)次數(shù)設(shè)小了,數(shù)據(jù)可能沒有發(fā)送完,就退出了,如設(shè)置太大了,程序發(fā)送完了,服務(wù)器沒有response就會(huì)多執(zhí)行多余循環(huán)。所以這個(gè)TRY_TIMES需要根據(jù)數(shù)據(jù)的大小和網(wǎng)絡(luò)狀況來(lái)設(shè)置,比正常
傳輸數(shù)據(jù)的次數(shù)略長(zhǎng)。這里我小數(shù)據(jù)的時(shí)候循環(huán)設(shè)次數(shù)25,大數(shù)據(jù)循環(huán)設(shè)為100.
最后是析構(gòu)函數(shù)體:
void http_post_deinit(UDF_INIT *initid)
{
if (initid!=NULL && initid->ptr!=NULL)
{
free(initid->ptr);
initid->ptr = NULL;
}
}
將初始化函數(shù)設(shè)置的內(nèi)存釋放。
編譯執(zhí)行過(guò)程如下:
將程序保存為http_post.c編譯如下(請(qǐng)根據(jù)機(jī)器上的mysql路徑進(jìn)行調(diào)整):
gcc -wall -I/usr/local/webserver/mysql/include/mysql/ -shared http_post.c -o http_post.so -fPIC
//使用mysql提供的頭文件生成動(dòng)態(tài)鏈接庫(kù)
cp -f http_post.so /usr/local/webserver/mysql/lib/mysql/plugin/http_post.so
//將生成的.so文件放入mysql的plugin文件夾下
//進(jìn)入mysql對(duì)動(dòng)態(tài)鏈接庫(kù)中的函數(shù)進(jìn)行安裝
cd /usr/local/webserver/mysql/bin/mysql
./mysql
//在mysql命令行下輸入如下命令:
mysql> DROP FUNCTION IF EXISTS http_post;
//其目的是如果系統(tǒng)內(nèi)安裝了同名函數(shù)先進(jìn)性drop。
mysql> CREATE FUNCTION http_post RETURNS INTEGER SONAME ‘http_post.so';
//生成http_post函數(shù),并指明調(diào)用來(lái)源是http_post.so。
//最后調(diào)用函數(shù),其目的是向指定ip和端口發(fā)送post數(shù)據(jù)。調(diào)用前先打開指定ip主機(jī)上的網(wǎng)絡(luò)調(diào)試助手,并監(jiān)聽3888端。
mysql> select http_post(‘testpost.com/index.php','sfasfa');
在網(wǎng)絡(luò)助手中可以看到如下結(jié)果:
相關(guān)文章
Mysql單文件存儲(chǔ)刪除數(shù)據(jù)文件容量不會(huì)減少的bug與解決方法
這篇文章主要給大家介紹了Mysql單文件存儲(chǔ)刪除數(shù)據(jù)文件時(shí)容量不會(huì)減少的bug與解決方法,文中給出了詳細(xì)的解決方法,相信對(duì)遇到這個(gè)問(wèn)題的朋友們能帶來(lái)一定的幫助,下面來(lái)一起看看吧。2016-12-12Mysql查詢數(shù)據(jù)庫(kù)或數(shù)據(jù)表中的數(shù)據(jù)量以及數(shù)據(jù)大小
許多數(shù)據(jù)庫(kù)的元數(shù)據(jù)都是存儲(chǔ)在mysql中的,本文主要介紹了Mysql查詢數(shù)據(jù)庫(kù)或數(shù)據(jù)表中的數(shù)據(jù)量以及數(shù)據(jù)大小,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-02-02