diff --git a/exe2json.py b/exe2json.py index eac91d3..d2e0d14 100644 --- a/exe2json.py +++ b/exe2json.py @@ -8,12 +8,13 @@ from tqdm import tqdm import numpy as np import os +import concurrent.futures # 禁用分词器多线程 os.environ["TOKENIZERS_PARALLELISM"] = "false" ret_trap_opcode_family = ["ret", "hlt", "int3", "ud2"] -sample_type = 'malware' +sample_type = 'benign' def extract_opcode(disasm_text): @@ -33,6 +34,7 @@ def extract_opcode(disasm_text): res.append(']') return ' '.join(res) + def calc_sha256(file_path): with open(file_path, 'rb') as f: bytes = f.read() @@ -41,7 +43,7 @@ def calc_sha256(file_path): return sha256 -def get_graph_cfg_r2pipe(r2pipe_open, file_path, feature_out): +def get_graph_cfg_r2pipe(r2pipe_open, file_path, output_path, file_name): # CFG提取 acfg_item = [] acfg_feature_item = [] @@ -78,7 +80,8 @@ def get_graph_cfg_r2pipe(r2pipe_open, file_path, feature_out): if disasm: last_op = '' if len(disasm) > 200: - logger.warning(f"基础块指令长度异常,{file_path},函数名称{function['name']}基础块地址{block['addr']},长度{len(disasm)}") + logger.warning( + f"基础块指令长度异常,{file_path},函数名称{function['name']}基础块地址{block['addr']},长度{len(disasm)}") for op_index, op in enumerate(disasm): op_disasm = extract_opcode(op["disasm"]) # 防止大量重复的语句造成内存溢出 @@ -86,7 +89,7 @@ def get_graph_cfg_r2pipe(r2pipe_open, file_path, feature_out): continue last_op = op_disasm # 提取操作码并转换为bert模型输入格式 - block_Statement.append(extract_opcode(op["disasm"])) + block_Statement.append(op_disasm) # 处理跳转码并构建cfg if 'jump' in op: if op['jump'] == 0: @@ -132,7 +135,6 @@ def get_graph_cfg_r2pipe(r2pipe_open, file_path, feature_out): # print(len(block_Statement)) # print(block_Statement) - """ 速度过慢 """ @@ -142,7 +144,7 @@ def get_graph_cfg_r2pipe(r2pipe_open, file_path, feature_out): # 暂时将bb地址作为特征 后续将操作码集中转化为特征 block_feature_list = block_addr - acfg_feature_item.append({'addr':block_addr, 'opcode':block_Statement}) + acfg_feature_item.append({'addr': block_addr, 'opcode': block_Statement}) # 过滤不存在的边 for temp_edge in temp_edge_list: @@ -150,7 +152,8 @@ def get_graph_cfg_r2pipe(r2pipe_open, file_path, feature_out): edge_list.append(temp_edge) # 单独错误信息日志 if block_number == 0: - logger.warning(f"二进制可执行文件解析出错,出错文件:{file_path},出错函数地址:{function['offset']},基础块个数{block_number}") + logger.warning( + f"二进制可执行文件解析出错,出错文件:{file_path},出错函数地址:{function['offset']},基础块个数{block_number}") # cfg构建 acfg = { 'block_number': block_number, @@ -158,7 +161,7 @@ def get_graph_cfg_r2pipe(r2pipe_open, file_path, feature_out): 'block_features': block_feature_list } acfg_item.append(acfg) - feature_out.write(json.dumps(acfg_feature_item)) + save_json(os.path.join(file_path, 'feature', file_name + '.jsonl'), acfg_feature_item) return True, "二进制可执行文件解析成功", acfg_item except Exception as e: @@ -182,39 +185,23 @@ def get_graph_fcg_r2pipe(r2pipe_open): pdf = r2pipe_open.cmdj('pdfj') if pdf is None: continue - node_bytes = "" - node_opcode = "" for op in pdf["ops"]: if op["type"] == "invalid": continue - node_bytes += op["bytes"] - opcode = extract_opcode(op["disasm"]) - node_opcode += opcode + " " if "jump" in op and op["jump"] != 0: temp_edge_list.append([function["offset"], op["jump"]]) node_list.append(function["offset"]) - # 完成 FCG 构建后, 检查并清理不存在的出边 for temp_edge in temp_edge_list: if temp_edge[1] in node_list: edge_list.append(temp_edge) - sub_function_name_list = ('sym.','sub','imp') - func_name_list = [func_name for func_name in func_name_list if func_name.startswith(sub_function_name_list)] + func_name_list = [func_name for func_name in func_name_list] return True, "二进制可执行文件解析成功", function_num, edge_list, func_name_list except Exception as e: return False, e, None, None, None -def get_r2pipe(file_path): - # 初始化r2pipe - try: - r2 = r2pipe.open(file_path, flags=['-2']) - r2.cmd("aaa") - r2.cmd('e arch=x86') - return r2 - except Exception as e: - return None def init_logging(): # 初始化日志 @@ -227,14 +214,17 @@ def exe_to_json(file_path): file_fingerprint = calc_sha256(file_path) if os.path.exists(os.path.join(output_path, file_fingerprint + '.jsonl')): logger.info(f"二进制可执行文件已解析,文件名{file_path}") - return True + return logger.info(f"开始解析,文件名{file_path}") + # 获取r2pipe并解析文件 解析完即释放r2 - r2 = get_r2pipe(file_path) - fcg_Operation_flag, fcg_Operation_message, function_num, function_fcg_edge_list, function_names = get_graph_fcg_r2pipe(r2) - with open(os.path.join(output_path, 'feature' ,file_fingerprint + '.jsonl'), 'w') as feature_out: - cfg_Operation_flag, cfg_Operation_message, cfg_item = get_graph_cfg_r2pipe(r2,file_path, feature_out) - feature_out.close() + r2 = r2pipe.open(file_path, flags=['-2']) + r2.cmd("aaa") + fcg_Operation_flag, fcg_Operation_message, function_num, function_fcg_edge_list, function_names = get_graph_fcg_r2pipe( + r2) + # cfg_Operation_flag, cfg_Operation_message, cfg_item = get_graph_cfg_r2pipe(r2, file_path, feature_out) + cfg_Operation_flag, cfg_Operation_message, cfg_item = get_graph_cfg_r2pipe(r2, file_path, output_path, + file_fingerprint) r2.quit() # 文件json构建 @@ -253,21 +243,30 @@ def exe_to_json(file_path): logger.error(f"fcg错误:{fcg_Operation_message}") if not cfg_Operation_flag: logger.error(f"cfg错误:{cfg_Operation_message}") - return False + return # json写入 - result = json.dumps(json_obj,ensure_ascii=False, default=lambda x: float(x) if isinstance(x, np.float32) else x) os.makedirs(output_path, exist_ok=True) - with open(os.path.join(output_path, file_fingerprint + '.jsonl'), 'w') as out: - out.write(result) - out.close() + save_json(os.path.join(output_path, file_fingerprint + '.jsonl'), json_obj) logger.info(f"解析完成,文件名{file_path}") - return True + return + if __name__ == '__main__': logger = init_logging() - - sample_file_path = f"/mnt/d/bishe/dataset/sample_{sample_type}" + sample_file_path = f"/mnt/d/bishe/sample_{sample_type}" sample_file_list = os.listdir(sample_file_path) - multi_thread(exe_to_json, [os.path.join(sample_file_path, file_name) for file_name in sample_file_list], thread_num=THREAD_FULL) + print(f"max worker {os.cpu_count()}") + with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: + try: + futures_to_args = { + executor.submit(exe_to_json, os.path.join(sample_file_path, file_name)): file_name for file_name in + sample_file_list + } + for future in tqdm(concurrent.futures.as_completed(futures_to_args), total=len(futures_to_args)): + pass + + except Exception as exc: + logger.error('%r generated an exception: %s' % (futures_to_args[future], exc)) + # test_file_path = '/mnt/d/bishe/exe2json/sample/VirusShare_0a3b625380161cf92c4bb10135326bb5' # exe_to_json(test_file_path) diff --git a/my_utils.py b/my_utils.py index 1a9dead..a484846 100644 --- a/my_utils.py +++ b/my_utils.py @@ -7,6 +7,8 @@ import os 使用方法: logger = setup_logger(日志记录器的实例名字, 日志文件目录) """ + + def setup_logger(name, log_file, level=logging.INFO): """Function setup as many loggers as you want""" if not os.path.exists(os.path.dirname(log_file)): @@ -40,6 +42,8 @@ def setup_logger(name, log_file, level=logging.INFO): """ THREAD_FULL = os.cpu_count() THREAD_HALF = int(os.cpu_count() / 2) + + def multi_thread(func, args, thread_num=THREAD_FULL): """ 多线程执行函数 @@ -53,13 +57,25 @@ def multi_thread(func, args, thread_num=THREAD_FULL): logger = setup_logger('multi_thread', './multi_thread.log') result = [] with concurrent.futures.ThreadPoolExecutor(max_workers=thread_num) as executor: - futures_to_args = { - executor.submit(func, arg): arg for arg in args - } - for future in tqdm(concurrent.futures.as_completed(futures_to_args), total=len(args)): - try: - result.append(future.result()) - except Exception as exc: - logger.error('%r generated an exception: %s' % (futures_to_args[future], exc)) + try: + futures_to_args = { + executor.submit(func, arg): arg for arg in args + } + for future in tqdm(concurrent.futures.as_completed(futures_to_args), total=len(args)): + try: + result.append(future.result()) + except Exception as exc: + logger.error('%r generated an exception: %s' % (futures_to_args[future], exc)) + except Exception as exc: + logger.error('%r generated an exception: %s' % (futures_to_args[future], exc)) + return result + +def save_json(filename, data): + import json + + data = json.dumps(data) + file = open(filename, 'w') + file.write(data) + file.close()