asm_to_csv/main.py
2024-03-07 15:08:07 +08:00

109 lines
4.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import re
import pandas as pd
import os
from tqdm import tqdm
from log_utils import setup_logger
import time
from datetime import datetime
max_opcode_num = 0
def csv_write(file_name, data: list):
"""write data to csv"""
df = pd.DataFrame(data)
chunksize = 1000
for i in range(0, len(df), chunksize):
df.iloc[i:i + chunksize].to_csv(f'./out/{file_name}', mode='a', header=False, index=False)
return True
def findOpcode_in_asm_file(content, logger, file_type):
"""
在给定的汇编文件内容中查找操作码(opcode)。
参数:
- content: 文件内容的迭代器,预期能逐行读取文件内容。
- logger: 日志记录器对象,用于记录过程中的信息。
返回值:
- over_num_flag: 布尔值如果找到的操作码数量超过200则为True否则为False。
- none_flag: 布尔值如果未找到任何操作码则为True否则为False。
- result: 列表包含找到的操作码列表。如果找到的数量超过200则仅包含前200个。
"""
global max_opcode_num
pattern = r'\t{2}(\w+)\s'
result = []
sections = content.read().split("\n\n")
over_num_flag = False
for item in sections:
if item.startswith(';'):
continue
# if acfg.funcname != 'start' and acfg.funcname != 'start_0' and 'sub_' not in acfg.funcname:
# TODO 判断函数是否为外部函数
instructions = re.findall(pattern, item)
if len(instructions) > 0 and len(instructions) != 1 and instructions[0] != 'retn':
instructions_remove_Opcode_list = {'align', 'dp', 'dd', 'db', 'dq', 'dw'}
if not instructions_remove_Opcode_list.isdisjoint(instructions):
instructions[:] = [item for item in instructions if item not in instructions_remove_Opcode_list]
if len(instructions) > 0:
result.append([file_type, file_type, len(instructions), ' '.join(instructions)])
if len(instructions) > 200:
max_opcode_num = len(instructions) if len(instructions) > max_opcode_num else max_opcode_num
over_num_flag = True
logger.info(f"over 200 Opcode is {instructions},list len {len(instructions)}")
none_flag = True if len(result) == 0 else False
return over_num_flag, none_flag, result
def Opcode_to_csv(opcode_list, file_type):
logger.info("*======================start write==================================*")
csv_write(f'output_{file_type}.csv', opcode_list)
logger.info(f"done {done_file_num} files")
logger.info("*=================write to csv success==============================*")
if __name__ == '__main__':
start_time = time.time()
# 文件相关设置
file_type = 'malware'
logger = setup_logger('asm_to_csv', f'./log/asm_to_csv_{file_type}.log')
asm_file_path = os.path.join("D:/bishe/dataset/infected/infected_asm/")
# end
file_list = os.listdir(asm_file_path)
Opcode_list = []
none_Opcode_list = []
done_file_num = 0
process_bar = tqdm(desc='Processing...', leave=True, total=len(file_list))
for file in file_list:
try:
with open(asm_file_path + file, 'r', errors='ignore') as asm_file:
over_flag, flag, result = findOpcode_in_asm_file(asm_file, logger, file_type)
if flag:
logger.warning(f"file {file} Opcode is empty")
continue
else:
if over_flag:
logger.info(f"file {file} Opcode num is over 200")
Opcode_list.extend(result)
done_file_num += 1
if len(Opcode_list) > 50000:
Opcode_to_csv(Opcode_list, file_type)
Opcode_list.clear()
except Exception as e:
logger.error(f"Error processing file {file}: {e}")
finally:
process_bar.update(1)
if len(Opcode_list) > 0:
Opcode_to_csv(Opcode_list, file_type)
Opcode_list.clear()
logger.debug(f"none Opcode file list {none_Opcode_list} ")
csv_write('none_Opcode_list.csv', none_Opcode_list)
end_time = time.time()
logger.info(f"max_opcode_num is {max_opcode_num}")
logger.info(f"Done processing {done_file_num} files")
logger.info(f"Total time: {end_time - start_time} "
f"seconds, start at :{datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S')}")