import threading from collections import defaultdict from tqdm import tqdm import pandas as pd import os import csv import argparse import statistics import plotly.express as px import concurrent.futures from functools import partial import logging import contextlib ################################################################################################### ## Program shall take two csv files of different classes - benign and malware ## It will compute ngrams for each of the classes seperately and find the delta frequencies ## of each computed ngram. delta_frequencies = (class1 - class2) ################################################################################################### # -------------------------------------------------------------------------------------------------- ## Generate ngrams given the corpus and factor n def generate_N_grams(corpus, n=1): words = [word for word in corpus.split(" ")] temp = zip(*[words[i:] for i in range(0, n)]) ngram = [' '.join(n) for n in temp] return ngram # -------------------------------------------------------------------------------------------------- ## Creates ngrams for the corpus List for given N and Filters it based on following criteria # file count >= percent of Total corpus len (pecent in [1..100]) # Selects high frequency ngram until the mean value # Returns both complete and filtered dictionary of ngrams def filter_N_grams(corpusList, N, percent, filterFreq=0): total = len(corpusList) ngramDictionary = defaultdict(int) ngramFileCount = defaultdict(int) for idx in range(0, total): opcodes = corpusList[idx] if type(opcodes) is not str: continue for item in generate_N_grams(opcodes, N): # compute frequency of all unique ngrams if len(opcodes) == 0: continue ngramDictionary[item] += 1 # compute ngram file count for item in ngramDictionary: ngramFileCount[item] += 1 filteredNgramDictionary = defaultdict(int) # Filter those ngrams which meet percent of Total files criteria filterCnt = round(int((percent * total) / 100), 0) for item in ngramFileCount: if ngramFileCount[item] >= filterCnt: # Add to filtered dictionary the item which meets file count criteria filteredNgramDictionary[item] = ngramDictionary[item] # Filter ngram with a minimum frequency if (filterFreq): for item in ngramDictionary: if ngramDictionary[item] < filterFreq and item in filteredNgramDictionary: # Remove the item which below the frequency threshold filteredNgramDictionary.pop(item) # print(f"Total ngrams:{len(ngramDictionary.items())} => filtered: {len(filteredNgramDictionary.items())}\n") return ngramDictionary, filteredNgramDictionary # -------------------------------------------------------------------------------------------------- # Calculate a normalization factor for frequency values of class1 and class2 # For class which are high in frequency due their sample size, a normalization may required to be # factored for correctly resizgin the frequencies of the small class set. # input list of frequencies of class1 and class 2 def normalization_factor(class1, class2): mean1 = statistics.mean(class1) mean2 = statistics.mean(class2) return mean1 / mean2 # -------------------------------------------------------------------------------------------------- # Write the data into the given csv file handle def WriteCSV(file, csvFields, dataDictionary): writer = csv.DictWriter(file, fieldnames=csvFields) writer.writeheader() writer.writerows(dataDictionary) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def process_csv_file(csvfile, ngram_type, file_percent_filter, frequency_filter): """处理CSV文件并并行计算n-gram""" print(f"start load csv file:{os.path.basename(csvfile)}") dataframe = pd.read_csv(csvfile, encoding="utf8") print(f"end load") ngram_list = defaultdict(int) filtered_ngram_list = defaultdict(int) process_bar = tqdm(total=len(dataframe['corpus'].values), desc=f'Computing {ngram_type}-gram on files') with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: # 调整线程池大小 future_to_args = { executor.submit(filter_N_grams, dataframe['corpus'].values[start: start + 10000], idx + 1, file_percent_filter, frequency_filter): start for start in range(0, len(dataframe['corpus'].values), 10000) } for future in tqdm(concurrent.futures.as_completed(future_to_args), total=len(future_to_args), desc=f'Computing {ngram_type}-gram on files'): try: sub_ngram_list, sub_filtered_ngram_list = future.result() for i in [sub_ngram_list, ngram_list]: for key, value in i.items(): ngram_list[key] += value for i in [sub_filtered_ngram_list, filtered_ngram_list]: for key, value in i.items(): filtered_ngram_list[key] += value except Exception as exc: logging.error(f"Error processing {idx + 1}-gram: {exc}") return ngram_list, filtered_ngram_list # -------------------------------------------------------------------------------------------------- # Execution starts here # Add command line arguments # CSV header: class, sub-class, size, corpus # Execute the parse_args() method def build_csv(ngram_list, filter_list, maxgrams, file_type): ngramDicList = [] csv_file_header = ['ngram', 'count'] csv_file = os.path.join('./out', f'{file_type}-{maxgrams}-gram.csv') for index in tqdm(range(len(ngram_list)), desc=f'Building {maxgrams}-gram csv'): ngramDicList.append({ 'ngram': ngram_list[index], 'count': filter_list[index] }) try: csv_file = open(csv_file, 'w') except Exception as e: print(f"Error opening {csv_file} for writing: {e}") WriteCSV(csv_file, csv_file_header, ngramDicList) csv_file.close() if __name__ == '__main__': # Get user arguments malware_csvfile = os.path.join('./out/output_malware.csv') benign_csvfile = os.path.join('./out/output_benign.csv') maxgrams_list = [3,2,1] # Error check and exit if not a file if not (os.path.isfile(malware_csvfile) and os.path.isfile(benign_csvfile)): print(f"Path should be csv file!") exit(1) # Read the csv file using pandas into data frame # Build a frequency list for ngrams for maxgrams in maxgrams_list: filePercentFilter = 80 ## select ngrams present in x% of files frequencyFilter = 20 ## select ngrams with frequency greater than this value malwareNgram = defaultdict(int) ## full list of ngrams in malware corpus benignNgram = defaultdict(int) ## full list of ngrams in benign corpus filteredMalwareNgram = defaultdict(int) ## filtered list of ngrams from malware corpus filteredBenignNgram = defaultdict(int) ## filtered list of ngrams from benign corpus ## common list ngrams from both malware and benign corpus with relative frequency (benignFreq - malwareFreq) filteredMergedNgram = defaultdict(int) # run for only the maxgram provided, change lower value to 0 to run for all values [1..N] for idx in range(maxgrams - 1, maxgrams): print(f"Computing {idx + 1}gram on files ...") print(f"CPU core {os.cpu_count()} on use") malwareNgram = [] filteredMalwareNgram = [] benignNgram = [] filteredBenignNgram = [] malwareNgram.clear() filteredMalwareNgram.clear() benignNgram.clear() filteredBenignNgram.clear() filteredMergedNgram.clear() # opcodes decoded from pe file in sequence is stored as corpus in the csv malwareNgram, filteredMalwareNgram = process_csv_file(malware_csvfile, 'malware', filePercentFilter, frequencyFilter) # build_csv(malwareNgram, filteredMalwareNgram, maxgrams, 'malware') benignNgram, filteredBenignNgram = process_csv_file(benign_csvfile, 'benign', filePercentFilter, frequencyFilter) # build_csv(benignNgram, filteredBenignNgram, maxgrams, 'benign') # creates a sorted list of ngram tuples with their frequency for 1 .. maxgram mergedList = list(set().union(filteredMalwareNgram.keys(), filteredBenignNgram.keys())) ## Now find the relative frequency b/w benign and malware files. = benign - malware ## write this for cases where ngrams only present in one of the clases malware or benign ## for reusability in case a union of classes is taken. for item in mergedList: key = item # get the ngram only if key in filteredBenignNgram: if key in filteredMalwareNgram: filteredMergedNgram[key] = filteredBenignNgram[key] - filteredMalwareNgram[key] elif item in malwareNgram: filteredMergedNgram[key] = filteredBenignNgram[key] - malwareNgram[key] else: filteredMergedNgram[key] = filteredBenignNgram[key] elif key in filteredMalwareNgram: if key in benignNgram: filteredMergedNgram[key] = benignNgram[key] - filteredMalwareNgram[key] else: filteredMergedNgram[key] = filteredMalwareNgram[key] print(f"Merged: {idx + 1}gramCnt={len(filteredMergedNgram.keys())}") # ## get a sorted list of merged ngrams with relative frequencies sortedMergedNgramList = sorted(filteredMergedNgram.items(), key=lambda x: x[1]) # write the final ngrams into a file for feature selection AbsoluteNgramDictList = [] RelativeNgramDictList = [] for item in sortedMergedNgramList: dictItem = {} key = item[0] dictItem['ngram'] = key dictItem['count'] = max(filteredMalwareNgram[key], filteredBenignNgram[key]) AbsoluteNgramDictList.append(dictItem) RelativeNgramDictList.append({'ngram': item[0], 'count': item[1]}) csvfields = ['ngram', 'count'] AbsoluteCsvName = "./out/" + str(idx + 1) + "gram-absolute.csv" RelativeCsvName = "./out/" + str(idx + 1) + "gram-relative.csv" print("*======================start write csv=======================================*") try: csvfile = open(AbsoluteCsvName, 'w') except Exception as err: print(f"Error: writing csvfile {err}") WriteCSV(csvfile, csvfields, AbsoluteNgramDictList) csvfile.close() try: csvfile = open(RelativeCsvName, 'w') except Exception as err: print(print(f"Error: writing csvfile {err}")) WriteCSV(csvfile, csvfields, RelativeNgramDictList) csvfile.close() print("*======================end write csv=======================================*")