import os import r2pipe from my_utils import setup_logger import concurrent.futures from tqdm import tqdm def extract_opcode(disasm_text): """ 从反汇编文本中提取操作码和操作数 正则表达式用于匹配操作码和操作数,考虑到操作数可能包含空格和逗号 """ op_list = disasm_text.split(' ') res = [] for item in op_list: item = item.strip().replace(',', '') if '[' in item: res.append('[') res.append(item.replace('[', '').replace(']', '')) if ']' in item: res.append(']') return res def double_exe_op_list(op_list): double_exe_op_list = [] for i in range(len(op_list) - 1 ): double_exe_op_list.append((op_list[i], op_list[i + 1])) return double_exe_op_list def get_all_from_exe(file, out_file): # 获取基础块内的操作码序列 r2pipe_open = r2pipe.open(os.path.join(file), flags=['-2']) with open(out_file, 'a') as f: try: # 获取函数列表 r2pipe_open.cmd("aaa") r2pipe_open.cmd('e arch=x86') function_list = r2pipe_open.cmdj("aflj") exe_op_list = [] for function in function_list: if function['name'][:4] not in ['fcn.', 'loc.', 'main', 'entr']: continue block_list = r2pipe_open.cmdj("afbj @" + str(function['offset'])) for block in block_list: # 获取基本块的反汇编指令 disasm = r2pipe_open.cmdj("pdj " + str(block["ninstr"]) + " @" + str(block["addr"])) if disasm: for op in disasm: if op["type"] == "invalid" or op["opcode"] == "invalid": continue op_list = extract_opcode(op["disasm"]) exe_op_list.append(' '.join(op_list)) exe_op_list = double_exe_op_list(exe_op_list) for op_str_before, op_str_after in exe_op_list: f.write(op_str_before + '\t' + op_str_after + '\n') except Exception as e: logger.error(f"Error: get function list failed in {file} ,error {e}") return False, file, e r2pipe_open.quit() return True, '', '' def main(): sample_file_path = '/mnt/d/bishe/dataset/sample_malware/' sample_file_list = os.listdir(sample_file_path)[:1000] out_file_path = '../dataset/all' with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: print(f"start with {os.cpu_count()} works.") future_to_args = { executor.submit(get_all_from_exe, os.path.join(sample_file_path, sample_file_list[file_index]), os.path.join(out_file_path, str(f'inst.{file_index%os.cpu_count()}.pos.txt')) ): file_index for file_index in range(len(sample_file_list)) } for future in tqdm(concurrent.futures.as_completed(future_to_args), total=len(sample_file_list)): try: future.result() if not future.result()[0]: print(f"Error file: {future.result()[1]}, msg {future.result()[2]}") except Exception as exc: logger.error(f"Error: {exc}") print(f"Error: {exc}") if __name__ == '__main__': logger = setup_logger('exe2all', '../log/exe2all.log') main()