# coding=utf-8 import os import subprocess import threading import time from log_utils import setup_logger # 设置最大并发线程数 max_threads = 20 # 创建一个锁和条件变量用于线程同步 thread_lock = threading.Lock() active_threads = 0 threads_completed = 0 condition = threading.Condition(thread_lock) timer_event = threading.Event() def execute_command(cmd, log): """ 在子线程中执行给定的命令。 """ global active_threads, threads_completed process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) stdout, stderr = process.communicate() with condition: if stdout: log.info("stdout: %s" % stdout.decode('gbk')) if stderr: log.warning("stderr: %s\n err_cmd: %cmd" % (stderr.decode('gbk'), cmd)) # 当前线程完成任务后释放一个线程位置 active_threads -= 1 threads_completed += 1 condition.notify_all() print threads_completed def timer_thread(): print 'start timer thread' while True: with condition: # 每隔1秒检查一次并输出已完成命令的数量 while not timer_event.is_set() and threads_completed < len(commands): timer_event.wait(1) print "done file: %d" % threads_completed # 如果所有命令都已完成,则停止计时器线程 if threads_completed == len(commands): timer_event.set() break if __name__ == '__main__': # timer = threading.Thread(target=timer_thread) # timer.start() # timer_event.clear() log = setup_logger('thread_out', 'ida_asm_create_out.log') # 样本文件夹 sample_dir = "D:/bishe/dataset/sample_20230130_458" # 创建并启动线程 commands = [] for file in os.listdir(sample_dir): com = r'D:\IDA_Pro_v6.8\idaq64.exe -c -A -S"D:\bishe\Gencoding_KE\Genius3\raw-feature-extractor\preprocessing_ida.py 0" -oD:\bishe\dataset\out ' commands.append(com + "D:/bishe/dataset/sample_20230130_458/" + file) threads = [] for cmd in commands: while active_threads >= max_threads: with condition: # 等待有线程完成任务 condition.wait() thread = threading.Thread(target=execute_command, args=(cmd, log)) thread.start() active_threads += 1 threads.append(thread) # 等待所有线程完成 for thread in threads: thread.join() print("所有命令已执行完毕.")