sqlmap是安全人员必备的注入神器,没有之一。注入手法,注入思路以及注入语句构造的都是相当厉害。

sqlmap作为集检测与利用一体的注入工具,在注入判断上的速度难免会慢一些。虽然提供了sqlmapapi的形式供DIY调用,但效果差强人意。
所以一直以来都在写着能够快速定位到注入点而非注入利用的工具。最重要的就是快、误报低。

最近研究了一下sqlmap的源码流程和注入点的判断,不得不膜拜sqlmap的作者,膜拜其在软件架构、算法、注入等的造诣。

先来一张图看一下sqlmap运行流程。
https://www.processon.com/view/5835511ce4b0620292bd7285

入口sqlmap.py

前面引入很多lib,以及配置文件,暂时先不关心这些

1
2
3
checkEnvironment()
setPaths(modulePath())
banner()

在main函数中先调用这三个函数
分别是检测环境,一些依赖和目录规范的问题。
设置路径,这个就是为什么我们不管在哪个路径下执行

1
python /root/test/sqlmap.py

都不会报错的原因。
然后输出logo信息。

然后进行参数解析。

1
2
cmdLineOptions.update(cmdLineParser().__dict__)
initOptions(cmdLineOptions)

命令行全部参数配置都在
lib\parse\cmdline.py中,

1
2
3
4
5
6
7
parser.add_option("--hh", dest="advancedHelp",
action="store_true",
help="Show advanced help message and exit")

parser.add_option("--version", dest="showVersion",
action="store_true",
help="Show program's version number and exit")

这个简单了解即可。

在lib\core\option.py中,init()方法,进行一系列初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def init():
"""
Set attributes into both configuration and knowledge base singletons
based upon command line and configuration file options.
"""

_useWizardInterface()
setVerbosity()
_saveConfig()
_setRequestFromFile()
_cleanupOptions()
_cleanupEnvironment()
_dirtyPatches()
_purgeOutput()
_checkDependencies()
_createTemporaryDirectory()
_basicOptionValidation()
_setProxyList()
_setTorProxySettings()
_setDNSServer()
_adjustLoggingFormatter()
_setMultipleTargets()
_setTamperingFunctions()
_setWafFunctions()
_setTrafficOutputFP()
_resolveCrossReferences()
_checkWebSocket()

这部分主要是初始化一些配置信息。还包括一个新手向导的函数,不过基本也不用。

前面对准备工作设置的特别多,数据库、日志、识别waf、加载tamper等。本文主要是探索sqlmap的注入判断方式,所以前面就不提太多。

参数解析

目前市面上很多的扫描器都支持urlencode和json参数的格式。如

id=1&name=2

{“id”:1,”name”:2}

而对两者混合的支持的比较少,如

id=1&data={“name”:1,”age”:4}

而sqlmap在解析时处理可以识别各种类型的参数。
主要实现在lib\core\common.py的walk函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
if place in (PLACE.POST, PLACE.GET):
for regex in (r"\A((?:<[^>]+>)+\w+)((?:<[^>]+>)+)\Z", r"\A([^\w]+.*\w+)([^\w]+)\Z"):
match = re.search(regex, testableParameters[parameter])
if match:
try:
candidates = OrderedDict()
def walk(head, current=None):
if current is None:
current = head
if isListLike(current):
for _ in current:
walk(head, _)
elif isinstance(current, dict):
for key in current.keys():
value = current[key]
if isinstance(value, (list, tuple, set, dict)):
if value:
walk(head, value)
elif isinstance(value, (bool, int, float, basestring)):
original = current[key]
if isinstance(value, bool):
current[key] = "%s%s" % (str(value).lower(), BOUNDED_INJECTION_MARKER)
else:
current[key] = "%s%s" % (value, BOUNDED_INJECTION_MARKER)
candidates["%s (%s)" % (parameter, key)] = re.sub("(%s\s*=\s*)%s" % (re.escape(parameter), re.escape(testableParameters[parameter])), r"\g<1>%s" % json.dumps(deserialized), parameters)
current[key] = original
deserialized = json.loads(testableParameters[parameter])
walk(deserialized)

这个思路解决了我在参数解析的混合参数的识别问题。

demo如下

1
2
3
4
python vulscaner.py
url : http://httpbin.org/cookies/set?k1=v1&k2=v2
http://httpbin.org/cookies/set?k1=v1__INSERT_VUL_PAYLOAD_HERE__&k2=v2
http://httpbin.org/cookies/set?k1=v1&k2=v2__INSERT_VUL_PAYLOAD_HERE__

这个函数主要是在变量的位置插入flag标识,我们在进行漏洞检测,如sql注入时,分别替换成对应的payload即可。

注入检测

注入检测方法在lib\controller\checks.py中,按照BUEST的注入分类分别检测的。

1
2
def checkSqlInjection(place, parameter, value):
...

看一下注入判断的逻辑。

bool-base注入

首先看一下布尔类型的盲注吧,因为我之前的工具这个类型准确度不高。

sqlmap中盲注检测是比较复杂的一种

BOOL

首先是生成payload,构造prefix和suffix。
然后就是分别使用正确和错误的payload分别请求。

然后在这里判断

1
if trueResult and not(truePage == falsePage and not kb.nullConnection):

如果两次请求不同,则认为有可能是注入,进入更详细的检测。
在检测方法中,我认为有个算法挺简介高效的。
在lib\request\comparison.py中的comparison中。

