threading — Параллелизм на основе потоков

Источник: Lib/threading.py


Этот модуль строит высокоуровневые потоковые интерфейсы поверх низкоуровневого модуля _thread.

Изменено в версии 3.7: Раньше этот модуль был необязательным, теперь он доступен всегда.

См.также

concurrent.futures.ThreadPoolExecutor предлагает интерфейс более высокого уровня для передачи задач в фоновый поток, не блокируя выполнение вызывающего потока и сохраняя возможность получения результатов при необходимости.

queue предоставляет потокобезопасный интерфейс для обмена данными между запущенными потоками.

asyncio предлагает альтернативный подход к достижению параллелизма на уровне задач, не требующий использования нескольких потоков операционной системы.

Примечание

В серии Python 2.x этот модуль содержал имена camelCase для некоторых методов и функций. Начиная с Python 3.10 они устарели, но по-прежнему поддерживаются для совместимости с Python 2.5 и ниже.

Детали реализации CPython: В CPython, благодаря использованию Global Interpreter Lock, только один поток может выполнять код Python одновременно (хотя некоторые библиотеки, ориентированные на производительность, могут преодолеть это ограничение). Если вы хотите, чтобы ваше приложение лучше использовало вычислительные ресурсы многоядерных машин, рекомендуется использовать multiprocessing или concurrent.futures.ProcessPoolExecutor. Однако многопоточность все еще является подходящей моделью, если вы хотите одновременно выполнять несколько задач, связанных с вводом-выводом.

Availability: не WASI.

Этот модуль не работает или недоступен на WebAssembly. Дополнительную информацию см. в разделе Платформы WebAssembly.

Этот модуль определяет следующие функции:

threading.active_count()

Возвращает количество объектов Thread, существующих в данный момент. Возвращаемое число равно длине списка, возвращаемого командой enumerate().

Функция activeCount является устаревшим псевдонимом для этой функции.

threading.current_thread()

Возвращает текущий объект Thread, соответствующий потоку управления вызывающей стороны. Если поток управления вызывающей стороны не был создан через модуль threading, возвращается фиктивный объект потока с ограниченной функциональностью.

Функция currentThread является устаревшим псевдонимом для этой функции.

threading.excepthook(args, /)

Обработка не пойманного исключения, поднятого Thread.run().

Аргумент args имеет следующие атрибуты:

  • exc_type: Тип исключения.

  • exc_value: Значение исключения, может быть None.

  • exc_traceback: Отслеживание исключений, может быть None.

  • thread: Поток, вызвавший исключение, может быть None.

Если exc_type равен SystemExit, то исключение молча игнорируется. В противном случае исключение выводится на sys.stderr.

Если эта функция вызывает исключение, то для его обработки вызывается sys.excepthook().

threading.excepthook() можно переопределить, чтобы контролировать обработку не пойманных исключений, поднятых Thread.run().

Хранение exc_value с помощью пользовательского хука может создать цикл ссылок. Его следует явно очистить, чтобы разорвать цикл ссылок, когда исключение больше не нужно.

Хранение thread с помощью пользовательского хука может воскресить его, если он установлен на объект, который завершается. Избегайте хранения thread после завершения работы пользовательского хука, чтобы избежать воскрешения объектов.

См.также

sys.excepthook() обрабатывает не пойманные исключения.

Added in version 3.8.

threading.__excepthook__

Хранит исходное значение threading.excepthook(). Сохраняется для того, чтобы можно было восстановить исходное значение в случае, если они будут заменены сломанными или другими объектами.

Added in version 3.10.

threading.get_ident()

Возвращает «идентификатор потока» текущего потока. Это ненулевое целое число. Его значение не имеет прямого смысла; оно предназначено в качестве магического cookie, которое можно использовать, например, для индексации словаря данных, специфичных для потока. Идентификаторы потоков могут быть повторно использованы при выходе из потока и создании другого потока.

Added in version 3.3.

threading.get_native_id()

Возвращает собственный интегральный Thread ID текущего потока, назначенный ядром. Это неотрицательное целое число. Его значение может использоваться для уникальной идентификации данного конкретного потока в масштабах всей системы (до тех пор, пока поток не завершится, после чего значение может быть утилизировано ОС).

Availability: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX, DragonFlyBSD, GNU/kFreeBSD.

Added in version 3.8.

Изменено в версии 3.13: Добавлена поддержка GNU/kFreeBSD.

threading.enumerate()

Возвращает список всех объектов Thread, активных в данный момент. Список включает демонические потоки и объекты фиктивных потоков, созданные current_thread(). Он исключает завершенные потоки и потоки, которые еще не были запущены. Однако главный поток всегда входит в результат, даже если он завершен.

threading.main_thread()

Возвращает объект main Thread. В обычных условиях главный поток - это поток, из которого был запущен интерпретатор Python.

Added in version 3.4.

threading.settrace(func)

