Gencoding/raw-feature-extractor/print_test.py

113 lines
4.5 KiB
Python
Raw Permalink Normal View History

2023-12-02 21:53:57 +08:00
# -*- coding: UTF-8 -*-
2023-12-04 14:15:10 +08:00
import os
2023-12-02 21:53:57 +08:00
import sys
from matplotlib import pyplot as plt
import networkx as nx
import hashlib
import json
def calc_sha256(file_path):
with open(file_path, 'rb') as f:
bytes = f.read()
sha256obj = hashlib.sha256(bytes)
sha256 = sha256obj.hexdigest()
return sha256
import pickle
# sub_10F20 308 反编译代码有字符串,但是这个特征提取里没有字符串 constant可能是间接引用的不识别。看了下所有函数的特征几乎都没有字符串常量可能都是写在别的地方然后引用的。
# sub_166C4 393
2023-12-28 14:10:34 +08:00
2023-12-28 15:43:53 +08:00
def main_op(file_path, jsonl_file):
2023-12-28 14:10:34 +08:00
done_index = 0
2023-12-28 15:43:53 +08:00
file_name_list = os.listdir(file_path)
sample_file = open(jsonl_file, mode='a')
2023-12-04 14:15:10 +08:00
for file_name in file_name_list:
print file_name
2023-12-28 15:43:53 +08:00
file_path_temp = file_path + file_name
if os.path.exists(file_path_temp) :
fr = open(file_path_temp, 'r')
2023-12-04 14:15:10 +08:00
data1 = pickle.load(fr) # 一个二进制文件的acfgs
# funtion num
function_number = len(data1.raw_graph_list)
if function_number == 0:
continue
# function_edges
function_edge_start = []
function_edge_end = []
for item in data1.raw_graph_list[0].old_g.edges:
function_edge_start.append(item[0])
function_edge_end.append(item[1])
function_edges = [function_edge_start, function_edge_end]
fun_name_temp = []
# function hsah
file_hash = calc_sha256(file_path)
acfg_list = []
# 函数级特征
for i in range(len(data1.raw_graph_list)):
# function name
fun_name_temp.append(data1.raw_graph_list[i].funcname)
# block features
temp_G = data1.raw_graph_list[i].old_g
# block_number
block_number = len(temp_G.node)
# block_features
acfg_list_item_feature = []
for temp in range(len(temp_G.node)):
block_features = []
# call
block_features.append(temp_G.node[temp]['numCalls'])
# transfer
block_features.append(temp_G.node[temp]['numTIs'])
# arithmetic
block_features.append(temp_G.node[temp]['numAs'])
# logic
block_features.append(temp_G.node[temp]['numLIs'])
# compare
block_features.append(temp_G.node[temp]['numCom'])
# move
block_features.append(temp_G.node[temp]['numMov'])
# termination
block_features.append(temp_G.node[temp]['numTerm'])
# date declaration
block_features.append(temp_G.node[temp]['numDD'])
# total instructions
block_features.append(temp_G.node[temp]['numIns'])
# string or integer constants
block_features.append(
len(temp_G.node[temp]['strings']) if len(temp_G.node[temp]['strings']) != 0 else len(
temp_G.node[temp]['consts']))
# offspring
block_features.append(temp_G.node[temp]['offs'])
acfg_list_item_feature.append(block_features)
edge_list_start = []
edge_list_end = []
for item in temp_G.edges:
edge_list_start.append(item[0])
edge_list_end.append(item[1])
block_edges = [edge_list_start, edge_list_end]
acfg_list_item = {"block_number": block_number, "block_edges": block_edges,
"block_features": acfg_list_item_feature}
acfg_list.append(acfg_list_item)
json_temp = {"function_edges": function_edges, "acfg_list": acfg_list, "function_names": fun_name_temp,
"hash": file_hash, "function_number": function_number}
json_str = json.dumps(json_temp)
2023-12-28 14:10:34 +08:00
sample_file.write(json_str + '\n')
print "完成写入" + str(done_index)
done_index += 1
2023-12-04 14:15:10 +08:00
else:
print "删除文件" + file_path
os.remove(file_path)
sample_file.close()
2023-12-28 15:43:53 +08:00
if __name__ == '__main__':
file_path = "../train_malware_result/"
jsonl_path = "../jsonl_res/malware_result.jsonl"
main_op(file_path, jsonl_path)