asm_to_csv/exe2json.py

275 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import r2pipe
import hashlib
from my_utils import *
import json
# 基础块抽取
# from bert.obtain_inst_vec import bb2vec
from tqdm import tqdm
import numpy as np
import os
import concurrent.futures
import multiprocessing
ret_trap_opcode_family = ["ret", "hlt", "int3", "ud2"]
sample_type = 'malware'
def extract_opcode(disasm_text):
"""
从反汇编文本中提取操作码和操作数
正则表达式用于匹配操作码和操作数,考虑到操作数可能包含空格和逗号
将操作码与操作数转化为bert模型输入
"""
op_list = disasm_text.split(' ')
res = []
for item in op_list:
item = item.strip().replace(',', '')
if '[' in item:
res.append('[')
res.append(item.replace('[', '').replace(']', ''))
if ']' in item:
res.append(']')
return ' '.join(res)
def calc_sha256(file_path):
with open(file_path, 'rb') as f:
bytes = f.read()
sha256obj = hashlib.sha256(bytes)
sha256 = sha256obj.hexdigest()
return sha256
def get_graph_cfg_r2pipe(r2pipe_open, file_path, output_path, file_name):
# CFG提取
acfg_item = []
acfg_feature_item = []
try:
# 获取函数列表
function_list = r2pipe_open.cmdj("aflj")
# debug
# for function in tqdm(function_list, total=len(function_list)):
for function in function_list:
# print(function['offset'])
# debug
# if function['offset'] != 5368850704:
# continue
node_list = []
edge_list = []
temp_edge_list = []
block_feature_list = []
# 基本快块列表
block_list = r2pipe_open.cmdj("afbj @" + str(function['offset']))
# 获取基本块数量
block_number = len(block_list)
for block in block_list:
# 基础块内的语句
block_addr = block["addr"]
block_Statement = []
node_list.append(block["addr"])
# 获取基本块的反汇编指令
disasm = r2pipe_open.cmdj("pdj " + str(block["ninstr"]) + " @" + str(block["addr"]))
# debug
# if len(disasm) != 49144:
# continue
if disasm:
last_op = ''
# if len(disasm) > 200:
# logger.warning(
# f"基础块指令长度异常,{file_path},函数名称{function['name']}基础块地址{block['addr']},长度{len(disasm)}")
for op_index, op in enumerate(disasm):
op_disasm = extract_opcode(op["disasm"])
# 防止大量重复的语句造成内存溢出
if op_disasm == last_op:
continue
last_op = op_disasm
# 提取操作码并转换为bert模型输入格式
block_Statement.append(op_disasm)
# 处理跳转码并构建cfg
if 'jump' in op:
if op['jump'] == 0:
if op_index != len(disasm) - 1:
node_list.append(disasm[op_index + 1]['offset'])
elif op['type'] == 'jmp':
temp_edge_list.append([block["addr"], op['jump']])
if op_index != len(disasm) - 1:
node_list.append(disasm[op_index + 1]['offset'])
elif op['type'] == 'cjmp':
temp_edge_list.append([block["addr"], op['jump']])
if op_index == len(disasm) - 1:
temp_edge_list.append([block_addr, op['jump']])
else:
temp_edge_list.append([block_addr, disasm[op_index + 1]["offset"]])
node_list.append(disasm[op_index + 1]["offset"])
elif op['type'] == 'call':
temp_edge_list.append([block_addr, op["jump"]])
temp_edge_list.append([op["jump"], block_addr])
if op_index == len(disasm) - 1:
temp_edge_list.append([block_addr, op["offset"] + op["size"]])
else:
logger.warning(
f"二进制可执行文件解析警告,跳转指令识别出错,文件{file_path},指令{op}")
# 操作码不存在跳转指令
else:
if op_index != len(disasm) - 1:
# 当前指令不是基础块的最后一条指令
if op in ret_trap_opcode_family and op["type"] in ["ret", "trap"]:
node_list.append(disasm[op_index + 1]["offset"])
else:
# 当前指令是基础块的最后一条指令
if op not in ret_trap_opcode_family or op["type"] not in ["ret", "trap"]:
temp_edge_list.append([block_addr, op["offset"] + op["size"]])
# debugger
# print(len(disasm))
# print(len(block_Statement))
# print(block_Statement)
"""
速度过慢
"""
# bert模型转化特征
# block_feature_list = bb2vec(block_Statement)
# block_feature_list = []
# 暂时将bb地址作为特征 后续将操作码集中转化为特征
block_feature_list.append(block_addr)
acfg_feature_item.append({'addr': block_addr, 'opcode': block_Statement})
# 过滤不存在的边
for temp_edge in temp_edge_list:
if temp_edge[0] in node_list and temp_edge[1] in node_list:
edge_list.append(temp_edge)
# 单独错误信息日志
if block_number == 0:
logger.warning(
f"二进制可执行文件解析出错,出错文件:{file_path},出错函数地址:{function['offset']},基础块个数{block_number}")
# cfg构建
acfg = {
'block_number': block_number,
'block_edges': [[d[0] for d in edge_list], [d[1] for d in edge_list]],
'block_features': block_feature_list
}
acfg_item.append(acfg)
save_json(os.path.join(output_path, 'feature', file_name + '.jsonl'), acfg_feature_item)
return True, "二进制可执行文件解析成功", acfg_item
except Exception as e:
return False, e, None
def get_graph_fcg_r2pipe(r2pipe_open):
# FCG提取
try:
node_list = []
func_name_list = []
edge_list = []
temp_edge_list = []
function_list = r2pipe_open.cmdj("aflj")
function_num = len(function_list)
for function in function_list:
func_name_list.append(function["name"])
r2pipe_open.cmd(f's ' + str(function["offset"]))
pdf = r2pipe_open.cmdj('pdfj')
if pdf is None:
continue
for op in pdf["ops"]:
if op["type"] == "invalid":
continue
if "jump" in op and op["jump"] != 0:
temp_edge_list.append([function["offset"], op["jump"]])
node_list.append(function["offset"])
# 完成 FCG 构建后, 检查并清理不存在的出边
for temp_edge in temp_edge_list:
if temp_edge[1] in node_list:
edge_list.append(temp_edge)
func_name_list = [func_name for func_name in func_name_list]
return True, "二进制可执行文件解析成功", function_num, edge_list, func_name_list
except Exception as e:
return False, e, None, None, None
def init_logging():
# 初始化日志
log_file = f"./log/exe2json_{sample_type}.log"
return setup_logger('exe2json', log_file)
def exe_to_json(file_path):
output_path = f"./out/json/{sample_type}"
file_fingerprint = calc_sha256(file_path)
if os.path.exists(os.path.join(output_path, file_fingerprint + '.jsonl')):
# logger.info(f"二进制可执行文件已解析,文件名{file_path}")
return
# logger.info(f"开始解析,文件名{file_path}")
# 获取r2pipe并解析文件 解析完即释放r2
r2 = r2pipe.open(file_path, flags=['-2'])
r2.cmd("aaa")
fcg_Operation_flag, fcg_Operation_message, function_num, function_fcg_edge_list, function_names = get_graph_fcg_r2pipe(
r2)
# cfg_Operation_flag, cfg_Operation_message, cfg_item = get_graph_cfg_r2pipe(r2, file_path, feature_out)
cfg_Operation_flag, cfg_Operation_message, cfg_item = get_graph_cfg_r2pipe(r2, file_path, output_path,
file_fingerprint)
r2.quit()
# 文件json构建
if fcg_Operation_flag and cfg_Operation_flag:
json_obj = {
'hash': file_fingerprint,
'function_number': function_num,
'function_edges': [[int(d[0]) for d in function_fcg_edge_list],
[int(d[1]) for d in function_fcg_edge_list]],
'acfg_list': cfg_item,
'function_names': function_names
}
else:
logger.error(f"二进制可执行文件解析失败 文件名{file_path}")
if not fcg_Operation_flag:
logger.error(f"fcg错误{fcg_Operation_message}")
if not cfg_Operation_flag:
logger.error(f"cfg错误{cfg_Operation_message}")
return
# json写入
os.makedirs(output_path, exist_ok=True)
save_json(os.path.join(output_path, file_fingerprint + '.jsonl'), json_obj)
# logger.info(f"解析完成,文件名{file_path}")
return
if __name__ == '__main__':
logger = init_logging()
sample_file_path = f"/mnt/d/bishe/sample_{sample_type}"
sample_file_list = os.listdir(sample_file_path)
print(f"max worker {os.cpu_count()}")
with multiprocessing.Pool(processes=os.cpu_count()) as pool:
result = list(tqdm(pool.imap_unordered(exe_to_json, [os.path.join(sample_file_path, file_name) for file_name in sample_file_list]), total=len(sample_file_list)))
# with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
# futures_to_args = {
# executor.submit(exe_to_json, os.path.join(sample_file_path, file_name)): file_name for file_name in
# sample_file_list
# }
# for future in tqdm(concurrent.futures.as_completed(futures_to_args), total=len(futures_to_args)):
# pass
# test_file_path = '/mnt/d/bishe/exe2json/sample/VirusShare_0a3b625380161cf92c4bb10135326bb5'
# exe_to_json(test_file_path)