import csv import re import pandas as pd import os from tqdm import tqdm from log_utils import setup_logger import time from datetime import datetime max_opcode_num = 0 def csv_write(file_name, data: list): """write data to csv""" df = pd.DataFrame(data) chunksize = 1000 for i in range(0, len(df), chunksize): df.iloc[i:i + chunksize].to_csv(f'./out/{file_name}', mode='a', header=False, index=False) return True def findOpcode_in_asm_file(content, logger, file_type): """ 在给定的汇编文件内容中查找操作码(opcode)。 参数: - content: 文件内容的迭代器,预期能逐行读取文件内容。 - logger: 日志记录器对象,用于记录过程中的信息。 返回值: - over_num_flag: 布尔值,如果找到的操作码数量超过200,则为True,否则为False。 - none_flag: 布尔值,如果未找到任何操作码,则为True,否则为False。 - result: 列表,包含找到的操作码列表。如果找到的数量超过200,则仅包含前200个。 """ global max_opcode_num pattern = r'\t{2}(\w+)\s' result = [] sections = content.read().split("\n\n") over_num_flag = False for item in sections: if item.startswith(';'): continue # if acfg.funcname != 'start' and acfg.funcname != 'start_0' and 'sub_' not in acfg.funcname: # TODO 判断函数是否为外部函数 instructions = re.findall(pattern, item) if len(instructions) > 0 and len(instructions) != 1 and instructions[0] != 'retn': instructions_remove_Opcode_list = {'align', 'dp', 'dd', 'db', 'dq', 'dw'} if not instructions_remove_Opcode_list.isdisjoint(instructions): instructions[:] = [item for item in instructions if item not in instructions_remove_Opcode_list] if len(instructions) > 0: result.append([file_type, file_type, len(instructions), ' '.join(instructions)]) if len(instructions) > 200: max_opcode_num = len(instructions) if len(instructions) > max_opcode_num else max_opcode_num over_num_flag = True logger.info(f"over 200 Opcode is {instructions},list len {len(instructions)}") none_flag = True if len(result) == 0 else False return over_num_flag, none_flag, result def Opcode_to_csv(opcode_list, file_type): logger.info("*======================start write==================================*") csv_write(f'output_{file_type}.csv', opcode_list) logger.info(f"done {done_file_num} files") logger.info("*=================write to csv success==============================*") if __name__ == '__main__': start_time = time.time() # 文件相关设置 file_type = 'malware' logger = setup_logger('asm_to_csv', f'./log/asm_to_csv_{file_type}.log') asm_file_path = os.path.join("D:/bishe/dataset/infected/infected_asm/") # end file_list = os.listdir(asm_file_path) Opcode_list = [] none_Opcode_list = [] done_file_num = 0 process_bar = tqdm(desc='Processing...', leave=True, total=len(file_list)) for file in file_list: try: with open(asm_file_path + file, 'r', errors='ignore') as asm_file: over_flag, flag, result = findOpcode_in_asm_file(asm_file, logger, file_type) if flag: logger.warning(f"file {file} Opcode is empty") continue else: if over_flag: logger.info(f"file {file} Opcode num is over 200") Opcode_list.extend(result) done_file_num += 1 if len(Opcode_list) > 50000: Opcode_to_csv(Opcode_list, file_type) Opcode_list.clear() except Exception as e: logger.error(f"Error processing file {file}: {e}") finally: process_bar.update(1) if len(Opcode_list) > 0: Opcode_to_csv(Opcode_list, file_type) Opcode_list.clear() logger.debug(f"none Opcode file list {none_Opcode_list} ") csv_write('none_Opcode_list.csv', none_Opcode_list) end_time = time.time() logger.info(f"max_opcode_num is {max_opcode_num}") logger.info(f"Done processing {done_file_num} files") logger.info(f"Total time: {end_time - start_time} " f"seconds, start at :{datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S')}")