Корутины и задачи

В этом разделе описываются высокоуровневые API-интерфейсы Asyncio для работы с coroutines и Tasks.

Корутины

Источник: Lib/asyncio/coroutines.py


Coroutines, объявленные с использованием синтаксиса async/await, являются предпочтительным способом написания приложений asyncio. Например, следующий фрагмент кода печатает «hello», ждет 1 секунду, а затем печатает «world»:

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

Обратите внимание, что простой вызов coroutine не приведет к планированию его выполнения:

>>> main()
<coroutine object main at 0x1053bb7c8>

Для фактического запуска coroutine asyncio предоставляет следующие механизмы:

  • Функция asyncio.run() для запуска функции верхнего уровня «main()» (см. пример выше).

  • Ожидание в корутине. Следующий фрагмент кода выведет «hello» после ожидания в течение 1 секунды, а затем выведет «world» после ожидания в течение еще 2 секунд:

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
    
    async def main():
        print(f"started at {time.strftime('%X')}")
    
        await say_after(1, 'hello')
        await say_after(2, 'world')
    
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())
    

    Ожидаемый результат:

    started at 17:13:52
    hello
    world
    finished at 17:13:55
    
  • Функция asyncio.create_task() для одновременного запуска coroutines как asyncio Tasks.

    Давайте модифицируем приведенный выше пример и запустим две say_after coroutines concurrent:

    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
    
        task2 = asyncio.create_task(
            say_after(2, 'world'))
    
        print(f"started at {time.strftime('%X')}")
    
        # Wait until both tasks are completed (should take
        # around 2 seconds.)
        await task1
        await task2
    
        print(f"finished at {time.strftime('%X')}")
    

    Обратите внимание, что ожидаемый результат теперь показывает, что фрагмент выполняется на 1 секунду быстрее, чем раньше:

    started at 17:14:32
    hello
    world
    finished at 17:14:34
    
  • Класс asyncio.TaskGroup представляет собой более современную альтернативу create_task(). Используя этот API, последний пример становится:

    async def main():
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(
                say_after(1, 'hello'))
    
            task2 = tg.create_task(
                say_after(2, 'world'))
    
            print(f"started at {time.strftime('%X')}")
    
        # The await is implicit when the context manager exits.
    
        print(f"finished at {time.strftime('%X')}")
    

    Время и результат должны быть такими же, как и в предыдущей версии.

    Added in version 3.11: asyncio.TaskGroup.

Ожидания

Мы говорим, что объект является awaitable объектом, если он может быть использован в выражении await. Многие API asyncio разработаны для приема awaitables.

Существует три основных типа ожидаемых объектов: коробочки, задачи и фьючерсы.

Корутины

Корутины Python являются ожидаемыми и поэтому могут быть ожидаемы от других корутин:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

Важно

В этой документации термин «coroutine» может использоваться для обозначения двух тесно связанных понятий:

  • коробочная функция: функция async def;

  • coroutine object: объект, возвращаемый при вызове coroutine function.

Задачи

Задачи используются для планирования корутинов постоянно.

Когда корутина обернута в Task с функциями типа asyncio.create_task(), корутина автоматически планируется к выполнению в ближайшее время:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Фьючерсы

Future - это специальный низкоуровневый ожидаемый объект, который представляет собой эвентуальный результат асинхронной операции.

Когда объект Future ожидается, это означает, что coroutine будет ждать, пока Future не будет разрешен в каком-то другом месте.

Объекты будущего в asyncio нужны для того, чтобы код, основанный на обратных вызовах, можно было использовать с async/await.

Обычно нет необходимости создавать объекты Future в коде на уровне приложения.

Будущие объекты, иногда открываемые библиотеками и некоторыми API asyncio, можно ожидать:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

Хорошим примером низкоуровневой функции, возвращающей объект Future, является loop.run_in_executor().

Создание заданий

Источник: Lib/asyncio/tasks.py


asyncio.create_task(coro, *, name=None, context=None)

Заверните coro coroutine в Task и запланируйте его выполнение. Верните объект Task.

Если name не является None, оно устанавливается как имя задачи с помощью Task.set_name().

Необязательный аргумент context, содержащий только ключевое слово, позволяет указать пользовательский contextvars.Context, в котором будет выполняться coro. Если аргумент context не указан, создается копия текущего контекста.

