queue — Класс синхронизированной очереди

Источник: Lib/queue.py


Модуль queue реализует очереди с несколькими производителями и потребителями. Он особенно полезен в многопоточном программировании, когда необходимо безопасно обмениваться информацией между несколькими потоками. Класс Queue в этом модуле реализует всю необходимую семантику блокировки.

Модуль реализует три типа очередей, которые отличаются только порядком получения записей. В очереди FIFO первыми извлекаются первые добавленные задачи. В очереди LIFO первой извлекается самая последняя добавленная запись (работает как стек). В приоритетной очереди записи сортируются (с помощью модуля heapq), и первой извлекается запись с наименьшим значением.

Внутри этих трех типов очередей используются блокировки для временного блокирования конкурирующих потоков; однако они не предназначены для обработки реентерабельности внутри потока.

Кроме того, в модуле реализован «простой» тип очереди FIFO, SimpleQueue, специфическая реализация которого обеспечивает дополнительные гарантии в обмен на меньшую функциональность.

Модуль queue определяет следующие классы и исключения:

class queue.Queue(maxsize=0)

Конструктор для очереди FIFO. maxsize - целое число, задающее верхнее ограничение на количество элементов, которые могут быть помещены в очередь. Вставка будет блокироваться по достижении этого размера, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.

class queue.LifoQueue(maxsize=0)

Конструктор для очереди LIFO. maxsize - целое число, задающее верхнее ограничение на количество элементов, которые могут быть помещены в очередь. Вставка будет блокироваться по достижении этого размера, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.

class queue.PriorityQueue(maxsize=0)

Конструктор для приоритетной очереди. maxsize - целое число, задающее верхнее ограничение на количество элементов, которые могут быть помещены в очередь. Вставка будет блокироваться по достижении этого размера, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.

Сначала извлекаются записи с наименьшим значением (запись с наименьшим значением - это та, которую вернет min(entries)). Типичным шаблоном для записей является кортеж в виде: (priority_number, data).

Если элементы данных не сопоставимы, данные можно обернуть в класс, который игнорирует элемент данных и сравнивает только номер приоритета:

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue

Конструктор для неограниченной FIFO очереди. Простым очередям не хватает расширенной функциональности, такой как отслеживание задач.

Added in version 3.7.

exception queue.Empty

Исключение, возникающее при вызове неблокирующего get() (или get_nowait()) на пустом объекте Queue.

exception queue.Full

Исключение, возникающее при вызове неблокирующего put() (или put_nowait()) на заполненном объекте Queue.

exception queue.ShutDown

Исключение, возникающее при вызове put() или get() на объекте Queue, который был закрыт.

Added in version 3.13.

Объекты очереди

Объекты очереди (Queue, LifoQueue или PriorityQueue) предоставляют публичные методы, описанные ниже.

Queue.qsize()

Возвращает приблизительный размер очереди. Обратите внимание, что qsize() > 0 не гарантирует, что последующая get() не заблокируется, равно как и qsize() < maxsize не гарантирует, что put() не заблокируется.

Queue.empty()

Возвращает True, если очередь пуста, False в противном случае. Если empty() возвращает True, это не гарантирует, что последующий вызов put() не будет заблокирован. Аналогично, если empty() возвращает False, это не гарантирует, что последующий вызов get() не будет заблокирован.

Queue.full()

Возвращает True, если очередь заполнена, False в противном случае. Если full() возвращает True, это не гарантирует, что последующий вызов get() не будет заблокирован. Аналогично, если full() возвращает False, это не гарантирует, что последующий вызов put() не будет заблокирован.

Queue.put(item, block=True, timeout=None)

Помещает элемент в очередь. Если опциональный args block равен true, а timeout равен None (по умолчанию), то при необходимости блокируется до тех пор, пока не появится свободный слот. Если timeout - положительное число, блокировка длится не более timeout секунд и вызывает исключение Full, если в течение этого времени свободный слот не был доступен. В противном случае (block равно false), если свободный слот доступен немедленно, поместите элемент в очередь, иначе вызовите исключение Full (timeout в этом случае игнорируется).

Вызывает ShutDown, если очередь была закрыта.

Queue.put_nowait(item)

Эквивалент put(item, block=False).

Queue.get(block=True, timeout=None)

Удаляет и возвращает элемент из очереди. Если опциональный args block равен true и timeout равен None (по умолчанию), то при необходимости блокируется до тех пор, пока элемент не будет доступен. Если timeout - положительное число, блокировка длится не более timeout секунд и вызывает исключение Empty, если в течение этого времени элемент не был доступен. В противном случае (block равно false), возвращает элемент, если он доступен немедленно, иначе вызывает исключение Empty (timeout в этом случае игнорируется).

