Compare commits

...

3 Commits

Author SHA1 Message Date
8e9c7e31c4 外部函数测试 2024-03-13 15:09:12 +08:00
3f4bde2989 线程池版本 2024-03-09 15:26:16 +08:00
52b0cf6db3 asm提取 2024-03-07 15:08:07 +08:00
4 changed files with 449 additions and 23 deletions

121
OpcodeGet.py Normal file
View File

@ -0,0 +1,121 @@
import concurrent.futures
import os
import re
from log_utils import setup_logger
from tqdm import tqdm
import r2pipe
import pandas as pd
csv_lock = 0
def Opcode_to_csv(opcode_list, file_type):
csv_write(f'output_{file_type}.csv', opcode_list)
logger.info(f"done {done_file_num} files")
def csv_write(file_name, data: list):
"""write data to csv"""
logger.info("*======================start write==================================*")
df = pd.DataFrame(data)
chunksize = 1000
for i in range(0, len(df), chunksize):
df.iloc[i:i + chunksize].to_csv(f'./out/{file_name}', mode='a', header=False, index=False)
logger.info(f"done rows {len(df)}")
logger.info("*=================write to csv success==============================*")
return True
def extract_opcode(disasm_text):
"""
从反汇编文本中提取操作码和操作数
正则表达式用于匹配操作码和操作数考虑到操作数可能包含空格和逗号
"""
match = re.search(r"^\s*(\S+)(?:\s+(.*))?$", disasm_text)
if match:
opcode = match.group(1)
# operands_str = match.group(2) if match.group(2) is not None else ""
# split_pattern = re.compile(r",(?![^\[]*\])") # 用于切分操作数的正则表达式
# operands = split_pattern.split(operands_str)
# return opcode, [op.strip() for op in operands if op.strip()]
return opcode
return ""
def get_graph_r2pipe(file_type, file_name):
# 获取基础块内的操作码序列
r2pipe_open = r2pipe.open(os.path.join(file_path, file_name), flags=['-2'])
opcode_Sequence = []
try:
# 获取函数列表
r2pipe_open.cmd("aaa")
r2pipe_open.cmd('e arch=x86')
function_list = r2pipe_open.cmdj("aflj")
for function in function_list:
# 外部函数测试
# if function['name'] == 'sub.TNe_U':
# print(function)
# block_list = r2pipe_open.cmdj("afbj @" + str(function['offset']))
# for block in block_list:
# # print(block)
# # 获取基本块的反汇编指令
# disasm = r2pipe_open.cmdj("pdj " + str(block["ninstr"]) + " @" + str(block["addr"]))
# if disasm:
# for op in disasm:
# print(extract_opcode(op["opcode"]))
block_list = r2pipe_open.cmdj("afbj @" + str(function['offset']))
block_opcode_Sequence = []
for block in block_list:
# print(block)
# 获取基本块的反汇编指令
disasm = r2pipe_open.cmdj("pdj " + str(block["ninstr"]) + " @" + str(block["addr"]))
if disasm:
for op in disasm:
if op["type"] == "invalid" or op["opcode"] == "invalid":
continue
block_opcode_Sequence.append(extract_opcode(op["opcode"]))
opcode_Sequence.append(
[file_type, file_type, len(block_opcode_Sequence), ' '.join(block_opcode_Sequence)])
except Exception as e:
logger.error(f"Error: get function list failed in {file_name}")
print(f"Error: get function list failed in {file_name} ,error info {e}")
r2pipe_open.quit()
return opcode_Sequence
if __name__ == '__main__':
file_type = 'malware'
logger = setup_logger('logger', f'./log/opcode_{file_type}.log')
file_path = os.path.join('/mnt/d/bishe/dataset/sample_20230130_458')
print(f"max works {os.cpu_count()}")
file_list = os.listdir(file_path)[:10000]
done_file_num = 0
done_list = [['class', 'sub-class', 'size', 'corpus']]
process_bar = tqdm(desc=f'Processing {file_type}...', leave=True, total=len(file_list))
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: # 调整线程池大小
future_to_args = {
executor.submit(get_graph_r2pipe, file_type, file_name): file_name for file_name in file_list
}
for future in concurrent.futures.as_completed(future_to_args):
try:
tmp = future.result()
done_list.extend(tmp if len(tmp) > 0 else [])
if len(done_list) > 100000:
csv_write(f'output_{file_type}.csv', done_list)
done_file_num += 1
done_list.clear()
except Exception as e:
logger.error(f"Error: {e}")
print(f"Error: {e}")
finally:
process_bar.update(1)
else:
csv_write(f'output_{file_type}.csv', done_list)