Задача выполняется в цикле, возвращаемом get_running_loop(), RuntimeError поднимается, если в текущем потоке нет запущенного цикла.

Примечание

asyncio.TaskGroup.create_task() - это новая альтернатива, использующая структурный параллелизм; она позволяет ожидать выполнения группы связанных задач с надежными гарантиями безопасности.

Важно

Сохраните ссылку на результат этой функции, чтобы избежать исчезновения задачи в середине выполнения. В цикле событий хранятся только слабые ссылки на задачи. Задача, на которую нет других ссылок, может быть собрана в мусор в любой момент, даже до того, как она будет выполнена. Для обеспечения надежности фоновых задач, выполняемых по принципу «поставил и забыл», собирайте их в коллекцию:

background_tasks = set()

for i in range(10):
    task = asyncio.create_task(some_coro(param=i))

    # Add task to the set. This creates a strong reference.
    background_tasks.add(task)

    # To prevent keeping references to finished tasks forever,
    # make each task remove its own reference from the set after
    # completion:
    task.add_done_callback(background_tasks.discard)

Added in version 3.7.

Изменено в версии 3.8: Добавлен параметр name.

Изменено в версии 3.11: Добавлен параметр context.

Отмена задания

Задачи можно легко и безопасно отменять. При отмене задачи asyncio.CancelledError будет поднят в задаче при следующей возможности.

Рекомендуется, чтобы корутины использовали блоки try/finally для надежного выполнения логики очистки. В случае, если asyncio.CancelledError явно пойман, он, как правило, должен быть распространен после завершения очистки. asyncio.CancelledError непосредственно является подклассом BaseException, поэтому большинству кода не нужно будет знать о нем.

Компоненты asyncio, обеспечивающие структурированный параллелизм, такие как asyncio.TaskGroup и asyncio.timeout(), внутри реализованы с использованием отмены и могут повести себя неправильно, если корутин проглотит asyncio.CancelledError. Аналогично, пользовательский код обычно не должен вызывать uncancel. Однако в случаях, когда подавление asyncio.CancelledError действительно желательно, необходимо также вызвать uncancel(), чтобы полностью удалить состояние отмены.

Целевые группы

Группы задач сочетают в себе API для создания задач с удобным и надежным способом дождаться завершения всех задач в группе.

class asyncio.TaskGroup

В asynchronous context manager содержится группа заданий. Задачи могут быть добавлены в группу с помощью create_task(). Все задачи ожидают выхода из контекстного менеджера.

Added in version 3.11.

create_task(coro, *, name=None, context=None)

Создайте задачу в этой группе задач. Подпись совпадает с подписью asyncio.create_task(). Если группа задач неактивна (например, еще не введена, уже завершена или находится в процессе закрытия), мы закроем данную coro.

Изменено в версии 3.13: Закройте данную корутину, если группа задач не активна.

Пример:

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coro(...))
        task2 = tg.create_task(another_coro(...))
    print(f"Both tasks have completed now: {task1.result()}, {task2.result()}")

Оператор async with будет ждать завершения всех задач в группе. Во время ожидания в группу могут быть добавлены новые задачи (например, путем передачи tg в одну из корутин и вызова tg.create_task() в этой корутине). После завершения последней задачи и выхода из блока async with в группу нельзя добавлять новые задачи.

При первом сбое любой из задач, входящих в группу, с исключением, отличным от asyncio.CancelledError, остальные задачи в группе отменяются. После этого никакие другие задачи не могут быть добавлены в группу. В этот момент, если тело оператора async with все еще активно (т. е. __aexit__() еще не вызван), задача, непосредственно содержащая оператор async with, также отменяется. Результирующий asyncio.CancelledError прервет await, но не выйдет из содержащего его оператора async with.

После завершения всех задач, если какая-либо задача завершилась неудачей с исключением, отличным от asyncio.CancelledError, эти исключения объединяются в исключение ExceptionGroup или BaseExceptionGroup (в зависимости от ситуации; см. документацию), которое затем поднимается.

Два базовых исключения обрабатываются особым образом: Если какая-либо задача проваливается с KeyboardInterrupt или SystemExit, группа задач все равно отменяет оставшиеся задачи и ждет их, но тогда вместо ExceptionGroup или BaseExceptionGroup снова поднимается начальная KeyboardInterrupt или SystemExit.

