concurrent.futures
为Python的一个复杂异步编程库,根据https://zhuanlan.zhihu.com/p/438627177的描述,其内部实现机制非常复杂,简单来说就是开辟一个固定大小为n的进程池/线程池。进程池中最多执行n个进程/线程,当任务完成后,从任务队列中取新任务。若池满,则排队等待。
submit()
方法提交任务。submit()
会返回一个Future
对象,这个对象代表了异步执行的操作。通过调用这个Future
对象的result()
方法,可以获取异步操作的结果,这个调用会阻塞,直到异步操作完成并返回结果。ProcessPoolExecutor
用于在一个进程池中执行调用的,适合于需要进行大量计算,且能够被分配到多个进程中并行处理的情况。
用例一
实现要求:利用concurrent.futures
对10个名称类似的数据f'network_{x}.csv'
(x取1~10)进行初始化操作。这一初始化操作首先使用的方法为initialization_method()
,其输入为x
,输出为pandas.DataFrame
数据df_initialized
和x
;之后将这一输出的元组打包进一个空列表tasks
,最后将tasks
输出。利用tqdm.tqdm
制作任务进度条。
from utils import initialization_method
import concurrent.futures
from tqdm import tqdm
TASK_QUANT = 10
def get_tasks():
tasks = []
with tqdm(total=TASK_QUANT) as pbar:
pbar.set_description('Initialization')
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(initialization_method, x) for x in range(1, 1 + TASK_QUANT)]
for future in futures:
tasks.append(future.result())
pbar.update(1)
return tasks
if __name__ == '__main__':
tasks = get_tasks()
用例二
实现要求:利用concurrent.futures
对用例一中获得的tasks
进行分析操作。这一分析操作首先使用的方法是process_frame()
,其输入为df_initialized
和x
,输出为两个pandas.DataFrame
数据df_path
(columns=['query_id', 'path'])和df_flow
(columns=['link_id', 'flow']);之后将这一输出的元组先依照键: 值为(df_path, df_flow): x
的形式打包成字典,用以记录df_path
的对应编号,之后将这些获得的DataFrame数据先添加columns再写入文件,文件命名分别为f'path_{x}.csv'
和f'flow_{x}.csv'
。利用try...except
打印可能产生的错误。
from utils import process_frame
from task1 import get_tasks
import concurrent.futures
import pandas as pd
PATH_COLUMNS = ['query_id', 'path']
FLOW_COLUMNS = ['link_id', 'path']
def result_output():
tasks = get_tasks()
with concurrent.futures.ProcessPoolExecutor() as executor:
future_dict = {executor.submit(process_frame, task[0], task[1]): task[1] for task in tasks}
for future in concurrent.futures.as_completed(future_dict):
x = future_dict[future]
try:
df_path, df_flow = future.result()
df_path.columns = PATH_COLUMNS
df_flow.columns = FLOW_COLUMNS
df_path.to_csv(f'path_{x}.csv', index=False)
df_flow.to_csv(f'flow_{x}.csv', index=False)
except Exception as exc:
print(f'Generated an exception: {exc}')
if __name__ == '__main__':
result_output()
ThreadPoolExecutor
用于在一个线程池中异步执行调用的,适用于那些大部分时间需要等待I/O操作的任务。
用例
博主没用过,看看Ichigo是怎么说的吧:
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(n)
return f"完成任务{n}"
with ThreadPoolExecutor(max_workers=3) as executor:
future1 = executor.submit(task, 1)
future2 = executor.submit(task, 2)
print(future1.result())
print(future2.result())
task
函数会简单地休眠一段时间,模拟一个耗时操作。我们通过ThreadPoolExecutor
提交了两个任务,然后通过result
方法等待并获取它们的结果。