Очереди

Источник: Lib/asyncio/queues.py


Очереди asyncio спроектированы так, чтобы быть похожими на классы модуля queue. Хотя очереди asyncio не являются потокобезопасными, они предназначены для использования именно в коде async/await.

Обратите внимание, что методы очередей asyncio не имеют параметра timeout; используйте функцию asyncio.wait_for(), чтобы выполнять операции с очередью с таймаутом.

См. также раздел Examples ниже.

Очередь

class asyncio.Queue(maxsize=0)

Очередь «первый вошел - первый вышел» (FIFO).

Если maxsize меньше или равно нулю, то размер очереди бесконечен. Если это целое число больше 0, то await put() блокирует очередь, когда она достигает maxsize, пока элемент не будет удален get().

В отличие от стандартной потоковой библиотеки queue, размер очереди всегда известен и может быть возвращен вызовом метода qsize().

Изменено в версии 3.10: Удален параметр loop.

Этот класс - not thread safe.

maxsize

Количество элементов, допустимых в очереди.

empty()

Возвращает True, если очередь пуста, False в противном случае.

full()

Возвращает True, если в очереди есть maxsize элементов.

Если очередь была инициализирована значением maxsize=0 (по умолчанию), то full() никогда не возвращает True.

coroutine get()

Удалите и верните элемент из очереди. Если очередь пуста, подождите, пока элемент не будет доступен.

Вызывает QueueShutDown, если очередь была закрыта и пуста, или если очередь была закрыта немедленно.

get_nowait()

Верните элемент, если он немедленно доступен, иначе поднимите QueueEmpty.

coroutine join()

Блокируйте до тех пор, пока все элементы в очереди не будут получены и обработаны.

Счетчик незавершенных задач увеличивается всякий раз, когда в очередь добавляется элемент. Счет уменьшается всякий раз, когда потребительская корутина вызывает task_done(), чтобы указать, что элемент был получен и вся работа над ним завершена. Когда счетчик незавершенных задач падает до нуля, join() разблокируется.

coroutine put(item)

Поместите элемент в очередь. Если очередь переполнена, подождите, пока освободится свободный слот, прежде чем добавлять элемент.

Вызывает QueueShutDown, если очередь была закрыта.

put_nowait(item)

Поместите элемент в очередь без блокировки.

Если свободный слот не доступен, поднимите QueueFull.

qsize()

Возвращает количество элементов в очереди.

shutdown(immediate=False)

Отключите очередь, заставив get() и put() поднять QueueShutDown.

По умолчанию get() в закрытой очереди будет подниматься только после того, как очередь опустеет. Установите immediate в true, чтобы get() поднимался немедленно.

Все заблокированные абоненты put() и get() будут разблокированы. Если значение immediate равно true, задача будет помечена как выполненная для каждого оставшегося элемента в очереди, что может разблокировать абонентов join().

Added in version 3.13.

task_done()

Указывает, что ранее поставленная задача завершена.

Используется потребителями очереди. Для каждого get(), использованного для получения задачи, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.

Если в данный момент join() блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызов task_done() был получен для каждого элемента, который был put() в очереди).

shutdown(immediate=True) вызывает task_done() для каждого оставшегося элемента в очереди.

Вызывает ValueError, если вызывается большее количество раз, чем было помещено элементов в очередь.

Приоритетная очередь

class asyncio.PriorityQueue

Вариант Queue; извлекает записи в порядке приоритета (сначала наименьшие).

Записи обычно представляют собой кортежи вида (priority_number, data).

Очередь LIFO

class asyncio.LifoQueue

Вариант Queue, в котором сначала извлекаются последние добавленные записи (последние входящие, первые исходящие).

Исключения

exception asyncio.QueueEmpty

Это исключение возникает, когда метод get_nowait() вызывается на пустой очереди.

exception asyncio.QueueFull

Исключение, возникающее при вызове метода put_nowait() в очереди, которая достигла своего максимального размера.

exception asyncio.QueueShutDown

Исключение, возникающее при вызове put() или get() в очереди, которая была закрыта.

Added in version 3.13.

Примеры

Очереди можно использовать для распределения нагрузки между несколькими параллельными задачами:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())