阅读sqlmap源代码,编写burpsuite插件--sqlmapapi
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了阅读sqlmap源代码,编写burpsuite插件--sqlmapapi相关的知识,希望对你有一定的参考价值。
burpsuite插件编写---sql injection
0x00 概要
在安全测试过程中,大部分人会使用burpsuite的scanner模块进行测试,可以发现一些浅显的漏洞:比如xss、sql injection、cf、xxe、Arbitrary file existence disclosure in Act、明文传输等。
说到sql injection,测试人员都会有一种想法是否存在一款自动化工具,可以将某一网站的所有链接都去尝试一边,尽可能的发现所有的sql injection。有了这种想法后大家会去想解决方案,有一种解决方案是编写burpsuite插件。
本文接上一篇文章,上一文记录到继承IHttpListener接口编写插件,影响测试效率,本文将介绍另一种方式,将sqlmapapi 和burpsuite进行对比
0x01 继承IScannerCheck接口(方法2)
先上插件代码:
> from burp import IBurpExtender
> from burp import IScannerCheck
> from java.io import PrintWriter
> import re
> import urllib
> import urllib2
> import time
> import json
> from threading import Thread
> import requests
>
>
>
>
> class BurpExtender(IBurpExtender, IScannerCheck):
>
> #
> #implement IBurpExtender
> #
> def registerExtenderCallbacks(self, callbacks):
> # keep a reference to our callbacks object
> self._callbacks = callbacks
>
> # set our extension name
> callbacks.setExtensionName("fanyingjie")
>
> # obtain our output stream
> self._stdout = PrintWriter(callbacks.getStdout(), True)
>
> self._helpers = callbacks.getHelpers()
>
> # register ourselves as an
> callbacks.registerScannerCheck(self)
>
>
> def doActiveScan(self, baseRequestResponse, insertionPoint):
> pass
> def doPassiveScan(self, baseRequestResponse):
> a=self._helpers.analyzeRequest(baseRequestResponse)
> method=a.getMethod()
> url=str(a.getUrl())
> if(("?" in url) and (method=="GET")):
> self._stdout.println("start")
> t=AutoSqli(target=url,stdout=self._stdout,method=method)
> t.run()
>
> def consolidateDuplicateIssues(self, existingIssue, newIssue):
> pass
>
>
>
> class AutoSqli(Thread):
> def __init__(self,target,stdout,method):
> self.server="http://192.168.159.134:8775"
> self.taskid = ‘‘
> self.target=target
> self.method=method
> self._stdout=stdout
> self.start_time = time.time()
>
> def task_new(self):
> self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
> self._stdout.println(‘Created new task: ‘ + self.taskid )
> if len(self.taskid) > 0:
> return True
> return False
>
> def task_delete(self):
> if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
> self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
> return True
> return False
>
>
> def scan_start(self):
> headers = {‘Content-Type‘: ‘application/json‘}
> payload = {‘url‘:self.target}
> url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
> #t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)
>
> req=urllib2.Request(url,data=json.dumps(payload),headers=headers)
> t=json.loads(urllib2.urlopen(req).read())
> self._stdout.println("start "+ self.taskid)
>
> if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
> return True
> return False
>
> def scan_status(self):
> status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
> if status == ‘running‘:
> return ‘running‘
> if status == ‘terminated‘:
> return ‘terminated‘
> return "error"
>
> def scan_data(self):
>
> data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
> if len(data) == 0:
> self._stdout.println(‘not injection: ‘ + self.target)
> return False
> else:
> self._stdout.println(‘injection: ‘ + self.target)
> return True
>
> def scan_kill(self):
> json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
> self._stdout.println("%s kill")%(self.taskid)
>
>
> def scan_stop(self):
> json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
> self._stdout.println("%s stop")%(self.taskid)
>
> def run(self):
> try:
> if not self.task_new():
> return False
> if not self.scan_start():
> return False
> while True:
> if self.scan_status() == ‘running‘:
> time.sleep(10)
> elif self.scan_status() == ‘terminated‘:
> break
> else:
> break
> #print self.target + ": " + str(time.time() - self.start_time)
> if time.time() - self.start_time > 500:
> self.scan_stop()
> self.scan_kill()
> break
> self.scan_data()
> #self.task_delete()
>
> except Exception as e:
> pass
>
使用burp scanner模块和上面写的插件对http://172.16.173.136/sqli-labs/Less-8/?id=234 进行sql注入测试,burp scanner模块可以测试出存在注入,但是插件无法测试出;
因此插件进行改写,修改默认配置,在插件中添加以下代码,我只修改了level和risk
def optionSet(self):
headers = {‘Content-Type‘: ‘application/json‘}
payload={"level":"5","risk":"3"}
url = self.server + ‘/option/‘ + self.taskid + ‘/set‘
req = urllib2.Request(url, data=json.dumps(payload), headers=headers)
t = json.loads(urllib2.urlopen(req).read())
self._stdout.println("set option " + self.taskid)
然后在新建扫描后,调用配置设置函数,修改后的代码:
from burp import IBurpExtender
from burp import IScannerCheck
from java.io import PrintWriter
import re
import urllib
import urllib2
import time
import json
from threading import Thread
import requests
class BurpExtender(IBurpExtender, IScannerCheck):
#
# implement IBurpExtender
#
def registerExtenderCallbacks(self, callbacks):
# keep a reference to our callbacks object
self._callbacks = callbacks
# set our extension name
callbacks.setExtensionName("fanyingjie")
# obtain our output stream
self._stdout = PrintWriter(callbacks.getStdout(), True)
self._helpers = callbacks.getHelpers()
# register ourselves as an
callbacks.registerScannerCheck(self)
def doActiveScan(self, baseRequestResponse, insertionPoint):
pass
def doPassiveScan(self, baseRequestResponse):
a = self._helpers.analyzeRequest(baseRequestResponse)
method = a.getMethod()
url = str(a.getUrl())
if (("?" in url) and (method == "GET")):
self._stdout.println("start")
t = AutoSqli(target=url, stdout=self._stdout, method=method)
t.run()
def consolidateDuplicateIssues(self, existingIssue, newIssue):
pass
class AutoSqli(Thread):
def __init__(self, target, stdout, method):
self.server = "http://172.16.173.136:8775"
self.taskid = ‘‘
self.target = target
self.method = method
self._stdout = stdout
self.start_time = time.time()
def task_new(self):
self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
self._stdout.println(‘Created new task: ‘ + self.taskid)
if len(self.taskid) > 0:
return True
return False
def task_delete(self):
if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
return True
return False
def optionSet(self):
headers = {‘Content-Type‘: ‘application/json‘}
payload={"level":"5","risk":"3"}
url = self.server + ‘/option/‘ + self.taskid + ‘/set‘
req = urllib2.Request(url, data=json.dumps(payload), headers=headers)
t = json.loads(urllib2.urlopen(req).read())
self._stdout.println("set option " + self.taskid)
def scan_start(self):
headers = {‘Content-Type‘: ‘application/json‘}
payload = {‘url‘: self.target}
url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
# t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)
req = urllib2.Request(url, data=json.dumps(payload), headers=headers)
t = json.loads(urllib2.urlopen(req).read())
self._stdout.println("start " + self.taskid)
if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
return True
return False
def scan_status(self):
status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
if status == ‘running‘:
return ‘running‘
if status == ‘terminated‘:
return ‘terminated‘
return "error"
def scan_data(self):
data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
if len(data) == 0:
self._stdout.println(‘not injection: ‘ + self.target)
return False
else:
self._stdout.println(‘injection: ‘ + self.target)
return True
def scan_kill(self):
json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
self._stdout.println("%s kill") % (self.taskid)
def scan_stop(self):
json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
self._stdout.println("%s stop") % (self.taskid)
def run(self):
try:
if not self.task_new():
return False
self.optionSet()
if not self.scan_start():
return False
while True:
if self.scan_status() == ‘running‘:
time.sleep(10)
elif self.scan_status() == ‘terminated‘:
break
else:
break
# print self.target + ": " + str(time.time() - self.start_time)
if time.time() - self.start_time > 500:
self.scan_stop()
self.scan_kill()
break
self.scan_data()
# self.task_delete()
except Exception as e:
pass
再次对上面代码进行测试,可以测试出存在注入
插件的输出:
start
Created new task: 42f685742c94a985
set option 42f685742c94a985
start 42f685742c94a985
injection: http://172.16.173.136:80/sqli-labs/Less-8/?id=234
比较sqlmapapi和burpsuite scanner模块:
通过修改sqlmapapi默认设置可以提高注入检测的准确率,但是修改默认配置后sqlmapapi不见的比burpsuite scanner模块更厉害,而且sqlmapapi只能检测get请求类型,且不能检测登录后的get请求,因此看来在安全测试过程中,如果目的只是发现网站是否存在sql注入,还是使用burpsuite scanner 模块比较好用;在知道存在sql注入后,可以使用sqlmap来进行后续的操作。
以上是关于阅读sqlmap源代码,编写burpsuite插件--sqlmapapi的主要内容,如果未能解决你的问题,请参考以下文章