Если тело оператора async with завершается с исключением (поэтому __aexit__() вызывается с установленным исключением), то это происходит так же, как если бы одна из задач завершилась неудачей: оставшиеся задачи отменяются и затем ожидаются, а исключения, не связанные с отменой, объединяются в группу исключений и поднимаются. Исключение, переданное в __aexit__(), если оно не является asyncio.CancelledError, также включается в группу исключений. Для KeyboardInterrupt и SystemExit предусмотрен тот же особый случай, что и в предыдущем параграфе.

Группы задач стараются не смешивать внутреннюю отмену, используемую для «пробуждения» их __aexit__(), с запросами на отмену задачи, в которой они выполняются, поступающими от других сторон. В частности, когда одна группа задач синтаксически вложена в другую и обе одновременно сталкиваются с исключением в одной из своих дочерних задач, внутренняя группа задач обрабатывает свои исключения, а затем внешняя группа задач получает еще одну отмену и обрабатывает свои собственные исключения.

В случае, когда группа задач отменяется извне и также должна поднять ExceptionGroup, она вызовет метод cancel() родительской задачи. Это гарантирует, что asyncio.CancelledError будет поднят при следующем await, так что отмена не будет потеряна.

Группы задач сохраняют количество отмен, о которых сообщает asyncio.Task.cancelling().

Изменено в версии 3.13: Улучшена обработка одновременных внутренних и внешних отмен и корректное сохранение подсчетов отмен.

Спать

coroutine asyncio.sleep(delay, result=None)

Блокировать на задержку секунд.

Если указан result, то он возвращается вызывающей стороне после завершения работы coroutine.

sleep() всегда приостанавливает выполнение текущей задачи, позволяя выполнять другие задачи.

Установка задержки в 0 обеспечивает оптимизированный путь, позволяющий выполнять другие задачи. Это может быть использовано долго выполняющимися функциями, чтобы избежать блокировки цикла событий на весь период вызова функции.

Пример корутина, выводящего текущую дату каждую секунду в течение 5 секунд:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

Изменено в версии 3.10: Удален параметр loop.

Изменено в версии 3.13: Поднимает ValueError, если задержка равна nan.

Одновременное выполнение задач

awaitable asyncio.gather(*aws, return_exceptions=False)

Выполните awaitable objects в последовательности aws последовательно.

Если какой-либо awaitable в aws является coroutine, он автоматически планируется как Task.

Если все ожидания завершены успешно, результатом будет агрегированный список возвращаемых значений. Порядок значений результата соответствует порядку ожиданий в aws.

Если return_exceptions имеет значение False (по умолчанию), первое поднятое исключение немедленно передается задаче, ожидающей на gather(). Другие ожидающие задачи в последовательности aws не будут отменены и продолжат выполняться.

Если return_exceptions имеет значение True, исключения рассматриваются так же, как и успешные результаты, и объединяются в списке результатов.

Если gather() отменен, все отправленные ожидания (которые еще не завершились) также отменяются.

Если какая-либо задача или будущее из последовательности aws отменяется, она рассматривается так, как если бы она подняла CancelledError. – вызов gather() в этом случае не отменяется. Это делается для того, чтобы отмена одной отправленной задачи/будущего не привела к отмене других задач/будущих.

Примечание

Новая альтернатива для одновременного создания и запуска задач и ожидания их завершения - asyncio.TaskGroup. TaskGroup обеспечивает более надежные гарантии безопасности, чем gather, при планировании вложенности подзадач: если задача (или подзадача, задача, запланированная задачей) вызывает исключение, TaskGroup отменит оставшиеся запланированные задачи, а gather - нет).

Пример:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2), currently i=2...
#     Task B: Compute factorial(3), currently i=2...
#     Task C: Compute factorial(4), currently i=2...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3), currently i=3...
#     Task C: Compute factorial(4), currently i=3...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4), currently i=4...
#     Task C: factorial(4) = 24
#     [2, 6, 24]

Примечание

Если значение return_exceptions равно false, отмена gather() после того, как она была помечена как выполненная, не отменит ни одного из переданных ожиданий. Например, gather может быть помечен как выполненный после передачи исключения вызывающему, поэтому вызов gather.cancel() после перехвата исключения (поднятого одним из ожиданий) из gather не отменит другие ожидания.

Изменено в версии 3.7: Если сам gather отменяется, то отмена распространяется независимо от return_exceptions.

Изменено в версии 3.10: Удален параметр loop.