До версии 3.0 на POSIX-системах и во всех версиях на Windows, если block равен true и timeout равен None, эта операция переходит в режим непрерывного ожидания на базовой блокировке. Это означает, что не может произойти никаких исключений, и, в частности, SIGINT не вызовет KeyboardInterrupt.

Вызывает ShutDown, если очередь была закрыта и пуста, или если очередь была закрыта немедленно.

Queue.get_nowait()

Эквивалент get(False).

Для отслеживания того, полностью ли обработаны поставленные в очередь задачи потребительскими потоками демона, предлагаются два метода.

Queue.task_done()

Указывает, что ранее поставленная в очередь задача завершена. Используется потоками-потребителями очереди. Для каждого get(), использованного для получения задачи, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.

Если в данный момент join() блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызов task_done() был получен для каждого элемента, который был put() в очереди).

shutdown(immediate=True) вызывает task_done() для каждого оставшегося элемента в очереди.

Вызывает ошибку ValueError, если вызывается большее количество раз, чем было помещено элементов в очередь.

Queue.join()

Блокируется до тех пор, пока все элементы в очереди не будут получены и обработаны.

Счетчик незавершенных задач увеличивается каждый раз, когда в очередь добавляется элемент. Счетчик уменьшается всякий раз, когда поток-потребитель вызывает task_done(), чтобы сообщить, что элемент был получен и вся работа над ним завершена. Когда счетчик незавершенных задач падает до нуля, join() разблокируется.

Пример ожидания завершения поставленных в очередь задач:

import threading
import queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()

# Send thirty task requests to the worker.
for item in range(30):
    q.put(item)

# Block until all tasks are done.
q.join()
print('All work completed')

Прерывание очередей

Объекты Queue можно сделать так, чтобы предотвратить дальнейшее взаимодействие, закрыв их.

Queue.shutdown(immediate=False)

Отключите очередь, заставив get() и put() поднять ShutDown.

По умолчанию get() в закрытой очереди будет подниматься только после того, как очередь опустеет. Установите immediate в true, чтобы get() поднимался немедленно.

Все заблокированные абоненты put() и get() будут разблокированы. Если значение immediate равно true, задача будет помечена как выполненная для каждого оставшегося элемента в очереди, что может разблокировать абонентов join().

Added in version 3.13.

Объекты SimpleQueue

Объекты SimpleQueue предоставляют публичные методы, описанные ниже.

SimpleQueue.qsize()

Возвращает приблизительный размер очереди. Обратите внимание, что qsize() > 0 не гарантирует, что последующая get() не будет блокирована.

SimpleQueue.empty()

Возвращает True, если очередь пуста, False в противном случае. Если empty() возвращает False, это не гарантирует, что последующий вызов get() не будет заблокирован.

SimpleQueue.put(item, block=True, timeout=None)

Поместите элемент в очередь. Метод никогда не блокируется и всегда завершается успешно (за исключением возможных низкоуровневых ошибок, таких как невозможность выделения памяти). Необязательные аргументы block и timeout игнорируются и предоставляются только для совместимости с Queue.put().

Детали реализации CPython: Реализация этого метода на языке C является реентерабельной. То есть вызов put() или get() может быть прерван другим вызовом put() в том же потоке без тупика или повреждения внутреннего состояния очереди. Это делает ее подходящей для использования в деструкторах, таких как методы __del__ или обратные вызовы weakref.

SimpleQueue.put_nowait(item)

Эквивалент put(item, block=False), предоставлен для совместимости с Queue.put_nowait().

SimpleQueue.get(block=True, timeout=None)

Удаляет и возвращает элемент из очереди. Если опциональный args block равен true и timeout равен None (по умолчанию), то при необходимости блокируется до тех пор, пока элемент не будет доступен. Если timeout - положительное число, блокировка длится не более timeout секунд и вызывает исключение Empty, если в течение этого времени элемент не был доступен. В противном случае (block равно false), возвращает элемент, если он доступен немедленно, иначе вызывает исключение Empty (timeout в этом случае игнорируется).

SimpleQueue.get_nowait()

Эквивалент get(False).

См.также

Класс multiprocessing.Queue

Класс очереди для использования в многопроцессорном (а не многопоточном) контексте.

collections.deque - альтернативная реализация неограниченных очередей с быстрыми атомарными операциями append() и popleft(), которые не требуют блокировки и поддерживают индексацию.