36
funNameGet.py Normal file
View File

@ -0,0 +1,36 @@
import concurrent.futures
import os
import r2pipe
from tqdm import tqdm
def get_fun_name_list(file_path):
# 读取csv文件
r2 = r2pipe.open(os.path.join(file_path), flags=['-2'])
r2.cmd('aaa')
r2.cmd('e arch=x86')
function_list = r2.cmdj("aflj")
fun_name_list = []
for function in function_list:
fun_name_list.append(function['name'])
r2.quit()
return fun_name_list
if __name__ == '__main__':
file_path = os.path.join('/mnt/d/bishe/dataset/sample_20230130_458')
file_list = os.listdir(file_path)
fun_name_set = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
future_to_args = {
executor.submit(get_fun_name_list, os.path.join(file_path, file_name)): file_name
for file_name in file_list
}
for future in tqdm(concurrent.futures.as_completed(future_to_args), total=len(future_to_args)):
fun_name_list = future.result()
for fun_name in fun_name_list:
if fun_name not in fun_name_set:
fun_name_set[fun_name] = 1
else:
fun_name_set[fun_name] += 1
print(fun_name_set)

70
main.py
View File

@ -7,16 +7,32 @@ from log_utils import setup_logger
import time import time
from datetime import datetime from datetime import datetime
max_opcode_num = 0
def csv_write(data: list):
def csv_write(file_name, data: list):
"""write data to csv"""
df = pd.DataFrame(data) df = pd.DataFrame(data)
chunksize = 1000 chunksize = 1000
for i in range(0, len(df), chunksize): for i in range(0, len(df), chunksize):
df.iloc[i:i + chunksize].to_csv('./out/output.csv', mode='a', header=False, index=False) df.iloc[i:i + chunksize].to_csv(f'./out/{file_name}', mode='a', header=False, index=False)
return True return True
def findOpcode_in_asm_file(content, logger): def findOpcode_in_asm_file(content, logger, file_type):
"""
在给定的汇编文件内容中查找操作码(opcode)
参数:
- content: 文件内容的迭代器预期能逐行读取文件内容
- logger: 日志记录器对象用于记录过程中的信息
返回值:
- over_num_flag: 布尔值如果找到的操作码数量超过200则为True否则为False
- none_flag: 布尔值如果未找到任何操作码则为True否则为False
- result: 列表包含找到的操作码列表如果找到的数量超过200则仅包含前200个
"""
global max_opcode_num
pattern = r'\t{2}(\w+)\s' pattern = r'\t{2}(\w+)\s'
result = [] result = []
sections = content.read().split("\n\n") sections = content.read().split("\n\n")
@ -27,32 +43,43 @@ def findOpcode_in_asm_file(content, logger):
# if acfg.funcname != 'start' and acfg.funcname != 'start_0' and 'sub_' not in acfg.funcname: # if acfg.funcname != 'start' and acfg.funcname != 'start_0' and 'sub_' not in acfg.funcname:
# TODO 判断函数是否为外部函数 # TODO 判断函数是否为外部函数
instructions = re.findall(pattern, item) instructions = re.findall(pattern, item)
if instructions and len(instructions) != 1 and instructions[0] != 'retn': if len(instructions) > 0 and len(instructions) != 1 and instructions[0] != 'retn':
instructions_remove_Opcode_list = {'align', 'dp', 'dd', 'db', 'dq'} instructions_remove_Opcode_list = {'align', 'dp', 'dd', 'db', 'dq', 'dw'}
if not instructions_remove_Opcode_list.isdisjoint(instructions): if not instructions_remove_Opcode_list.isdisjoint(instructions):
instructions[:] = [item for item in instructions if item not in instructions_remove_Opcode_list] instructions[:] = [item for item in instructions if item not in instructions_remove_Opcode_list]
if len(instructions) > 0:
result.append([file_type, file_type, len(instructions), ' '.join(instructions)])
if len(instructions) > 200: if len(instructions) > 200:
max_opcode_num = len(instructions) if len(instructions) > max_opcode_num else max_opcode_num
over_num_flag = True over_num_flag = True
logger.info(f"over 200 Opcode is {instructions},list len {len(instructions)}") logger.info(f"over 200 Opcode is {instructions},list len {len(instructions)}")
result.append(instructions[:200])
else:
result.append(instructions)
none_flag = True if len(result) == 0 else False none_flag = True if len(result) == 0 else False
return over_num_flag, none_flag, result return over_num_flag, none_flag, result
def Opcode_to_csv(opcode_list, file_type):
logger.info("*======================start write==================================*")
csv_write(f'output_{file_type}.csv', opcode_list)
logger.info(f"done {done_file_num} files")
logger.info("*=================write to csv success==============================*")
if __name__ == '__main__': if __name__ == '__main__':
start_time = time.time() start_time = time.time()
logger = setup_logger('asm_to_csv', './log/asm_to_csv.log') # 文件相关设置
file_type = 'malware'
logger = setup_logger('asm_to_csv', f'./log/asm_to_csv_{file_type}.log')
asm_file_path = os.path.join("D:/bishe/dataset/infected/infected_asm/") asm_file_path = os.path.join("D:/bishe/dataset/infected/infected_asm/")
# end
file_list = os.listdir(asm_file_path) file_list = os.listdir(asm_file_path)
Opcode_list = [] Opcode_list = []
none_Opcode_list = [] none_Opcode_list = []
done_file_num = 0 done_file_num = 0
process_bar = tqdm(desc='Processing...', leave=True, total=len(file_list))
for file in file_list: for file in file_list:
try: try:
with open(asm_file_path + file, 'r', errors='ignore') as asm_file: with open(asm_file_path + file, 'r', errors='ignore') as asm_file:
over_flag, flag, result = findOpcode_in_asm_file(asm_file, logger) over_flag, flag, result = findOpcode_in_asm_file(asm_file, logger, file_type)
if flag: if flag:
logger.warning(f"file {file} Opcode is empty") logger.warning(f"file {file} Opcode is empty")
continue continue
@ -62,23 +89,20 @@ if __name__ == '__main__':
Opcode_list.extend(result) Opcode_list.extend(result)
done_file_num += 1 done_file_num += 1
if len(Opcode_list) > 50000: if len(Opcode_list) > 50000:
print("*======================start write==================================*") Opcode_to_csv(Opcode_list, file_type)
write_res = csv_write(Opcode_list)
Opcode_list.clear() Opcode_list.clear()
print("list clear")
print(f"done {done_file_num} files")
print("*=================write to csv success==============================*")
except Exception as e: except Exception as e:
print(f"Error processing file {file}: {e}") logger.error(f"Error processing file {file}: {e}")
finally:
process_bar.update(1)
if len(Opcode_list) > 0: if len(Opcode_list) > 0:
print("*======================start write==================================*") Opcode_to_csv(Opcode_list, file_type)
write_res = csv_write(Opcode_list)
Opcode_list.clear() Opcode_list.clear()
print("list clear")
print(f"done {done_file_num} files")
print("*=================write to csv success==============================*")
logger.debug(f"none Opcode file list {none_Opcode_list} ") logger.debug(f"none Opcode file list {none_Opcode_list} ")
csv_write('none_Opcode_list.csv', none_Opcode_list)
end_time = time.time() end_time = time.time()
print(f"Done processing {done_file_num} files") logger.info(f"max_opcode_num is {max_opcode_num}")
print(f"Total time: {end_time - start_time} " logger.info(f"Done processing {done_file_num} files")
logger.info(f"Total time: {end_time - start_time} "
f"seconds, start at :{datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S')}") f"seconds, start at :{datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S')}")