Не рекомендуется, начиная с версии 3.10: Предупреждение об устаревании выдается, если не указаны позиционные аргументы или не все позиционные аргументы являются Future-like объектами и нет запущенного цикла событий.

Фабрика заданий

asyncio.eager_task_factory(loop, coro, *, name=None, context=None)

Фабрика задач для ускоренного выполнения задач.

При использовании этой фабрики (через loop.set_task_factory(asyncio.eager_task_factory)) корутины начинают выполняться синхронно во время построения Task. Задачи планируются в цикле событий только в том случае, если они блокируются. Это может повысить производительность, так как накладные расходы на планирование циклов исключаются для синхронно выполняющихся коротинов.

Частым примером, когда это полезно, являются корутины, которые используют кэширование или мемоизацию, чтобы избежать фактического ввода-вывода, когда это возможно.

Примечание

Немедленное выполнение корутины - это семантическое изменение. Если корутина возвращается или поднимается, задача никогда не планируется в цикл событий. Если выполнение корутины блокируется, задача назначается в цикл событий. Это изменение может внести изменения в поведение существующих приложений. Например, может измениться порядок выполнения задач в приложении.

Added in version 3.12.

asyncio.create_eager_task_factory(custom_task_constructor)

Создайте фабрику задач, аналогичную eager_task_factory(), используя предоставленный custom_task_constructor при создании новой задачи вместо стандартного Task.

custom_task_constructor должен быть callable с сигнатурой, совпадающей с сигнатурой Task.__init__. Вызываемый объект должен возвращать asyncio.Task-совместимый объект.

Эта функция возвращает callable, предназначенный для использования в качестве фабрики задач цикла событий через loop.set_task_factory(factory).

Added in version 3.12.

Защита от отмены

awaitable asyncio.shield(aw)

Защитите awaitable object от превращения в cancelled.

Если aw является корутином, он автоматически запланирован как задача.

Заявление:

task = asyncio.create_task(something())
res = await shield(task)

эквивалентно:

res = await something()

*За исключением того, что если содержащая его coroutine отменяется, то Task, выполняющаяся в something(), не отменяется. С точки зрения something(), отмены не произошло. Хотя ее вызывающая сторона все еще отменена, поэтому выражение «await» все еще вызывает CancelledError.

Если something() аннулируется другим способом (т.е. изнутри самого себя), это также аннулирует shield().

Если необходимо полностью игнорировать отмену (не рекомендуется), функцию shield() следует объединить с предложением try/except, как показано ниже:

task = asyncio.create_task(something())
try:
    res = await shield(task)
except CancelledError:
    res = None

Важно

Сохраните ссылку на задачи, переданные в эту функцию, чтобы избежать исчезновения задачи в середине выполнения. В цикле событий сохраняются только слабые ссылки на задачи. Задача, на которую нет других ссылок, может быть собрана в мусор в любой момент, даже до того, как она будет выполнена.

Изменено в версии 3.10: Удален параметр loop.

Не рекомендуется, начиная с версии 3.10: Предупреждение об устаревании выдается, если aw не является Future-like объектом и нет запущенного цикла событий.

Тайм-ауты

asyncio.timeout(delay)

Возвращает значение asynchronous context manager, которое можно использовать для ограничения времени ожидания чего-либо.

delay может быть либо None, либо float/int числом секунд ожидания. Если delay имеет значение None, ограничение по времени не будет применяться; это может быть полезно, если задержка неизвестна на момент создания менеджера контекста.

В любом случае, менеджер контекста можно переназначить после создания с помощью Timeout.reschedule().

Пример:

async def main():
    async with asyncio.timeout(10):
        await long_running_task()

Если выполнение long_running_task занимает более 10 секунд, менеджер контекста отменяет текущую задачу и обрабатывает полученный asyncio.CancelledError внутри, преобразуя его в TimeoutError, который может быть пойман и обработан.

Примечание

Контекстный менеджер asyncio.timeout() преобразует asyncio.CancelledError в TimeoutError, что означает, что TimeoutError может быть пойман только вне контекстного менеджера.

Пример ловли TimeoutError:

async def main():
    try:
        async with asyncio.timeout(10):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

Контекстный менеджер, созданный asyncio.timeout(), может быть перенесен на другой срок и проверен.

class asyncio.Timeout(when)

asynchronous context manager для отмены просроченных корутинов.

