diff --git a/exe2json.py b/exe2json.py index 8ca3042..b14b162 100644 --- a/exe2json.py +++ b/exe2json.py @@ -13,6 +13,8 @@ import os os.environ["TOKENIZERS_PARALLELISM"] = "false" ret_trap_opcode_family = ["ret", "hlt", "int3", "ud2"] +sample_type = 'benign' + def extract_opcode(disasm_text): """ @@ -45,7 +47,14 @@ def get_graph_cfg_r2pipe(r2pipe_open, file_path): try: # 获取函数列表 function_list = r2pipe_open.cmdj("aflj") + # debug + # for function in tqdm(function_list, total=len(function_list)): for function in function_list: + # print(function['offset']) + # debug + # if function['offset'] != 5368850704: + # continue + node_list = [] edge_list = [] temp_edge_list = [] @@ -54,8 +63,6 @@ def get_graph_cfg_r2pipe(r2pipe_open, file_path): block_list = r2pipe_open.cmdj("afbj @" + str(function['offset'])) # 获取基本块数量 block_number = len(block_list) - - for block in block_list: # 基础块内的语句 block_addr = block["addr"] @@ -64,11 +71,21 @@ def get_graph_cfg_r2pipe(r2pipe_open, file_path): node_list.append(block["addr"]) # 获取基本块的反汇编指令 disasm = r2pipe_open.cmdj("pdj " + str(block["ninstr"]) + " @" + str(block["addr"])) + # debug + # if len(disasm) != 49144: + # continue if disasm: + last_op = '' + if len(disasm) > 200: + logger.warning(f"基础块指令长度异常,{file_path},函数名称{function['name']}基础块地址{block['addr']},长度{len(disasm)}") for op_index, op in enumerate(disasm): + op_disasm = extract_opcode(op["disasm"]) + # 防止大量重复的语句造成内存溢出 + if op_disasm == last_op: + continue + last_op = op_disasm # 提取操作码并转换为bert模型输入格式 block_Statement.append(extract_opcode(op["disasm"])) - # 处理跳转码并构建cfg if 'jump' in op: if op['jump'] == 0: @@ -109,9 +126,16 @@ def get_graph_cfg_r2pipe(r2pipe_open, file_path): if op not in ret_trap_opcode_family or op["type"] not in ["ret", "trap"]: temp_edge_list.append([block_addr, op["offset"] + op["size"]]) + # debugger + # print(len(disasm)) + # print(len(block_Statement)) + # print(block_Statement) + + # bert模型转化特征 block_feature_list = bb2vec(block_Statement) # block_feature_list = [] + # 过滤不存在的边 for temp_edge in temp_edge_list: if temp_edge[0] in node_list and temp_edge[1] in node_list: @@ -184,21 +208,24 @@ def get_r2pipe(file_path): def init_logging(): # 初始化日志 - log_file = "./log/exe2json.log" + log_file = f"./log/exe2json_{sample_type}.log" return setup_logger('exe2json', log_file) def exe_to_json(file_path): - output_path = "./out/json/malware" - - + output_path = f"./out/json/{sample_type}" + file_fingerprint = calc_sha256(file_path) + if os.path.exists(os.path.join(output_path, file_fingerprint + '.jsonl')): + logger.info(f"二进制可执行文件已解析,文件名{file_path}") + return True + logger.info(f"开始解析,文件名{file_path}") # 获取r2pipe并解析文件 解析完即释放r2 r2 = get_r2pipe(file_path) fcg_Operation_flag, fcg_Operation_message, function_num, function_fcg_edge_list, function_names = get_graph_fcg_r2pipe(r2) cfg_Operation_flag, cfg_Operation_message, cfg_item = get_graph_cfg_r2pipe(r2,file_path) r2.quit() # 文件json构建 - file_fingerprint = calc_sha256(file_path) + if fcg_Operation_flag and cfg_Operation_flag: json_obj = { 'hash': file_fingerprint, @@ -221,12 +248,15 @@ def exe_to_json(file_path): with open(os.path.join(output_path, file_fingerprint + '.jsonl'), 'w') as out: out.write(result) out.close() + logger.info(f"解析完成,文件名{file_path}") return True if __name__ == '__main__': logger = init_logging() - sample_file_path = "/mnt/d/bishe/dataset/sample_malware" + + sample_file_path = f"/mnt/d/bishe/dataset/sample_{sample_type}" sample_file_list = os.listdir(sample_file_path) - multi_thread(exe_to_json, [os.path.join(sample_file_path, file_name) for file_name in sample_file_list]) + # sample_file_list = ['001b1ca33bf52c5c09b3a852d0ac0254.exe'] + multi_thread(exe_to_json, [os.path.join(sample_file_path, file_name) for file_name in sample_file_list], thread_num=THREAD_FULL) # test_file_path = '/mnt/d/bishe/exe2json/sample/VirusShare_0a3b625380161cf92c4bb10135326bb5' # exe_to_json(test_file_path) diff --git a/funNameGet.py b/external_func/funNameGet.py similarity index 77% rename from funNameGet.py rename to external_func/funNameGet.py index ee129ad..0e3a10d 100644 --- a/funNameGet.py +++ b/external_func/funNameGet.py @@ -17,21 +17,22 @@ def get_fun_name_list(file_path): for function in function_list: fun_name_list.append(function['name']) + print(function['name']) except Exception as err: print(f'error at {file_path} , {err}') r2.quit() return fun_name_list def fun_name_count(): - file_path = os.path.join('/mnt/d/bishe/dataset/sample_20230130_458') - bengin_file_path = os.path.join('/mnt/d/bishe/dataset/train_benign') + file_path = os.path.join('/mnt/d/bishe/dataset/sample_malware') + bengin_file_path = os.path.join('/mnt/d/bishe/dataset/sample_benign') file_list = [os.path.join(file_path, file_name) for file_name in os.listdir(file_path)] file_list.extend([os.path.join(bengin_file_path, file_name) for file_name in os.listdir(bengin_file_path)]) fun_name_set = {} with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor: future_to_args = { executor.submit(get_fun_name_list, file_name): file_name - for file_name in file_list + for file_name in file_list[:1] } for future in tqdm(concurrent.futures.as_completed(future_to_args), total=len(future_to_args)): fun_name_list = future.result() @@ -41,16 +42,16 @@ def fun_name_count(): fun_name_set[fun_name] = 1 else: fun_name_set[fun_name] += 1 - pd.DataFrame(fun_name_set.items(), columns=['fun_name', 'count']).to_csv('./out/fun_name.csv', index=False, mode='a') + # pd.DataFrame(fun_name_set.items(), columns=['fun_name', 'count']).to_csv('../out/external/fun_name.csv', index=False, mode='a') def fun_name_sort(): - fun_name_df = pd.read_csv('./out/fun_name.csv') + fun_name_df = pd.read_csv('../out/external/fun_name.csv') # 去除fun_name中fun_name列中的局部函数 for item in ['fcn.', 'loc.', 'main', 'entr']: fun_name_df = fun_name_df[fun_name_df['fun_name'].apply(lambda x: item not in x and item not in x)] fun_name_df = fun_name_df.sort_values(by='count', ascending=False)[:10000] - fun_name_df.to_csv('fun_name_sort.csv', index=False) + fun_name_df.to_csv('../out/external/fun_name_sort.csv', index=False) if __name__ == '__main__': fun_name_count() - fun_name_sort() + # fun_name_sort() diff --git a/log_utils.py b/log_utils.py deleted file mode 100644 index bf12cf2..0000000 --- a/log_utils.py +++ /dev/null @@ -1,45 +0,0 @@ -import logging -import os - - -def setup_logger(name, log_file, level=logging.INFO): - """Function setup as many loggers as you want""" - if not os.path.exists(os.path.dirname(log_file)): - os.makedirs(os.path.dirname(log_file)) - - formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') - - handler = logging.FileHandler(log_file) - handler.setFormatter(formatter) - - # 控制台是否输出日志信息 - # stream_handler = logging.StreamHandler() - # stream_handler.setFormatter(formatter) - - logger = logging.getLogger(name) - logger.setLevel(level) - logger.addHandler(handler) - # 控制台 - # logger.addHandler(stream_handler) - - # 刷新原有log文件 - - if os.path.exists(log_file): - open(log_file, 'w').close() - - return logger - - -""" -用法示例 -""" - - -def main(): - log_file = "app.log" - logger = setup_logger(__name__, log_file) - - logger.info("Application started.") - logger.debug("Debug message.") - logger.warning("Warning message.") - logger.error("Error occurred.") diff --git a/OpcodeGet.py b/ngram/OpcodeGet.py similarity index 76% rename from OpcodeGet.py rename to ngram/OpcodeGet.py index 0510138..d08c84e 100644 --- a/OpcodeGet.py +++ b/ngram/OpcodeGet.py @@ -56,19 +56,8 @@ def get_graph_r2pipe(file_type, file_name): r2pipe_open.cmd('e arch=x86') function_list = r2pipe_open.cmdj("aflj") for function in function_list: - - # 外部函数测试 - # if function['name'] == 'sub.TNe_U': - # print(function) - # block_list = r2pipe_open.cmdj("afbj @" + str(function['offset'])) - # for block in block_list: - # # print(block) - # # 获取基本块的反汇编指令 - # disasm = r2pipe_open.cmdj("pdj " + str(block["ninstr"]) + " @" + str(block["addr"])) - # if disasm: - # for op in disasm: - # print(extract_opcode(op["opcode"])) - + if function['name'][:4] not in ['fcn.', 'loc.']: + continue block_list = r2pipe_open.cmdj("afbj @" + str(function['offset'])) block_opcode_Sequence = [] for block in block_list: @@ -92,30 +81,27 @@ def get_graph_r2pipe(file_type, file_name): if __name__ == '__main__': file_type = 'malware' logger = setup_logger('logger', f'./log/opcode_{file_type}.log') - file_path = os.path.join('/mnt/d/bishe/dataset/sample_20230130_458') + file_path = os.path.join(f'/mnt/d/bishe/dataset/sample_{file_type}') print(f"max works {os.cpu_count()}") - file_list = os.listdir(file_path)[:10000] + file_list = os.listdir(file_path)[:1000] done_file_num = 0 done_list = [['class', 'sub-class', 'size', 'corpus']] - process_bar = tqdm(desc=f'Processing {file_type}...', leave=True, total=len(file_list)) - with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: # 调整线程池大小 + with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()/2) as executor: # 调整线程池大小 future_to_args = { executor.submit(get_graph_r2pipe, file_type, file_name): file_name for file_name in file_list } - for future in concurrent.futures.as_completed(future_to_args): + for future in tqdm(concurrent.futures.as_completed(future_to_args), total=len(future_to_args), desc=f'Processing {file_type}...',): try: tmp = future.result() done_list.extend(tmp if len(tmp) > 0 else []) if len(done_list) > 100000: - csv_write(f'output_{file_type}.csv', done_list) + csv_write(f'output_{file_type}_test.csv', done_list) done_file_num += 1 done_list.clear() except Exception as e: logger.error(f"Error: {e}") print(f"Error: {e}") - finally: - process_bar.update(1) else: - csv_write(f'output_{file_type}.csv', done_list) + csv_write(f'output_{file_type}_test.csv', done_list) diff --git a/ngram.py b/ngram/ngram.py similarity index 96% rename from ngram.py rename to ngram/ngram.py index 8948457..7968da4 100644 --- a/ngram.py +++ b/ngram/ngram.py @@ -93,11 +93,17 @@ def process_csv_file(csvfile, ngram_type, file_percent_filter, frequency_filter) print(f"start load csv file:{os.path.basename(csvfile)}") dataframe = pd.read_csv(csvfile, encoding="utf8") print(f"end load") + print(f"computer {idx + 1} ngram for {ngram_type}") + # reader = pd.read_csv(csvfile, chunksize=10000) + print(f"init file {csvfile} done") ngram_list = defaultdict(int) filtered_ngram_list = defaultdict(int) - process_bar = tqdm(total=len(dataframe['corpus'].values), desc=f'Computing {ngram_type}-gram on files') with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: # 调整线程池大小 future_to_args = { + # executor.submit(filter_N_grams, values['corpus'].values, idx + 1, file_percent_filter, frequency_filter): + # values for values in reader + + executor.submit(filter_N_grams, dataframe['corpus'].values[start: start + 10000], idx + 1, file_percent_filter, frequency_filter): start for start in range(0, len(dataframe['corpus'].values), 10000) @@ -128,7 +134,7 @@ def process_csv_file(csvfile, ngram_type, file_percent_filter, frequency_filter) def build_csv(ngram_list, filter_list, maxgrams, file_type): ngramDicList = [] csv_file_header = ['ngram', 'count'] - csv_file = os.path.join('./out', f'{file_type}-{maxgrams}-gram.csv') + csv_file = os.path.join('../out', f'{file_type}-{maxgrams}-gram.csv') for index in tqdm(range(len(ngram_list)), desc=f'Building {maxgrams}-gram csv'): ngramDicList.append({ 'ngram': ngram_list[index], diff --git a/ngramSort.py b/ngram/ngramSort.py similarity index 100% rename from ngramSort.py rename to ngram/ngramSort.py