Примитивы синхронизации

Источник: Lib/asyncio/locks.py


Примитивы синхронизации asyncio разработаны таким образом, чтобы быть похожими на примитивы модуля threading с двумя важными оговорками:

  • Примитивы asyncio не являются потокобезопасными, поэтому их не следует использовать для синхронизации потоков ОС (для этого используйте threading);

  • Методы этих примитивов синхронизации не принимают аргумент timeout; для выполнения операций с таймаутами используйте функцию asyncio.wait_for().

В asyncio есть следующие основные примитивы синхронизации:


Замок

class asyncio.Lock

Реализует мьютексную блокировку для задач asyncio. Не является потокобезопасным.

Блокировка asyncio может использоваться для обеспечения эксклюзивного доступа к общему ресурсу.

Предпочтительным способом использования Lock является оператор async with:

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

что эквивалентно:

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

Изменено в версии 3.10: Удален параметр loop.

coroutine acquire()

Приобретите замок.

Этот метод ждет, пока блокировка не будет разблокирована, устанавливает ее в значение заблокирована и возвращает True.

Если несколько программ заблокированы в acquire() в ожидании разблокировки блокировки, то в конечном итоге выполняется только одна из них.

Получение блокировки является справедливым: выполняемая корутина будет первой корутиной, которая начала ожидать блокировку.

release()

Освободите замок.

Если замок заблокирован, сбросьте его на разблокирован и вернитесь.

Если блокировка разблокирована, возникает сообщение RuntimeError.

locked()

Возвращает True, если замок заблокирован.

Событие

class asyncio.Event

Объект события. Не является потокобезопасным.

Событие asyncio может быть использовано для уведомления нескольких задач asyncio о том, что произошло какое-то событие.

Объект Event управляет внутренним флагом, который может быть установлен в значение true с помощью метода set() и сброшен в значение false с помощью метода clear(). Метод wait() блокируется до тех пор, пока флаг не будет установлен в значение true. Изначально флаг устанавливается в значение false.

Изменено в версии 3.10: Удален параметр loop.

Пример:

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
coroutine wait()

Подождите, пока событие не будет установлено.

Если событие установлено, немедленно верните True. В противном случае блокируйте выполнение до тех пор, пока другая задача не вызовет set().

set()

Установите событие.

Все задачи, ожидающие наступления события, будут немедленно разбужены.

clear()

Очистить (снять) событие.

Задачи, ожидающие на wait(), теперь будут блокироваться до тех пор, пока метод set() не будет вызван снова.

is_set()

Возвращает True, если событие установлено.

Состояние

class asyncio.Condition(lock=None)

Объект Condition. Не является потокобезопасным.

Примитив условия asyncio может использоваться задачей для ожидания некоторого события и последующего получения эксклюзивного доступа к общему ресурсу.

По сути, объект Condition сочетает в себе функциональность Event и Lock. Возможно, что несколько объектов Condition совместно используют один Lock, что позволяет координировать эксклюзивный доступ к общему ресурсу между различными задачами, заинтересованными в определенных состояниях этого общего ресурса.

Необязательный аргумент lock должен быть объектом Lock или None. В последнем случае новый объект Lock создается автоматически.

Изменено в версии 3.10: Удален параметр loop.

Предпочтительным способом использования условия является оператор async with:

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

что эквивалентно:

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
coroutine acquire()

Приобретите базовую блокировку.

Этот метод ждет, пока базовая блокировка не будет разблокирована, устанавливает ее на блокирована и возвращает True.

notify(n=1)

Разбудите n задач (по умолчанию 1), ожидающих выполнения этого условия. Если ожидающих заданий меньше n, они пробуждаются все.

Блокировка должна быть получена до вызова этого метода и освобождена вскоре после него. Если метод вызывается с незаблокированной блокировкой, возникает ошибка RuntimeError.

locked()

Возвращает True, если базовая блокировка получена.

notify_all()

Разбудите все задачи, ожидающие этого условия.

Этот метод действует аналогично notify(), но пробуждает все ожидающие задачи.

Блокировка должна быть получена до вызова этого метода и освобождена вскоре после него. Если метод вызывается с незаблокированной блокировкой, возникает ошибка RuntimeError.

release()

Освободите основной замок.

При вызове на разблокированной блокировке возникает сообщение RuntimeError.

coroutine wait()

Дождитесь уведомления.

Если вызывающая задача не получила блокировку на момент вызова этого метода, то будет вызвана ошибка RuntimeError.

Этот метод освобождает базовую блокировку, а затем блокирует ее до тех пор, пока она не будет разбужена вызовом notify() или notify_all(). После пробуждения условие снова получает свою блокировку, и этот метод возвращает True.

