concurrent.futures — Запуск параллельных задач

Added in version 3.2.

Источник: Lib/concurrent/futures/thread.py и Lib/concurrent/futures/process.py


Модуль concurrent.futures предоставляет высокоуровневый интерфейс для асинхронного выполнения вызываемых файлов.

Асинхронное выполнение может осуществляться с помощью потоков, используя ThreadPoolExecutor, или отдельных процессов, используя ProcessPoolExecutor. Оба реализуют один и тот же интерфейс, который определяется абстрактным классом Executor.

Availability: не WASI.

Этот модуль не работает или недоступен на WebAssembly. Дополнительную информацию см. в разделе Платформы WebAssembly.

Объекты исполнителя

class concurrent.futures.Executor

Абстрактный класс, предоставляющий методы для асинхронного выполнения вызовов. Его следует использовать не напрямую, а через его конкретные подклассы.

submit(fn, /, *args, **kwargs)

Планирует выполнение вызываемого объекта fn как fn(*args, **kwargs) и возвращает объект Future, представляющий выполнение вызываемого объекта.

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(fn, *iterables, timeout=None, chunksize=1)

Аналогично map(fn, *iterables), за исключением:

  • итерации собираются сразу, а не лениво;

  • fn выполняется асинхронно, и несколько вызовов fn могут выполняться одновременно.

Возвращаемый итератор вызывает ошибку TimeoutError, если вызывается __next__() и результат не доступен по истечении timeout секунд с момента первоначального вызова Executor.map(). В качестве timeout может выступать int или float. Если timeout не указано или None, время ожидания не ограничено.

Если вызов fn вызывает исключение, то это исключение будет вызвано, когда его значение будет получено из итератора.

При использовании ProcessPoolExecutor этот метод разбивает итерабельные файлы на некоторое количество кусков, которые он отправляет в пул как отдельные задачи. Размер (приблизительный) этих кусков можно задать, установив для chunksize целое положительное число. Для очень длинных итераций использование большого значения chunksize может значительно повысить производительность по сравнению с размером по умолчанию, равным 1. При значении ThreadPoolExecutor chunksize не влияет.

Изменено в версии 3.5: Добавлен аргумент chunksize.

shutdown(wait=True, *, cancel_futures=False)

Сигнал исполнителю, что он должен освободить все используемые им ресурсы, когда текущие ожидающие фьючерсы закончат выполняться. Вызовы Executor.submit() и Executor.map(), сделанные после выключения, вызовут сигнал RuntimeError.

Если wait имеет значение True, то этот метод не вернется, пока все ожидающие фьючерсы не завершат выполнение и ресурсы, связанные с исполнителем, не будут освобождены. Если wait имеет значение False, то этот метод вернется немедленно, а ресурсы, связанные с исполнителем, будут освобождены, когда все ожидающие фьючерсы завершат выполнение. Независимо от значения wait, вся программа Python не завершится до тех пор, пока не будут выполнены все ожидающие фьючерсы.

Если значение cancel_futures равно True, этот метод отменит все ожидающие фьючерсы, которые исполнитель не начал выполнять. Любые завершенные или запущенные фьючерсы не будут отменены, независимо от значения cancel_futures.

Если значения cancel_futures и wait равны True, все фьючерсы, которые исполнитель начал выполнять, будут завершены до возвращения этого метода. Оставшиеся фьючерсы будут отменены.

Вы можете избежать явного вызова этого метода, если используете оператор with, который выключит Executor (ожидание, как если бы Executor.shutdown() был вызван с wait, установленным на True):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

Изменено в версии 3.9: Добавлено cancel_futures.

ThreadPoolExecutor

ThreadPoolExecutor - это подкласс Executor, который использует пул потоков для асинхронного выполнения вызовов.

Тупики могут возникать, когда вызываемый модуль, связанный с Future, ожидает результатов выполнения другого Future. Например:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

И:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

Подкласс Executor, использующий пул из не более чем max_workers потоков для асинхронного выполнения вызовов.

