使用机器学习和深度学习对PE进行二分类和多分类
Posted herosunly
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用机器学习和深度学习对PE进行二分类和多分类相关的知识,希望对你有一定的参考价值。
1. 前言
根据调研发现sophos-ai分享了两千万的PE数据集,链接为https://github.com/sophos-ai/SOREL-20M。由于数据量规模大,所以就可以拿来训练较为有效的模型,不仅可以做对PE进行正常或者恶意的二分类,而且还可以进行更小粒度的恶意分类(如木马、蠕虫、勒索病毒等等)。
在静态条件下对PE进行分类(不把PE放到沙箱中执行),主要有两种方法:
- 提取特征后进行分类,例如使用
LIEF
进行特征提取后再进行分类。 - 将PE文件看作是灰度图,然后将其视作是图像进行分类。
本文主要讲究的是方法一,即通过工具进行特征提取后进行分类。
2. 安装库
2.1 安装lief
pip install lief==0.11.4
源码链接:https://github.com/lief-project/LIEF
2.2 安装ember
pip install git+https://github.com/elastic/ember.git
源码链接:https://github.com/elastic/ember
3. 数据预处理
假设每个文件名的格式为md5_label。比如二分类,则label为0,则表示为白文件;label为1,则表示为恶意文件。如果是多分类,比如进行11分类:
- adware
- flooder
- ransomware
- dropper
- spyware
- packed
- crypto_miner
- fle infector
- installer
- worm
- downloader
导包:
import sys
import os
import glob
import ember
import json
将结果保存到json文件中:
pe_extractor = ember.features.PEFeatureExtractor()
json_content_list = []
for i in glob.iglob('/home/learn/PE/data/*'):
label = os.path.basename(i).split('_')[1]
if label == 'nan':
continue
with open(i, 'rb') as f:
json_content = pe_extractor.raw_features(f.read())
json_content['label'] = int(float(label))
json_content_list.append(json_content)
with open('/home/learn/PE/result/processed_data.json', 'w') as f:
f.write('\\n'.join(json.dumps(i) for i in json_content_list))
读取json文件,并将特征和标签保存到dat类型文件中:
data_dir = '/home/learn/PE/data/result/'
extractor = ember.features.PEFeatureExtractor()
print("Vectorizing training set")
X_path = os.path.join(data_dir, "X_train.dat")
y_path = os.path.join(data_dir, "y_train.dat")
raw_feature_paths = [os.path.join(data_dir, "processed_data.json")]
nrows = sum([1 for fp in raw_feature_paths for line in open(fp)])
ember.vectorize_subset(X_path, y_path, raw_feature_paths, extractor, nrows)
X_train, y_train = ember.read_vectorized_features(data_dir, "train", 2)
4. 训练模型
比如使用lightgbm来训练模型,代码为:
import lightgbm as lgb
params = { "boosting": "gbdt",
"objective": "binary",
"num_iterations": 1000,
"learning_rate": 0.05,
"num_leaves": 100,
"max_depth": 15,
"min_data_in_leaf": 20,
"feature_fraction": 0.5,
}
train_rows = (y_train != -1)
# Train
lgbm_dataset = lgb.Dataset(X_train[train_rows], y_train[train_rows].astype('int32'))
lgbm_model = lgb.train(params, lgbm_dataset)
lgbm_model.save_model('lightgbm.model')
5. 预测新数据
5.1 二分类预测
import lightgbm as lgb
import ember
binary_path = 'putty.exe'
file_data = open(binary_path, "rb").read()
lgbm_model = lgb.Booster(model_file='lightgbm.model')
score = ember.predict_sample(lgbm_model, file_data)
print(score)
5.2 多分类预测
import ember
import torch
from torch import nn
import torch.nn.functional as F
class PENetwork(nn.Module):
"""
This is a simple network loosely based on the one used in ALOHA: Auxiliary Loss Optimization for Hypothesis Augmentation (https://arxiv.org/abs/1903.05700)
Note that it uses fewer (and smaller) layers, as well as a single layer for all tag predictions, performance will suffer accordingly.
"""
def __init__(self, use_malware=True, use_counts=False, use_tags=True, n_tags=11, feature_dimension=2381, layer_sizes = None):
self.use_malware=use_malware
self.use_counts=use_counts
self.use_tags=use_tags
self.n_tags = n_tags
if self.use_tags and self.n_tags == None:
raise ValueError("n_tags was None but we're trying to predict tags. Please include n_tags")
super(PENetwork,self).__init__()
p = 0.05
layers = []
if layer_sizes is None:layer_sizes=[512,512,128]
for i,ls in enumerate(layer_sizes):
if i == 0:
layers.append(nn.Linear(feature_dimension,ls))
else:
layers.append(nn.Linear(layer_sizes[i-1],ls))
layers.append(nn.LayerNorm(ls))
layers.append(nn.ELU())
layers.append(nn.Dropout(p))
self.model_base = nn.Sequential(*tuple(layers))
self.malware_head = nn.Sequential(nn.Linear(layer_sizes[-1], 1),
nn.Sigmoid())
self.count_head = nn.Linear(layer_sizes[-1], 1)
self.sigmoid = nn.Sigmoid()
self.tag_head = nn.Sequential(nn.Linear(layer_sizes[-1],64),
nn.ELU(),
nn.Linear(64,64),
nn.ELU(),
nn.Linear(64,n_tags),
nn.Sigmoid())
def forward(self,data):
rv = {}
base_result = self.model_base.forward(data)
if self.use_malware:
rv['malware'] = self.malware_head(base_result)
if self.use_counts:
rv['count'] = self.count_head(base_result)
if self.use_tags:
rv['tags'] = self.tag_head(base_result)
return rv
pe_extractor = ember.features.PEFeatureExtractor()
binary_path = 'malware.exe'
file_data = open(binary_path, "rb").read()
X_test = pe_extractor.feature_vector(file_data)
with torch.no_grad():
model = PENetwork()
model.load_state_dict(torch.load('nn.pt'))
model.eval()
y_pred = model(torch.from_numpy(X_test))
print(y_pred['tags'].argmax())
6. 代码下载
为了方便大家进行测试,提供的代码包含的是用保存后的模型对数据进行预测的代码,下载链接为:https://download.csdn.net/download/herosunly/20721893。
以上是关于使用机器学习和深度学习对PE进行二分类和多分类的主要内容,如果未能解决你的问题,请参考以下文章