Задайте функцию трассировки для всех потоков, запущенных из модуля threading. Функция func будет передана в sys.settrace() для каждого потока перед вызовом его метода run().

threading.settrace_all_threads(func)

Установите функцию трассировки для всех потоков, запущенных из модуля threading, и всех потоков Python, которые выполняются в данный момент.

Этот func будет передан в sys.settrace() для каждого потока, прежде чем будет вызван его метод run().

Added in version 3.12.

threading.gettrace()

Получите функцию трассировки, заданную settrace().

Added in version 3.10.

threading.setprofile(func)

Задайте функцию профиля для всех потоков, запускаемых из модуля threading. Функция func будет передана в sys.setprofile() для каждого потока, прежде чем будет вызван его метод run().

threading.setprofile_all_threads(func)

Установите функцию профиля для всех потоков, запущенных из модуля threading, и всех потоков Python, которые выполняются в данный момент.

Этот func будет передан в sys.setprofile() для каждого потока, прежде чем будет вызван его метод run().

Added in version 3.12.

threading.getprofile()

Получение функции профилировщика, заданной setprofile().

Added in version 3.10.

threading.stack_size([size])

Возвращает размер стека потоков, используемый при создании новых потоков. Необязательный аргумент size задает размер стека, который будет использоваться для последующих созданных потоков, и должен быть равен 0 (использование платформы или конфигурации по умолчанию) или целому положительному значению не менее 32 768 (32 килобайта). Если size не указан, используется 0. Если изменение размера стека потоков не поддерживается, возникает ошибка RuntimeError. Если указанный размер стека недопустим, выдается сообщение ValueError, и размер стека не изменяется. В настоящее время минимальный поддерживаемый размер стека составляет 32 килобайта, чтобы гарантировать достаточное пространство стека для самого интерпретатора. Обратите внимание, что некоторые платформы могут иметь особые ограничения на значения размера стека, например, требовать минимальный размер стека > 32 килобайт или требовать выделения кратного размера страницы системной памяти - для получения дополнительной информации следует обратиться к документации платформы (страницы размером 4 килобайта являются обычными; при отсутствии более конкретной информации рекомендуется использовать для размера стека кратное 4096).

Availability: Windows, pthreads.

Платформы Unix с поддержкой потоков POSIX.

В этом модуле также определена следующая константа:

threading.TIMEOUT_MAX

Максимальное значение, допустимое для параметра timeout блокирующих функций (Lock.acquire(), RLock.acquire(), Condition.wait() и т. д.). Указание таймаута, превышающего это значение, приведет к возникновению ошибки OverflowError.

Added in version 3.2.

Этот модуль определяет ряд классов, которые подробно описаны в следующих разделах.

Дизайн этого модуля в значительной степени основан на потоковой модели Java. Однако если в Java блокировки и переменные состояния являются базовым поведением каждого объекта, то в Python это отдельные объекты. Класс Thread в Python поддерживает подмножество поведения класса Thread в Java; в настоящее время нет приоритетов, нет групп потоков, и потоки не могут быть уничтожены, остановлены, приостановлены, возобновлены или прерваны. Статические методы класса Thread из Java, когда они реализованы, отображаются на функции уровня модуля.

Все описанные ниже методы выполняются атомарно.

Локальные данные

Локальные данные - это данные, значения которых зависят от конкретного потока. Чтобы управлять потоково-локальными данными, достаточно создать экземпляр local (или подкласс) и хранить на нем атрибуты:

mydata = threading.local()
mydata.x = 1

Значения экземпляра будут разными для разных потоков.

class threading.local

Класс, представляющий локальные данные потока.

Более подробную информацию и обширные примеры можно найти в строке документации модуля _threading_local: Lib/_threading_local.py.

Нитяные объекты

Класс Thread представляет деятельность, которая выполняется в отдельном потоке управления. Есть два способа указать деятельность: передать конструктору вызываемый объект или переопределить метод run() в подклассе. Никакие другие методы (кроме конструктора) не должны переопределяться в подклассе. Другими словами, переопределяйте только методы __init__() и run() этого класса.

После создания объекта потока его деятельность должна быть начата вызовом метода start() потока. Это вызовет метод run() в отдельном потоке управления.

Как только деятельность потока начата, он считается «живым». Он перестает быть «живым», когда его метод run() завершается - либо нормально, либо путем поднятия необработанного исключения. Метод is_alive() проверяет, жив ли поток.

Другие потоки могут вызывать метод join() потока. Это блокирует вызывающий поток до тех пор, пока поток, чей метод join() вызывается, не будет завершен.

У потока есть имя. Имя может быть передано конструктору, а также прочитано или изменено через атрибут name.

Если метод run() вызывает исключение, то для его обработки вызывается threading.excepthook(). По умолчанию threading.excepthook() молча игнорирует SystemExit.

Поток может быть помечен как «поток-демон». Значение этого флага заключается в том, что вся программа Python завершается, когда остаются только потоки-демоны. Начальное значение наследуется от создающего потока. Флаг может быть установлен через свойство daemon или аргумент конструктора daemon.

