Очереди¶
Источник: Lib/asyncio/queues.py
Очереди asyncio спроектированы так, чтобы быть похожими на классы модуля queue
. Хотя очереди asyncio не являются потокобезопасными, они предназначены для использования именно в коде async/await.
Обратите внимание, что методы очередей asyncio не имеют параметра timeout; используйте функцию asyncio.wait_for()
, чтобы выполнять операции с очередью с таймаутом.
См. также раздел Examples ниже.
Очередь¶
- class asyncio.Queue(maxsize=0)¶
Очередь «первый вошел - первый вышел» (FIFO).
Если maxsize меньше или равно нулю, то размер очереди бесконечен. Если это целое число больше
0
, тоawait put()
блокирует очередь, когда она достигает maxsize, пока элемент не будет удаленget()
.В отличие от стандартной потоковой библиотеки
queue
, размер очереди всегда известен и может быть возвращен вызовом методаqsize()
.Изменено в версии 3.10: Удален параметр loop.
Этот класс - not thread safe.
- maxsize¶
Количество элементов, допустимых в очереди.
- empty()¶
Возвращает
True
, если очередь пуста,False
в противном случае.
- full()¶
Возвращает
True
, если в очереди естьmaxsize
элементов.Если очередь была инициализирована значением
maxsize=0
(по умолчанию), тоfull()
никогда не возвращаетTrue
.
- coroutine get()¶
Удалите и верните элемент из очереди. Если очередь пуста, подождите, пока элемент не будет доступен.
Вызывает
QueueShutDown
, если очередь была закрыта и пуста, или если очередь была закрыта немедленно.
- get_nowait()¶
Верните элемент, если он немедленно доступен, иначе поднимите
QueueEmpty
.
- coroutine join()¶
Блокируйте до тех пор, пока все элементы в очереди не будут получены и обработаны.
Счетчик незавершенных задач увеличивается всякий раз, когда в очередь добавляется элемент. Счет уменьшается всякий раз, когда потребительская корутина вызывает
task_done()
, чтобы указать, что элемент был получен и вся работа над ним завершена. Когда счетчик незавершенных задач падает до нуля,join()
разблокируется.
- coroutine put(item)¶
Поместите элемент в очередь. Если очередь переполнена, подождите, пока освободится свободный слот, прежде чем добавлять элемент.
Вызывает
QueueShutDown
, если очередь была закрыта.
- put_nowait(item)¶
Поместите элемент в очередь без блокировки.
Если свободный слот не доступен, поднимите
QueueFull
.
- qsize()¶
Возвращает количество элементов в очереди.
- shutdown(immediate=False)¶
Отключите очередь, заставив
get()
иput()
поднятьQueueShutDown
.По умолчанию
get()
в закрытой очереди будет подниматься только после того, как очередь опустеет. Установите immediate в true, чтобыget()
поднимался немедленно.Все заблокированные абоненты
put()
иget()
будут разблокированы. Если значение immediate равно true, задача будет помечена как выполненная для каждого оставшегося элемента в очереди, что может разблокировать абонентовjoin()
.Added in version 3.13.
- task_done()¶
Указывает, что ранее поставленная задача завершена.
Используется потребителями очереди. Для каждого
get()
, использованного для получения задачи, последующий вызовtask_done()
сообщает очереди, что обработка задачи завершена.Если в данный момент
join()
блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызовtask_done()
был получен для каждого элемента, который былput()
в очереди).shutdown(immediate=True)
вызываетtask_done()
для каждого оставшегося элемента в очереди.Вызывает
ValueError
, если вызывается большее количество раз, чем было помещено элементов в очередь.
Приоритетная очередь¶
Очередь LIFO¶
Исключения¶
- exception asyncio.QueueEmpty¶
Это исключение возникает, когда метод
get_nowait()
вызывается на пустой очереди.
- exception asyncio.QueueFull¶
Исключение, возникающее при вызове метода
put_nowait()
в очереди, которая достигла своего максимального размера.
Примеры¶
Очереди можно использовать для распределения нагрузки между несколькими параллельными задачами:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())