Все потоки, поставленные в очередь на ThreadPoolExecutor, будут объединены, прежде чем интерпретатор сможет выйти. Обратите внимание, что обработчик выхода, который это делает, выполняется перед любыми обработчиками выхода, добавленными с помощью atexit. Это означает, что исключения в главном потоке должны быть пойманы и обработаны, чтобы сигнализировать потокам об изящном выходе. По этой причине рекомендуется не использовать ThreadPoolExecutor для долго выполняющихся задач.

initializer - необязательный вызываемый модуль, который вызывается при запуске каждого рабочего потока; initargs - кортеж аргументов, передаваемых инициализатору. Если initializer вызовет исключение, то все текущие задания, находящиеся в процессе выполнения, вызовут BrokenThreadPool, а также все попытки отправить в пул дополнительные задания.

Изменено в версии 3.5: Если значение max_workers равно None или не задано, то по умолчанию оно будет равно количеству процессоров на машине, умноженному на 5, исходя из того, что ThreadPoolExecutor часто используется для перекрытия ввода-вывода вместо работы процессора, и количество рабочих должно быть больше, чем количество рабочих для ProcessPoolExecutor.

Изменено в версии 3.6: Добавлен параметр thread_name_prefix, позволяющий пользователям контролировать имена threading.Thread для рабочих потоков, создаваемых пулом, для облегчения отладки.

Изменено в версии 3.7: Добавлены аргументы initializer и initargs.

Изменено в версии 3.8: Значение по умолчанию max_workers изменено на min(32, os.cpu_count() + 4). Это значение по умолчанию сохраняет не менее 5 рабочих для задач, связанных с вводом/выводом. Оно использует не более 32 ядер процессора для задач, связанных с процессором, которые освобождают GIL. Это позволяет избежать неявного использования очень больших ресурсов на многоядерных машинах.

ThreadPoolExecutor теперь повторно использует простаивающие рабочие потоки перед запуском рабочих потоков max_workers.

Изменено в версии 3.13: Значение по умолчанию max_workers изменено на min(32, (os.process_cpu_count() or 1) + 4).

Пример ThreadPoolExecutor

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistant-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

Класс ProcessPoolExecutor является подклассом Executor, который использует пул процессов для асинхронного выполнения вызовов. ProcessPoolExecutor использует модуль multiprocessing, что позволяет ему обойти Global Interpreter Lock, но также означает, что выполняться и возвращаться могут только picklable-объекты.

Модуль __main__ должен быть импортируемым рабочими подпроцессами. Это означает, что ProcessPoolExecutor не будет работать в интерактивном интерпретаторе.

Вызов методов Executor или Future из callable, переданного в ProcessPoolExecutor, приведет к тупику.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

Подкласс Executor, который выполняет вызовы асинхронно, используя пул из не более чем max_workers процессов. Если max_workers равно None или не задано, то по умолчанию будет задано значение os.process_cpu_count(). Если max_workers меньше или равно 0, то будет вызвана ошибка ValueError. В Windows значение max_workers должно быть меньше или равно 61. Если это не так, то будет выдано сообщение ValueError. Если max_workers равно None, то по умолчанию будет выбрано не более 61, даже если доступно больше процессоров. mp_context может быть контекстом multiprocessing или None. Он будет использоваться для запуска рабочих. Если mp_context равен None или не указан, используется контекст по умолчанию multiprocessing. См. Контексты и методы запуска.

initializer - это необязательный вызываемый модуль, который вызывается при запуске каждого рабочего процесса; initargs - это кортеж аргументов, передаваемых инициализатору. Если initializer вызовет исключение, то все текущие задания, находящиеся в процессе выполнения, вызовут BrokenProcessPool, а также все попытки отправить в пул дополнительные задания.

max_tasks_per_child - необязательный аргумент, определяющий максимальное количество задач, которые может выполнить один процесс, прежде чем он завершится и будет заменен новым рабочим процессом. По умолчанию max_tasks_per_child равен None, что означает, что рабочие процессы будут жить столько же, сколько и пул. Если указано значение max, то при отсутствии параметра mp_context по умолчанию будет использоваться метод запуска мультипроцесса «spawn». Эта функция несовместима с методом запуска «fork».

