import concurrent.futures import os import re from log_utils import setup_logger from tqdm import tqdm import r2pipe import pandas as pd csv_lock = 0 def Opcode_to_csv(opcode_list, file_type): csv_write(f'output_{file_type}.csv', opcode_list) logger.info(f"done {done_file_num} files") def csv_write(file_name, data: list): """write data to csv""" logger.info("*======================start write==================================*") df = pd.DataFrame(data) chunksize = 1000 for i in range(0, len(df), chunksize): df.iloc[i:i + chunksize].to_csv(f'./out/{file_name}', mode='a', header=False, index=False) logger.info(f"done rows {len(df)}") logger.info("*=================write to csv success==============================*") return True def extract_opcode(disasm_text): """ 从反汇编文本中提取操作码和操作数 正则表达式用于匹配操作码和操作数,考虑到操作数可能包含空格和逗号 """ match = re.search(r"^\s*(\S+)(?:\s+(.*))?$", disasm_text) if match: opcode = match.group(1) # operands_str = match.group(2) if match.group(2) is not None else "" # split_pattern = re.compile(r",(?![^\[]*\])") # 用于切分操作数的正则表达式 # operands = split_pattern.split(operands_str) # return opcode, [op.strip() for op in operands if op.strip()] return opcode return "" def get_graph_r2pipe(file_type, file_name): # 获取基础块内的操作码序列 r2pipe_open = r2pipe.open(os.path.join(file_path, file_name), flags=['-2']) opcode_Sequence = [] try: # 获取函数列表 r2pipe_open.cmd("aaa") r2pipe_open.cmd('e arch=x86') function_list = r2pipe_open.cmdj("aflj") for function in function_list: # 外部函数测试 # if function['name'] == 'sub.TNe_U': # print(function) # block_list = r2pipe_open.cmdj("afbj @" + str(function['offset'])) # for block in block_list: # # print(block) # # 获取基本块的反汇编指令 # disasm = r2pipe_open.cmdj("pdj " + str(block["ninstr"]) + " @" + str(block["addr"])) # if disasm: # for op in disasm: # print(extract_opcode(op["opcode"])) block_list = r2pipe_open.cmdj("afbj @" + str(function['offset'])) block_opcode_Sequence = [] for block in block_list: # print(block) # 获取基本块的反汇编指令 disasm = r2pipe_open.cmdj("pdj " + str(block["ninstr"]) + " @" + str(block["addr"])) if disasm: for op in disasm: if op["type"] == "invalid" or op["opcode"] == "invalid": continue block_opcode_Sequence.append(extract_opcode(op["opcode"])) opcode_Sequence.append( [file_type, file_type, len(block_opcode_Sequence), ' '.join(block_opcode_Sequence)]) except Exception as e: logger.error(f"Error: get function list failed in {file_name}") print(f"Error: get function list failed in {file_name} ,error info {e}") r2pipe_open.quit() return opcode_Sequence if __name__ == '__main__': file_type = 'malware' logger = setup_logger('logger', f'./log/opcode_{file_type}.log') file_path = os.path.join('/mnt/d/bishe/dataset/sample_20230130_458') print(f"max works {os.cpu_count()}") file_list = os.listdir(file_path)[:10000] done_file_num = 0 done_list = [['class', 'sub-class', 'size', 'corpus']] process_bar = tqdm(desc=f'Processing {file_type}...', leave=True, total=len(file_list)) with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: # 调整线程池大小 future_to_args = { executor.submit(get_graph_r2pipe, file_type, file_name): file_name for file_name in file_list } for future in concurrent.futures.as_completed(future_to_args): try: tmp = future.result() done_list.extend(tmp if len(tmp) > 0 else []) if len(done_list) > 100000: csv_write(f'output_{file_type}.csv', done_list) done_file_num += 1 done_list.clear() except Exception as e: logger.error(f"Error: {e}") print(f"Error: {e}") finally: process_bar.update(1) else: csv_write(f'output_{file_type}.csv', done_list)