python實(shí)現(xiàn)封裝得到virustotal掃描結(jié)果
更新時(shí)間:2014年10月05日 15:25:34 投稿:shichen2014
這篇文章主要介紹了python實(shí)現(xiàn)封裝得到virustotal掃描結(jié)果的方法,是比較實(shí)用的技巧,可將掃描結(jié)果寫(xiě)入數(shù)據(jù)庫(kù),需要的朋友可以參考下
本文實(shí)例講述了python實(shí)現(xiàn)封裝得到virustotal掃描結(jié)果的方法。分享給大家供大家參考。具體方法如下:
import simplejson
import urllib
import urllib2
import os, sys
import logging
try:
import sqlite3
except ImportError:
sys.stderr.write("ERROR: Unable to locate Python SQLite3 module. " \
"Please verify your installation. Exiting...\n")
sys.exit(-1)
MD5 = "5248f774d2ee0a10936d0b1dc89107f1"
MD5 = "12fa5fb74201d9b6a14f63fbf9a81ff6" #do not have report on virustotal.com
APIKEY = "xxxxxxxxxxxxxxxxxx"用自己的
class VirusTotalDatabase:
"""
Database abstraction layer.
"""
def __init__(self, db_file):
log = logging.getLogger("Database.Init")
self.__dbfile = db_file
self._conn = None
self._cursor = None
# Check if SQLite database already exists. If it doesn't exist I invoke
# the generation procedure.
if not os.path.exists(self.__dbfile):
if self._generate():
print("Generated database \"%s\" which didn't" \
" exist before." % self.__dbfile)
else:
print("Unable to generate database")
# Once the database is generated of it already has been, I can
# initialize the connection.
try:
self._conn = sqlite3.connect(self.__dbfile)
self._cursor = self._conn.cursor()
except Exception, why:
print("Unable to connect to database \"%s\": %s."
% (self.__dbfile, why))
log.debug("Connected to SQLite database \"%s\"." % self.__dbfile)
def _generate(self):
"""
Creates database structure in a SQLite file.
"""
if os.path.exists(self.__dbfile):
return False
db_dir = os.path.dirname(self.__dbfile)
if not os.path.exists(db_dir):
try:
os.makedirs(db_dir)
except (IOError, os.error), why:
print("Something went wrong while creating database " \
"directory \"%s\": %s" % (db_dir, why))
return False
conn = sqlite3.connect(self.__dbfile)
cursor = conn.cursor()
cursor.execute("CREATE TABLE virustotal (\n" \
" id INTEGER PRIMARY KEY,\n" \
" md5 TEXT NOT NULL,\n" \
" Kaspersky TEXT DEFAULT NULL,\n" \
" McAfee TEXT DEFAULT NULL,\n" \
" Symantec TEXT DEFAULT NULL,\n" \
" Norman TEXT DEFAULT NULL,\n" \
" Avast TEXT DEFAULT NULL,\n" \
" NOD32 TEXT DEFAULT NULL,\n" \
" BitDefender TEXT DEFAULT NULL,\n" \
" Microsoft TEXT DEFAULT NULL,\n" \
" Rising TEXT DEFAULT NULL,\n" \
" Panda TEXT DEFAULT NULL\n" \
");")
print "create db:%s sucess" % self.__dbfile
return True
def _get_task_dict(self, row):
try:
task = {}
task["id"] = row[0]
task["md5"] = row[1]
task["Kaspersky"] = row[2]
task["McAfee"] = row[3]
task["Symantec"] = row[4]
task["Norman"] = row[5]
task["Avast"] = row[6]
task["NOD32"] = row[7]
task["BitDefender"] = row[8]
task["Microsoft"] = row[9]
task["Rising"] = row[10]
task["Panda"] = row[11]
return task
except Exception, why:
return None
def add_sample(self, md5, virus_dict):
"""
"""
task_id = None
if not self._cursor:
return None
if not md5 or md5 == "":
return None
Kaspersky = virus_dict.get("Kaspersky", None)
McAfee = virus_dict.get("McAfee", None)
Symantec = virus_dict.get("Symantec", None)
Norman = virus_dict.get("Norman", None)
Avast = virus_dict.get("Avast", None)
NOD32 = virus_dict.get("NOD32", None)
BitDefender = virus_dict.get("BitDefender", None)
Microsoft = virus_dict.get("Microsoft", None)
Rising = virus_dict.get("Rising", None)
Panda = virus_dict.get("Panda", None)
self._conn.text_factory = str
try:
self._cursor.execute("SELECT id FROM virustotal WHERE md5 = ?;",
(md5,))
sample_row = self._cursor.fetchone()
except sqlite3.OperationalError, why:
print "sqlite3 error:%s\n" % str(why)
return False
if sample_row:
try:
sample_row = sample_row[0]
self._cursor.execute("UPDATE virustotal SET Kaspersky=?, McAfee=?, Symantec=?, Norman=?, Avast=?, \
NOD32=?, BitDefender=?, Microsoft=?, Rising=?, Panda=? WHERE id = ?;",
(Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, Microsoft,\
Rising, Panda, sample_row))
self._conn.commit()
task_id = sample_row
except sqlite3.OperationalError, why:
print("Unable to update database: %s." % why)
return False
else: #the sample not in the database
try:
self._cursor.execute("INSERT INTO virustotal " \
"(md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender,\
Microsoft, Rising, Panda) " \
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
(md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender,\
Microsoft, Rising, Panda))
self._conn.commit()
task_id = self._cursor.lastrowid
except sqlite3.OperationalError, why:
print "why",str(why)
return None
print "add_to_db:%s, task_id:%s" % (str(self.__dbfile), str(task_id))
return task_id
def get_sample(self):
"""
Gets a task from pending queue.
"""
log = logging.getLogger("Database.GetTask")
if not self._cursor:
log.error("Unable to acquire cursor.")
return None
# Select one item from the queue table with higher priority and older
# addition date which has not already been processed.
try:
self._cursor.execute("SELECT * FROM virustotal " \
#"WHERE lock = 0 " \
#"AND status = 0 " \
"ORDER BY id, added_on LIMIT 1;")
except sqlite3.OperationalError, why:
log.error("Unable to query database: %s." % why)
return None
sample_row = self._cursor.fetchone()
if sample_row:
return self._get_task_dict(sample_row)
else:
return None
def search_md5(self, md5):
"""
"""
if not self._cursor:
return None
if not md5 or len(md5) != 32:
return None
try:
self._cursor.execute("SELECT * FROM virustotal " \
"WHERE md5 = ? " \
#"AND status = 1 " \
"ORDER BY id DESC;",
(md5,))
except sqlite3.OperationalError, why:
return None
task_dict = {}
for row in self._cursor.fetchall():
task_dict = self._get_task_dict(row)
#if task_dict:
#tasks.append(task_dict)
return task_dict
class VirusTotal:
""""""
def __init__(self, md5):
"""Constructor"""
self._virus_dict = {}
self._md5 = md5
self._db_file = r"./db/virustotal.db"
self.get_report_dict()
def repr(self):
return str(self._virus_dict)
def submit_md5(self, file_path):
import postfile
#submit the file
FILE_NAME = os.path.basename(file_path)
host = "www.virustotal.com"
selector = "https://www.virustotal.com/vtapi/v2/file/scan"
fields = [("apikey", APIKEY)]
file_to_send = open(file_path, "rb").read()
files = [("file", FILE_NAME, file_to_send)]
json = postfile.post_multipart(host, selector, fields, files)
print json
pass
def get_report_dict(self):
result_dict = {}
url = "https://www.virustotal.com/vtapi/v2/file/report"
parameters = {"resource": self._md5,
"apikey": APIKEY}
data = urllib.urlencode(parameters)
req = urllib2.Request(url, data)
response = urllib2.urlopen(req)
json = response.read()
response_dict = simplejson.loads(json)
if response_dict["response_code"]: #has result
scans_dict = response_dict.get("scans", {})
for anti_virus_comany, virus_name in scans_dict.iteritems():
if virus_name["detected"]:
result_dict.setdefault(anti_virus_comany, virus_name["result"])
return result_dict
def write_to_db(self):
""""""
db = VirusTotalDatabase(self._db_file)
virus_dict = self.get_report_dict()
db.add_sample(self._md5, virus_dict)
使用方法如下:
config = {'input':"inputMd5s"}
fp = open(config['input'], "r")
content = fp.readlines()
MD5S = []
for md5 in ifilter(lambda x:len(x)>0, imap(string.strip, content)):
MD5S.append(md5)
print "MD5S",MD5S
fp.close()
from getVirusTotalInfo import VirusTotal
#得到掃描結(jié)果并寫(xiě)入數(shù)庫(kù)
for md5 in MD5S:
virus_total = VirusTotal(md5)
virus_total.write_to_db()
希望本文所述對(duì)大家的Python程序設(shè)計(jì)有所幫助。
您可能感興趣的文章:
- 如何使用VSCode配置Rust開(kāi)發(fā)環(huán)境(Rust新手教程)
- 教你使用RustDesk?搭建一個(gè)自己的遠(yuǎn)程桌面中繼服務(wù)器
- 完美解決node.js中使用https請(qǐng)求報(bào)CERT_UNTRUSTED的問(wèn)題
- IntelliJ安裝并使用Rust IDE插件
- python實(shí)現(xiàn)上傳樣本到virustotal并查詢(xún)掃描信息的方法
- Rust 連接 SQLite 數(shù)據(jù)庫(kù)的過(guò)程解析
- rustysun同學(xué)ASP代碼書(shū)寫(xiě)規(guī)范
- Rust 中的文件操作示例詳解
- 用rust?寫(xiě)一個(gè)jar包?class沖突檢測(cè)工具
相關(guān)文章
Django使用httpresponse返回用戶(hù)頭像實(shí)例代碼
這篇文章主要介紹了Django使用httpresponse返回用戶(hù)頭像實(shí)例代碼2018-01-01
對(duì)Python 文件夾遍歷和文件查找的實(shí)例講解
下面小編就為大家分享一篇對(duì)Python 文件夾遍歷和文件查找的實(shí)例講解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-04-04
python3操作mysql數(shù)據(jù)庫(kù)的方法
這篇文章主要介紹了python3操作mysql數(shù)據(jù)庫(kù)的方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-06-06
python實(shí)現(xiàn)學(xué)生信息管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)學(xué)生信息管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-03-03
Python開(kāi)發(fā)之利用re模塊去除代碼塊注釋
Python的re模塊主要是正則表達(dá)式的操作函數(shù),下面這篇文章主要給大家介紹了關(guān)于Python開(kāi)發(fā)之利用re模塊去除代碼塊注釋的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-11-11

