import logging import os """ 日志工具 使用方法: logger = setup_logger(日志记录器的实例名字, 日志文件目录) """ def setup_logger(name, log_file, level=logging.INFO): """Function setup as many loggers as you want""" if not os.path.exists(os.path.dirname(log_file)): os.makedirs(os.path.dirname(log_file)) formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') handler = logging.FileHandler(log_file) handler.setFormatter(formatter) # 控制台是否输出日志信息 # stream_handler = logging.StreamHandler() # stream_handler.setFormatter(formatter) logger = logging.getLogger(name) logger.setLevel(level) logger.addHandler(handler) # 控制台 # logger.addHandler(stream_handler) # 刷新原有log文件 if os.path.exists(log_file): open(log_file, 'w').close() return logger """ 多线程工具 """ THREAD_FULL = os.cpu_count() THREAD_HALF = int(os.cpu_count() / 2) def multi_thread(func, args, thread_num=THREAD_FULL): """ 多线程执行函数 :param func: 函数 :param args: list函数参数 :param thread_num: 线程数 :return: """ import concurrent.futures from tqdm import tqdm logger = setup_logger('multi_thread', './multi_thread.log') result = [] with concurrent.futures.ThreadPoolExecutor(max_workers=thread_num) as executor: try: futures_to_args = { executor.submit(func, arg): arg for arg in args } for future in tqdm(concurrent.futures.as_completed(futures_to_args), total=len(args)): try: result.append(future.result()) except Exception as exc: logger.error('%r generated an exception: %s' % (futures_to_args[future], exc)) except Exception as exc: logger.error('%r generated an exception: %s' % (futures_to_args[future], exc)) return result def save_json(filename, data): import json data = json.dumps(data) file = open(filename, 'w') file.write(data) file.close()