when должно быть абсолютным временем, в которое контекст должен завершиться, по часам цикла событий:

  • Если when равно None, тайм-аут никогда не сработает.

  • Если when < loop.time(), то таймаут сработает на следующей итерации цикла событий.

when() float | None

Возвращает текущий крайний срок или None, если текущий срок не установлен.

reschedule(when: float | None)

Перенесите тайм-аут.

expired() bool

Возвращает, превысил ли менеджер контекста свой срок (истек).

Пример:

async def main():
    try:
        # We do not know the timeout when starting, so we pass ``None``.
        async with asyncio.timeout(None) as cm:
            # We know the timeout now, so we reschedule it.
            new_deadline = get_running_loop().time() + 10
            cm.reschedule(new_deadline)

            await long_running_task()
    except TimeoutError:
        pass

    if cm.expired():
        print("Looks like we haven't finished on time.")

Контекстные менеджеры тайм-аута могут быть безопасно вложены друг в друга.

Added in version 3.11.

asyncio.timeout_at(when)

Аналогично asyncio.timeout(), только когда - это абсолютное время прекращения ожидания, или None.

Пример:

async def main():
    loop = get_running_loop()
    deadline = loop.time() + 20
    try:
        async with asyncio.timeout_at(deadline):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

Added in version 3.11.

coroutine asyncio.wait_for(aw, timeout)

Дождитесь завершения aw awaitable с таймаутом.

Если aw является корутином, он автоматически запланирован как задача.

timeout может быть либо None, либо число секунд ожидания в формате float или int. Если timeout равно None, блокируйте выполнение до тех пор, пока будущее не завершится.

Если произошел таймаут, он отменяет задание и поднимает TimeoutError.

Чтобы избежать задачи cancellation, оберните ее в shield().

Функция будет ждать, пока будущее не будет фактически отменено, поэтому общее время ожидания может превысить timeout. Если во время отмены произойдет исключение, оно будет передано.

Если ожидание отменяется, то отменяется и будущее aw.

Пример:

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

Изменено в версии 3.7: Когда aw отменяется из-за таймаута, wait_for ждет, пока aw не будет отменен. Ранее она немедленно вызывала TimeoutError.

Изменено в версии 3.10: Удален параметр loop.

Изменено в версии 3.11: Поднимает TimeoutError вместо asyncio.TimeoutError.

Примитивы ожидания

coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

Одновременный запуск экземпляров Future и Task в итерабельной системе aws и блокировка до выполнения условия, указанного в return_when.

Итерабель aws не должен быть пустым.

Возвращает два набора Задач/Фьючерсов: (done, pending).

Использование:

done, pending = await asyncio.wait(aws)

timeout (число с плавающей запятой или int), если указано, может использоваться для управления максимальным количеством секунд ожидания перед возвратом.

Обратите внимание, что эта функция не поднимает TimeoutError. Фьючерсы или задачи, которые не были выполнены в момент таймаута, просто возвращаются во второй набор.

return_when указывает, когда эта функция должна вернуться. Это должна быть одна из следующих констант:

Постоянно

Описание

asyncio.FIRST_COMPLETED

Функция вернется, когда любое будущее завершится или будет отменено.

asyncio.FIRST_EXCEPTION

Функция вернется, когда любое будущее завершит свою работу, вызвав исключение. Если ни одно будущее не вызывает исключения, то это эквивалентно ALL_COMPLETED.

asyncio.ALL_COMPLETED

Функция вернется, когда все фьючерсы завершатся или будут отменены.

В отличие от wait_for(), wait() не отменяет фьючерсы при наступлении тайм-аута.

Изменено в версии 3.10: Удален параметр loop.

Изменено в версии 3.11: Передача объектов coroutine в wait() напрямую запрещена.

Изменено в версии 3.12: Добавлена поддержка генераторов, дающих задания.

asyncio.as_completed(aws, *, timeout=None)

Одновременно выполняйте awaitable objects в итерабле aws. Возвращенный объект может быть итерирован для получения результатов ожиданий по мере их завершения.

Объект, возвращаемый as_completed(), может быть итерирован как asynchronous iterator или как обычный iterator. Когда используется асинхронная итерация, первоначально предоставленные awaitables выдаются, если они являются задачами или фьючерсами. Это позволяет легко соотнести ранее запланированные задачи с их результатами. Пример:

ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]

