import json import os import sys import torch from torch_geometric.data import Data from tqdm import tqdm sys.path.append(os.path.dirname(sys.path[0])) from src.utils.Vocabulary import Vocab def parse_json_list_2_pyg_object(jsonl_file: str, label: int, vocab: Vocab, save_path: str, file_type: str): # def parse_json_list_2_pyg_object(jsonl_file: str): train_type = ['train', 'valid', 'test'] index = 0 file_index = 0 type_index = 0 valid_flag = True test_flag = True file_len = len(os.listdir(jsonl_file)) for file in tqdm(os.listdir(jsonl_file), desc=file_type): if index >= file_len * 0.8 and valid_flag: type_index += 1 valid_flag = False file_index = 0 print("make valid set") elif index >= file_len * 0.9 and test_flag: type_index += 1 test_flag = False file_index = 0 print("make test set") j = json_to_pt(file=jsonl_file + file, label=label, vocab=vocab, save_path=save_path, file_type=file_type, train_type=train_type[type_index], index=file_index) index += 1 file_index += 1 def json_to_pt(file: str, label: int, vocab: Vocab, save_path: str, file_type: str, train_type: str, index: int): if not os.path.exists(save_path+f"{train_type}_{file_type}/"): os.makedirs(save_path+f"{train_type}_{file_type}/") with open(file, "r", encoding="utf-8") as item: line = item.readline() try: item = json.loads(line) except json.decoder.JSONDecodeError as e: print(e) print(file) return False item_hash = item['hash'] acfg_list = [] for one_acfg in item['acfg_list']: # list of dict of acfg block_features = one_acfg['block_features'] block_edges = one_acfg['block_edges'] one_acfg_data = Data(x=torch.tensor(block_features, dtype=torch.float), edge_index=torch.tensor(block_edges, dtype=torch.long)) acfg_list.append(one_acfg_data) item_function_names = item['function_names'] item_function_edges = item['function_edges'] local_function_name_list = item_function_names[:len(acfg_list)] assert len(acfg_list) == len( local_function_name_list), "The length of ACFG_List should be equal to the length of Local_Function_List" external_function_name_list = item_function_names[len(acfg_list):] external_function_index_list = [vocab[f_name] for f_name in external_function_name_list] torch.save(Data(hash=item_hash, local_acfgs=acfg_list, external_list=external_function_index_list, function_edges=item_function_edges, targets=label), save_path + "{}_{}/{}_{}.pt".format(train_type, file_type, file_type, index)) return True if __name__ == '__main__': malware_json_path = "/home/king/python/data/jsonl/malware/" benign_json_path = "/home/king/python/data/jsonl/benign/" train_vocab_file = "/home/king/python/data/fun_name_sort.jsonl" save_vocab_file = "/home/king/python/data/DatasetJSON_remake/" file_type = ["malware", "benign"] max_vocab_size = 10000 vocabulary = Vocab(freq_file=train_vocab_file, max_vocab_size=max_vocab_size) # parse_json_list_2_pyg_object(jsonl_file=malware_json_path, label=1, vocab=vocabulary, save_path=save_vocab_file, # file_type=file_type[0]) parse_json_list_2_pyg_object(jsonl_file=benign_json_path, label=0, vocab=vocabulary, save_path=save_vocab_file, file_type=file_type[1])