OpCode使用N-gram归一化实践

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了OpCode使用N-gram归一化实践相关的知识,希望对你有一定的参考价值。

静态OpCode实践聚类和数据分析思路,OpCode基础上拓展其它的特征minhash-共享代码(重要)

|--> 1. 利用IDA反汇编样本集-提取OpCode(带有注释和垃圾指令)。

IDA批量反汇编(python脚本):

import sys
import os
import datetime

# idal -c -A -S//usr//local//src//ida-pro-6.4//idc//analysis_fullname.idc inputfile
# idalPath = "//usr//local//src//ida-pro-6.4//idal"
idalPath = "E:\Tools\IDA_Old_Version\1\1\idaw.exe"
# idcPath = "//usr//local//src//ida-pro-6.4//idc//analysis_fullname.idc"
idcPath = "E:\Tools\IDA_Old_Version\1\1\idc\analysis_fullname.idc"
# PATH = ‘.//resource//vxheaven//class//virus.win//compress//compress/‘
normalPath = "E:\TestVirusAsm"

# unpackPath = ""
logName = datetime.datetime.now().strftime(‘%Y%m%d%H%M%S‘)
logPath = ".log"

def genAsm(filepath, total):
ExecStr = "E:\Tools\IDA_Old_Version\1\1\idaw.exe" + " -c -A -S" + "E:\Tools\IDA_Old_Version\1\1\idc\analysis_fullname.idc " + filepath
# print ExecStr
os.system(ExecStr)

# clear idb

return total + 1

def traveseFile(path, initClean=False):
for parent, dirnames, filenames in os.walk(path):

if(initClean):
log(‘Cleaning‘, ‘‘, ‘[-]‘)

for filename in filenames:
filepath = os.path.join(parent, filename)

cleanFile(filename, filepath)

continue

log(‘Entering‘, parent)
# normal file
# log(‘origin‘, str(len(filenames)))
# unpack file
log(‘origin‘, str(countFile(parent, ‘dump‘)))

total = 0
for filename in filenames:
filepath = os.path.join(parent, filename)

if (cleanFile(filename, filepath)):
continue

log(‘asming‘, filename)

total = genAsm(filepath, total)

log(‘genasm‘, str(countFile(parent, ‘asm‘)))

def countFile(dirpath, suffix=‘‘):
return len([x for x in os.listdir(dirpath) if (x.split(‘.‘)[-1] == suffix)])

def cleanFile(filename, filepath):
filetype = filename.split(‘.‘)[-1]
if (filetype == ‘asm‘ or filetype == ‘idb‘):
os.remove(filepath)
print ‘[-] Clean ‘, filename
return True

# unpack file
if (filetype == ‘bin‘):
return False
return True
# return False

def getNowTime():
return datetime.datetime.now().strftime(‘%m-%d-%H:%M:%S‘)

def log(action, content, prefix=‘[+]‘, suffix=‘‘, subpath=‘‘):
logDir = os.path.join(logPath, subpath)
if not os.path.exists(logDir):
try:
os.makedirs(logDir)
except:
print ‘[-] Mkdir error‘

logpath = os.path.join(logDir, logName)

with open(logpath, ‘a+‘) as logfile:
logfile.write(‘‘.join([prefix, getNowTime(), ‘ ‘, action, ‘ ‘, content, suffix, ‘
‘]))

if __name__ == ‘__main__‘:

log(‘Starting‘, ‘‘, ‘********‘, ‘********‘)

# normal file
traveseFile(normalPath, True)
traveseFile(normalPath)

# unpack file
# traveseFile(unpackPath, True)
# traveseFile(unpackPath)

上面脚本中的analysis_fullname.idc

#include <idc.idc>

static main()

{

// turn on coagulation of data in the final pass of analysis

SetShortPrm(INF_AF2, GetShortPrm(INF_AF2) | AF2_DODATA);

Wait();

auto file = GetInputFilePath();

auto asmfile = file + ".asm";

auto idbfile = file + ".idb";

WriteTxt(asmfile, 0, BADADDR); // create the assembler file

SaveBase(idbfile, 0); // save the idb database

Exit(0); // exit to OS, error code 0 - success

}