Изменено в версии 3.3: При внезапном завершении одного из рабочих процессов теперь выдается ошибка BrokenProcessPool. Ранее поведение было неопределенным, но операции над исполнителем или его фьючерсами часто зависали или заходили в тупик.

Изменено в версии 3.7: Аргумент mp_context был добавлен, чтобы позволить пользователям управлять методом start_method для рабочих процессов, созданных пулом.

Добавлены аргументы initializer и initargs.

Примечание

В Python 3.14 метод запуска multiprocessing по умолчанию (см. Контексты и методы запуска) будет заменен на fork. Код, требующий использования fork для своих ProcessPoolExecutor, должен явно указать это, передав параметр mp_context=multiprocessing.get_context("fork").

Изменено в версии 3.11: Аргумент max_tasks_per_child был добавлен, чтобы позволить пользователям контролировать время жизни рабочих в пуле.

Изменено в версии 3.12: В POSIX-системах, если ваше приложение имеет несколько потоков и контекст multiprocessing использует метод "fork" start: Внутренняя функция os.fork(), вызываемая для порождения рабочих, может вызвать ошибку DeprecationWarning. Передайте mp_context, настроенный на использование другого метода запуска. Дополнительные пояснения см. в документации os.fork().

Изменено в версии 3.13: В max_workers по умолчанию используется os.process_cpu_count(), а не os.cpu_count().

Пример ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Объекты будущего

Класс Future инкапсулирует асинхронное выполнение вызываемого объекта. Экземпляры Future создаются Executor.submit().

class concurrent.futures.Future

Инкапсулирует асинхронное выполнение вызываемого объекта. Экземпляры Future создаются Executor.submit() и не должны создаваться напрямую, кроме как для тестирования.

cancel()

Попытка отменить вызов. Если вызов в данный момент выполняется или завершен и не может быть отменен, то метод вернет False, в противном случае вызов будет отменен и метод вернет True.

cancelled()

Возвращает True, если вызов был успешно отменен.

running()

Возвращает True, если вызов выполняется в данный момент и не может быть отменен.

done()

Возвращает True, если вызов был успешно отменен или завершен.

result(timeout=None)

Возвращает значение, возвращенное вызовом. Если вызов еще не завершен, то этот метод будет ждать до timeout секунд. Если вызов не завершился за timeout секунд, то будет вызвана ошибка TimeoutError. Значение timeout может быть int или float. Если timeout не указано или None, то время ожидания не ограничено.

Если будущее отменяется до завершения, то будет поднят CancelledError.

Если вызов вызвал исключение, этот метод вызовет такое же исключение.

exception(timeout=None)

Возвращает исключение, вызванное вызовом. Если вызов еще не завершен, то этот метод будет ждать до timeout секунд. Если вызов не завершился за timeout секунд, то будет поднята ошибка TimeoutError. Значение timeout может быть int или float. Если timeout не указано или None, то время ожидания не ограничено.

Если будущее отменяется до завершения, то будет поднят CancelledError.

Если вызов завершился без повышения, возвращается None.

add_done_callback(fn)

Присоединяет вызываемую функцию fn к будущему. fn будет вызвана, с будущим в качестве единственного аргумента, когда будущее будет отменено или завершит выполнение.

Добавленные вызываемые объекты вызываются в том порядке, в котором они были добавлены, и всегда вызываются в потоке, принадлежащем добавившему их процессу. Если вызываемый элемент вызывает подкласс Exception, он будет зарегистрирован и проигнорирован. Если вызываемый объект порождает подкласс BaseException, поведение не определено.

Если будущее уже завершилось или было отменено, будет немедленно вызвано fn.

Следующие Future методы предназначены для использования в модульных тестах и Executor реализациях.

set_running_or_notify_cancel()

Этот метод должен вызываться только реализациями Executor перед выполнением работы, связанной с Future, и модульными тестами.

Если метод возвращает False, то Future был отменен, то есть был вызван Future.cancel(), который вернул True. Все потоки, ожидающие завершения Future (т. е. через as_completed() или wait()), будут разбужены.