245
ngram.py Normal file
View File

@ -0,0 +1,245 @@
import threading
from collections import defaultdict
from tqdm import tqdm
import pandas as pd
import os
import csv
import argparse
import statistics
import plotly.express as px
import concurrent.futures
from functools import partial
import logging
import contextlib
###################################################################################################
## Program shall take two csv files of different classes - benign and malware
## It will compute ngrams for each of the classes seperately and find the delta frequencies
## of each computed ngram. delta_frequencies = (class1 - class2)
###################################################################################################
# --------------------------------------------------------------------------------------------------
## Generate ngrams given the corpus and factor n
def generate_N_grams(corpus, n=1):
words = [word for word in corpus.split(" ")]
temp = zip(*[words[i:] for i in range(0, n)])
ngram = [' '.join(n) for n in temp]
return ngram
# --------------------------------------------------------------------------------------------------
## Creates ngrams for the corpus List for given N and Filters it based on following criteria
# file count >= percent of Total corpus len (pecent in [1..100])
# Selects high frequency ngram until the mean value
# Returns both complete and filtered dictionary of ngrams
def filter_N_grams(corpusList, N, percent, filterFreq=0):
total = len(corpusList)
ngramDictionary = defaultdict(int)
ngramFileCount = defaultdict(int)
for idx in range(0, total):
opcodes = corpusList[idx]
if type(opcodes) is not str:
continue
for item in generate_N_grams(opcodes, N):
# compute frequency of all unique ngrams
if len(opcodes) == 0:
continue
ngramDictionary[item] += 1
# compute ngram file count
for item in ngramDictionary:
ngramFileCount[item] += 1
filteredNgramDictionary = defaultdict(int)
# Filter those ngrams which meet percent of Total files criteria
filterCnt = round(int((percent * total) / 100), 0)
for item in ngramFileCount:
if ngramFileCount[item] >= filterCnt:
# Add to filtered dictionary the item which meets file count criteria
filteredNgramDictionary[item] = ngramDictionary[item]
# Filter ngram with a minimum frequency
if (filterFreq):
for item in ngramDictionary:
if ngramDictionary[item] < filterFreq and item in filteredNgramDictionary:
# Remove the item which below the frequency threshold
filteredNgramDictionary.pop(item)
# print(f"Total ngrams:{len(ngramDictionary.items())} => filtered: {len(filteredNgramDictionary.items())}\n")
return ngramDictionary, filteredNgramDictionary
# --------------------------------------------------------------------------------------------------
# Calculate a normalization factor for frequency values of class1 and class2
# For class which are high in frequency due their sample size, a normalization may required to be
# factored for correctly resizgin the frequencies of the small class set.
# input list of frequencies of class1 and class 2
def normalization_factor(class1, class2):
mean1 = statistics.mean(class1)
mean2 = statistics.mean(class2)
return mean1 / mean2
# --------------------------------------------------------------------------------------------------
# Write the data into the given csv file handle
def WriteCSV(file, csvFields, dataDictionary):
writer = csv.DictWriter(file, fieldnames=csvFields)
writer.writeheader()
writer.writerows(dataDictionary)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def process_csv_file(csvfile, ngram_type, file_percent_filter, frequency_filter):
"""处理CSV文件并并行计算n-gram"""
print(f"start load csv file:{os.path.basename(csvfile)}")
dataframe = pd.read_csv(csvfile, encoding="utf8")
print(f"end load")
ngram_list = defaultdict(int)
filtered_ngram_list = defaultdict(int)
process_bar = tqdm(total=len(dataframe['corpus'].values), desc=f'Computing {ngram_type}-gram on files')
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: # 调整线程池大小
future_to_args = {
executor.submit(filter_N_grams, dataframe['corpus'].values[start: start + 10000],
idx + 1, file_percent_filter, frequency_filter): start for start in
range(0, len(dataframe['corpus'].values), 10000)
}
for future in concurrent.futures.as_completed(future_to_args):
try:
sub_ngram_list, sub_filtered_ngram_list = future.result()
for i in [sub_ngram_list, ngram_list]:
for key, value in i.items():
ngram_list[key] += value
for i in [sub_filtered_ngram_list, filtered_ngram_list]:
for key, value in i.items():
filtered_ngram_list[key] += value
process_bar.update(10000) # 手动更新进度条
except Exception as exc:
logging.error(f"Error processing {idx + 1}-gram: {exc}")
return ngram_list, filtered_ngram_list
# --------------------------------------------------------------------------------------------------
# Execution starts here
# Add command line arguments
# CSV header: class, sub-class, size, corpus
# Execute the parse_args() method
def build_csv(ngram_list, filter_list, maxgrams, file_type):
ngramDicList = []
csv_file_header = ['ngram', 'count']
csv_file = os.path.join('./out', f'{file_type}-{maxgrams}-gram.csv')
for index in tqdm(range(len(ngram_list)), desc=f'Building {maxgrams}-gram csv'):
ngramDicList.append({
'ngram': ngram_list[index],
'count': filter_list[index]
})
try:
csv_file = open(csv_file, 'w')
except Exception as e:
print(f"Error opening {csv_file} for writing: {e}")
WriteCSV(csv_file, csv_file_header, ngramDicList)
csv_file.close()
if __name__ == '__main__':
# Get user arguments
malware_csvfile = os.path.join('./out/output_malware.csv')
benign_csvfile = os.path.join('./out/output_benign.csv')
maxgrams_list = [3,2,1]
# Error check and exit if not a file
if not (os.path.isfile(malware_csvfile) and os.path.isfile(benign_csvfile)):
print(f"Path should be csv file!")
exit(1)
# Read the csv file using pandas into data frame
# Build a frequency list for ngrams
for maxgrams in maxgrams_list:
filePercentFilter = 80 ## select ngrams present in x% of files
frequencyFilter = 20 ## select ngrams with frequency greater than this value
malwareNgram = defaultdict(int) ## full list of ngrams in malware corpus
benignNgram = defaultdict(int) ## full list of ngrams in benign corpus
filteredMalwareNgram = defaultdict(int) ## filtered list of ngrams from malware corpus
filteredBenignNgram = defaultdict(int) ## filtered list of ngrams from benign corpus
## common list ngrams from both malware and benign corpus with relative frequency (benignFreq - malwareFreq)
filteredMergedNgram = defaultdict(int)
# run for only the maxgram provided, change lower value to 0 to run for all values [1..N]
for idx in range(maxgrams - 1, maxgrams):
print(f"Computing {idx + 1}gram on files ...")
print(f"CPU core {os.cpu_count()} on use")
malwareNgram = []
filteredMalwareNgram = []
benignNgram = []
filteredBenignNgram = []
malwareNgram.clear()
filteredMalwareNgram.clear()
benignNgram.clear()
filteredBenignNgram.clear()
filteredMergedNgram.clear()
# opcodes decoded from pe file in sequence is stored as corpus in the csv
malwareNgram, filteredMalwareNgram = process_csv_file(malware_csvfile, 'malware', filePercentFilter,
frequencyFilter)
# build_csv(malwareNgram, filteredMalwareNgram, maxgrams, 'malware')
benignNgram, filteredBenignNgram = process_csv_file(benign_csvfile, 'benign', filePercentFilter,
frequencyFilter)
# build_csv(benignNgram, filteredBenignNgram, maxgrams, 'benign')
# creates a sorted list of ngram tuples with their frequency for 1 .. maxgram
mergedList = list(set().union(filteredMalwareNgram.keys(), filteredBenignNgram.keys()))
## Now find the relative frequency b/w benign and malware files. = benign - malware
## write this for cases where ngrams only present in one of the clases malware or benign
## for reusability in case a union of classes is taken.
for item in mergedList:
key = item # get the ngram only
if key in filteredBenignNgram:
if key in filteredMalwareNgram:
filteredMergedNgram[key] = filteredBenignNgram[key] - filteredMalwareNgram[key]
elif item in malwareNgram:
filteredMergedNgram[key] = filteredBenignNgram[key] - malwareNgram[key]
else:
filteredMergedNgram[key] = filteredBenignNgram[key]
elif key in filteredMalwareNgram:
if key in benignNgram:
filteredMergedNgram[key] = benignNgram[key] - filteredMalwareNgram[key]
else:
filteredMergedNgram[key] = filteredMalwareNgram[key]
print(f"Merged: {idx + 1}gramCnt={len(filteredMergedNgram.keys())}")
# ## get a sorted list of merged ngrams with relative frequencies
sortedMergedNgramList = sorted(filteredMergedNgram.items(), key=lambda x: x[1])
# write the final ngrams into a file for feature selection
AbsoluteNgramDictList = []
RelativeNgramDictList = []
for item in sortedMergedNgramList:
dictItem = {}
key = item[0]
dictItem['ngram'] = key
dictItem['count'] = max(filteredMalwareNgram[key], filteredBenignNgram[key])
AbsoluteNgramDictList.append(dictItem)
RelativeNgramDictList.append({'ngram': item[0], 'count': item[1]})
csvfields = ['ngram', 'count']
AbsoluteCsvName = "./out/" + str(idx + 1) + "gram-absolute.csv"
RelativeCsvName = "./out/" + str(idx + 1) + "gram-relative.csv"
print("*======================start write csv=======================================*")
try:
csvfile = open(AbsoluteCsvName, 'w')
except Exception as err:
print(f"Error: writing csvfile {err}")
WriteCSV(csvfile, csvfields, AbsoluteNgramDictList)
csvfile.close()
try:
csvfile = open(RelativeCsvName, 'w')
except Exception as err:
print(print(f"Error: writing csvfile {err}"))
WriteCSV(csvfile, csvfields, RelativeNgramDictList)
csvfile.close()
print("*======================end write csv=======================================*")