Примечание

Потоки демонов резко останавливаются при завершении работы. Их ресурсы (такие как открытые файлы, транзакции базы данных и т. д.) могут быть освобождены неправильно. Если вы хотите, чтобы ваши потоки останавливались изящно, сделайте их не демонами и используйте подходящий механизм сигнализации, например Event.

Существует объект «главный поток»; он соответствует начальному потоку управления в программе Python. Это не поток демона.

Существует вероятность создания «фиктивных потоковых объектов». Это объекты потоков, соответствующие «чужим потокам», то есть потокам управления, запущенным вне модуля потоков, например, непосредственно из кода на языке C. Объекты фиктивных потоков имеют ограниченную функциональность; они всегда считаются живыми и демоническими и не могут быть joined. Они никогда не удаляются, поскольку невозможно обнаружить завершение чужих потоков.

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Этот конструктор всегда должен вызываться с аргументами в виде ключевых слов. Аргументами являются:

group должна быть None; зарезервировано для будущего расширения, когда будет реализован класс ThreadGroup.

target - вызываемый объект, который будет вызван методом run(). По умолчанию принимает значение None, что означает, что ничего не вызывается.

name - это имя потока. По умолчанию уникальное имя строится в виде «Thread-N», где N - маленькое десятичное число, или «Thread-N (target)», где «target» - target.__name__, если указан аргумент target.

args - список или кортеж аргументов для целевого вызова. По умолчанию принимает значение ().

kwargs - словарь аргументов ключевых слов для целевого вызова. По умолчанию имеет значение {}.

Если не None, daemon явно устанавливает, является ли данный поток демоническим. Если None (по умолчанию), то свойство daemonic наследуется от текущего потока.

Если подкласс переопределяет конструктор, он должен обязательно вызвать конструктор базового класса (Thread.__init__()), прежде чем делать что-либо еще с потоком.

Изменено в версии 3.3: Добавлен параметр daemon.

Изменено в версии 3.10: Используйте имя цели, если аргумент имя опущен.

start()

Начните активность потока.

Она может быть вызвана не более одного раза для каждого объекта потока. Он организует вызов метода run() объекта в отдельном потоке управления.

Этот метод вызовет ошибку RuntimeError, если будет вызван более одного раза на одном и том же объекте потока.

run()

Метод, представляющий активность потока.

Вы можете переопределить этот метод в подклассе. Стандартный метод run() вызывает вызываемый объект, переданный конструктору объекта в качестве аргумента target, если таковой имеется, с позиционными и ключевыми аргументами, взятыми из аргументов args и kwargs, соответственно.

Используя список или кортеж в качестве аргумента args, который передается в Thread, можно добиться того же эффекта.

Пример:

>>> from threading import Thread
>>> t = Thread(target=print, args=[1])
>>> t.run()
1
>>> t = Thread(target=print, args=(1,))
>>> t.run()
1
join(timeout=None)

Подождите, пока поток не завершится. Это блокирует вызывающий поток до тех пор, пока поток, чей метод join() вызывается, не завершится - либо нормально, либо через необработанное исключение - или пока не наступит необязательный тайм-аут.

Если аргумент timeout присутствует, а не является None, он должен быть числом с плавающей точкой, задающим тайм-аут операции в секундах (или их долях). Поскольку join() всегда возвращает None, вы должны вызвать is_alive() после join(), чтобы определить, произошел ли таймаут - если поток все еще жив, вызов join() завершился по таймауту.

Если аргумент timeout отсутствует или равен None, операция будет блокироваться до тех пор, пока поток не завершится.

К одной нити можно присоединяться много раз.

join() вызывает исключение RuntimeError при попытке присоединиться к текущему потоку, поскольку это приведет к тупику. Также ошибкой является join() потока до того, как он был запущен, и попытки сделать это вызывают такое же исключение.

name

Строка, используемая только для идентификации. Она не имеет никакой семантики. Одно и то же имя может быть присвоено нескольким потокам. Начальное имя задается конструктором.

getName()
setName()

Устаревший API getter/setter для name; вместо этого используйте его непосредственно как свойство.

Не рекомендуется, начиная с версии 3.10.

ident

Идентификатор потока или None, если поток не был запущен. Это ненулевое целое число. См. функцию get_ident(). Идентификаторы потоков могут быть повторно использованы, когда поток завершается и создается другой поток. Идентификатор доступен даже после выхода потока.

native_id

Идентификатор потока (TID), присвоенный ОС (ядром). Это целое неотрицательное число, или None, если поток не был запущен. См. функцию get_native_id(). Это значение может использоваться для уникальной идентификации данного потока в рамках всей системы (до тех пор, пока поток не завершится, после чего значение может быть утилизировано ОС).

Примечание

Как и идентификаторы процессов, идентификаторы потоков действительны (гарантированно уникальны в масштабах всей системы) только с момента создания потока и до его завершения.

