asm_to_csv/ngram.py
2024-03-09 15:26:16 +08:00

236 lines
11 KiB
Python

from collections import defaultdict
from tqdm import tqdm
import pandas as pd
import os
import csv
import argparse
import statistics
import plotly.express as px
import concurrent.futures
from functools import partial
import logging
import contextlib
###################################################################################################
## Program shall take two csv files of different classes - benign and malware
## It will compute ngrams for each of the classes seperately and find the delta frequencies
## of each computed ngram. delta_frequencies = (class1 - class2)
###################################################################################################
# --------------------------------------------------------------------------------------------------
## Generate ngrams given the corpus and factor n
def generate_N_grams(corpus, n=1):
words = [word for word in corpus.split(" ")]
temp = zip(*[words[i:] for i in range(0, n)])
ngram = [' '.join(n) for n in temp]
return ngram
# --------------------------------------------------------------------------------------------------
## Creates ngrams for the corpus List for given N and Filters it based on following criteria
# file count >= percent of Total corpus len (pecent in [1..100])
# Selects high frequency ngram until the mean value
# Returns both complete and filtered dictionary of ngrams
def filter_N_grams(corpusList, N, percent, filterFreq=0):
total = len(corpusList)
ngramDictionary = defaultdict(int)
ngramFileCount = defaultdict(int)
for idx in range(0, total):
opcodes = corpusList[idx]
if type(opcodes) is not str:
continue
for item in generate_N_grams(opcodes, N):
# compute frequency of all unique ngrams
if len(opcodes) == 0:
continue
ngramDictionary[item] += 1
# compute ngram file count
for item in ngramDictionary:
ngramFileCount[item] += 1
filteredNgramDictionary = defaultdict(int)
# Filter those ngrams which meet percent of Total files criteria
filterCnt = round(int((percent * total) / 100), 0)
for item in ngramFileCount:
if ngramFileCount[item] >= filterCnt:
# Add to filtered dictionary the item which meets file count criteria
filteredNgramDictionary[item] = ngramDictionary[item]
# Filter ngram with a minimum frequency
if (filterFreq):
for item in ngramDictionary:
if ngramDictionary[item] < filterFreq and item in filteredNgramDictionary:
# Remove the item which below the frequency threshold
filteredNgramDictionary.pop(item)
# print(f"Total ngrams:{len(ngramDictionary.items())} => filtered: {len(filteredNgramDictionary.items())}\n")
return ngramDictionary, filteredNgramDictionary
# --------------------------------------------------------------------------------------------------
# Calculate a normalization factor for frequency values of class1 and class2
# For class which are high in frequency due their sample size, a normalization may required to be
# factored for correctly resizgin the frequencies of the small class set.
# input list of frequencies of class1 and class 2
def normalization_factor(class1, class2):
mean1 = statistics.mean(class1)
mean2 = statistics.mean(class2)
return mean1 / mean2
# --------------------------------------------------------------------------------------------------
# Write the data into the given csv file handle
def WriteCSV(file, csvFields, dataDictionary):
writer = csv.DictWriter(file, fieldnames=csvFields)
writer.writeheader()
writer.writerows(dataDictionary)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def process_csv_file(csvfile, ngram_type, file_percent_filter, frequency_filter):
"""处理CSV文件并并行计算n-gram"""
print(f"start load csv file:{os.path.basename(csvfile)}")
dataframe = pd.read_csv(csvfile, encoding="utf8")
print(f"end load")
ngram_list = defaultdict(int)
filtered_ngram_list = defaultdict(int)
process_bar = tqdm(total=len(dataframe['corpus'].values), desc=f'Computing {ngram_type}-gram on files')
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: # 调整线程池大小
future_to_args = {
executor.submit(filter_N_grams, dataframe['corpus'].values[start: start + 10000],
idx + 1, file_percent_filter, frequency_filter): start for start in
range(0, len(dataframe['corpus'].values), 10000)
}
for future in concurrent.futures.as_completed(future_to_args):
try:
sub_ngram_list, sub_filtered_ngram_list = future.result()
for i in [sub_ngram_list, ngram_list]:
for key, value in i.items():
ngram_list[key] += value
for i in [sub_filtered_ngram_list, filtered_ngram_list]:
for key, value in i.items():
filtered_ngram_list[key] += value
process_bar.update(10000) # 手动更新进度条
except Exception as exc:
logging.error(f"Error processing {idx + 1}-gram: {exc}")
return ngram_list, filtered_ngram_list
# --------------------------------------------------------------------------------------------------
# Execution starts here
# Add command line arguments
# CSV header: class, sub-class, size, corpus
# Execute the parse_args() method
if __name__ == '__main__':
# Get user arguments
malware_csvfile = os.path.join('./out/output_malware.csv')
benign_csvfile = os.path.join('./out/output_benign.csv')
maxgrams = 3
# Error check and exit if not a file
if not (os.path.isfile(malware_csvfile) and os.path.isfile(benign_csvfile)):
print(f"Path should be csv file!")
exit(1)
# Read the csv file using pandas into data frame
# Build a frequency list for ngrams
filePercentFilter = 80 ## select ngrams present in x% of files
frequencyFilter = 20 ## select ngrams with frequency greater than this value
malwareNgram = defaultdict(int) ## full list of ngrams in malware corpus
benignNgram = defaultdict(int) ## full list of ngrams in benign corpus
filteredMalwareNgram = defaultdict(int) ## filtered list of ngrams from malware corpus
filteredBenignNgram = defaultdict(int) ## filtered list of ngrams from benign corpus
## common list ngrams from both malware and benign corpus with relative frequency (benignFreq - malwareFreq)
filteredMergedNgram = defaultdict(int)
# run for only the maxgram provided, change lower value to 0 to run for all values [1..N]
for idx in range(maxgrams - 1, maxgrams):
print(f"Computing {idx + 1}gram on files ...")
print(f"CPU core {os.cpu_count()} on use")
malwareNgram = []
filteredMalwareNgram = []
benignNgram = []
filteredBenignNgram = []
malwareNgram.clear()
filteredMalwareNgram.clear()
benignNgram.clear()
filteredBenignNgram.clear()
filteredMergedNgram.clear()
# opcodes decoded from pe file in sequence is stored as corpus in the csv
malwareNgram, filteredMalwareNgram = process_csv_file(malware_csvfile, 'malware', filePercentFilter, frequencyFilter)
benignNgram, filteredBenignNgram = process_csv_file(benign_csvfile, 'benign', filePercentFilter, frequencyFilter)
# creates a sorted list of ngram tuples with their frequency for 1 .. maxgram
mergedList = list(set().union(filteredMalwareNgram.keys(), filteredBenignNgram.keys()))
## Now find the relative frequency b/w benign and malware files. = benign - malware
## write this for cases where ngrams only present in one of the clases malware or benign
## for reusability in case a union of classes is taken.
for item in mergedList:
key = item # get the ngram only
if key in filteredBenignNgram:
if key in filteredMalwareNgram:
filteredMergedNgram[key] = filteredBenignNgram[key] - filteredMalwareNgram[key]
elif item in malwareNgram:
filteredMergedNgram[key] = filteredBenignNgram[key] - malwareNgram[key]
else:
filteredMergedNgram[key] = filteredBenignNgram[key]
elif key in filteredMalwareNgram:
if key in benignNgram:
filteredMergedNgram[key] = benignNgram[key] - filteredMalwareNgram[key]
else:
filteredMergedNgram[key] = filteredMalwareNgram[key]
print(f"Merged: {idx + 1}gramCnt={len(filteredMergedNgram.keys())}")
## get a sorted list of merged ngrams with relative frequencies
sortedMergedNgramList = sorted(filteredMergedNgram.items(), key=lambda x: x[1])
# Plot a scatter graph -
# y values as relative frequency benign-malware
# x values as max frequency of a ngram max(malware, benign)
# color labels as 'a' + frequency % 26
# size as frequency/max * 100
# hover name is ngram name
# titlestr = str(idx + 1) + "gram: Total samples(" + str(len(sortedMergedNgramList)) + ")"
# htmlfile = str(idx + 1) + "gram.html"
# hovername = [item[0] for item in sortedMergedNgramList]
# yval = [item[1]/1e10 for item in sortedMergedNgramList]
# xval = []
# for key in hovername:
# xval.append(max(filteredMalwareNgram[key], filteredBenignNgram[key]))
# colors = [chr(ord('a') + (value % 26)) for value in xval]
# maxval = max(xval)
# sizeval = [(int((val / maxval) * 100) + 1) for val in xval]
#
# fig = px.scatter(title=titlestr, y=yval, x=xval, color=colors,
# size=sizeval, hover_name=hovername, log_x=True,
# labels={
# "x": "Absolute Frequency",
# "y": "Relative Frequency"})
# fig.write_html(htmlfile)
# write the final ngrams into a file for feature selection
ngramDictList = []
for item in sortedMergedNgramList:
dictItem = {}
key = item[0]
dictItem['ngram'] = key
dictItem['count'] = max(filteredMalwareNgram[key], filteredBenignNgram[key])
ngramDictList.append(dictItem)
csvfields = ['ngram', 'count']
csvname = "./out/"+str(idx + 1) + "gram.csv"
print("*======================start write csv=======================================*")
try:
csvfile = open(csvname, 'w')
except Exception as err:
print(f"Error: writing csvfile {err}")
WriteCSV(csvfile, csvfields, ngramDictList)
csvfile.close()