concurrent.futures为Python的一个复杂异步编程库,根据https://zhuanlan.zhihu.com/p/438627177的描述,其内部实现机制非常复杂,简单来说就是开辟一个固定大小为n的进程池/线程池。进程池中最多执行n个进程/线程,当任务完成后,从任务队列中取新任务。若池满,则排队等待。

使用concurrent.futures库进行异步调用时,你会用到submit()方法提交任务。submit()会返回一个Future对象,这个对象代表了异步执行的操作。通过调用这个Future对象的result()方法,可以获取异步操作的结果,这个调用会阻塞,直到异步操作完成并返回结果。
使用concurrent.futures库可以简化多线程和多进程编程的复杂性,让你更加方便地执行并发操作。

ProcessPoolExecutor

用于在一个进程池中执行调用的,适合于需要进行大量计算,且能够被分配到多个进程中并行处理的情况。

用例一

实现要求:利用concurrent.futures对10个名称类似的数据f'network_{x}.csv'(x取1~10)进行初始化操作。这一初始化操作首先使用的方法为initialization_method(),其输入为x,输出为pandas.DataFrame数据df_initializedx;之后将这一输出的元组打包进一个空列表tasks,最后将tasks输出。利用tqdm.tqdm制作任务进度条。

Python
from utils import initialization_method
import concurrent.futures
from tqdm import tqdm

TASK_QUANT = 10

def get_tasks():
    tasks = []
    with tqdm(total=TASK_QUANT) as pbar:
        pbar.set_description('Initialization')
        with concurrent.futures.ProcessPoolExecutor() as executor:
            futures = [executor.submit(initialization_method, x) for x in range(1, 1 + TASK_QUANT)]
            for future in futures:
                tasks.append(future.result())
                pbar.update(1)
    return tasks


if __name__ == '__main__':
    tasks = get_tasks()

用例二

实现要求:利用concurrent.futures用例一中获得的tasks进行分析操作。这一分析操作首先使用的方法是process_frame(),其输入为df_initializedx,输出为两个pandas.DataFrame数据df_path(columns=['query_id', 'path'])和df_flow(columns=['link_id', 'flow']);之后将这一输出的元组先依照键: 值为(df_path, df_flow): x的形式打包成字典,用以记录df_path的对应编号,之后将这些获得的DataFrame数据先添加columns再写入文件,文件命名分别为f'path_{x}.csv'f'flow_{x}.csv'。利用try...except打印可能产生的错误。

Python
from utils import process_frame
from task1 import get_tasks
import concurrent.futures
import pandas as pd

PATH_COLUMNS = ['query_id', 'path']
FLOW_COLUMNS = ['link_id', 'path']

def result_output():
    tasks = get_tasks()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        future_dict = {executor.submit(process_frame, task[0], task[1]): task[1] for task in tasks}
        for future in concurrent.futures.as_completed(future_dict):
            x = future_dict[future]
            try:
                df_path, df_flow = future.result()
                df_path.columns = PATH_COLUMNS
                df_flow.columns = FLOW_COLUMNS
                df_path.to_csv(f'path_{x}.csv', index=False)
                df_flow.to_csv(f'flow_{x}.csv', index=False)
            except Exception as exc:
                print(f'Generated an exception: {exc}')


if __name__ == '__main__':
    result_output()

ThreadPoolExecutor

用于在一个线程池中异步执行调用的,适用于那些大部分时间需要等待I/O操作的任务。

用例

博主没用过,看看Ichigo是怎么说的吧:

这里是一个简单的使用ThreadPoolExecutor执行异步调用的例子:

Python
from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    time.sleep(n)
    return f"完成任务{n}"

with ThreadPoolExecutor(max_workers=3) as executor:
    future1 = executor.submit(task, 1)
    future2 = executor.submit(task, 2)
    print(future1.result())
    print(future2.result())

这段代码中,task函数会简单地休眠一段时间,模拟一个耗时操作。我们通过ThreadPoolExecutor提交了两个任务,然后通过result方法等待并获取它们的结果。