OpCode使用N-gram归一化实践
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了OpCode使用N-gram归一化实践相关的知识,希望对你有一定的参考价值。
静态OpCode实践聚类和数据分析思路,OpCode基础上拓展其它的特征minhash-共享代码(重要)
|--> 1. 利用IDA反汇编样本集-提取OpCode(带有注释和垃圾指令)。
IDA批量反汇编(python脚本):
import sys
import os
import datetime
# idal -c -A -S//usr//local//src//ida-pro-6.4//idc//analysis_fullname.idc inputfile
# idalPath = "//usr//local//src//ida-pro-6.4//idal"
idalPath = "E:\Tools\IDA_Old_Version\1\1\idaw.exe"
# idcPath = "//usr//local//src//ida-pro-6.4//idc//analysis_fullname.idc"
idcPath = "E:\Tools\IDA_Old_Version\1\1\idc\analysis_fullname.idc"
# PATH = ‘.//resource//vxheaven//class//virus.win//compress//compress/‘
normalPath = "E:\TestVirusAsm"
# unpackPath = ""
logName = datetime.datetime.now().strftime(‘%Y%m%d%H%M%S‘)
logPath = ".log"
def genAsm(filepath, total):
ExecStr = "E:\Tools\IDA_Old_Version\1\1\idaw.exe" + " -c -A -S" + "E:\Tools\IDA_Old_Version\1\1\idc\analysis_fullname.idc " + filepath
# print ExecStr
os.system(ExecStr)
# clear idb
return total + 1
def traveseFile(path, initClean=False):
for parent, dirnames, filenames in os.walk(path):
if(initClean):
log(‘Cleaning‘, ‘‘, ‘[-]‘)
for filename in filenames:
filepath = os.path.join(parent, filename)
cleanFile(filename, filepath)
continue
log(‘Entering‘, parent)
# normal file
# log(‘origin‘, str(len(filenames)))
# unpack file
log(‘origin‘, str(countFile(parent, ‘dump‘)))
total = 0
for filename in filenames:
filepath = os.path.join(parent, filename)
if (cleanFile(filename, filepath)):
continue
log(‘asming‘, filename)
total = genAsm(filepath, total)
log(‘genasm‘, str(countFile(parent, ‘asm‘)))
def countFile(dirpath, suffix=‘‘):
return len([x for x in os.listdir(dirpath) if (x.split(‘.‘)[-1] == suffix)])
def cleanFile(filename, filepath):
filetype = filename.split(‘.‘)[-1]
if (filetype == ‘asm‘ or filetype == ‘idb‘):
os.remove(filepath)
print ‘[-] Clean ‘, filename
return True
# unpack file
if (filetype == ‘bin‘):
return False
return True
# return False
def getNowTime():
return datetime.datetime.now().strftime(‘%m-%d-%H:%M:%S‘)
def log(action, content, prefix=‘[+]‘, suffix=‘‘, subpath=‘‘):
logDir = os.path.join(logPath, subpath)
if not os.path.exists(logDir):
try:
os.makedirs(logDir)
except:
print ‘[-] Mkdir error‘
logpath = os.path.join(logDir, logName)
with open(logpath, ‘a+‘) as logfile:
logfile.write(‘‘.join([prefix, getNowTime(), ‘ ‘, action, ‘ ‘, content, suffix, ‘
‘]))
if __name__ == ‘__main__‘:
log(‘Starting‘, ‘‘, ‘********‘, ‘********‘)
# normal file
traveseFile(normalPath, True)
traveseFile(normalPath)
# unpack file
# traveseFile(unpackPath, True)
# traveseFile(unpackPath)
上面脚本中的analysis_fullname.idc
#include <idc.idc>
static main()
{
// turn on coagulation of data in the final pass of analysis
SetShortPrm(INF_AF2, GetShortPrm(INF_AF2) | AF2_DODATA);
Wait();
auto file = GetInputFilePath();
auto asmfile = file + ".asm";
auto idbfile = file + ".idb";
WriteTxt(asmfile, 0, BADADDR); // create the assembler file
SaveBase(idbfile, 0); // save the idb database
Exit(0); // exit to OS, error code 0 - success
}
.Asm提取OpCode指令(python脚本) – 过滤注释和垃圾数据
import os
# from wingenasm import log
# BASEPATH = ‘.//resource//vxheaven//class//virus.win//‘
BASEPATH = ‘E:\TestVirusAsm\123‘
# BASEPATH = ‘.//resource//vxheaven//class//virus.dos//‘
# PATH = ‘..//resource//vxheaven//vl//virus.win/‘
def checkDir(dirpath):
if not os.path.exists(dirpath):
try:
os.makedirs(dirpath)
except:
print ‘[-] Mkdir error‘
def checkFile(filepath):
if os.path.exists(filepath):
try:
os.remove(filepath)
except:
print ‘[-] Delete error‘
def getOpCode(content, filename):
opfiledir = os.path.join(BASEPATH, ‘opcode‘)
checkDir(opfiledir)
opfilepath = os.path.join(opfiledir, filename)
checkFile(opfilepath)
for line in content:
line = line.split(‘ ‘)
prefix = line[0]
if(len(prefix) > 2 and prefix[0:2] == ‘ ‘):
prefix = prefix.strip()
if(prefix == ‘‘ or prefix is None):
continue
if(prefix[0] == ‘.‘ or prefix[0] == ‘;‘ or prefix[0] == ‘/‘):
continue
opcode = prefix.split(‘ ‘)[0]
if(not opcode.isalpha()):
continue
opcode = ‘‘.join([opcode, ‘
‘])
with open(opfilepath, ‘a+‘) as opfile:
opfile.write(opcode)
# print prefix.strip()
# print line
print "getOpcode Success!"
def isOpCodeFile(lines):
for line in lines:
if (‘; Format : Binary file‘ in line):
return False
return True
def getByteCode(parent, filename):
rawname = filename[0:-4]
# print rawfile
desfiledir = os.path.join(BASEPATH, ‘bytecode‘)
checkDir(desfiledir)
desfilepath = os.path.join(desfiledir, rawname)
checkFile(desfilepath)
rawpath = os.path.join(parent, rawname)
with open(rawpath, ‘rb‘) as rawfile:
rawfile.seek(0, 0)
while True:
byte = rawfile.read(1)
if byte == ‘‘:
break
else:
hexstr = "%s" % byte.encode(‘hex‘)
bytecode = ‘‘.join([hexstr, ‘
‘])
with open(desfilepath, ‘a+‘) as bytefile:
bytefile.write(bytecode)
def checkFileType(filename, type=‘asm‘):
return filename.split(‘.‘)[-1] == type
def traveseFile(path):
for parent, dirnames, filenames in os.walk(path):
# log(‘Entering‘, parent, subpath=‘opcode‘)
for filename in filenames:
if(not checkFileType(filename, ‘asm‘)):
continue
filepath = os.path.join(parent, filename)
print filepath
with open(filepath) as asmfile:
lines = asmfile.readlines()
if(isOpCodeFile(lines)):
print ‘opcode‘
# log(‘OpCoding‘, filename, subpath=‘opcode‘)
getOpCode(lines, filename)
else:
print ‘Binary‘
# log(‘Bytecoding‘, filename, subpath=‘opcode‘)
getByteCode(parent, filename)
if __name__ == ‘__main__‘:
# (‘Starting‘, ‘getopcode from benign‘, ‘********‘, ‘********‘, subpath=‘opcode‘)
# viruswin
# winnormalpath = os.path.join(BASEPATH, ‘normal‘)
# winunpackpath = os.path.join(BASEPATH, ‘compress‘, ‘unpack‘)
#
# traveseFile(winnormalpath)
# traveseFile(winunpackpath)
# virusdos
# dosnormalpath = os.path.join(BASEPATH, ‘normal‘)
#
# traveseFile(dosnormalpath)
# benign
benignpath = os.path.join(‘E:\TestVirusAsm‘)
#
# traveseFile(winnormalpath)
traveseFile(benignpath)
|--> 2. 利用N-gram生成OpCode"特征袋"。 n = 3的效果最佳
import os
# from wingenasm import log
# BASEPATH = ‘.//resource//vxheaven//class//‘
BASEPATH = ‘E:\TestVirusAsm\123\begin‘
# BASEPATH = ‘.//resource//vxheaven//class//virus.dos//‘
# PATH = ‘..//resource//vxheaven//vl//virus.win/‘
__GRAM_SIZE__ = 2
__GRAM_TYPE__ = ‘2-gram‘
__GRAM_SIZE1__ = 3
__GRAM_TYPE1__ = ‘3-gram‘
def checkDir(dirpath):
if not os.path.exists(dirpath):
try:
os.makedirs(dirpath)
except:
print ‘[-] Mkdir error‘
def checkFile(filepath):
if os.path.exists(filepath):
try:
os.remove(filepath)
except:
print ‘[-] Delete error‘
def genGram(content, filename):
desfiledir = os.path.join(BASEPATH, __GRAM_TYPE1__)
checkDir(desfiledir)
desfilepath = os.path.join(desfiledir, filename)
checkFile(desfilepath)
end = len(content)
strgram = ‘‘
for i in range(0, end):
bigram = content[i: i + __GRAM_SIZE1__]
strgram += str(bigram).replace(‘[‘, ‘‘).replace(‘]‘, ‘‘).replace(‘\n‘, ‘‘).replace(‘‘‘, ‘‘).replace(‘ ‘, ‘‘) + ‘
‘
# strgram += str(bigram) + ‘
‘
# print strgram
# gramlist = [content[i:i + __GRAM_SIZE1__] for i in range(0, len(content) - 1)]
# print gramlist
with open(desfilepath, ‘w‘) as desfile:
desfile.write(strgram)
def traveseFile(path):
for parent, dirnames, filenames in os.walk(path):
# log(‘Entering‘, parent, subpath=‘opcode‘)
for filename in filenames:
filepath = os.path.join(parent, filename)
print filepath
with open(filepath) as asmfile:
lines = asmfile.readlines()
# print lines
# log(‘Generating‘, filename, subpath=‘opcode‘)
genGram(lines, filename)
if __name__ == ‘__main__‘:
# log(‘Starting‘, ‘generate 2-gram in benign‘, ‘********‘, ‘********‘, subpath=‘opcode‘)
# viruswin
# winopcodepath = os.path.join(BASEPATH, ‘virus.win‘, ‘opcode‘, ‘origin‘)
# traveseFile(winopcodepath)
# virusdos
# dosopcodepath = os.path.join(BASEPATH, ‘virus.dos‘, ‘opcode‘, ‘filter‘)
# traveseFile(dosopcodepath)
# virusdos
benignpath = os.path.join(‘E:\TestVirusAsm\123\opcode‘)
traveseFile(benignpath)
归一化处理,求tf和df
from __future__ import division
import os
import copy
import math
# from wingenasm import log
# virus
# BASEPATH = ‘.//resource//vxheaven//class//opcode//‘
# benign
# BASEPATH = ‘.//resource//benign//‘
# classfier
BASEPATH = ‘E:\TestVirusAsm\123\begin‘
# BASEPATH = ‘.//resource//vxheaven//class//virus.dos//‘
# PATH = ‘..//resource//vxheaven//vl//virus.win/‘
__GRAM_SIZE__ = 2
__GRAM_TYPE__ = ‘2-gram-tf‘
__GRAM_SIZE1__ = 3
__GRAM_TYPE1__ = ‘3-gram-tf‘
def checkDir(dirpath):
if not os.path.exists(dirpath):
try:
os.makedirs(dirpath)
except:
print ‘[-] Mkdir error‘
def checkFile(filepath):
if os.path.exists(filepath):
try:
os.remove(filepath)
except:
print ‘[-] Delete error‘
def genSingleTF(content, filename):
desfiledir = os.path.join(BASEPATH, __GRAM_TYPE__)
checkDir(desfiledir)
desfilepath = os.path.join(desfiledir, filename)
checkFile(desfilepath)
freq = dict()
for line in content:
# print line
freq[line.strip()] = freq.get(line.strip(), 0) + 1
# total = len(content)
# desfile = open(desfilepath, ‘w‘)
#
# for key in freq.keys():
# # print key, freq[key]
# desfile.write(key + ‘----‘ + str(freq[key]) + ‘----‘ + str(total) + ‘----‘ + str(freq[key] / total) + ‘
‘)
#
# desfile.close()
maxterm = max(freq.values())
total = len(freq)
with open(desfilepath, ‘w‘) as desfile:
for key in freq.keys():
# print key, freq[key]
desfile.write(key + ‘----‘ + str(freq[key]) + ‘----‘ + str(
total) + ‘----‘ + str(freq[key] / maxterm) + ‘
‘)
def getTotalTF(content, tf, df):
tmp = copy.deepcopy(df)
for line in content:
# print line
tf[line.strip()] = tf.get(line.strip(), 0) + 1
df[line.strip()] = tmp.get(line.strip(), 0) + 1
def traveseFile(path):
totaltf = dict()
totaldf = dict()
totalterm = 0
maxterm = 0
totaldocument = 0
maxdocument = 0
for parent, dirnames, filenames in os.walk(path):
# log(‘Entering‘, parent, subpath=‘classfier‘)
totaldocument += len(filenames)
for filename in filenames:
filepath = os.path.join(parent, filename)
print filepath
with open(filepath) as asmfile:
lines = asmfile.readlines()
# log(‘Generating‘, filename, subpath=‘classfier‘)
genSingleTF(lines, filename)
# totalterm += len(lines)
getTotalTF(lines, totaltf, totaldf)
# print totaltf
desfilepath = os.path.join(BASEPATH, ‘2-gram-totaltf‘)
maxterm = max(totaltf.values())
maxdocument = max(totaldf.values())
totalterm = len(totaltf)
with open(desfilepath, ‘w‘) as desfile:
for key in totaltf.keys():
# print key, totaltf[key]
tmp = ‘----‘.join([key, str(totaltf[key]), str(totalterm), str(totaltf[key] / maxterm), str(
totaldf.get(key, 0)), str(totaldocument), str(totaldf.get(key, 0) / maxdocument), str(math.log(totaldocument / totaldf.get(key, 1)))])
desfile.write(tmp + ‘
‘)
print "Success tf!"
if __name__ == ‘__main__‘:
# log(‘Starting‘, ‘caulate 2-gram frequncy for classfier‘,‘********‘, ‘********‘, subpath=‘classfier‘)
# 2-gram
# grampath = os.path.join(BASEPATH, ‘2-gram‘)
# traveseFile(grampath)
# 2-gram of benign
grampath = os.path.join(‘E:\TestVirusAsm\123\begin‘, ‘2-gram‘)
traveseFile(grampath)
|--> 3. 利用Jaccard计算"特征袋"的相似阈值,应用场景如下:
|----> 3.1 已知:同家族样本,如2018年100个样本和2019年100样本直观分析,阈值越高意味着版本迭代越小,阈值越小(组件框架变动越大)。
|----> 3.2 已知:非同家族样本,如100个DarkHotel-100个Ramsay-100个xxx-100个xxx,都是来自于韩国地区APT,可以直观分析APT家族代码重叠情况,用来关联他们组件库联系和代码共用。
以上是关于OpCode使用N-gram归一化实践的主要内容,如果未能解决你的问题,请参考以下文章