Обратите внимание, что задача может вернуться из этого вызова ошибочно, поэтому вызывающая сторона должна всегда перепроверять состояние и быть готовой к повторному wait(). По этой причине вы можете предпочесть использовать wait_for() вместо этого.

coroutine wait_for(predicate)

Подождите, пока предикат не станет истинным.

Предикат должен быть вызываемым элементом, результат которого будет интерпретироваться как булево значение. Метод будет повторять wait() до тех пор, пока предикат не будет оценен как true. Конечное значение является возвращаемым значением.

Семафор

class asyncio.Semaphore(value=1)

Объект семафора. Не является потокобезопасным.

Семафор управляет внутренним счетчиком, который декрементируется при каждом вызове acquire() и инкрементируется при каждом вызове release(). Счетчик никогда не может опуститься ниже нуля; когда acquire() обнаруживает, что он равен нулю, он блокируется, ожидая, пока какая-нибудь задача не вызовет release().

Необязательный аргумент value задает начальное значение внутреннего счетчика (1 по умолчанию). Если заданное значение меньше 0, возникает ошибка ValueError.

Изменено в версии 3.10: Удален параметр loop.

Предпочтительным способом использования семафора является оператор async with:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

что эквивалентно:

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
coroutine acquire()

Получение семафора.

Если внутренний счетчик больше нуля, уменьшите его на единицу и немедленно верните True. Если он равен нулю, дождитесь вызова release() и верните True.

locked()

Возвращает True, если семафор не может быть получен немедленно.

release()

Освобождает семафор, увеличивая внутренний счетчик на единицу. Может разбудить задачу, ожидающую получения семафора.

В отличие от BoundedSemaphore, Semaphore позволяет делать больше release() коллов, чем acquire() коллов.

BoundedSemaphore

class asyncio.BoundedSemaphore(value=1)

Ограниченный объект семафора. Не является потокобезопасным.

Bounded Semaphore - это версия Semaphore, которая поднимает ValueError в release(), если увеличивает внутренний счетчик выше начального значения.

Изменено в версии 3.10: Удален параметр loop.

Барьер

class asyncio.Barrier(parties)

Объект барьера. Не является потокобезопасным.

Барьер - это простой примитив синхронизации, который позволяет блокировать выполнение до тех пор, пока определенное количество задач не будет ожидать его. Задачи могут ждать в методе wait() и будут блокироваться до тех пор, пока указанное количество задач не закончит ждать в wait(). В этот момент все ожидающие задачи одновременно разблокируются.

async with можно использовать как альтернативу ожиданию на wait().

Барьер можно использовать любое количество раз.

Пример:

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

Результат этого примера таков:

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

Added in version 3.11.

coroutine wait()

Пройти барьер. Когда все задачи, стоящие у барьера, вызовут эту функцию, все они будут разблокированы одновременно.

Когда ожидающая или заблокированная задача в барьере отменяется, эта задача выходит из барьера, который остается в том же состоянии. Если состояние барьера - «заполнение», количество ожидающих задач уменьшается на 1.

Возвращаемое значение - целое число в диапазоне от 0 до parties-1, разное для каждой задачи. Это может быть использовано для выбора задачи для выполнения особых действий, например:

...
async with barrier as position:
   if position == 0:
      # Only one task prints this
      print('End of *draining phase*')

Этот метод может вызвать исключение BrokenBarrierError, если барьер будет сломан или сброшен во время ожидания задачи. Он может вызвать исключение CancelledError, если задача отменена.

coroutine reset()

Верните барьер в стандартное, пустое состояние. Все задачи, ожидающие его, получат исключение BrokenBarrierError.

Если барьер разрушен, возможно, лучше просто оставить его и создать новый.

coroutine abort()

Переведите барьер в состояние разрушения. Это приводит к тому, что все активные или будущие вызовы wait() завершаются с ошибкой BrokenBarrierError. Используйте это, например, если одна из задач должна прерваться, чтобы избежать бесконечного ожидания задач.

parties

Количество заданий, необходимых для прохождения барьера.

n_waiting

Количество заданий, ожидающих выполнения в барьере во время заполнения.

broken

Булево значение True, если барьер находится в разрушенном состоянии.

exception asyncio.BrokenBarrierError

Это исключение, являющееся подклассом RuntimeError, возникает, когда объект Barrier сбрасывается или разрушается.


Изменено в версии 3.9: Убрано получение блокировки с помощью оператора await lock или yield from lock и/или with (with await lock, with (yield from lock)). Вместо этого используйте async with lock.