Availability: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX, DragonFlyBSD.

Added in version 3.8.

is_alive()

Возвращает, жив ли данный поток.

Этот метод возвращает True непосредственно перед началом работы метода run() и сразу после завершения работы метода run(). Функция модуля enumerate() возвращает список всех живых потоков.

daemon

Булево значение, указывающее, является ли этот поток демоном (True) или нет (False). Это значение должно быть установлено до вызова start(), иначе будет вызван RuntimeError. Его начальное значение наследуется от создающего потока; главный поток не является демонским, и поэтому все потоки, созданные в главном потоке, по умолчанию имеют значение daemon = False.

Вся программа Python завершается, когда не остается живых потоков, не являющихся демонами.

isDaemon()
setDaemon()

Устаревший API getter/setter для daemon; вместо этого используйте его непосредственно как свойство.

Не рекомендуется, начиная с версии 3.10.

Заблокировать объекты

Примитивная блокировка - это примитив синхронизации, который при блокировке не принадлежит конкретному потоку. В Python это самый низкоуровневый примитив синхронизации, реализуемый непосредственно модулем расширения _thread.

Примитивная блокировка находится в одном из двух состояний: «заблокировано» или «разблокировано». Он создается в состоянии «разблокирован». У нее есть два основных метода, acquire() и release(). Когда состояние разблокировано, acquire() меняет состояние на заблокированное и немедленно возвращается. Когда состояние заблокировано, acquire() блокирует его до тех пор, пока вызов release() в другом потоке не изменит его на разблокированное, после чего вызов acquire() сбрасывает его на заблокированное и возвращается. Метод release() следует вызывать только в заблокированном состоянии; он меняет состояние на разблокированное и немедленно возвращается. Если будет предпринята попытка освободить разблокированную блокировку, будет вызвана ошибка RuntimeError.

Замки также поддерживают context management protocol.

Если несколько потоков заблокированы в acquire() в ожидании перехода в состояние unlocked, только один поток продолжит выполнение, когда вызов release() сбросит состояние в unlocked; какой из ожидающих потоков продолжит выполнение, не определено, и может отличаться в разных реализациях.

Все методы выполняются атомарно.

class threading.Lock

Класс, реализующий примитивные объекты блокировки. Если поток получил блокировку, последующие попытки получить ее блокируются, пока она не будет освобождена; любой поток может освободить ее.

Изменено в версии 3.13: Lock теперь является классом. В ранних версиях Python Lock была фабричной функцией, которая возвращала экземпляр базового частного типа блокировки.

acquire(blocking=True, timeout=-1)

Получение блокировки, блокирующей или неблокирующей.

При вызове с аргументом blocking, установленным в True (по умолчанию), блокируется до тех пор, пока блокировка не будет разблокирована, затем устанавливается значение locked и возвращается True.

При вызове с аргументом blocking, установленным в False, блокировка не производится. Если вызов с blocking, установленным в True, заблокировал бы, немедленно верните False; в противном случае установите блокировку на locked и верните True.

При вызове с аргументом timeout с плавающей точкой, установленным в положительное значение, блокирует не более чем на количество секунд, указанное в timeout, и до тех пор, пока блокировка не может быть получена. Аргумент timeout, равный -1, указывает на неограниченное ожидание. Запрещено указывать timeout, когда blocking равен False.

Возвращаемое значение True, если блокировка получена успешно, False, если нет (например, если истек тайм-аут).

Изменено в версии 3.2: Параметр timeout является новым.

Изменено в версии 3.2: Получение блокировки теперь может быть прервано сигналами на POSIX, если базовая реализация потоков поддерживает это.

release()

Освобождение блокировки. Эта функция может быть вызвана из любого потока, а не только из того, который получил блокировку.

Когда блокировка будет заблокирована, сбросьте ее на разблокированную и вернитесь. Если другие потоки заблокированы в ожидании разблокировки блокировки, разрешите продолжить выполнение только одному из них.

При вызове на разблокированной блокировке возникает сообщение RuntimeError.

Возвращаемое значение отсутствует.

locked()

Возвращает True, если блокировка получена.

Объекты RLock

Рекуррентная блокировка - это примитив синхронизации, который может быть получен несколько раз одним и тем же потоком. Внутри она использует понятия «владеющий поток» и «уровень рекурсии» в дополнение к заблокированному/разблокированному состоянию, используемому примитивными блокировками. В заблокированном состоянии блокировка принадлежит какому-то потоку, а в разблокированном - ни одному потоку.

Потоки вызывают метод acquire() блокировки, чтобы заблокировать ее, и метод release(), чтобы разблокировать.

Примечание

Реентерабельные блокировки поддерживают context management protocol, поэтому рекомендуется использовать with вместо того, чтобы вручную вызывать acquire() и release() для обработки получения и освобождения блокировки для блока кода.