async for earliest_connect in as_completed(tasks):
    # earliest_connect is done. The result can be obtained by
    # awaiting it or calling earliest_connect.result()
    reader, writer = await earliest_connect

    if earliest_connect is ipv6_connect:
        print("IPv6 connection established.")
    else:
        print("IPv4 connection established.")

Во время асинхронной итерации для предоставленных awaitables, которые не являются задачами или фьючерсами, будут выведены неявно созданные задачи.

При использовании в качестве обычного итератора каждая итерация порождает новую корутину, которая возвращает результат или поднимает исключение следующего завершенного ожидаемого. Этот паттерн совместим с версиями Python старше 3.13:

ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]

for next_connect in as_completed(tasks):
    # next_connect is not one of the original task objects. It must be
    # awaited to obtain the result value or raise the exception of the
    # awaitable that finishes next.
    reader, writer = await next_connect

Если таймаут наступает до того, как все ожидаемые события завершены, возникает ошибка TimeoutError. Это происходит в цикле async for при асинхронной итерации или в корутинах, возникающих при обычной итерации.

Изменено в версии 3.10: Удален параметр loop.

Не рекомендуется, начиная с версии 3.10: Предупреждение об устаревании выдается, если не все ожидаемые объекты в итерабле aws являются Future-подобными объектами и нет запущенного цикла событий.

Изменено в версии 3.12: Добавлена поддержка генераторов, дающих задания.

Изменено в версии 3.13: Результат теперь может быть использован как asynchronous iterator или как обычный iterator (ранее он был только обычным итератором).

Бег по нитям

coroutine asyncio.to_thread(func, /, *args, **kwargs)

Асинхронный запуск функции func в отдельном потоке.

Любые *args и **kwargs, переданные для этой функции, напрямую передаются в func. Также передается текущее значение contextvars.Context, что позволяет обращаться к контекстным переменным из потока цикла событий в отдельном потоке.

Возвращает корутину, которую можно ожидать, чтобы получить конечный результат func.

Эта корутинная функция предназначена в первую очередь для выполнения функций/методов, связанных с IO, которые в противном случае заблокировали бы цикл событий, если бы выполнялись в главном потоке. Например:

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

# Expected output:
#
# started main at 19:50:53
# start blocking_io at 19:50:53
# blocking_io complete at 19:50:54
# finished main at 19:50:54

Прямой вызов blocking_io() в любой корутине заблокирует цикл событий на все его время, что приведет к дополнительной 1 секунде времени выполнения. Вместо этого, используя asyncio.to_thread(), мы можем запустить его в отдельном потоке, не блокируя цикл событий.

Примечание

Из-за наличия GIL, asyncio.to_thread() обычно можно использовать только для того, чтобы сделать функции, связанные с вводом-выводом, неблокируемыми. Однако для модулей расширения, выпускающих GIL, или альтернативных реализаций Python, не имеющих его, asyncio.to_thread() также может использоваться для функций, связанных с процессором.

Added in version 3.9.

Планирование из других потоков

asyncio.run_coroutine_threadsafe(coro, loop)

Послать корутину в заданный цикл событий. Потокобезопасно.

Верните значение concurrent.futures.Future, чтобы дождаться результата от другого потока ОС.

Эта функция должна вызываться из потока ОС, отличного от того, в котором выполняется цикл событий. Пример:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

Если в корутине возникнет исключение, возвращенное Future будет уведомлено об этом. Его также можно использовать для отмены задачи в цикле событий:

try:
    result = future.result(timeout)
except TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

См. раздел concurrency and multithreading в документации.

В отличие от других функций asyncio, эта функция требует явной передачи аргумента loop.

Added in version 3.5.1.

Интроспекция

asyncio.current_task(loop=None)

Возвращает текущий запущенный экземпляр Task или None, если задача не запущена.

Если loop равно None get_running_loop() используется для получения текущего цикла.

Added in version 3.7.

asyncio.all_tasks(loop=None)

Возвращает набор еще не завершенных Task объектов, запущенных циклом.

Если loop имеет значение None, то get_running_loop() используется для получения текущего цикла.

Added in version 3.7.

asyncio.iscoroutine(obj)

Возвращает True, если obj является объектом coroutine.

Added in version 3.4.

Объект задачи

class asyncio.Task(coro, *, loop=None, name=None, context=None, eager_start=False)

