import os import re from log_utils import setup_logger from tqdm import tqdm import r2pipe import pandas as pd def Opcode_to_csv(opcode_list, file_type): logger.info("*======================start write==================================*") csv_write(f'output_{file_type}.csv', opcode_list) logger.info(f"done {done_file_num} files") logger.info("*=================write to csv success==============================*") def csv_write(file_name, data: list): """write data to csv""" df = pd.DataFrame(data) chunksize = 1000 for i in range(0, len(df), chunksize): df.iloc[i:i + chunksize].to_csv(f'./out/{file_name}', mode='a', header=False, index=False) return True def extract_opcode(disasm_text): """ 从反汇编文本中提取操作码和操作数 正则表达式用于匹配操作码和操作数,考虑到操作数可能包含空格和逗号 """ match = re.search(r"^\s*(\S+)(?:\s+(.*))?$", disasm_text) if match: opcode = match.group(1) # operands_str = match.group(2) if match.group(2) is not None else "" # split_pattern = re.compile(r",(?![^\[]*\])") # 用于切分操作数的正则表达式 # operands = split_pattern.split(operands_str) # return opcode, [op.strip() for op in operands if op.strip()] return opcode return "" def get_graph_r2pipe(r2pipe_open, file_type): # 获取基础块内的操作码序列 opcode_Sequence = [] try: # 获取函数列表 function_list = r2pipe_open.cmdj("aflj") for function in function_list: # 外部函数测试 # if function['name'] == 'sub.TNe_U': # print(function) # block_list = r2pipe_open.cmdj("afbj @" + str(function['offset'])) # for block in block_list: # # print(block) # # 获取基本块的反汇编指令 # disasm = r2pipe_open.cmdj("pdj " + str(block["ninstr"]) + " @" + str(block["addr"])) # if disasm: # for op in disasm: # print(extract_opcode(op["opcode"])) block_list = r2pipe_open.cmdj("afbj @" + str(function['offset'])) block_opcode_Sequence = [] for block in block_list: # print(block) # 获取基本块的反汇编指令 disasm = r2pipe_open.cmdj("pdj " + str(block["ninstr"]) + " @" + str(block["addr"])) if disasm: for op in disasm: if op["type"] == "invalid": continue block_opcode_Sequence.append(extract_opcode(op["opcode"])) opcode_Sequence.append([file_type, file_type, len(block_opcode_Sequence), ' '.join(block_opcode_Sequence)]) except: print("Error: get function list failed") return opcode_Sequence if __name__ == '__main__': logger = setup_logger('logger', 'log/opcode_benign.log') file_type = 'benign' file_path = os.path.join('/mnt/d/bishe/dataset/train_benign') file_list = os.listdir(file_path)[:10000] done_file_num = 0 process_bar = tqdm(desc='Processing...', leave=True, total=len(file_list)) done_list = [['class', 'sub-class','size', 'corpus']] for file_name in file_list: r2pipe_open = r2pipe.open(os.path.join(file_path, file_name), flags=['-2']) r2pipe_open.cmd("aaa") done_list.extend(get_graph_r2pipe(r2pipe_open, file_type)) if len(done_list) > 100000: csv_write(f'output_{file_type}.csv', done_list) done_file_num += 1 done_list.clear() process_bar.update(1) else: csv_write(f'output_{file_type}.csv', done_list) # node_list = [] # edge_list = [] # temp_edge_list = [] # node_info_list = [] # # for function in function_list: # block_list = r2pipe_open.cmdj("afbj @" + str(function['offset'])) # # for block in block_list: # node_list.append(block["addr"]) # # # 获取基本块的反汇编指令 # disasm = r2pipe_open.cmdj("pdj " + str(block["ninstr"]) + " @" + str(block["addr"])) # node_info = [] # if disasm: # for op in disasm: # if op["type"] == "invalid": # continue # opcode, operands = extract_opcode_and_operands(op["disasm"]) # # 处理跳转指令 # if "jump" in op and op["jump"] != 0: # temp_edge_list.append([block["addr"], op["jump"]]) # node_info.append([op["offset"], op["bytes"], opcode, op["jump"]]) # else: # node_info.append([op["offset"], op["bytes"], opcode, None]) # node_info_list.append(node_info) # # # 完成 CFG 构建后, 检查并清理不存在的出边 # for temp_edge in temp_edge_list: # if temp_edge[1] in node_list: # edge_list.append(temp_edge) # # # 获取排序后元素的原始索引 # sorted_indices = [i for i, v in sorted(enumerate(node_list), key=lambda x: x[1])] # # 根据这些索引重新排列 # node_list = [node_list[i] for i in sorted_indices] # node_info_list = [node_info_list[i] for i in sorted_indices] # # return True, "二进制可执行文件解析成功", node_list, edge_list, node_info_list # except Exception as e: # return False, e, None, None, None