86 lines
3.6 KiB
Python
86 lines
3.6 KiB
Python
import json
|
|
import os
|
|
import sys
|
|
import torch
|
|
from torch_geometric.data import Data
|
|
from tqdm import tqdm
|
|
sys.path.append(os.path.dirname(sys.path[0]))
|
|
from src.utils.Vocabulary import Vocab
|
|
|
|
|
|
def parse_json_list_2_pyg_object(jsonl_file: str, label: int, vocab: Vocab, save_path: str, file_type: str):
|
|
# def parse_json_list_2_pyg_object(jsonl_file: str):
|
|
train_type = ['train', 'valid', 'test']
|
|
index = 0
|
|
file_index = 0
|
|
type_index = 0
|
|
valid_flag = True
|
|
test_flag = True
|
|
file_len = len(os.listdir(jsonl_file))
|
|
|
|
for file in tqdm(os.listdir(jsonl_file), desc=file_type):
|
|
if index >= file_len * 0.8 and valid_flag:
|
|
type_index += 1
|
|
valid_flag = False
|
|
file_index = 0
|
|
print("make valid set")
|
|
elif index >= file_len * 0.9 and test_flag:
|
|
type_index += 1
|
|
test_flag = False
|
|
file_index = 0
|
|
print("make test set")
|
|
j = json_to_pt(file=jsonl_file + file, label=label, vocab=vocab, save_path=save_path, file_type=file_type, train_type=train_type[type_index], index=file_index)
|
|
index += 1
|
|
file_index += 1
|
|
|
|
|
|
def json_to_pt(file: str, label: int, vocab: Vocab, save_path: str, file_type: str, train_type: str, index: int):
|
|
if not os.path.exists(save_path+f"{train_type}_{file_type}/"):
|
|
os.makedirs(save_path+f"{train_type}_{file_type}/")
|
|
with open(file, "r", encoding="utf-8") as item:
|
|
line = item.readline()
|
|
try:
|
|
item = json.loads(line)
|
|
except json.decoder.JSONDecodeError as e:
|
|
print(e)
|
|
print(file)
|
|
return False
|
|
item_hash = item['hash']
|
|
acfg_list = []
|
|
for one_acfg in item['acfg_list']: # list of dict of acfg
|
|
block_features = one_acfg['block_features']
|
|
block_edges = one_acfg['block_edges']
|
|
one_acfg_data = Data(x=torch.tensor(block_features, dtype=torch.float),
|
|
edge_index=torch.tensor(block_edges, dtype=torch.long))
|
|
acfg_list.append(one_acfg_data)
|
|
|
|
item_function_names = item['function_names']
|
|
item_function_edges = item['function_edges']
|
|
|
|
local_function_name_list = item_function_names[:len(acfg_list)]
|
|
assert len(acfg_list) == len(
|
|
local_function_name_list), "The length of ACFG_List should be equal to the length of Local_Function_List"
|
|
external_function_name_list = item_function_names[len(acfg_list):]
|
|
|
|
external_function_index_list = [vocab[f_name] for f_name in external_function_name_list]
|
|
|
|
torch.save(Data(hash=item_hash, local_acfgs=acfg_list, external_list=external_function_index_list,
|
|
function_edges=item_function_edges, targets=label),
|
|
save_path + "{}_{}/{}_{}.pt".format(train_type, file_type, file_type, index))
|
|
return True
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
malware_json_path = "/home/king/python/data/jsonl/malware/"
|
|
benign_json_path = "/home/king/python/data/jsonl/benign/"
|
|
train_vocab_file = "/home/king/python/data/fun_name_sort.jsonl"
|
|
save_vocab_file = "/home/king/python/data/DatasetJSON_remake/"
|
|
file_type = ["malware", "benign"]
|
|
max_vocab_size = 10000
|
|
vocabulary = Vocab(freq_file=train_vocab_file, max_vocab_size=max_vocab_size)
|
|
# parse_json_list_2_pyg_object(jsonl_file=malware_json_path, label=1, vocab=vocabulary, save_path=save_vocab_file,
|
|
# file_type=file_type[0])
|
|
parse_json_list_2_pyg_object(jsonl_file=benign_json_path, label=0, vocab=vocabulary, save_path=save_vocab_file,
|
|
file_type=file_type[1])
|