concurrent.futures
— Запуск параллельных задач¶
Added in version 3.2.
Источник: Lib/concurrent/futures/thread.py и Lib/concurrent/futures/process.py
Модуль concurrent.futures
предоставляет высокоуровневый интерфейс для асинхронного выполнения вызываемых файлов.
Асинхронное выполнение может осуществляться с помощью потоков, используя ThreadPoolExecutor
, или отдельных процессов, используя ProcessPoolExecutor
. Оба реализуют один и тот же интерфейс, который определяется абстрактным классом Executor
.
Availability: не WASI.
Этот модуль не работает или недоступен на WebAssembly. Дополнительную информацию см. в разделе Платформы WebAssembly.
Объекты исполнителя¶
- class concurrent.futures.Executor¶
Абстрактный класс, предоставляющий методы для асинхронного выполнения вызовов. Его следует использовать не напрямую, а через его конкретные подклассы.
- submit(fn, /, *args, **kwargs)¶
Планирует выполнение вызываемого объекта fn как
fn(*args, **kwargs)
и возвращает объектFuture
, представляющий выполнение вызываемого объекта.with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
- map(fn, *iterables, timeout=None, chunksize=1)¶
Аналогично
map(fn, *iterables)
, за исключением:итерации собираются сразу, а не лениво;
fn выполняется асинхронно, и несколько вызовов fn могут выполняться одновременно.
Возвращаемый итератор вызывает ошибку
TimeoutError
, если вызывается__next__()
и результат не доступен по истечении timeout секунд с момента первоначального вызоваExecutor.map()
. В качестве timeout может выступать int или float. Если timeout не указано илиNone
, время ожидания не ограничено.Если вызов fn вызывает исключение, то это исключение будет вызвано, когда его значение будет получено из итератора.
При использовании
ProcessPoolExecutor
этот метод разбивает итерабельные файлы на некоторое количество кусков, которые он отправляет в пул как отдельные задачи. Размер (приблизительный) этих кусков можно задать, установив для chunksize целое положительное число. Для очень длинных итераций использование большого значения chunksize может значительно повысить производительность по сравнению с размером по умолчанию, равным 1. При значенииThreadPoolExecutor
chunksize не влияет.Изменено в версии 3.5: Добавлен аргумент chunksize.
- shutdown(wait=True, *, cancel_futures=False)¶
Сигнал исполнителю, что он должен освободить все используемые им ресурсы, когда текущие ожидающие фьючерсы закончат выполняться. Вызовы
Executor.submit()
иExecutor.map()
, сделанные после выключения, вызовут сигналRuntimeError
.Если wait имеет значение
True
, то этот метод не вернется, пока все ожидающие фьючерсы не завершат выполнение и ресурсы, связанные с исполнителем, не будут освобождены. Если wait имеет значениеFalse
, то этот метод вернется немедленно, а ресурсы, связанные с исполнителем, будут освобождены, когда все ожидающие фьючерсы завершат выполнение. Независимо от значения wait, вся программа Python не завершится до тех пор, пока не будут выполнены все ожидающие фьючерсы.Если значение cancel_futures равно
True
, этот метод отменит все ожидающие фьючерсы, которые исполнитель не начал выполнять. Любые завершенные или запущенные фьючерсы не будут отменены, независимо от значения cancel_futures.Если значения cancel_futures и wait равны
True
, все фьючерсы, которые исполнитель начал выполнять, будут завершены до возвращения этого метода. Оставшиеся фьючерсы будут отменены.Вы можете избежать явного вызова этого метода, если используете оператор
with
, который выключитExecutor
(ожидание, как если быExecutor.shutdown()
был вызван с wait, установленным наTrue
):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
Изменено в версии 3.9: Добавлено cancel_futures.
ThreadPoolExecutor¶
ThreadPoolExecutor
- это подкласс Executor
, который использует пул потоков для асинхронного выполнения вызовов.
Тупики могут возникать, когда вызываемый модуль, связанный с Future
, ожидает результатов выполнения другого Future
. Например:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
И:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
- class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶
Подкласс
Executor
, использующий пул из не более чем max_workers потоков для асинхронного выполнения вызовов.Все потоки, поставленные в очередь на
ThreadPoolExecutor
, будут объединены, прежде чем интерпретатор сможет выйти. Обратите внимание, что обработчик выхода, который это делает, выполняется перед любыми обработчиками выхода, добавленными с помощьюatexit
. Это означает, что исключения в главном потоке должны быть пойманы и обработаны, чтобы сигнализировать потокам об изящном выходе. По этой причине рекомендуется не использоватьThreadPoolExecutor
для долго выполняющихся задач.initializer - необязательный вызываемый модуль, который вызывается при запуске каждого рабочего потока; initargs - кортеж аргументов, передаваемых инициализатору. Если initializer вызовет исключение, то все текущие задания, находящиеся в процессе выполнения, вызовут
BrokenThreadPool
, а также все попытки отправить в пул дополнительные задания.Изменено в версии 3.5: Если значение max_workers равно
None
или не задано, то по умолчанию оно будет равно количеству процессоров на машине, умноженному на5
, исходя из того, чтоThreadPoolExecutor
часто используется для перекрытия ввода-вывода вместо работы процессора, и количество рабочих должно быть больше, чем количество рабочих дляProcessPoolExecutor
.Изменено в версии 3.6: Добавлен параметр thread_name_prefix, позволяющий пользователям контролировать имена
threading.Thread
для рабочих потоков, создаваемых пулом, для облегчения отладки.Изменено в версии 3.7: Добавлены аргументы initializer и initargs.
Изменено в версии 3.8: Значение по умолчанию max_workers изменено на
min(32, os.cpu_count() + 4)
. Это значение по умолчанию сохраняет не менее 5 рабочих для задач, связанных с вводом/выводом. Оно использует не более 32 ядер процессора для задач, связанных с процессором, которые освобождают GIL. Это позволяет избежать неявного использования очень больших ресурсов на многоядерных машинах.ThreadPoolExecutor теперь повторно использует простаивающие рабочие потоки перед запуском рабочих потоков max_workers.
Изменено в версии 3.13: Значение по умолчанию max_workers изменено на
min(32, (os.process_cpu_count() or 1) + 4)
.
Пример ThreadPoolExecutor¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistant-subdomain.python.org/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor¶
Класс ProcessPoolExecutor
является подклассом Executor
, который использует пул процессов для асинхронного выполнения вызовов. ProcessPoolExecutor
использует модуль multiprocessing
, что позволяет ему обойти Global Interpreter Lock, но также означает, что выполняться и возвращаться могут только picklable-объекты.
Модуль __main__
должен быть импортируемым рабочими подпроцессами. Это означает, что ProcessPoolExecutor
не будет работать в интерактивном интерпретаторе.
Вызов методов Executor
или Future
из callable, переданного в ProcessPoolExecutor
, приведет к тупику.
- class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)¶
Подкласс
Executor
, который выполняет вызовы асинхронно, используя пул из не более чем max_workers процессов. Если max_workers равноNone
или не задано, то по умолчанию будет задано значениеos.process_cpu_count()
. Если max_workers меньше или равно0
, то будет вызвана ошибкаValueError
. В Windows значение max_workers должно быть меньше или равно61
. Если это не так, то будет выдано сообщениеValueError
. Если max_workers равноNone
, то по умолчанию будет выбрано не более61
, даже если доступно больше процессоров. mp_context может быть контекстомmultiprocessing
илиNone
. Он будет использоваться для запуска рабочих. Если mp_context равенNone
или не указан, используется контекст по умолчаниюmultiprocessing
. См. Контексты и методы запуска.initializer - это необязательный вызываемый модуль, который вызывается при запуске каждого рабочего процесса; initargs - это кортеж аргументов, передаваемых инициализатору. Если initializer вызовет исключение, то все текущие задания, находящиеся в процессе выполнения, вызовут
BrokenProcessPool
, а также все попытки отправить в пул дополнительные задания.max_tasks_per_child - необязательный аргумент, определяющий максимальное количество задач, которые может выполнить один процесс, прежде чем он завершится и будет заменен новым рабочим процессом. По умолчанию max_tasks_per_child равен
None
, что означает, что рабочие процессы будут жить столько же, сколько и пул. Если указано значение max, то при отсутствии параметра mp_context по умолчанию будет использоваться метод запуска мультипроцесса «spawn». Эта функция несовместима с методом запуска «fork».Изменено в версии 3.3: При внезапном завершении одного из рабочих процессов теперь выдается ошибка
BrokenProcessPool
. Ранее поведение было неопределенным, но операции над исполнителем или его фьючерсами часто зависали или заходили в тупик.Изменено в версии 3.7: Аргумент mp_context был добавлен, чтобы позволить пользователям управлять методом start_method для рабочих процессов, созданных пулом.
Добавлены аргументы initializer и initargs.
Примечание
В Python 3.14 метод запуска
multiprocessing
по умолчанию (см. Контексты и методы запуска) будет заменен на fork. Код, требующий использования fork для своихProcessPoolExecutor
, должен явно указать это, передав параметрmp_context=multiprocessing.get_context("fork")
.Изменено в версии 3.11: Аргумент max_tasks_per_child был добавлен, чтобы позволить пользователям контролировать время жизни рабочих в пуле.
Изменено в версии 3.12: В POSIX-системах, если ваше приложение имеет несколько потоков и контекст
multiprocessing
использует метод"fork"
start: Внутренняя функцияos.fork()
, вызываемая для порождения рабочих, может вызвать ошибкуDeprecationWarning
. Передайте mp_context, настроенный на использование другого метода запуска. Дополнительные пояснения см. в документацииos.fork()
.Изменено в версии 3.13: В max_workers по умолчанию используется
os.process_cpu_count()
, а неos.cpu_count()
.
Пример ProcessPoolExecutor¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Объекты будущего¶
Класс Future
инкапсулирует асинхронное выполнение вызываемого объекта. Экземпляры Future
создаются Executor.submit()
.
- class concurrent.futures.Future¶
Инкапсулирует асинхронное выполнение вызываемого объекта. Экземпляры
Future
создаютсяExecutor.submit()
и не должны создаваться напрямую, кроме как для тестирования.- cancel()¶
Попытка отменить вызов. Если вызов в данный момент выполняется или завершен и не может быть отменен, то метод вернет
False
, в противном случае вызов будет отменен и метод вернетTrue
.
- cancelled()¶
Возвращает
True
, если вызов был успешно отменен.
- running()¶
Возвращает
True
, если вызов выполняется в данный момент и не может быть отменен.
- done()¶
Возвращает
True
, если вызов был успешно отменен или завершен.
- result(timeout=None)¶
Возвращает значение, возвращенное вызовом. Если вызов еще не завершен, то этот метод будет ждать до timeout секунд. Если вызов не завершился за timeout секунд, то будет вызвана ошибка
TimeoutError
. Значение timeout может быть int или float. Если timeout не указано илиNone
, то время ожидания не ограничено.Если будущее отменяется до завершения, то будет поднят
CancelledError
.Если вызов вызвал исключение, этот метод вызовет такое же исключение.
- exception(timeout=None)¶
Возвращает исключение, вызванное вызовом. Если вызов еще не завершен, то этот метод будет ждать до timeout секунд. Если вызов не завершился за timeout секунд, то будет поднята ошибка
TimeoutError
. Значение timeout может быть int или float. Если timeout не указано илиNone
, то время ожидания не ограничено.Если будущее отменяется до завершения, то будет поднят
CancelledError
.Если вызов завершился без повышения, возвращается
None
.
- add_done_callback(fn)¶
Присоединяет вызываемую функцию fn к будущему. fn будет вызвана, с будущим в качестве единственного аргумента, когда будущее будет отменено или завершит выполнение.
Добавленные вызываемые объекты вызываются в том порядке, в котором они были добавлены, и всегда вызываются в потоке, принадлежащем добавившему их процессу. Если вызываемый элемент вызывает подкласс
Exception
, он будет зарегистрирован и проигнорирован. Если вызываемый объект порождает подклассBaseException
, поведение не определено.Если будущее уже завершилось или было отменено, будет немедленно вызвано fn.
Следующие
Future
методы предназначены для использования в модульных тестах иExecutor
реализациях.- set_running_or_notify_cancel()¶
Этот метод должен вызываться только реализациями
Executor
перед выполнением работы, связанной сFuture
, и модульными тестами.Если метод возвращает
False
, тоFuture
был отменен, то есть был вызванFuture.cancel()
, который вернулTrue
. Все потоки, ожидающие завершенияFuture
(т. е. черезas_completed()
илиwait()
), будут разбужены.Если метод возвращает
True
, тоFuture
не был отменен и переведен в состояние выполнения, т. е. вызовыFuture.running()
вернутTrue
.Этот метод может быть вызван только один раз и не может быть вызван после вызова
Future.set_result()
илиFuture.set_exception()
.
- set_result(result)¶
Устанавливает результат работы, связанной с
Future
, в result.Этот метод должен использоваться только в реализациях
Executor
и модульных тестах.Изменено в версии 3.8: Этот метод поднимает
concurrent.futures.InvalidStateError
, еслиFuture
уже выполнен.
Функции модуля¶
- concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)¶
Дождитесь завершения работы экземпляров
Future
(возможно, созданных разными экземплярамиExecutor
), переданных fs. Дубликаты фьючерсов, переданные fs, удаляются и будут возвращены только один раз. Возвращает именованный 2-кортеж наборов. Первый набор, названныйdone
, содержит фьючерсы, которые завершились (завершенные или отмененные фьючерсы) до завершения ожидания. Второй набор, названныйnot_done
, содержит фьючерсы, которые не завершились (ожидающие или выполняющиеся фьючерсы).timeout можно использовать для управления максимальным количеством секунд ожидания перед возвратом. Значение timeout может быть int или float. Если timeout не указан или
None
, время ожидания не ограничено.return_when указывает, когда эта функция должна вернуться. Это должна быть одна из следующих констант:
Постоянно
Описание
- concurrent.futures.FIRST_COMPLETED¶
Функция вернется, когда любое будущее завершится или будет отменено.
- concurrent.futures.FIRST_EXCEPTION¶
Функция вернется, когда любое будущее завершит свою работу, вызвав исключение. Если ни одно будущее не вызывает исключения, то это эквивалентно
ALL_COMPLETED
.- concurrent.futures.ALL_COMPLETED¶
Функция вернется, когда все фьючерсы завершатся или будут отменены.
- concurrent.futures.as_completed(fs, timeout=None)¶
Возвращает итератор по экземплярам
Future
(возможно, созданным разными экземплярамиExecutor
), заданным fs, который возвращает фьючерсы по мере их завершения (завершенные или отмененные фьючерсы). Любые фьючерсы, заданные fs, которые дублируются, будут возвращены один раз. Любые фьючерсы, завершившиеся до вызоваas_completed()
, будут возвращены первыми. Возвращенный итератор вызывает ошибкуTimeoutError
, если вызывается__next__()
и результат не доступен по истечении timeout секунд с момента первоначального вызоваas_completed()
. В качестве timeout может выступать int или float. Если timeout не указан илиNone
, время ожидания не ограничено.
См.также
- PEP 3148 – фьючерсы - асинхронное выполнение вычислений
Предложение, описывающее эту возможность для включения в стандартную библиотеку Python.
Классы исключений¶
- exception concurrent.futures.CancelledError¶
Возникает, когда будущее отменяется.
- exception concurrent.futures.TimeoutError¶
Устаревший псевдоним
TimeoutError
, поднимаемый, когда будущая операция превышает заданный таймаут.Изменено в версии 3.11: Этот класс был сделан псевдонимом
TimeoutError
.
- exception concurrent.futures.BrokenExecutor¶
Производный от
RuntimeError
, этот класс исключений возникает, когда исполнитель по какой-то причине выходит из строя и не может быть использован для отправки или выполнения новых задач.Added in version 3.7.
- exception concurrent.futures.InvalidStateError¶
Возникает, когда над будущим выполняется операция, недопустимая в текущем состоянии.
Added in version 3.8.
- exception concurrent.futures.thread.BrokenThreadPool¶
Производный от
BrokenExecutor
, этот класс исключений возникает, когда один из рабочихThreadPoolExecutor
не смог инициализироваться.Added in version 3.7.
- exception concurrent.futures.process.BrokenProcessPool¶
Производный от
BrokenExecutor
(ранееRuntimeError
), этот класс исключений возникает, когда один из рабочихProcessPoolExecutor
завершается нечистым образом (например, если он был убит извне).Added in version 3.3.