Объект Future-like, который запускает программу Python coroutine. Не является потокобезопасным.

Задачи используются для запуска программ в циклах событий. Если программа ожидает Future, задача приостанавливает выполнение программы и ожидает завершения Future. Когда Future закончено, выполнение обернутой программы возобновляется.

В циклах событий используется кооперативное планирование: цикл событий выполняет одну задачу за раз. Пока задача ожидает завершения Future, цикл событий запускает другие задачи, обратные вызовы или выполняет операции ввода-вывода.

Для создания Задач используйте высокоуровневую функцию asyncio.create_task() или низкоуровневые функции loop.create_task() или ensure_future(). Не рекомендуется создавать Задачи вручную.

Для отмены запущенной задачи используйте метод cancel(). Его вызов приведет к тому, что задача выбросит исключение CancelledError в обернутую корутину. Если во время отмены корутина ожидает объект Future, объект Future будет отменен.

cancelled() можно использовать для проверки того, была ли отменена задача. Метод возвращает True, если обернутая корутина не подавила исключение CancelledError и была действительно отменена.

asyncio.Task наследует от Future все его API, кроме Future.set_result() и Future.set_exception().

Необязательный аргумент context, содержащий только ключевое слово, позволяет указать пользовательский contextvars.Context для запуска coro. Если context не указан, задача копирует текущий контекст и впоследствии запускает свою coroutine в скопированном контексте.

Необязательный аргумент eager_start, не имеющий ключевого слова, позволяет начать выполнение asyncio.Task в момент создания задачи. Если установлено значение True и запущен цикл событий, задача начнет выполнять корутину немедленно, до первого блокирования корутины. Если coroutine возвращается или поднимается без блокировки, задача будет завершена с нетерпением и пропустит планирование цикла событий.

Изменено в версии 3.7: Добавлена поддержка модуля contextvars.

Изменено в версии 3.8: Добавлен параметр name.

Не рекомендуется, начиная с версии 3.10: Предупреждение об устаревании выдается, если не указан loop и нет запущенного цикла событий.

Изменено в версии 3.11: Добавлен параметр context.

Изменено в версии 3.12: Добавлен параметр eager_start.

done()

Возвращает True, если задача выполнена.

Задача закончена, когда обернутая coroutine либо вернула значение, либо вызвала исключение, либо задача была отменена.

result()

Возвращает результат выполнения задачи.

Если задача выполнена, возвращается результат обернутой корутины (или, если корутина вызвала исключение, это исключение будет вызвано повторно).

Если задача была отменена, этот метод вызывает исключение CancelledError.

Если результат задачи еще не доступен, этот метод вызывает исключение InvalidStateError.

exception()

Возвращает исключение из задачи.

Если обернутая программа вызвала исключение, то это исключение возвращается. Если обернутая корутина вернулась нормально, этот метод возвращает None.

Если задача была отменена, этот метод вызывает исключение CancelledError.

Если задача еще не выполнена, этот метод вызывает исключение InvalidStateError.

add_done_callback(callback, *, context=None)

Добавьте обратный вызов, который будет выполняться, когда задача будет выполнена.

Этот метод следует использовать только в низкоуровневом коде, основанном на обратных вызовах.

Более подробную информацию см. в документации к Future.add_done_callback().

remove_done_callback(callback)

Удалите callback из списка обратных вызовов.

Этот метод следует использовать только в низкоуровневом коде, основанном на обратных вызовах.

Более подробную информацию см. в документации к Future.remove_done_callback().

get_stack(*, limit=None)

Возвращает список кадров стека для этой задачи.

Если обернутая программа не завершена, возвращается стек, в котором она приостановлена. Если программа завершилась успешно или была отменена, возвращается пустой список. Если работа программы была завершена в результате исключения, возвращается список кадров обратного трассирования.

Кадры всегда располагаются в порядке от самых старых к самым новым.

Для приостановленной корутины возвращается только один кадр стека.

Необязательный аргумент limit задает максимальное количество возвращаемых кадров; по умолчанию возвращаются все доступные кадры. Порядок возвращаемого списка отличается в зависимости от того, возвращается ли стек или трассировка: самые новые кадры стека возвращаются, а самые старые кадры трассировки возвращаются. (Это соответствует поведению модуля traceback).

print_stack(*, limit=None, file=None)

Выведите стек или обратную трассировку для этой задачи.