Пары вызовов acquire()/release() в RLock могут быть вложенными, в отличие от acquire()/release() в Lock. Только заключительный release() (release() из крайней пары) сбрасывает блокировку в разблокированное состояние и позволяет другому потоку, заблокированному в acquire(), продолжить работу.

acquire()/release() должны использоваться парами: каждое приобретение должно сопровождаться освобождением в потоке, который приобрел блокировку. Если не вызывать release столько раз, сколько раз была приобретена блокировка, это может привести к тупику.

class threading.RLock

Этот класс реализует объекты реентерабельной блокировки. Реентерабельная блокировка должна быть освобождена потоком, который ее приобрел. Если поток приобрел реентерабельную блокировку, этот же поток может получить ее снова без блокировки; поток должен освободить ее один раз за каждый раз, когда он ее приобрел.

Обратите внимание, что RLock на самом деле является фабричной функцией, которая возвращает экземпляр наиболее эффективной версии конкретного класса RLock, поддерживаемой платформой.

acquire(blocking=True, timeout=-1)

Получение блокировки, блокирующей или неблокирующей.

См.также

Using RLock as a context manager

Рекомендуется использовать вместо ручных вызовов acquire() и release(), когда это целесообразно.

При вызове с аргументом blocking, установленным на True (по умолчанию):

  • Если ни один поток не владеет блокировкой, приобретите ее и немедленно вернитесь.

  • Если блокировка принадлежит другому потоку, блокируйте его до тех пор, пока мы не сможем получить блокировку, или timeout, если задано положительное плавающее значение.

  • Если блокировка принадлежит тому же потоку, снова приобретите блокировку и немедленно вернитесь. В этом заключается разница между Lock и RLock; Lock обрабатывает этот случай так же, как и предыдущий, блокируя его до тех пор, пока блокировка не будет получена.

При вызове с аргументом blocking, установленным на False:

  • Если ни один поток не владеет блокировкой, приобретите ее и немедленно вернитесь.

  • Если блокировка принадлежит другому потоку, вернитесь немедленно.

  • Если блокировка принадлежит тому же потоку, приобретите ее снова и немедленно вернитесь.

Во всех случаях, если поток смог получить блокировку, возвращается True. Если поток не смог получить блокировку (т. е. не блокировался или был достигнут тайм-аут), верните False.

При многократном вызове неспособность вызвать release() столько же раз может привести к тупику. Рассмотрите возможность использования RLock в качестве менеджера контекста вместо прямого вызова acquire/release.

Изменено в версии 3.2: Параметр timeout является новым.

release()

Освободите блокировку, декрементируя уровень рекурсии. Если после декремента он равен нулю, сбросьте блокировку на разблокированную (не принадлежащую ни одному потоку), и если какие-либо другие потоки заблокированы в ожидании разблокировки блокировки, разрешите ровно одному из них продолжить выполнение. Если после декремента уровень рекурсии все еще ненулевой, замок остается заблокированным и принадлежит вызывающему потоку.

Вызывайте этот метод только тогда, когда вызывающий поток владеет блокировкой. Если этот метод вызывается, когда блокировка не получена, возникает ошибка RuntimeError.

Возвращаемое значение отсутствует.

Объекты состояния

Переменная условия всегда связана с какой-то блокировкой; ее можно передать или она будет создана по умолчанию. Передача блокировки полезна, когда несколько переменных условий должны использовать одну и ту же блокировку. Блокировка является частью объекта условия: вам не нужно отслеживать ее отдельно.

Переменная условия подчиняется оператору context management protocol: использование оператора with приводит к приобретению связанной блокировки на время действия вложенного блока. Методы acquire() и release() также вызывают соответствующие методы связанной блокировки.

Другие методы должны вызываться с удержанием соответствующей блокировки. Метод wait() освобождает блокировку, а затем блокирует ее до тех пор, пока другой поток не пробудит ее вызовом notify() или notify_all(). После пробуждения wait() снова получает блокировку и возвращается. Также можно указать таймаут.

Метод notify() пробуждает один из потоков, ожидающих переменную условия, если таковые имеются. Метод notify_all() пробуждает все потоки, ожидающие переменную условия.

Примечание: методы notify() и notify_all() не освобождают блокировку; это означает, что пробужденный поток или потоки вернутся из своего вызова wait() не сразу, а только когда поток, вызвавший notify() или notify_all(), окончательно откажется от владения блокировкой.

Типичный стиль программирования с использованием переменных состояния использует блокировку для синхронизации доступа к некоторому общему состоянию; потоки, заинтересованные в конкретном изменении состояния, вызывают wait() несколько раз, пока не увидят желаемое состояние, а потоки, изменяющие состояние, вызывают notify() или notify_all(), когда они изменяют состояние таким образом, что оно может стать желаемым состоянием для одного из ожидающих. Например, следующий код представляет собой общую ситуацию производитель-потребитель с неограниченной емкостью буфера:

# Consume one item
with cv:
    while not an_item_is_available():
        cv.wait()
    get_an_available_item()