Если метод возвращает True, то Future не был отменен и переведен в состояние выполнения, т. е. вызовы Future.running() вернут True.

Этот метод может быть вызван только один раз и не может быть вызван после вызова Future.set_result() или Future.set_exception().

set_result(result)

Устанавливает результат работы, связанной с Future, в result.

Этот метод должен использоваться только в реализациях Executor и модульных тестах.

Изменено в версии 3.8: Этот метод поднимает concurrent.futures.InvalidStateError, если Future уже выполнен.

set_exception(exception)

Устанавливает результат работы, связанный с Future, на Exception исключение.

Этот метод должен использоваться только в реализациях Executor и модульных тестах.

Изменено в версии 3.8: Этот метод поднимает concurrent.futures.InvalidStateError, если Future уже выполнен.

Функции модуля

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

Дождитесь завершения работы экземпляров Future (возможно, созданных разными экземплярами Executor), переданных fs. Дубликаты фьючерсов, переданные fs, удаляются и будут возвращены только один раз. Возвращает именованный 2-кортеж наборов. Первый набор, названный done, содержит фьючерсы, которые завершились (завершенные или отмененные фьючерсы) до завершения ожидания. Второй набор, названный not_done, содержит фьючерсы, которые не завершились (ожидающие или выполняющиеся фьючерсы).

timeout можно использовать для управления максимальным количеством секунд ожидания перед возвратом. Значение timeout может быть int или float. Если timeout не указан или None, время ожидания не ограничено.

return_when указывает, когда эта функция должна вернуться. Это должна быть одна из следующих констант:

Постоянно

Описание

concurrent.futures.FIRST_COMPLETED

Функция вернется, когда любое будущее завершится или будет отменено.

concurrent.futures.FIRST_EXCEPTION

Функция вернется, когда любое будущее завершит свою работу, вызвав исключение. Если ни одно будущее не вызывает исключения, то это эквивалентно ALL_COMPLETED.

concurrent.futures.ALL_COMPLETED

Функция вернется, когда все фьючерсы завершатся или будут отменены.

concurrent.futures.as_completed(fs, timeout=None)

Возвращает итератор по экземплярам Future (возможно, созданным разными экземплярами Executor), заданным fs, который возвращает фьючерсы по мере их завершения (завершенные или отмененные фьючерсы). Любые фьючерсы, заданные fs, которые дублируются, будут возвращены один раз. Любые фьючерсы, завершившиеся до вызова as_completed(), будут возвращены первыми. Возвращенный итератор вызывает ошибку TimeoutError, если вызывается __next__() и результат не доступен по истечении timeout секунд с момента первоначального вызова as_completed(). В качестве timeout может выступать int или float. Если timeout не указан или None, время ожидания не ограничено.

См.также

PEP 3148 – фьючерсы - асинхронное выполнение вычислений

Предложение, описывающее эту возможность для включения в стандартную библиотеку Python.

Классы исключений

exception concurrent.futures.CancelledError

Возникает, когда будущее отменяется.

exception concurrent.futures.TimeoutError

Устаревший псевдоним TimeoutError, поднимаемый, когда будущая операция превышает заданный таймаут.

Изменено в версии 3.11: Этот класс был сделан псевдонимом TimeoutError.

exception concurrent.futures.BrokenExecutor

Производный от RuntimeError, этот класс исключений возникает, когда исполнитель по какой-то причине выходит из строя и не может быть использован для отправки или выполнения новых задач.

Added in version 3.7.

exception concurrent.futures.InvalidStateError

Возникает, когда над будущим выполняется операция, недопустимая в текущем состоянии.

Added in version 3.8.

exception concurrent.futures.thread.BrokenThreadPool

Производный от BrokenExecutor, этот класс исключений возникает, когда один из рабочих ThreadPoolExecutor не смог инициализироваться.

Added in version 3.7.

exception concurrent.futures.process.BrokenProcessPool

Производный от BrokenExecutor (ранее RuntimeError), этот класс исключений возникает, когда один из рабочих ProcessPoolExecutor завершается нечистым образом (например, если он был убит извне).

Added in version 3.3.