Это дает вывод, аналогичный выводу модуля traceback для кадров, полученных по get_stack().

Аргумент limit передается в get_stack() напрямую.

Аргумент file - это поток ввода-вывода, в который записывается вывод; по умолчанию вывод записывается в sys.stdout.

get_coro()

Возвращает объект coroutine, обернутый Task.

Примечание

Это вернет None для задач, которые уже завершились с нетерпением. Смотрите Eager Task Factory.

Added in version 3.8.

Изменено в версии 3.12: Вновь добавленное ускоренное выполнение задачи означает, что результат может быть None.

get_context()

Возвращает объект contextvars.Context, связанный с задачей.

Added in version 3.12.

get_name()

Возвращает имя задачи.

Если имя не было явно присвоено задаче, стандартная реализация Asyncio Task генерирует имя по умолчанию во время инстанцирования.

Added in version 3.8.

set_name(value)

Задайте имя задачи.

Аргументом значение может быть любой объект, который затем преобразуется в строку.

В стандартной реализации Task имя будет видно в выводе repr() объекта задачи.

Added in version 3.8.

cancel(msg=None)

Запрос на отмену задания.

Таким образом, исключение CancelledError будет брошено в обернутую корутину на следующем цикле цикла событий.

Затем у корутины есть шанс очистить или даже отклонить запрос, подавив исключение с помощью try … … … except CancelledErrorfinally блоком. Поэтому, в отличие от Future.cancel(), Task.cancel() не гарантирует, что задача будет отменена, хотя полное подавление отмены не является обычным делом и активно не рекомендуется. Если корутина все же решит подавить отмену, ей необходимо вызвать Task.uncancel() в дополнение к перехвату исключения.

Изменено в версии 3.9: Добавлен параметр msg.

Изменено в версии 3.11: Параметр msg передается от отмененной задачи к ее ожидателю.

Следующий пример иллюстрирует, как корутины могут перехватывать запрос на отмену:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled()

Возвращает True, если задача отменена.

Задача отменена, если отмена была запрошена с помощью cancel() и обернутая корутина распространила брошенное в нее исключение CancelledError.

uncancel()

Уменьшите количество запросов на отмену этой задачи.

Возвращает оставшееся количество запросов на отмену.

Обратите внимание, что после завершения выполнения отмененной задачи дальнейшие обращения к uncancel() неэффективны.

Added in version 3.11.

Этот метод используется внутренними компонентами Asyncio и не предполагается, что он будет использоваться в коде конечного пользователя. В частности, если задача успешно отменяется, это позволяет элементам структурированного параллелизма, таким как Целевые группы и asyncio.timeout(), продолжать выполнение, изолируя отмену от соответствующего структурированного блока. Например:

async def make_request_with_timeout():
    try:
        async with asyncio.timeout(1):
            # Structured block affected by the timeout:
            await make_request()
            await make_another_request()
    except TimeoutError:
        log("There was a timeout")
    # Outer code not affected by the timeout:
    await unrelated_code()

В то время как блок с make_request() и make_another_request() может быть отменен из-за таймаута, unrelated_code() должен продолжить выполнение даже в случае таймаута. Это реализовано с помощью uncancel(). Контекстные менеджеры TaskGroup аналогичным образом используют uncancel().

Если код конечного пользователя по какой-то причине подавляет отмену, перехватывая CancelledError, ему необходимо вызвать этот метод, чтобы удалить состояние отмены.

Когда этот метод декрементирует счетчик отмен до нуля, он проверяет, не был ли предыдущий вызов cancel() организован для выброса CancelledError в задачу. Если она еще не была брошена, то эта договоренность будет отменена (путем сброса внутреннего флага _must_cancel).

Изменено в версии 3.13: Изменено для отмены запросов на отмену при достижении нуля.

cancelling()

Возвращает количество ожидающих отмены запросов к этой задаче, т.е. количество обращений к cancel() за вычетом количества обращений к uncancel().

Обратите внимание, что если это число больше нуля, но задача продолжает выполняться, cancelled() все равно вернет False. Это связано с тем, что это число может быть уменьшено вызовом uncancel(), что может привести к тому, что задача не будет отменена, если количество запросов на отмену уменьшится до нуля.

Этот метод используется внутренними компонентами asyncio, и не предполагается, что он будет использоваться кодом конечного пользователя. Более подробную информацию см. в разделе uncancel().

Added in version 3.11.