queue
— Класс синхронизированной очереди¶
Источник: Lib/queue.py
Модуль queue
реализует очереди с несколькими производителями и потребителями. Он особенно полезен в многопоточном программировании, когда необходимо безопасно обмениваться информацией между несколькими потоками. Класс Queue
в этом модуле реализует всю необходимую семантику блокировки.
Модуль реализует три типа очередей, которые отличаются только порядком получения записей. В очереди FIFO первыми извлекаются первые добавленные задачи. В очереди LIFO первой извлекается самая последняя добавленная запись (работает как стек). В приоритетной очереди записи сортируются (с помощью модуля heapq
), и первой извлекается запись с наименьшим значением.
Внутри этих трех типов очередей используются блокировки для временного блокирования конкурирующих потоков; однако они не предназначены для обработки реентерабельности внутри потока.
Кроме того, в модуле реализован «простой» тип очереди FIFO, SimpleQueue
, специфическая реализация которого обеспечивает дополнительные гарантии в обмен на меньшую функциональность.
Модуль queue
определяет следующие классы и исключения:
- class queue.Queue(maxsize=0)¶
Конструктор для очереди FIFO. maxsize - целое число, задающее верхнее ограничение на количество элементов, которые могут быть помещены в очередь. Вставка будет блокироваться по достижении этого размера, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.
- class queue.LifoQueue(maxsize=0)¶
Конструктор для очереди LIFO. maxsize - целое число, задающее верхнее ограничение на количество элементов, которые могут быть помещены в очередь. Вставка будет блокироваться по достижении этого размера, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.
- class queue.PriorityQueue(maxsize=0)¶
Конструктор для приоритетной очереди. maxsize - целое число, задающее верхнее ограничение на количество элементов, которые могут быть помещены в очередь. Вставка будет блокироваться по достижении этого размера, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.
Сначала извлекаются записи с наименьшим значением (запись с наименьшим значением - это та, которую вернет
min(entries)
). Типичным шаблоном для записей является кортеж в виде:(priority_number, data)
.Если элементы данных не сопоставимы, данные можно обернуть в класс, который игнорирует элемент данных и сравнивает только номер приоритета:
from dataclasses import dataclass, field from typing import Any @dataclass(order=True) class PrioritizedItem: priority: int item: Any=field(compare=False)
- class queue.SimpleQueue¶
Конструктор для неограниченной FIFO очереди. Простым очередям не хватает расширенной функциональности, такой как отслеживание задач.
Added in version 3.7.
- exception queue.Empty¶
Исключение, возникающее при вызове неблокирующего
get()
(илиget_nowait()
) на пустом объектеQueue
.
- exception queue.Full¶
Исключение, возникающее при вызове неблокирующего
put()
(илиput_nowait()
) на заполненном объектеQueue
.
- exception queue.ShutDown¶
Исключение, возникающее при вызове
put()
илиget()
на объектеQueue
, который был закрыт.Added in version 3.13.
Объекты очереди¶
Объекты очереди (Queue
, LifoQueue
или PriorityQueue
) предоставляют публичные методы, описанные ниже.
- Queue.qsize()¶
Возвращает приблизительный размер очереди. Обратите внимание, что qsize() > 0 не гарантирует, что последующая get() не заблокируется, равно как и qsize() < maxsize не гарантирует, что put() не заблокируется.
- Queue.empty()¶
Возвращает
True
, если очередь пуста,False
в противном случае. Если empty() возвращаетTrue
, это не гарантирует, что последующий вызов put() не будет заблокирован. Аналогично, если empty() возвращаетFalse
, это не гарантирует, что последующий вызов get() не будет заблокирован.
- Queue.full()¶
Возвращает
True
, если очередь заполнена,False
в противном случае. Если full() возвращаетTrue
, это не гарантирует, что последующий вызов get() не будет заблокирован. Аналогично, если full() возвращаетFalse
, это не гарантирует, что последующий вызов put() не будет заблокирован.
- Queue.put(item, block=True, timeout=None)¶
Помещает элемент в очередь. Если опциональный args block равен true, а timeout равен
None
(по умолчанию), то при необходимости блокируется до тех пор, пока не появится свободный слот. Если timeout - положительное число, блокировка длится не более timeout секунд и вызывает исключениеFull
, если в течение этого времени свободный слот не был доступен. В противном случае (block равно false), если свободный слот доступен немедленно, поместите элемент в очередь, иначе вызовите исключениеFull
(timeout в этом случае игнорируется).Вызывает
ShutDown
, если очередь была закрыта.
- Queue.put_nowait(item)¶
Эквивалент
put(item, block=False)
.
- Queue.get(block=True, timeout=None)¶
Удаляет и возвращает элемент из очереди. Если опциональный args block равен true и timeout равен
None
(по умолчанию), то при необходимости блокируется до тех пор, пока элемент не будет доступен. Если timeout - положительное число, блокировка длится не более timeout секунд и вызывает исключениеEmpty
, если в течение этого времени элемент не был доступен. В противном случае (block равно false), возвращает элемент, если он доступен немедленно, иначе вызывает исключениеEmpty
(timeout в этом случае игнорируется).До версии 3.0 на POSIX-системах и во всех версиях на Windows, если block равен true и timeout равен
None
, эта операция переходит в режим непрерывного ожидания на базовой блокировке. Это означает, что не может произойти никаких исключений, и, в частности, SIGINT не вызоветKeyboardInterrupt
.Вызывает
ShutDown
, если очередь была закрыта и пуста, или если очередь была закрыта немедленно.
- Queue.get_nowait()¶
Эквивалент
get(False)
.
Для отслеживания того, полностью ли обработаны поставленные в очередь задачи потребительскими потоками демона, предлагаются два метода.
- Queue.task_done()¶
Указывает, что ранее поставленная в очередь задача завершена. Используется потоками-потребителями очереди. Для каждого
get()
, использованного для получения задачи, последующий вызовtask_done()
сообщает очереди, что обработка задачи завершена.Если в данный момент
join()
блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызовtask_done()
был получен для каждого элемента, который былput()
в очереди).shutdown(immediate=True)
вызываетtask_done()
для каждого оставшегося элемента в очереди.Вызывает ошибку
ValueError
, если вызывается большее количество раз, чем было помещено элементов в очередь.
- Queue.join()¶
Блокируется до тех пор, пока все элементы в очереди не будут получены и обработаны.
Счетчик незавершенных задач увеличивается каждый раз, когда в очередь добавляется элемент. Счетчик уменьшается всякий раз, когда поток-потребитель вызывает
task_done()
, чтобы сообщить, что элемент был получен и вся работа над ним завершена. Когда счетчик незавершенных задач падает до нуля,join()
разблокируется.
Пример ожидания завершения поставленных в очередь задач:
import threading
import queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()
# Send thirty task requests to the worker.
for item in range(30):
q.put(item)
# Block until all tasks are done.
q.join()
print('All work completed')
Прерывание очередей¶
Объекты Queue
можно сделать так, чтобы предотвратить дальнейшее взаимодействие, закрыв их.
- Queue.shutdown(immediate=False)¶
Отключите очередь, заставив
get()
иput()
поднятьShutDown
.По умолчанию
get()
в закрытой очереди будет подниматься только после того, как очередь опустеет. Установите immediate в true, чтобыget()
поднимался немедленно.Все заблокированные абоненты
put()
иget()
будут разблокированы. Если значение immediate равно true, задача будет помечена как выполненная для каждого оставшегося элемента в очереди, что может разблокировать абонентовjoin()
.Added in version 3.13.
Объекты SimpleQueue¶
Объекты SimpleQueue
предоставляют публичные методы, описанные ниже.
- SimpleQueue.qsize()¶
Возвращает приблизительный размер очереди. Обратите внимание, что qsize() > 0 не гарантирует, что последующая get() не будет блокирована.
- SimpleQueue.empty()¶
Возвращает
True
, если очередь пуста,False
в противном случае. Если empty() возвращаетFalse
, это не гарантирует, что последующий вызов get() не будет заблокирован.
- SimpleQueue.put(item, block=True, timeout=None)¶
Поместите элемент в очередь. Метод никогда не блокируется и всегда завершается успешно (за исключением возможных низкоуровневых ошибок, таких как невозможность выделения памяти). Необязательные аргументы block и timeout игнорируются и предоставляются только для совместимости с
Queue.put()
.Детали реализации CPython: Реализация этого метода на языке C является реентерабельной. То есть вызов
put()
илиget()
может быть прерван другим вызовомput()
в том же потоке без тупика или повреждения внутреннего состояния очереди. Это делает ее подходящей для использования в деструкторах, таких как методы__del__
или обратные вызовыweakref
.
- SimpleQueue.put_nowait(item)¶
Эквивалент
put(item, block=False)
, предоставлен для совместимости сQueue.put_nowait()
.
- SimpleQueue.get(block=True, timeout=None)¶
Удаляет и возвращает элемент из очереди. Если опциональный args block равен true и timeout равен
None
(по умолчанию), то при необходимости блокируется до тех пор, пока элемент не будет доступен. Если timeout - положительное число, блокировка длится не более timeout секунд и вызывает исключениеEmpty
, если в течение этого времени элемент не был доступен. В противном случае (block равно false), возвращает элемент, если он доступен немедленно, иначе вызывает исключениеEmpty
(timeout в этом случае игнорируется).
- SimpleQueue.get_nowait()¶
Эквивалент
get(False)
.
См.также
- Класс
multiprocessing.Queue
Класс очереди для использования в многопроцессорном (а не многопоточном) контексте.
collections.deque
- альтернативная реализация неограниченных очередей с быстрыми атомарными операциями append()
и popleft()
, которые не требуют блокировки и поддерживают индексацию.