# Produce one item
with cv:
    make_an_item_available()
    cv.notify()

Цикл while, проверяющий состояние приложения, необходим, поскольку wait() может вернуться через произвольно долгое время, и условие, вызвавшее вызов notify(), может перестать выполняться. Это свойственно многопоточному программированию. Метод wait_for() позволяет автоматизировать проверку условия и облегчает вычисление таймаутов:

# Consume an item
with cv:
    cv.wait_for(an_item_is_available)
    get_an_available_item()

Чтобы выбрать между notify() и notify_all(), подумайте, может ли одно изменение состояния быть интересным только для одного или нескольких ожидающих потоков. Например, в типичной ситуации производитель-потребитель добавление одного элемента в буфер должно разбудить только один поток-потребитель.

class threading.Condition(lock=None)

Этот класс реализует объекты переменных условий. Переменная условия позволяет одному или нескольким потокам ждать, пока они не получат уведомление от другого потока.

Если указан аргумент lock, а не None, то это должен быть объект Lock или RLock, и он используется в качестве базовой блокировки. В противном случае создается новый объект RLock, который используется в качестве базовой блокировки.

Изменено в версии 3.3: превратилась из фабричной функции в класс.

acquire(*args)

Приобретите базовую блокировку. Этот метод вызывает соответствующий метод на базовой блокировке; возвращаемое значение - то, что возвращает этот метод.

release()

Освобождение базовой блокировки. Этот метод вызывает соответствующий метод на базовой блокировке; возвращаемое значение отсутствует.

wait(timeout=None)

Ожидайте уведомления или таймаута. Если вызывающий поток не получил блокировку к моменту вызова этого метода, будет поднята ошибка RuntimeError.

Этот метод освобождает базовую блокировку, а затем блокируется до тех пор, пока его не разбудит вызов notify() или notify_all() для той же переменной состояния в другом потоке, или пока не наступит необязательный тайм-аут. После пробуждения или истечения тайм-аута он снова приобретает блокировку и возвращается.

Если аргумент timeout присутствует, а не является None, он должен представлять собой число с плавающей точкой, задающее тайм-аут для операции в секундах (или их долях).

Если базовой блокировкой является RLock, она не освобождается с помощью метода release(), поскольку это может не разблокировать блокировку, если она была получена несколько раз рекурсивно. Вместо этого используется внутренний интерфейс класса RLock, который действительно разблокирует ее, даже если она была получена рекурсивно несколько раз. Затем используется другой внутренний интерфейс для восстановления уровня рекурсии при повторном получении блокировки.

Возвращаемое значение - True, если не истек заданный тайм-аут, в этом случае оно равно False.

Изменено в версии 3.2: Ранее метод всегда возвращал None.

wait_for(predicate, timeout=None)

Дождитесь, пока условие не будет оценено как true. Предикат predicate должен быть вызываемой переменной, результат которой будет интерпретирован как булево значение. Может быть указан timeout, задающий максимальное время ожидания.

Этот метод может вызывать wait() несколько раз, пока предикат не будет выполнен или пока не произойдет таймаут. Возвращаемое значение - это последнее возвращаемое значение предиката, и оно будет равно False, если метод завершил работу по таймауту.

Если не принимать во внимание функцию тайм-аута, вызов этого метода примерно эквивалентен написанию:

while not predicate():
    cv.wait()

Поэтому действуют те же правила, что и для wait(): Блокировка должна удерживаться при вызове и вновь приобретается при возврате. Предикат оценивается с удержанной блокировкой.

Added in version 3.2.

notify(n=1)

По умолчанию пробуждается один поток, ожидающий выполнения этого условия, если таковой имеется. Если вызывающий поток не получил блокировку на момент вызова этого метода, то будет поднята ошибка RuntimeError.

Этот метод пробуждает не более n потоков, ожидающих переменную условия; он не работает, если ни один поток не ожидает.

Текущая реализация пробуждает ровно n потоков, если ожидают хотя бы n потоков. Однако полагаться на такое поведение небезопасно. Будущая оптимизированная реализация может иногда пробуждать больше, чем n потоков.

Обратите внимание: пробудившийся поток не возвращается из вызова wait() до тех пор, пока не сможет снова получить блокировку. Поскольку notify() не освобождает блокировку, это должен сделать его вызывающий.

notify_all()

Разбудить все потоки, ожидающие выполнения этого условия. Этот метод действует аналогично notify(), но пробуждает не один, а все ожидающие потоки. Если вызывающий поток не получил блокировку на момент вызова этого метода, будет вызвана ошибка RuntimeError.

Метод notifyAll является устаревшим псевдонимом для этого метода.

Объекты семафора

Это один из старейших примитивов синхронизации в истории информатики, изобретенный голландским компьютерщиком Эдсгером В. Дейкстрой (он использовал имена P() и V() вместо acquire() и release()).

