阅读sqlmap源代码,编写burpsuite插件--sqlmapapi

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了阅读sqlmap源代码,编写burpsuite插件--sqlmapapi相关的知识,希望对你有一定的参考价值。

burpsuite插件编写---sql injection

0x00 概要

在安全测试过程中,大部分人会使用burpsuite的scanner模块进行测试,可以发现一些浅显的漏洞:比如xss、sql injection、cf、xxe、Arbitrary file existence disclosure in Act、明文传输等。
说到sql injection,测试人员都会有一种想法是否存在一款自动化工具,可以将某一网站的所有链接都去尝试一边,尽可能的发现所有的sql injection。有了这种想法后大家会去想解决方案,有一种解决方案是编写burpsuite插件。
本文接上一篇文章,上一文记录到继承IHttpListener接口编写插件,影响测试效率,本文将介绍另一种方式,将sqlmapapi 和burpsuite进行对比

0x01 继承IScannerCheck接口(方法2)

先上插件代码:

> from burp import IBurpExtender
> from burp import IScannerCheck
> from java.io import PrintWriter
> import re
> import urllib
> import urllib2
> import time
> import json
> from threading import Thread
> import requests
> 
> 
> 
> 
> class BurpExtender(IBurpExtender, IScannerCheck):
> 
>     #
>     #implement IBurpExtender
>     #
>     def    registerExtenderCallbacks(self, callbacks):
>         # keep a reference to our callbacks object
>         self._callbacks = callbacks
> 
>         # set our extension name
>         callbacks.setExtensionName("fanyingjie")
> 
>         # obtain our output stream
>         self._stdout = PrintWriter(callbacks.getStdout(), True)
> 
>         self._helpers  = callbacks.getHelpers()
> 
>         # register ourselves as an
>         callbacks.registerScannerCheck(self)
> 
> 
>     def doActiveScan(self, baseRequestResponse, insertionPoint):
>         pass
>     def doPassiveScan(self, baseRequestResponse):
>         a=self._helpers.analyzeRequest(baseRequestResponse)
>         method=a.getMethod()
>         url=str(a.getUrl())
>         if(("?" in url) and (method=="GET")):
>             self._stdout.println("start")
>             t=AutoSqli(target=url,stdout=self._stdout,method=method)
>             t.run()
> 
>     def consolidateDuplicateIssues(self, existingIssue, newIssue):
>         pass
> 
>             
> 
> class AutoSqli(Thread):
>     def __init__(self,target,stdout,method):
>         self.server="http://192.168.159.134:8775"
>         self.taskid = ‘‘
>         self.target=target
>         self.method=method
>         self._stdout=stdout
>         self.start_time = time.time()
> 
>     def task_new(self):
>         self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
>         self._stdout.println(‘Created new task: ‘ + self.taskid )
>         if len(self.taskid) > 0:
>             return True
>         return False
> 
>     def task_delete(self):
>         if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
>             self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
>             return True
>         return False
> 
> 
>     def scan_start(self):
>         headers = {‘Content-Type‘: ‘application/json‘}
>         payload = {‘url‘:self.target}
>         url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
>         #t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)
>         
>         req=urllib2.Request(url,data=json.dumps(payload),headers=headers)
>         t=json.loads(urllib2.urlopen(req).read())
>         self._stdout.println("start "+ self.taskid)
> 
>         if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
>             return True
>         return False
> 
>     def scan_status(self):
>         status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
>         if status == ‘running‘:
>             return ‘running‘
>         if status == ‘terminated‘:
>             return ‘terminated‘
>         return "error"
> 
>     def scan_data(self):
>         
>         data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
>         if len(data) == 0:
>             self._stdout.println(‘not injection:	‘ + self.target)
>             return False
>         else:
>             self._stdout.println(‘injection:	‘ + self.target)
>             return True
> 
>     def scan_kill(self):
>         json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
>         self._stdout.println("%s kill")%(self.taskid)
> 
> 
>     def scan_stop(self):
>         json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
>         self._stdout.println("%s stop")%(self.taskid)
> 
>     def run(self):
>         try:
>             if not self.task_new():
>                 return False
>             if not self.scan_start():
>                 return False
>             while True:
>                 if self.scan_status() == ‘running‘:
>                     time.sleep(10)
>                 elif self.scan_status() == ‘terminated‘:
>                     break
>                 else:
>                     break
>                 #print self.target + ":	" + str(time.time() - self.start_time)
>                 if time.time() - self.start_time > 500:
>                     self.scan_stop()
>                     self.scan_kill()
>                     break
>             self.scan_data()
>             #self.task_delete()
> 
>         except Exception as e:
>             pass
> 

使用burp scanner模块和上面写的插件对http://172.16.173.136/sqli-labs/Less-8/?id=234 进行sql注入测试,burp scanner模块可以测试出存在注入,但是插件无法测试出;
因此插件进行改写,修改默认配置,在插件中添加以下代码,我只修改了level和risk

    def optionSet(self):
        headers = {‘Content-Type‘: ‘application/json‘}
        payload={"level":"5","risk":"3"}
        url = self.server + ‘/option/‘ + self.taskid + ‘/set‘
        req = urllib2.Request(url, data=json.dumps(payload), headers=headers)
        t = json.loads(urllib2.urlopen(req).read())
        self._stdout.println("set option " + self.taskid)