.Asm提取OpCode指令(python脚本) – 过滤注释和垃圾数据

import os
# from wingenasm import log

# BASEPATH = ‘.//resource//vxheaven//class//virus.win//‘
BASEPATH = ‘E:\TestVirusAsm\123‘
# BASEPATH = ‘.//resource//vxheaven//class//virus.dos//‘
# PATH = ‘..//resource//vxheaven//vl//virus.win/‘

def checkDir(dirpath):
    if not os.path.exists(dirpath):
        try:
            os.makedirs(dirpath)
        except:
            print ‘[-] Mkdir error‘

def checkFile(filepath):
    if os.path.exists(filepath):
        try:
            os.remove(filepath)
        except:
            print ‘[-] Delete error‘

def getOpCode(content, filename):

    opfiledir = os.path.join(BASEPATH, ‘opcode‘)
    checkDir(opfiledir)

    opfilepath = os.path.join(opfiledir, filename)
    checkFile(opfilepath)

    for line in content:
        line = line.split(‘ ‘)
        prefix = line[0]
        if(len(prefix) > 2 and prefix[0:2] == ‘		‘):
            prefix = prefix.strip()
            if(prefix == ‘‘ or prefix is None):
                continue

            if(prefix[0] == ‘.‘ or prefix[0] == ‘;‘ or prefix[0] == ‘/‘):
                continue

            opcode = prefix.split(‘	‘)[0]
            if(not opcode.isalpha()):
                continue

            opcode = ‘‘.join([opcode, ‘
‘])
            with open(opfilepath, ‘a+‘) as opfile:
                opfile.write(opcode)
            # print prefix.strip()
            # print line

    print "getOpcode Success!"

def isOpCodeFile(lines):
    for line in lines:
        if (‘; Format      :   Binary file‘ in line):
            return False

    return True

def getByteCode(parent, filename):
    rawname = filename[0:-4]
    # print rawfile

    desfiledir = os.path.join(BASEPATH, ‘bytecode‘)
    checkDir(desfiledir)

    desfilepath = os.path.join(desfiledir, rawname)
    checkFile(desfilepath)

    rawpath = os.path.join(parent, rawname)

    with open(rawpath, ‘rb‘) as rawfile:
        rawfile.seek(0, 0)
        while True:
            byte = rawfile.read(1)
            if byte == ‘‘:
                break
            else:
                hexstr = "%s" % byte.encode(‘hex‘)

                bytecode = ‘‘.join([hexstr, ‘
‘])
                with open(desfilepath, ‘a+‘) as bytefile:
                    bytefile.write(bytecode)

def checkFileType(filename, type=‘asm‘):
    return filename.split(‘.‘)[-1] == type

def traveseFile(path):
    for parent, dirnames, filenames in os.walk(path):
        # log(‘Entering‘, parent, subpath=‘opcode‘)

        for filename in filenames:
            if(not checkFileType(filename, ‘asm‘)):
                continue

            filepath = os.path.join(parent, filename)
            print filepath

            with open(filepath) as asmfile:
                lines = asmfile.readlines()

            if(isOpCodeFile(lines)):
                print ‘opcode‘
                # log(‘OpCoding‘, filename, subpath=‘opcode‘)
                getOpCode(lines, filename)
            else:
                print ‘Binary‘
                # log(‘Bytecoding‘, filename, subpath=‘opcode‘)
                getByteCode(parent, filename)

if __name__ == ‘__main__‘:

    # (‘Starting‘, ‘getopcode from benign‘, ‘********‘, ‘********‘, subpath=‘opcode‘)

    # viruswin
    # winnormalpath = os.path.join(BASEPATH, ‘normal‘)
    # winunpackpath = os.path.join(BASEPATH, ‘compress‘, ‘unpack‘)
    #
    # traveseFile(winnormalpath)
    # traveseFile(winunpackpath)

    # virusdos
    # dosnormalpath = os.path.join(BASEPATH, ‘normal‘)
    #
    # traveseFile(dosnormalpath)

    # benign
    benignpath = os.path.join(‘E:\TestVirusAsm‘)
    #
    # traveseFile(winnormalpath)
    traveseFile(benignpath)