Семафор управляет внутренним счетчиком, который декрементируется при каждом вызове acquire() и инкрементируется при каждом вызове release(). Счетчик никогда не может опуститься ниже нуля; когда acquire() обнаруживает, что он равен нулю, он блокируется, ожидая, пока какой-нибудь другой поток не вызовет release().

Семафоры также поддерживают context management protocol.

class threading.Semaphore(value=1)

Этот класс реализует объекты семафора. Семафор управляет атомарным счетчиком, представляющим количество вызовов release() минус количество вызовов acquire(), плюс начальное значение. Метод acquire() при необходимости блокируется до тех пор, пока не сможет вернуться, не сделав счетчик отрицательным. Если значение не задано, value по умолчанию равно 1.

Необязательный аргумент задает начальное значение для внутреннего счетчика; по умолчанию оно равно 1. Если заданное значение меньше 0, то будет получено значение ValueError.

Изменено в версии 3.3: превратилась из фабричной функции в класс.

acquire(blocking=True, timeout=None)

Получение семафора.

Вызывается без аргументов:

  • Если при входе внутренний счетчик больше нуля, уменьшите его на единицу и немедленно верните True.

  • Если при входе внутренний счетчик равен нулю, блокируйте его до пробуждения вызовом release(). После пробуждения (и если счетчик больше 0), уменьшите счетчик на 1 и верните True. При каждом вызове release() будет пробужден ровно один поток. Не следует полагаться на порядок, в котором пробуждаются потоки.

При вызове с параметром blocking, установленным в False, блокировка не происходит. Если вызов без аргумента заблокирует, немедленно верните False; в противном случае сделайте то же самое, что и при вызове без аргументов, и верните True.

При вызове с параметром timeout, отличным от None, он будет блокироваться не более чем на timeout секунд. Если acquire не завершится успешно за этот промежуток времени, верните False. В противном случае возвращается True.

Изменено в версии 3.2: Параметр timeout является новым.

release(n=1)

Освободите семафор, увеличив внутренний счетчик на n. Если при входе он был равен нулю и другие потоки ждут, когда он снова станет больше нуля, разбудите n этих потоков.

Изменено в версии 3.9: Добавлен параметр n для одновременного освобождения нескольких ожидающих потоков.

class threading.BoundedSemaphore(value=1)

Класс, реализующий объекты ограниченного семафора. Ограниченный семафор проверяет, не превышает ли его текущее значение начальное значение. Если оно превышает, то поднимается значение ValueError. В большинстве ситуаций семафоры используются для охраны ресурсов с ограниченной емкостью. Если семафор освобождается слишком много раз, это признак ошибки. Если значение не задано, value по умолчанию равно 1.

Изменено в версии 3.3: превратилась из фабричной функции в класс.

Semaphore Пример

Семафоры часто используются для охраны ресурсов с ограниченной емкостью, например, сервера базы данных. В любой ситуации, когда размер ресурса фиксирован, следует использовать ограниченный семафор. Прежде чем породить рабочие потоки, ваш главный поток инициализирует семафор:

maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)

После порождения рабочие потоки вызывают методы приобретения и освобождения семафора, когда им нужно подключиться к серверу:

with pool_sema:
    conn = connectdb()
    try:
        # ... use connection ...
    finally:
        conn.close()

Использование ограниченного семафора снижает вероятность того, что ошибка программирования, в результате которой семафор будет освобожден больше, чем получен, останется незамеченной.

Объекты событий

Это один из простейших механизмов взаимодействия между потоками: один поток подает сигнал о событии, а другие потоки ждут его.

Объект события управляет внутренним флагом, который может быть установлен в true с помощью метода set() и сброшен в false с помощью метода clear(). Метод wait() блокирует выполнение до тех пор, пока флаг не станет истинным.

class threading.Event

Класс, реализующий объекты событий. Событие управляет флагом, который может быть установлен в true с помощью метода set() и сброшен в false с помощью метода clear(). Метод wait() блокируется до тех пор, пока флаг не станет истинным. Изначально флаг имеет значение false.

Изменено в версии 3.3: превратилась из фабричной функции в класс.

is_set()

Возвращает True тогда и только тогда, когда внутренний флаг истинен.

Метод isSet является устаревшим псевдонимом для этого метода.

set()

Установите внутренний флаг в true. Все потоки, ожидающие, пока он станет истинным, пробуждаются. Потоки, вызывающие wait() после того, как флаг станет истинным, не будут блокироваться вообще.

clear()

Сброс внутреннего флага в false. В дальнейшем потоки, вызывающие wait(), будут блокироваться до тех пор, пока не будет вызван set(), чтобы снова установить внутренний флаг в true.

wait(timeout=None)

Блокирует до тех пор, пока внутренний флаг ложен и не истек таймаут, если он задан. Возвращаемое значение представляет собой причину возврата этого блокирующего метода; True, если возврат произошел потому, что внутренний флаг установлен в true, или False, если задан тайм-аут и внутренний флаг не стал true в течение заданного времени ожидания.

