Python實(shí)現(xiàn)SQL注入檢測(cè)插件實(shí)例代碼
掃描器需要實(shí)現(xiàn)的功能思維導(dǎo)圖

爬蟲編寫思路
首先需要開發(fā)一個(gè)爬蟲用于收集網(wǎng)站的鏈接,爬蟲需要記錄已經(jīng)爬取的鏈接和待爬取的鏈接,并且去重,用 Python 的set()就可以解決,大概流程是:
- 輸入 URL
- 下載解析出 URL
- URL 去重,判斷是否為本站
- 加入到待爬列表
- 重復(fù)循環(huán)
SQL 判斷思路
- 通過在 URL 后面加上AND %d=%d或者OR NOT (%d>%d)
- %d后面的數(shù)字是隨機(jī)可變的
- 然后搜索網(wǎng)頁中特殊關(guān)鍵詞,比如:
MySQL 中是 SQL syntax.*MySQL
Microsoft SQL Server 是 Warning.*mssql_
Microsoft Access 是 Microsoft Access Driver
Oracle 是 Oracle error
IBM DB2 是 DB2 SQL error
SQLite 是 SQLite.Exception
...
通過這些關(guān)鍵詞就可以判斷出所用的數(shù)據(jù)庫
- 還需要判斷一下 waf 之類的東西,有這種東西就直接停止。簡(jiǎn)單的方法就是用特定的 URL 訪問,如果出現(xiàn)了像IP banned,fierwall之類的關(guān)鍵詞,可以判斷出是waf。具體的正則表達(dá)式是(?i)(\A|\b)IP\b.*\b(banned|blocked|bl(a|o)ck\s?list|firewall)
- 開發(fā)準(zhǔn)備展開目錄
請(qǐng)安裝這些庫
pip install requests pip install beautifulsoup4
實(shí)驗(yàn)環(huán)境是 Linux,創(chuàng)建一個(gè)Code目錄,在其中創(chuàng)建一個(gè)work文件夾,將其作為工作目錄
目錄結(jié)構(gòu)
/w8ay.py // 項(xiàng)目啟動(dòng)主文件
/lib/core // 核心文件存放目錄
/lib/core/config.py // 配置文件
/script // 插件存放
/exp // exp和poc存放
步驟
SQL 檢測(cè)腳本編寫
DBMS_ERRORS = {
'MySQL': (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."),
"PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."),
"Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"(?s)Exception.*\WSystem\.Data\.SqlClient\.", r"(?s)Exception.*\WRoadhouse\.Cms\."),
"Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"),
"Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"),
"IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("),
"SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*", r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"),
"Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"),
}
通過正則表達(dá)式就可以判斷出是哪個(gè)數(shù)據(jù)庫了
for (dbms, regex) in ((dbms, regex) for dbms in DBMS_ERRORS for regex in DBMS_ERRORS[dbms]):
if (re.search(regex,_content)):
return True
下面是我們測(cè)試語句的payload
BOOLEAN_TESTS = (" AND %d=%d", " OR NOT (%d=%d)")
用報(bào)錯(cuò)語句返回正確的內(nèi)容和錯(cuò)誤的內(nèi)容進(jìn)行對(duì)比
for test_payload in BOOLEAN_TESTS:
# Right Page
RANDINT = random.randint(1, 255)
_url = url + test_payload % (RANDINT, RANDINT)
content["true"] = Downloader.get(_url)
_url = url + test_payload % (RANDINT, RANDINT + 1)
content["false"] = Downloader.get(_url)
if content["origin"] == content["true"] != content["false"]:
return "sql found: %" % url
這句
content["origin"] == content["true"] != content["false"]
意思就是當(dāng)原始網(wǎng)頁等于正確的網(wǎng)頁不等于錯(cuò)誤的網(wǎng)頁內(nèi)容時(shí),就可以判定這個(gè)地址存在注入漏洞
完整代碼:
import re, random
from lib.core import Download
def sqlcheck(url):
if (not url.find("?")): # Pseudo-static page
return false;
Downloader = Download.Downloader()
BOOLEAN_TESTS = (" AND %d=%d", " OR NOT (%d=%d)")
DBMS_ERRORS = {
# regular expressions used for DBMS recognition based on error message response
"MySQL": (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."),
"PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."),
"Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"(?s)Exception.*\WSystem\.Data\.SqlClient\.", r"(?s)Exception.*\WRoadhouse\.Cms\."),
"Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"),
"Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"),
"IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("),
"SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*", r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"),
"Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"),
}
_url = url + "%29%28%22%27"
_content = Downloader.get(_url)
for (dbms, regex) in ((dbms, regex) for dbms in DBMS_ERRORS for regex in DBMS_ERRORS[dbms]):
if (re.search(regex,_content)):
return True
content = {}
content['origin'] = Downloader.get(_url)
for test_payload in BOOLEAN_TESTS:
# Right Page
RANDINT = random.randint(1, 255)
_url = url + test_payload % (RANDINT, RANDINT)
content["true"] = Downloader.get(_url)
_url = url + test_payload % (RANDINT, RANDINT + 1)
content["false"] = Downloader.get(_url)
if content["origin"] == content["true"] != content["false"]:
return "sql found: %" % url
將這個(gè)文件命名為sqlcheck.py,放在/script目錄中。代碼的第 4 行作用是查找 URL 是否包含?,如果不包含,比方說偽靜態(tài)頁面,可能不太好注入,因此需要過濾掉
爬蟲的編寫
爬蟲的思路上面講過了,先完成 URL 的管理,我們單獨(dú)將它作為一個(gè)類,文件保存在/lib/core/UrlManager.py
#-*- coding:utf-8 -*-
class UrlManager(object):
def __init__(self):
self.new_urls = set()
self.old_urls = set()
def add_new_url(self, url):
if url is None:
return
if url not in self.new_urls and url not in self.old_urls:
self.new_urls.add(url)
def add_new_urls(self, urls):
if urls is None or len(urls) == 0:
return
for url in urls:
self.add_new_url(url)
def has_new_url(self):
return len(self.new_urls) != 0
def get_new_url(self):
new_url = self.new_urls.pop()
self.old_urls.add(new_url)
return new_url
為了方便,我們也將下載功能單獨(dú)作為一個(gè)類使用,文件保存在lib/core/Downloader.py
#-*- coding:utf-8 -*-
import requests
class Downloader(object):
def get(self, url):
r = requests.get(url, timeout = 10)
if r.status_code != 200:
return None
_str = r.text
return _str
def post(self, url, data):
r = requests.post(url, data)
_str = r.text
return _str
def download(self, url, htmls):
if url is None:
return None
_str = {}
_str["url"] = url
try:
r = requests.get(url, timeout = 10)
if r.status_code != 200:
return None
_str["html"] = r.text
except Exception as e:
return None
htmls.append(_str)
特別說明,因?yàn)槲覀円獙懙呐老x是多線程的,所以類中有個(gè)download方法是專門為多線程下載專用的
在lib/core/Spider.py中編寫爬蟲
#-*- coding:utf-8 -*-
from lib.core import Downloader, UrlManager
import threading
from urllib import parse
from urllib.parse import urljoin
from bs4 import BeautifulSoup
class SpiderMain(object):
def __init__(self, root, threadNum):
self.urls = UrlManager.UrlManager()
self.download = Downloader.Downloader()
self.root = root
self.threadNum = threadNum
def _judge(self, domain, url):
if (url.find(domain) != -1):
return True
return False
def _parse(self, page_url, content):
if content is None:
return
soup = BeautifulSoup(content, 'html.parser')
_news = self._get_new_urls(page_url, soup)
return _news
def _get_new_urls(self, page_url, soup):
new_urls = set()
links = soup.find_all('a')
for link in links:
new_url = link.get('href')
new_full_url = urljoin(page_url, new_url)
if (self._judge(self.root, new_full_url)):
new_urls.add(new_full_url)
return new_urls
def craw(self):
self.urls.add_new_url(self.root)
while self.urls.has_new_url():
_content = []
th = []
for i in list(range(self.threadNum)):
if self.urls.has_new_url() is False:
break
new_url = self.urls.get_new_url()
## sql check
try:
if (sqlcheck.sqlcheck(new_url)):
print("url:%s sqlcheck is valueable" % new_url)
except:
pass
print("craw:" + new_url)
t = threading.Thread(target = self.download.download, args = (new_url, _content))
t.start()
th.append(t)
for t in th:
t.join()
for _str in _content:
if _str is None:
continue
new_urls = self._parse(new_url, _str["html"])
self.urls.add_new_urls(new_urls)
爬蟲通過調(diào)用craw()方法傳入一個(gè)網(wǎng)址進(jìn)行爬行,然后采用多線程的方法下載待爬行的網(wǎng)站,下載之后的源碼用_parse方法調(diào)用BeautifulSoup進(jìn)行解析,之后將解析出的 URL 列表丟入 URL 管理器,這樣循環(huán),最后只要爬完了網(wǎng)頁,爬蟲就會(huì)停止
threading庫可以自定義需要開啟的線程數(shù),線程開啟后,每個(gè)線程會(huì)得到一個(gè) url 進(jìn)行下載,然后線程會(huì)阻塞,阻塞完畢后線程放行
爬蟲和 SQL 檢查的結(jié)合
在lib/core/Spider.py文件引用一下from script import sqlcheck,在craw()方法中,取出新的 URL 地方調(diào)用一下
##sql check
try:
if(sqlcheck.sqlcheck(new_url)):
print("url:%s sqlcheck is valueable"%new_url)
except:
pass
用try檢測(cè)可能出現(xiàn)的異常,繞過它,在文件w8ay.py中進(jìn)行測(cè)試
#-*- coding:utf-8 -*- ''' Name: w8ayScan Author: mathor Copyright (c) 2019 ''' import sys from lib.core.Spider import SpiderMain def main(): root = "https://wmathor.com" threadNum = 50 w8 = SpiderMain(root, threadNum) w8.craw() if __name__ == "__main__": main()
很重要的一點(diǎn)!為了使得lib和script文件夾中的.py文件可以可以被認(rèn)作是模塊,請(qǐng)?jiān)趌ib、lib/core和script文件夾中創(chuàng)建__init__.py文件,文件中什么都不需要寫
總結(jié)
SQL 注入檢測(cè)通過一些payload使頁面出錯(cuò),判斷原始網(wǎng)頁,正確網(wǎng)頁,錯(cuò)誤網(wǎng)頁即可檢測(cè)出是否存在 SQL 注入漏洞
通過匹配出 sql 報(bào)錯(cuò)出來的信息,可以正則判斷所用的數(shù)據(jù)庫
好了,以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
Selenium顯式等待配置錯(cuò)誤的報(bào)錯(cuò)與修復(fù)實(shí)戰(zhàn)指南
在自動(dòng)化測(cè)試中,等待機(jī)制是處理頁面元素加載延遲的重要手段,顯式等待允許我們?cè)诶^續(xù)執(zhí)行代碼之前等待某個(gè)條件發(fā)生,這比固定的強(qiáng)制等待更靈活高效,我們經(jīng)常會(huì)遇到Selenium顯式等待配置錯(cuò)誤,所以本文給大家介紹了修復(fù)指南,需要的朋友可以參考下2025-07-07
pytorch構(gòu)建網(wǎng)絡(luò)模型的4種方法
這篇文章主要為大家詳細(xì)介紹了pytorch構(gòu)建網(wǎng)絡(luò)模型的4種方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-04-04
python?request要求接口參數(shù)必須是json數(shù)據(jù)的處理方式
這篇文章主要介紹了python?request要求接口參數(shù)必須是json數(shù)據(jù)的處理方式,Reqeusts支持以form表單形式發(fā)送post請(qǐng)求,只需要將請(qǐng)求的參數(shù)構(gòu)造成一個(gè)字典,然后傳給requests.post()的data參數(shù)即可,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2022-08-08
Python調(diào)用ChatGPT的API實(shí)現(xiàn)文章生成
最近ChatGPT大火,在3.5版本后開放了接口API,所以很多人開始進(jìn)行實(shí)操,這里我就用python來為大家實(shí)現(xiàn)一下,如何調(diào)用API并提問返回文章的說明2023-03-03