|--> 2. 利用N-gram生成OpCode"特征袋"。 n = 3的效果最佳

import os
# from wingenasm import log

# BASEPATH = ‘.//resource//vxheaven//class//‘
BASEPATH = ‘E:\TestVirusAsm\123\begin‘
# BASEPATH = ‘.//resource//vxheaven//class//virus.dos//‘
# PATH = ‘..//resource//vxheaven//vl//virus.win/‘

__GRAM_SIZE__ = 2
__GRAM_TYPE__ = ‘2-gram‘
__GRAM_SIZE1__ = 3
__GRAM_TYPE1__ = ‘3-gram‘

def checkDir(dirpath):
if not os.path.exists(dirpath):
try:
os.makedirs(dirpath)
except:
print ‘[-] Mkdir error‘

def checkFile(filepath):
if os.path.exists(filepath):
try:
os.remove(filepath)
except:
print ‘[-] Delete error‘

def genGram(content, filename):

desfiledir = os.path.join(BASEPATH, __GRAM_TYPE1__)
checkDir(desfiledir)

desfilepath = os.path.join(desfiledir, filename)
checkFile(desfilepath)

end = len(content)
strgram = ‘‘

for i in range(0, end):
bigram = content[i: i + __GRAM_SIZE1__]
strgram += str(bigram).replace(‘[‘, ‘‘).replace(‘]‘, ‘‘).replace(‘\n‘, ‘‘).replace(‘‘‘, ‘‘).replace(‘ ‘, ‘‘) + ‘
‘
# strgram += str(bigram) + ‘
‘

# print strgram

# gramlist = [content[i:i + __GRAM_SIZE1__] for i in range(0, len(content) - 1)]

# print gramlist

with open(desfilepath, ‘w‘) as desfile:
desfile.write(strgram)

def traveseFile(path):
for parent, dirnames, filenames in os.walk(path):
# log(‘Entering‘, parent, subpath=‘opcode‘)

for filename in filenames:

filepath = os.path.join(parent, filename)
print filepath

with open(filepath) as asmfile:
lines = asmfile.readlines()
# print lines
# log(‘Generating‘, filename, subpath=‘opcode‘)
genGram(lines, filename)

if __name__ == ‘__main__‘:

# log(‘Starting‘, ‘generate 2-gram in benign‘, ‘********‘, ‘********‘, subpath=‘opcode‘)

# viruswin
# winopcodepath = os.path.join(BASEPATH, ‘virus.win‘, ‘opcode‘, ‘origin‘)
# traveseFile(winopcodepath)

# virusdos
# dosopcodepath = os.path.join(BASEPATH, ‘virus.dos‘, ‘opcode‘, ‘filter‘)
# traveseFile(dosopcodepath)

# virusdos
benignpath = os.path.join(‘E:\TestVirusAsm\123\opcode‘)
traveseFile(benignpath)

归一化处理,求tf和df

from __future__ import division
import os
import copy
import math
# from wingenasm import log

# virus
# BASEPATH = ‘.//resource//vxheaven//class//opcode//‘
# benign
# BASEPATH = ‘.//resource//benign//‘
# classfier
BASEPATH = ‘E:\TestVirusAsm\123\begin‘
# BASEPATH = ‘.//resource//vxheaven//class//virus.dos//‘
# PATH = ‘..//resource//vxheaven//vl//virus.win/‘

__GRAM_SIZE__ = 2
__GRAM_TYPE__ = ‘2-gram-tf‘
__GRAM_SIZE1__ = 3
__GRAM_TYPE1__ = ‘3-gram-tf‘

def checkDir(dirpath):
    if not os.path.exists(dirpath):
        try:
            os.makedirs(dirpath)
        except:
            print ‘[-] Mkdir error‘

def checkFile(filepath):
    if os.path.exists(filepath):
        try:
            os.remove(filepath)
        except:
            print ‘[-] Delete error‘