Если аргумент timeout присутствует, а не является None, он должен представлять собой число с плавающей точкой, задающее время ожидания операции в секундах или их долях.

Изменено в версии 3.1: Ранее метод всегда возвращал None.

Объекты таймера

Этот класс представляет действие, которое должно быть выполнено только по истечении определенного времени — таймера. Timer является подклассом Thread и как таковой также служит примером создания пользовательских потоков.

Таймеры запускаются, как и потоки, вызовом их метода Timer.start. Таймер можно остановить (до начала его действия), вызвав метод cancel(). Интервал, который таймер будет ждать перед выполнением своего действия, может не совпадать с интервалом, указанным пользователем.

Например:

def hello():
    print("hello, world")

t = Timer(30.0, hello)
t.start()  # after 30 seconds, "hello, world" will be printed
class threading.Timer(interval, function, args=None, kwargs=None)

Создайте таймер, который запустит функцию с аргументами args и ключевыми словами kwargs по истечении интервала секунд. Если args имеет значение None (по умолчанию), то будет использоваться пустой список. Если kwargs имеет значение None (по умолчанию), то будет использован пустой dict.

Изменено в версии 3.3: превратилась из фабричной функции в класс.

cancel()

Остановите таймер и отмените выполнение действия таймера. Это сработает, только если таймер все еще находится в стадии ожидания.

Барьерные объекты

Added in version 3.2.

Этот класс предоставляет простой примитив синхронизации для использования фиксированным числом потоков, которым необходимо ждать друг друга. Каждый из потоков пытается пройти барьер, вызывая метод wait(), и будет блокироваться до тех пор, пока все потоки не выполнят свои вызовы wait(). В этот момент потоки одновременно освобождаются.

Барьер можно использовать любое количество раз для одного и того же числа потоков.

В качестве примера приведем простой способ синхронизации потока клиента и сервера:

b = Barrier(2, timeout=5)

def server():
    start_server()
    b.wait()
    while True:
        connection = accept_connection()
        process_server_connection(connection)

def client():
    b.wait()
    while True:
        connection = make_connection()
        process_client_connection(connection)
class threading.Barrier(parties, action=None, timeout=None)

Создает объект барьера для партии количества потоков. action, если указано, представляет собой вызываемую переменную, которая будет вызываться одним из потоков при их освобождении. timeout - значение таймаута по умолчанию, если для метода wait() не указано ни одного.

wait(timeout=None)

Пройти барьер. Когда все потоки, участвующие в барьере, вызовут эту функцию, все они будут освобождены одновременно. Если указан таймаут, то он используется предпочтительнее, чем тот, который был указан в конструкторе класса.

Возвращаемое значение - целое число в диапазоне от 0 до parties – 1, разное для каждого потока. Это может быть использовано для выбора потока для выполнения особых домашних дел, например:

i = barrier.wait()
if i == 0:
    # Only one thread needs to print this
    print("passed the barrier")

Если конструктору было предоставлено действие, один из потоков вызовет его до освобождения. Если этот вызов приведет к ошибке, барьер будет переведен в состояние разрушения.

Если вызов прерывается, барьер переходит в состояние «сломан».

Этот метод может вызвать исключение BrokenBarrierError, если барьер будет сломан или сброшен во время ожидания потока.

reset()

Возвращает барьер в состояние по умолчанию, пустое. Все потоки, ожидающие его, получат исключение BrokenBarrierError.

Обратите внимание, что использование этой функции может потребовать внешней синхронизации, если есть другие потоки, состояние которых неизвестно. Если барьер нарушен, возможно, лучше просто оставить его и создать новый.

abort()

Переведите барьер в состояние разрушения. Это приводит к тому, что все активные или будущие вызовы wait() завершаются с ошибкой BrokenBarrierError. Используйте это, например, если один из потоков должен прерваться, чтобы избежать тупиковой ситуации в приложении.

Возможно, будет предпочтительнее просто создать барьер с разумным значением timeout для автоматической защиты от сбоев в работе одного из потоков.

parties

Количество нитей, необходимое для прохождения барьера.

n_waiting

Количество потоков, ожидающих в данный момент в барьере.

broken

Булево значение True, если барьер находится в разрушенном состоянии.

exception threading.BrokenBarrierError

Это исключение, являющееся подклассом RuntimeError, возникает, когда объект Barrier сбрасывается или разрушается.

Использование блокировок, условий и семафоров в операторе with

Все объекты, предоставляемые этим модулем и имеющие методы acquire и release, могут быть использованы в качестве менеджеров контекста для оператора with. Метод acquire будет вызван при входе в блок, а release - при выходе из него. Отсюда следующий фрагмент:

with some_lock:
    # do something...

эквивалентно:

some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()

В настоящее время объекты Lock, RLock, Condition, Semaphore и BoundedSemaphore могут использоваться в качестве менеджеров контекста операторов with.