然后在新建扫描后,调用配置设置函数,修改后的代码:

from burp import IBurpExtender
from burp import IScannerCheck
from java.io import PrintWriter
import re
import urllib
import urllib2
import time
import json
from threading import Thread
import requests

class BurpExtender(IBurpExtender, IScannerCheck):

    #
    # implement IBurpExtender
    #
    def registerExtenderCallbacks(self, callbacks):
        # keep a reference to our callbacks object
        self._callbacks = callbacks

        # set our extension name
        callbacks.setExtensionName("fanyingjie")

        # obtain our output stream
        self._stdout = PrintWriter(callbacks.getStdout(), True)

        self._helpers = callbacks.getHelpers()

        # register ourselves as an
        callbacks.registerScannerCheck(self)

    def doActiveScan(self, baseRequestResponse, insertionPoint):
        pass

    def doPassiveScan(self, baseRequestResponse):
        a = self._helpers.analyzeRequest(baseRequestResponse)
        method = a.getMethod()
        url = str(a.getUrl())
        if (("?" in url) and (method == "GET")):
            self._stdout.println("start")
            t = AutoSqli(target=url, stdout=self._stdout, method=method)
            t.run()

    def consolidateDuplicateIssues(self, existingIssue, newIssue):
        pass

class AutoSqli(Thread):
    def __init__(self, target, stdout, method):
        self.server = "http://172.16.173.136:8775"
        self.taskid = ‘‘
        self.target = target
        self.method = method
        self._stdout = stdout
        self.start_time = time.time()

    def task_new(self):
        self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
        self._stdout.println(‘Created new task: ‘ + self.taskid)
        if len(self.taskid) > 0:
            return True
        return False

    def task_delete(self):
        if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
            self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
            return True
        return False
    def optionSet(self):
        headers = {‘Content-Type‘: ‘application/json‘}
        payload={"level":"5","risk":"3"}
        url = self.server + ‘/option/‘ + self.taskid + ‘/set‘
        req = urllib2.Request(url, data=json.dumps(payload), headers=headers)
        t = json.loads(urllib2.urlopen(req).read())
        self._stdout.println("set option " + self.taskid)

    def scan_start(self):
        headers = {‘Content-Type‘: ‘application/json‘}
        payload = {‘url‘: self.target}
        url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
        # t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)

        req = urllib2.Request(url, data=json.dumps(payload), headers=headers)
        t = json.loads(urllib2.urlopen(req).read())
        self._stdout.println("start " + self.taskid)

        if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
            return True
        return False

    def scan_status(self):
        status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
        if status == ‘running‘:
            return ‘running‘
        if status == ‘terminated‘:
            return ‘terminated‘
        return "error"

    def scan_data(self):

        data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
        if len(data) == 0:
            self._stdout.println(‘not injection:	‘ + self.target)
            return False
        else:
            self._stdout.println(‘injection:	‘ + self.target)
            return True

    def scan_kill(self):
        json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
        self._stdout.println("%s kill") % (self.taskid)

    def scan_stop(self):
        json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
        self._stdout.println("%s stop") % (self.taskid)

    def run(self):
        try:
            if not self.task_new():
                return False
            self.optionSet()
            if not self.scan_start():
                return False
            while True:
                if self.scan_status() == ‘running‘:
                    time.sleep(10)
                elif self.scan_status() == ‘terminated‘:
                    break
                else:
                    break
                # print self.target + ":	" + str(time.time() - self.start_time)
                if time.time() - self.start_time > 500:
                    self.scan_stop()
                    self.scan_kill()
                    break
            self.scan_data()
            # self.task_delete()

        except Exception as e:
            pass

再次对上面代码进行测试,可以测试出存在注入
插件的输出:

start
Created new task: 42f685742c94a985
set option 42f685742c94a985
start 42f685742c94a985
injection: http://172.16.173.136:80/sqli-labs/Less-8/?id=234


比较sqlmapapi和burpsuite scanner模块:
通过修改sqlmapapi默认设置可以提高注入检测的准确率,但是修改默认配置后sqlmapapi不见的比burpsuite scanner模块更厉害,而且sqlmapapi只能检测get请求类型,且不能检测登录后的get请求,因此看来在安全测试过程中,如果目的只是发现网站是否存在sql注入,还是使用burpsuite scanner 模块比较好用;在知道存在sql注入后,可以使用sqlmap来进行后续的操作。

以上是关于阅读sqlmap源代码,编写burpsuite插件--sqlmapapi的主要内容,如果未能解决你的问题,请参考以下文章

burpsuite扩展集成sqlmap插件

Burpsuit安装sqlmap插件

sqlmap-tamper编写小结

干货--将sqlmap扩展到burpsuite的详细步骤

BurpSuite插件分享

sqlmap批量扫描burpsuite请求日志记录