1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
from flask import Flask, request, jsonify, Response, abort, render_template_string, session
import requests, re
from urllib.parse import urljoin, urlparse
app = Flask(__name__)
MAX_TOTAL_STEPS = 30
ERROR_COUNT = 6
META_REFRESH_RE = re.compile(
r'<meta\s+http-equiv=["\']refresh["\']\s+content=["\']\s*\d+\s*;\s*url=([^"\']+)["\']',
re.IGNORECASE
)
def read(f): return open(f).read()
SECRET = read("/secret").strip()
app.secret_key = "a_test_secret"
def sset(key, value):
session[key] = value
return ""
def sget(key, default=None):
return session.get(key, default)
app.jinja_env.globals.update(sget=sget)
app.jinja_env.globals.update(sset=sset)
@app.route("/_internal/secret")
def internal_flag():
if request.remote_addr not in ("127.0.0.1", "::1"):
abort(403)
body = f'OK Secret: {SECRET}'
return Response(body, mimetype="application/json")
@app.route("/")
def index():
return "welcome"
def _next_by_refresh_header(r, current_url):
refresh = r.headers.get("Refresh")
if not refresh:
return None
try:
part = refresh.split(";", 1)[1]
k, v = part.split("=", 1)
if k.strip().lower() == "url":
return urljoin(current_url, v.strip())
except Exception:
return None
def _next_by_meta_refresh(r, current_url):
m = META_REFRESH_RE.search(r.text[:5000])
if m:
return urljoin(current_url, m.group(1).strip())
return None
def _next_by_authlike_header(r, current_url):
if r.status_code in (401, 407, 429):
nxt = r.headers.get("X-Next")
if nxt:
return urljoin(current_url, nxt)
return None
def my_fetch(url):
session = requests.Session()
current_url = url
count_redirect = 0
history = []
last_resp = None
while count_redirect < MAX_TOTAL_STEPS:
print(count_redirect)
try:
r = session.get(current_url, allow_redirects=False, timeout=5)
print(r.text)
except Exception as e:
return history, None, f"Upstream request failed: {e}"
last_resp = r
history.append({
"url": current_url,
"status": r.status_code,
"headers": dict(r.headers),
"body_preview": r.text[:800]
})
nxt = _next_by_refresh_header(r, current_url)
if nxt:
current_url = nxt
count_redirect += 1
continue
nxt = _next_by_meta_refresh(r, current_url)
if nxt:
current_url = nxt
count_redirect += 1
continue
nxt = _next_by_authlike_header(r, current_url)
if nxt:
current_url = nxt
count_redirect += 1
continue
break
return history, last_resp, None
@app.route("/fetch")
def fetch():
target = request.args.get("url")
if not target:
return jsonify({"error": "no url"}), 400
history, last_resp, err = my_fetch(target)
if err:
return jsonify({"error": err}), 502
if not last_resp:
return jsonify({"error": "no response"}), 502
walked_steps = len(history) - 1
try:
if "application/json" in (last_resp.headers.get("Content-Type") or "").lower():
_ = last_resp.json()
else:
if "MUST_HAVE_FIELD" not in last_resp.text:
raise ValueError("JSON schema mismatch")
return jsonify({"ok": True, "len": len(last_resp.text)})
except Exception as parse_err:
if walked_steps >= ERROR_COUNT:
raw = []
raw.append(last_resp.text[:5000])
return Response("\n".join(raw), mimetype="text/plain", status=500)
else:
return jsonify({"error": "Invalid JSON"}), 500
@app.route("/login")
def login():
username = request.args.get("username")
secret = request.args.get("secret", "")
blacklist = ["config", "_", "read", "{{"]
if secret != SECRET:
return ("forbidden", 403)
if len(username) > 47:
return ("username too long", 400)
if any([n in username.lower() for n in blacklist]):
return ("forbidden", 403)
sset('username', username)
rendered = render_template_string("Welcome: " + username)
return rendered
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
|