部分代码,(忽略我加的调试代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _comparison(page, headers, code, getRatioValue, pageLength):
threadData = getCurrentThreadData()
if kb.testMode:
threadData.lastComparisonHeaders = listToStrValue([_ for _ in headers.headers if not _.startswith("%s:" % URI_HTTP_HEADER)]) if headers else ""
threadData.lastComparisonPage = page
threadData.lastComparisonCode = code
print 'kb.testMode'

if page is None and pageLength is None:
return None
print "2-->"
print conf.string
if any((conf.string, conf.notString, conf.regexp)):
rawResponse = "%s%s" % (listToStrValue([_ for _ in headers.headers if not _.startswith("%s:" % URI_HTTP_HEADER)]) if headers else "", page)
# String to match in page when the query is True and/or valid
if conf.string:
print conf.string
return conf.string in rawResponse

# String to match in page when the query is False and/or invalid
if conf.notString:
return conf.notString not in rawResponse

# Regular expression to match in page when the query is True and/or valid
if conf.regexp:
return re.search(conf.regexp, rawResponse, re.I | re.M) is not None
print 'can you here'

这个方法是把正确和错误的返回内容分割为单个字符的set去比较,包括返回头信息。

比较算法如下,把两次返回的set取差集。计算出正确内容的flag标识,也就是sqlmap的–string参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
originalSet = set(getFilteredPageContent(kb.pageTemplate, True, "\n").split("\n"))
trueSet = set(getFilteredPageContent(truePage, True, "\n").split("\n"))
falseSet = set(getFilteredPageContent(falsePage, True, "\n").split("\n"))
print "in 2"

if originalSet == trueSet != falseSet:
candidates = trueSet - falseSet

if candidates:
candidates = sorted(candidates, key=lambda _: len(_))
for candidate in candidates:
if re.match(r"\A[\w.,! ]+\Z", candidate) and ' ' in candidate and len(candidate) > CANDIDATE_SENTENCE_MIN_LENGTH:
conf.string = candidate
injectable = True

infoMsg = "%s parameter '%s' appears to be '%s' injectable (with --string=\"%s\")" % (paramType, parameter, title, repr(conf.string).lstrip('u').strip("'"))
logger.info(infoMsg)

break

获取到–string的参数后,就可以根据这个flag来判断页面返回是true或false了。

其他情况下:

注入逻辑是根据页面的相似度来进行判断true或false。我之前用的是simhash,sqlmap这里使用的是difflib。

当页面相似度阀值过低,就认为该位置可能存在注入。(ps : 忽略我加的调试代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
else:
seq1 = getFilteredPageContent(seqMatcher.a, True) if conf.textOnly else seqMatcher.a
seq2 = getFilteredPageContent(page, True) if conf.textOnly else page

if seq1 is None or seq2 is None:
return None

seq1 = seq1.replace(REFLECTED_VALUE_MARKER, "")
seq2 = seq2.replace(REFLECTED_VALUE_MARKER, "")
print '-----seq-------'
print seq2
seqMatcher.set_seq1(seq1)
seqMatcher.set_seq2(seq2)

ratio = round(seqMatcher.quick_ratio(), 3)
print ratio
print '--'

error-base注入

这个实现起来相对简单一些。

sqlmap的规则在xml\errors.xml中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# In case of error-based SQL injection
elif method == PAYLOAD.METHOD.GREP:
# Perform the test's request and grep the response
# body for the test's <grep> regular expression
try:
page, headers = Request.queryPage(reqPayload, place, content=True, raise404=False)
output = extractRegexResult(check, page, re.DOTALL | re.IGNORECASE) \
or extractRegexResult(check, listToStrValue( \
[headers[key] for key in headers.keys() if key.lower() != URI_HTTP_HEADER.lower()] \
if headers else None), re.DOTALL | re.IGNORECASE) \
or extractRegexResult(check, threadData.lastRedirectMsg[1] \
if threadData.lastRedirectMsg and threadData.lastRedirectMsg[0] == \
threadData.lastRequestUID else None, re.DOTALL | re.IGNORECASE)

if output:
result = output == "1"

if result:
infoMsg = "%s parameter '%s' is '%s' injectable " % (paramType, parameter, title)
logger.info(infoMsg)

injectable = True

在给定的payload中存在数据库的报错信息,则初步认定为是注入点。

time-base注入

时间类型的主要是判定两次请求的时间差。
网络访问控制实现主要在lib\request\connect.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# In case of time-based blind or stacked queries
# SQL injections
elif method == PAYLOAD.METHOD.TIME:
# Perform the test's request
trueResult = Request.queryPage(reqPayload, place, timeBasedCompare=True, raise404=False)
trueCode = threadData.lastCode

if trueResult:
# Extra validation step (e.g. to check for DROP protection mechanisms)
if SLEEP_TIME_MARKER in reqPayload:
falseResult = Request.queryPage(reqPayload.replace(SLEEP_TIME_MARKER, "0"), place, timeBasedCompare=True, raise404=False)
if falseResult:
continue

# Confirm test's results
trueResult = Request.queryPage(reqPayload, place, timeBasedCompare=True, raise404=False)

if trueResult:
infoMsg = "%s parameter '%s' appears to be '%s' injectable " % (paramType, parameter, title)
logger.info(infoMsg)

injectable = True

源码中还有union注入方法等。

PS : 看了两天源码,改了sqlmap的一些东西。加了一堆调试,最后成功把sqlmap改坏了 :(

结语

由于python水平有限,而且本次看sqlmap源码主要目的是研究一下其注入判断的方式,所以sqlmap在注入利用的源码部分没细看。
在参数解析和布尔注入的思路如醍醐灌顶。
总的来说,sqlmap的注入思想还是值得每一个web安全研究者去拜读的。