def genSingleTF(content, filename):

    desfiledir = os.path.join(BASEPATH, __GRAM_TYPE__)
    checkDir(desfiledir)

    desfilepath = os.path.join(desfiledir, filename)
    checkFile(desfilepath)

    freq = dict()
    for line in content:
        # print line
        freq[line.strip()] = freq.get(line.strip(), 0) + 1

    # total = len(content)

    # desfile = open(desfilepath, ‘w‘)
    #
    # for key in freq.keys():
    #     # print key, freq[key]
    #     desfile.write(key + ‘----‘ + str(freq[key]) + ‘----‘ + str(total) + ‘----‘ + str(freq[key] / total) + ‘
‘)
    #
    # desfile.close()
    maxterm = max(freq.values())
    total = len(freq)
    with open(desfilepath, ‘w‘) as desfile:
        for key in freq.keys():
            # print key, freq[key]
            desfile.write(key + ‘----‘ + str(freq[key]) + ‘----‘ + str(
                total) + ‘----‘ + str(freq[key] / maxterm) + ‘
‘)

def getTotalTF(content, tf, df):
    tmp = copy.deepcopy(df)
    for line in content:
        # print line
        tf[line.strip()] = tf.get(line.strip(), 0) + 1
        df[line.strip()] = tmp.get(line.strip(), 0) + 1

def traveseFile(path):
    totaltf = dict()
    totaldf = dict()
    totalterm = 0
    maxterm = 0
    totaldocument = 0
    maxdocument = 0

    for parent, dirnames, filenames in os.walk(path):
        # log(‘Entering‘, parent, subpath=‘classfier‘)

        totaldocument += len(filenames)
        for filename in filenames:

            filepath = os.path.join(parent, filename)
            print filepath

            with open(filepath) as asmfile:
                lines = asmfile.readlines()

            # log(‘Generating‘, filename, subpath=‘classfier‘)
            genSingleTF(lines, filename)
            # totalterm += len(lines)
            getTotalTF(lines, totaltf, totaldf)

    # print totaltf
    desfilepath = os.path.join(BASEPATH, ‘2-gram-totaltf‘)
    maxterm = max(totaltf.values())
    maxdocument = max(totaldf.values())
    totalterm = len(totaltf)

    with open(desfilepath, ‘w‘) as desfile:
        for key in totaltf.keys():
            # print key, totaltf[key]
            tmp = ‘----‘.join([key, str(totaltf[key]), str(totalterm), str(totaltf[key] / maxterm), str(
                totaldf.get(key, 0)), str(totaldocument), str(totaldf.get(key, 0) / maxdocument), str(math.log(totaldocument / totaldf.get(key, 1)))])
            desfile.write(tmp + ‘
‘)
        print "Success tf!"

if __name__ == ‘__main__‘:
    # log(‘Starting‘, ‘caulate 2-gram frequncy for classfier‘,‘********‘, ‘********‘, subpath=‘classfier‘)

    # 2-gram
    # grampath = os.path.join(BASEPATH, ‘2-gram‘)
    # traveseFile(grampath)

    # 2-gram of benign
    grampath = os.path.join(‘E:\TestVirusAsm\123\begin‘, ‘2-gram‘)
    traveseFile(grampath)

|--> 3. 利用Jaccard计算"特征袋"的相似阈值,应用场景如下:

|----> 3.1 已知:同家族样本,如2018年100个样本和2019年100样本直观分析,阈值越高意味着版本迭代越小,阈值越小(组件框架变动越大)。

|----> 3.2 已知:非同家族样本,如100个DarkHotel-100个Ramsay-100个xxx-100个xxx,都是来自于韩国地区APT,可以直观分析APT家族代码重叠情况,用来关联他们组件库联系和代码共用。

以上是关于OpCode使用N-gram归一化实践的主要内容,如果未能解决你的问题,请参考以下文章

python基础--接口与归一化设计封装异常网络编程

Python__接口与归一化设计

深度学习面试题22:批量归一化在实践中的应用

matlab中怎样将矩阵归一化处理?

模式识别 - 特征归一化 及 測试 代码(Matlab)

matlab 数据归一化问题