multiprocessing — Параллелизм на основе процессов

Источник: Lib/multiprocessing/


Availability: не WASI, не iOS.

Этот модуль не работает или недоступен на платформах WebAssembly или iOS. Дополнительные сведения о доступности WASM см. в разделе Платформы WebAssembly; о доступности iOS - в разделе iOS.

Введение

multiprocessing - это пакет, поддерживающий порождение процессов с помощью API, аналогичного модулю threading. Пакет multiprocessing предлагает как локальный, так и удаленный параллелизм, эффективно обходя Global Interpreter Lock за счет использования подпроцессов вместо потоков. Благодаря этому модуль multiprocessing позволяет программисту полностью задействовать несколько процессоров на данной машине. Он работает как под POSIX, так и под Windows.

В модуле multiprocessing также появились API, не имеющие аналогов в модуле threading. Ярким примером этого является объект Pool, который предлагает удобное средство распараллеливания выполнения функции по нескольким входным значениям, распределяя входные данные по процессам (параллелизм данных). Следующий пример демонстрирует распространенную практику определения таких функций в модуле, чтобы дочерние процессы могли успешно импортировать этот модуль. Этот базовый пример параллелизма данных с использованием Pool,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

выведет на стандартный вывод

[1, 4, 9]

См.также

concurrent.futures.ProcessPoolExecutor предлагает интерфейс более высокого уровня для передачи задач фоновому процессу, не блокируя выполнение вызывающего процесса. По сравнению с непосредственным использованием интерфейса Pool, интерфейс concurrent.futures API позволяет отделить отправку работы в пул базовых процессов от ожидания результатов.

Класс Process

В multiprocessing процессы порождаются путем создания объекта Process и последующего вызова его метода start(). В Process используется API из threading.Thread. Тривиальным примером многопроцессной программы является

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Чтобы показать отдельные идентификаторы процессов, приведем расширенный пример:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

О том, зачем нужна часть if __name__ == '__main__', смотрите в разделе Рекомендации по программированию.

Контексты и методы запуска

В зависимости от платформы, multiprocessing поддерживает три способа запуска процесса. К этим способам запуска относятся

spawn

Родительский процесс запускает новый процесс интерпретатора Python. Дочерний процесс наследует только те ресурсы, которые необходимы для выполнения метода run() объекта процесса. В частности, ненужные файловые дескрипторы и дескрипторы родительского процесса не будут унаследованы. Запуск процесса с помощью этого метода довольно медленный по сравнению с использованием fork или forkserver.

Доступно на платформах POSIX и Windows. По умолчанию в Windows и macOS.

fork

Родительский процесс использует os.fork() для форка интерпретатора Python. Дочерний процесс, когда он запускается, фактически идентичен родительскому. Все ресурсы родительского процесса наследуются дочерним процессом. Обратите внимание, что безопасное форкирование многопоточного процесса проблематично.

Доступно на POSIX-системах. В настоящее время используется по умолчанию в POSIX, за исключением macOS.

Примечание

В Python 3.14 метод запуска по умолчанию будет заменен на fork. Код, требующий fork, должен явно указать это через get_context() или set_start_method().

Изменено в версии 3.12: Если Python обнаружит, что ваш процесс имеет несколько потоков, функция os.fork(), которую вызывает этот метод запуска, вызовет ошибку DeprecationWarning. Используйте другой метод запуска. Более подробные объяснения см. в документации по os.fork().

forkserver

Когда программа запускается и выбирает метод запуска forkserver, порождается серверный процесс. С этого момента всякий раз, когда требуется новый процесс, родительский процесс соединяется с сервером и запрашивает у него форк нового процесса. Процесс fork server является однопоточным, если только системные библиотеки или предварительно загруженный импорт не порождают потоки в качестве побочного эффекта, поэтому в целом безопасно использовать os.fork(). Никаких лишних ресурсов не наследуется.

Доступен на POSIX-платформах, поддерживающих передачу файловых дескрипторов через Unix pipes, таких как Linux.

Изменено в версии 3.4: Добавлены spawn для всех POSIX-платформ и forkserver для некоторых POSIX-платформ. В Windows дочерние процессы больше не наследуют все ручки, наследуемые родителями.

Изменено в версии 3.8: В macOS метод запуска spawn теперь используется по умолчанию. Метод запуска fork следует считать небезопасным, поскольку он может привести к аварийному завершению подпроцесса, так как системные библиотеки macOS могут запускать потоки. См. bpo-33725.

На POSIX при использовании методов запуска spawn или forkserver также запускается процесс resource tracker, который отслеживает несвязанные именованные системные ресурсы (например, именованные семафоры или объекты SharedMemory), созданные процессами программы. Когда все процессы завершат свою работу, процесс отслеживания ресурсов отсоединит все оставшиеся отслеживаемые объекты. Обычно их не должно быть, но если процесс был убит сигналом, то могут остаться «просочившиеся» ресурсы. (Ни утечка семафоров, ни сегменты общей памяти не будут автоматически отсоединены до следующей перезагрузки. Это проблематично для обоих объектов, поскольку система позволяет использовать только ограниченное количество именованных семафоров, а сегменты разделяемой памяти занимают некоторое место в основной памяти).

Для выбора метода запуска используется символ set_start_method() в пункте if __name__ == '__main__' основного модуля. Например:

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() не должен использоваться более одного раза в программе.

В качестве альтернативы можно использовать get_context(), чтобы получить контекстный объект. Контекстные объекты имеют тот же API, что и модуль мультипроцессинга, и позволяют использовать несколько методов запуска в одной программе.

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

Обратите внимание, что объекты, относящиеся к одному контексту, могут быть несовместимы с процессами для другого контекста. В частности, блокировки, созданные с помощью контекста fork, не могут быть переданы процессам, запущенным с помощью методов spawn или forkserver start.

Библиотека, которая хочет использовать определенный метод запуска, вероятно, должна использовать get_context(), чтобы не вмешиваться в выбор пользователя библиотеки.

Предупреждение

Методы запуска 'spawn' и 'forkserver' обычно нельзя использовать с «замороженными» исполняемыми файлами (т. е. с двоичными файлами, созданными пакетами типа PyInstaller и cx_Freeze) на POSIX-системах. Метод запуска 'fork' может работать, если код не использует потоки.

Обмен объектами между процессами

multiprocessing поддерживает два типа каналов связи между процессами:

Количество

Класс Queue является почти клоном queue.Queue. Например:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

Очереди безопасны для потоков и процессов.

Трубы

Функция Pipe() возвращает пару объектов соединения, соединенных трубой, которая по умолчанию является дуплексной (двусторонней). Например:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Два объекта соединения, возвращаемые Pipe(), представляют собой два конца трубы. Каждый объект соединения имеет методы send() и recv() (среди прочих). Обратите внимание, что данные в трубе могут быть повреждены, если два процесса (или потока) попытаются читать из или записывать в один и тот же конец трубы в одно и то же время. Конечно, нет никакого риска повреждения от процессов, использующих разные концы трубы в одно и то же время.

Синхронизация между процессами

multiprocessing содержит эквиваленты всех примитивов синхронизации из threading. Например, можно использовать блокировку, чтобы гарантировать, что только один процесс печатает на стандартный вывод в одно и то же время:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

Без использования блокировки вывод различных процессов может смешаться.

Совместное использование состояния между процессами

Как уже говорилось выше, при параллельном программировании обычно лучше избегать использования общего состояния, насколько это возможно. Это особенно верно при использовании нескольких процессов.

Однако если вам действительно нужно использовать общие данные, то multiprocessing предоставляет несколько способов сделать это.

Общая память.

Данные могут храниться в общей карте памяти с помощью Value или Array. Например, следующий код

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

напечатает

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Аргументы 'd' и 'i', используемые при создании num и arr, являются типовыми кодами, используемыми модулем array: 'd' обозначает float двойной точности, а 'i' - знаковое целое число. Эти общие объекты будут безопасны для процессов и потоков.

Для большей гибкости в использовании общей памяти можно использовать модуль multiprocessing.sharedctypes, который поддерживает создание произвольных объектов ctypes, выделяемых из общей памяти.

Серверный процесс

Объект менеджера, возвращаемый Manager(), управляет серверным процессом, который хранит объекты Python и позволяет другим процессам манипулировать ими с помощью прокси.

Менеджер, возвращаемый Manager(), будет поддерживать типы list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value и Array. Например,

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

напечатает

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Менеджеры серверных процессов более гибкие, чем использование объектов общей памяти, потому что их можно заставить поддерживать произвольные типы объектов. Кроме того, один менеджер может быть общим для процессов на разных компьютерах по сети. Однако они работают медленнее, чем при использовании общей памяти.

Использование резерва работников

Класс Pool представляет собой пул рабочих процессов. В нем есть методы, которые позволяют перекладывать задачи на рабочие процессы несколькими различными способами.

Например:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 seconds
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

Обратите внимание, что методы пула должны использоваться только тем процессом, который его создал.

Примечание

Функциональность этого пакета требует, чтобы модуль __main__ был импортируемым дочерними модулями. Об этом говорится в Рекомендации по программированию, однако здесь стоит на это обратить внимание. Это означает, что некоторые примеры, такие как примеры multiprocessing.pool.Pool, не будут работать в интерактивном интерпретаторе. Например:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...     p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>

(Если вы попробуете это сделать, то на самом деле будет выведено три полных трассировки, чередующихся полуслучайным образом, и тогда вам, возможно, придется как-то остановить родительский процесс).

Ссылка

Пакет multiprocessing в основном повторяет API модуля threading.

Process и исключения

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Объекты Process представляют деятельность, которая выполняется в отдельном процессе. Класс Process имеет эквиваленты всех методов threading.Thread.

Конструктор всегда должен вызываться с аргументами в виде ключевых слов. group всегда должно быть None; оно существует исключительно для совместимости с threading.Thread. target - это вызываемый объект, который будет вызван методом run(). По умолчанию оно равно None, что означает, что ничего не вызывается. name - имя процесса (см. name для более подробной информации). args - кортеж аргументов для целевого вызова. kwargs - словарь аргументов с ключевыми словами для целевого вызова. Если указан, аргумент daemon, содержащий только ключевое слово, устанавливает флаг daemon процесса на True или False. Если None (по умолчанию), то этот флаг будет унаследован от создающего процесса.

По умолчанию в target не передается никаких аргументов. Аргумент args, который по умолчанию равен (), может быть использован для указания списка или кортежа аргументов, которые нужно передать target.

Если подкласс переопределяет конструктор, он должен убедиться, что вызывает конструктор базового класса (Process.__init__()), прежде чем делать что-либо еще с процессом.

Изменено в версии 3.3: Добавлен параметр daemon.

run()

Метод, представляющий деятельность процесса.

Вы можете переопределить этот метод в подклассе. Стандартный метод run() вызывает вызываемый объект, переданный конструктору объекта в качестве целевого аргумента, если таковой имеется, с последовательными и ключевыми аргументами, взятыми из аргументов args и kwargs, соответственно.

Использование списка или кортежа в качестве аргумента args, передаваемого в Process, позволяет добиться того же эффекта.

Пример:

>>> from multiprocessing import Process
>>> p = Process(target=print, args=[1])
>>> p.run()
1
>>> p = Process(target=print, args=(1,))
>>> p.run()
1
start()

Запустите активность процесса.

Эта функция должна вызываться не более одного раза для каждого объекта процесса. Он организует вызов метода run() объекта в отдельном процессе.

join([timeout])

Если необязательный аргумент timeout равен None (по умолчанию), метод блокируется до тех пор, пока не завершится процесс, чей метод вызван join(). Если timeout - положительное число, метод блокирует не более timeout секунд. Обратите внимание, что метод возвращает None, если его процесс завершается или если метод завершает работу. Проверьте exitcode процесса, чтобы определить, завершился ли он.

К одному процессу можно присоединяться много раз.

Процесс не может присоединиться к самому себе, так как это приведет к тупику. Ошибкой является попытка присоединиться к процессу до того, как он был запущен.

name

Имя процесса. Имя - это строка, используемая только для идентификации. Оно не имеет никакой семантики. Одно и то же имя может быть присвоено нескольким процессам.

Начальное имя задается конструктором. Если конструктору не задано явное имя, то строится имя вида „Process-N1:N2:…:Nk“, где каждый Nk является N-м дочерним элементом своего родителя.

is_alive()

Возвращает, жив ли данный процесс.

Грубо говоря, объект процесса жив с момента возвращения метода start() до завершения дочернего процесса.

daemon

Флаг демона процесса, булево значение. Он должен быть установлен до вызова start().

Начальное значение наследуется от процесса создания.

Когда процесс завершается, он пытается завершить все свои дочерние демонические процессы.

Обратите внимание, что демоническому процессу не разрешается создавать дочерние процессы. В противном случае демонический процесс оставит свои дочерние процессы сиротами, если он будет завершен при выходе из родительского процесса. Кроме того, это не демоны или службы Unix, это обычные процессы, которые будут завершены (и не присоединятся), если завершились не демонические процессы.

В дополнение к threading.Thread API, объекты Process также поддерживают следующие атрибуты и методы:

pid

Возвращает идентификатор процесса. До порождения процесса это будет None.

exitcode

Код завершения дочернего процесса. Он будет равен None, если процесс еще не завершился.

Если дочерний метод run() завершился нормально, код выхода будет равен 0. Если он завершился через sys.exit() с целочисленным аргументом N, код выхода будет равен N.

Если дочерняя программа завершилась из-за исключения, не пойманного в пределах run(), код выхода будет равен 1. Если он был завершен по сигналу N, код выхода будет отрицательным значением -N.

authkey

Ключ аутентификации процесса (байтовая строка).

При инициализации multiprocessing главному процессу присваивается случайная строка с помощью os.urandom().

Когда создается объект Process, он наследует ключ аутентификации своего родительского процесса, хотя его можно изменить, установив authkey в другую байтовую строку.

См. Ключи аутентификации.

sentinel

Числовой хэндл системного объекта, который станет «готовым» после завершения процесса.

Вы можете использовать это значение, если хотите ждать сразу несколько событий, используя multiprocessing.connection.wait(). В противном случае вызов join() будет более простым.

В Windows это дескриптор ОС, используемый с помощью API-вызовов семейства WaitForSingleObject и WaitForMultipleObjects. В POSIX это дескриптор файла, используемый с примитивами из модуля select.

Added in version 3.3.

terminate()

Завершить процесс. На POSIX для этого используется сигнал SIGTERM; на Windows - TerminateProcess(). Обратите внимание, что обработчики выхода, клаузулы finally и т. д. не будут выполнены.

Обратите внимание, что процессы-потомки этого процесса не будут завершены - они просто станут сиротами.

Предупреждение

Если этот метод используется, когда связанный процесс использует трубу или очередь, то труба или очередь могут быть повреждены и стать непригодными для использования другими процессами. Аналогично, если процесс получил блокировку или семафор и т. д., то его завершение может привести к тупику других процессов.

kill()

То же самое, что и terminate(), но с использованием сигнала SIGKILL на POSIX.

Added in version 3.7.

close()

Закрывает объект Process, освобождая все связанные с ним ресурсы. Если основной процесс все еще запущен, будет вызван сигнал ValueError. После успешного возврата close() большинство других методов и атрибутов объекта Process будут вызывать ValueError.

Added in version 3.7.

Обратите внимание, что методы start(), join(), is_alive(), terminate() и exitcode должны вызываться только тем процессом, который создал объект процесса.

Пример использования некоторых методов из Process:

>>> import multiprocessing, time, signal
>>> mp_context = multiprocessing.get_context('spawn')
>>> p = mp_context.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<...Process ... initial> False
>>> p.start()
>>> print(p, p.is_alive())
<...Process ... started> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<...Process ... stopped exitcode=-SIGTERM> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError

Базовый класс всех исключений multiprocessing.

exception multiprocessing.BufferTooShort

Исключение, вызываемое Connection.recv_bytes_into(), когда предоставленный объект буфера слишком мал для прочитанного сообщения.

Если e является экземпляром BufferTooShort, то e.args[0] выдаст сообщение в виде байтовой строки.

exception multiprocessing.AuthenticationError

Возникает при ошибке аутентификации.

exception multiprocessing.TimeoutError

Вызывается методами с таймаутом, когда таймаут истекает.

Трубы и очереди

При использовании нескольких процессов для связи между ними обычно используется передача сообщений, что позволяет избежать использования таких примитивов синхронизации, как блокировки.

Для передачи сообщений можно использовать Pipe() (для соединения между двумя процессами) или очередь (которая позволяет использовать несколько производителей и потребителей).

Типы Queue, SimpleQueue и JoinableQueue представляют собой многопроизводные и многопотребительские очереди FIFO, созданные по образцу класса queue.Queue из стандартной библиотеки. Они отличаются тем, что в Queue отсутствуют методы task_done() и join(), появившиеся в классе queue.Queue в Python 2.5.

Если вы используете JoinableQueue, то вы должны вызывать JoinableQueue.task_done() для каждой задачи, удаленной из очереди, иначе семафор, используемый для подсчета количества незавершенных задач, может в конечном итоге переполниться, вызвав исключение.

Обратите внимание, что общую очередь можно создать и с помощью объекта-менеджера - см. раздел Менеджеры.

Примечание

multiprocessing использует обычные исключения queue.Empty и queue.Full, чтобы сигнализировать о тайм-ауте. Они недоступны в пространстве имен multiprocessing, поэтому их нужно импортировать из queue.

Примечание

Когда объект помещается в очередь, он пикируется, а фоновый поток позже сбрасывает пикированные данные в нижележащую трубу. Это имеет некоторые последствия, которые немного удивляют, но не должны вызывать никаких практических трудностей - если они вас действительно беспокоят, то вы можете использовать очередь, созданную с помощью manager.

  1. После помещения объекта в пустую очередь может произойти бесконечно малая задержка, прежде чем метод empty() очереди вернет False, а get_nowait() может вернуться, не поднимая queue.Empty.

  2. Если несколько процессов ставят объекты в очередь, возможно, что объекты будут получены на другом конце не по порядку. Однако объекты, зачисленные в очередь одним и тем же процессом, всегда будут располагаться в ожидаемом порядке относительно друг друга.

Предупреждение

Если процесс будет убит с помощью Process.terminate() или os.kill(), в то время как он пытается использовать Queue, то данные в очереди, скорее всего, будут повреждены. Это может привести к тому, что любой другой процесс получит исключение при попытке использовать очередь в дальнейшем.

Предупреждение

Как упоминалось выше, если дочерний процесс поместил элементы в очередь (и не использовал JoinableQueue.cancel_join_thread), то этот процесс не завершится, пока все буферизованные элементы не будут смыты в трубу.

Это означает, что если вы попытаетесь присоединиться к этому процессу, то можете получить тупик, если не будете уверены, что все элементы, которые были поставлены в очередь, были потреблены. Аналогично, если дочерний процесс не является демоническим, то родительский процесс может зависнуть на выходе, когда попытается присоединиться ко всем своим недемоническим дочерним процессам.

Обратите внимание, что очередь, созданная с помощью менеджера, не имеет этой проблемы. См. Рекомендации по программированию.

Пример использования очередей для межпроцессного взаимодействия приведен в Примеры.

multiprocessing.Pipe([duplex])

Возвращает пару (conn1, conn2) из Connection объектов, представляющих концы трубы.

Если duplex равен True (по умолчанию), то труба двунаправленная. Если duplex равно False, то труба будет однонаправленной: conn1 можно использовать только для приема сообщений, а conn2 - только для отправки.

class multiprocessing.Queue([maxsize])

Возвращает общую очередь процессов, реализованную с помощью трубы и нескольких блокировок/семафоров. Когда процесс впервые помещает элемент в очередь, запускается поток-фидер, который передает объекты из буфера в трубу.

Обычные исключения queue.Empty и queue.Full из модуля queue стандартной библиотеки поднимаются, чтобы сигнализировать о тайм-ауте.

В Queue реализованы все методы queue.Queue, кроме task_done() и join().

qsize()

Возвращает приблизительный размер очереди. Из-за семантики многопоточности/многопроцессорности это число не является надежным.

Обратите внимание, что это может привести к появлению NotImplementedError на таких платформах, как macOS, где sem_getvalue() не реализована.

empty()

Возвращает True, если очередь пуста, False в противном случае. Из-за семантики многопоточности/многопроцессорности это ненадежно.

full()

Возвращает True, если очередь заполнена, False в противном случае. Из-за семантики многопоточности/многопроцессорности это ненадежно.

put(obj[, block[, timeout]])

Помещает объект в очередь. Если необязательный аргумент block равен True (по умолчанию), а timeout равен None (по умолчанию), то при необходимости блокируется до тех пор, пока не появится свободный слот. Если timeout - положительное число, блокировка длится не более timeout секунд и вызывает исключение queue.Full, если в течение этого времени свободный слот не был доступен. В противном случае (block равно False), если свободный слот доступен немедленно, поместите элемент в очередь, иначе вызовите исключение queue.Full (timeout в этом случае игнорируется).

Изменено в версии 3.8: Если очередь закрыта, вместо AssertionError поднимается ValueError.

put_nowait(obj)

Эквивалент put(obj, False).

get([block[, timeout]])

Удаляет и возвращает элемент из очереди. Если опциональные args block равны True (по умолчанию) и timeout равны None (по умолчанию), то при необходимости блокируется до тех пор, пока элемент не будет доступен. Если timeout - положительное число, блокировка длится не более timeout секунд и вызывает исключение queue.Empty, если в течение этого времени элемент не был доступен. В противном случае (блок равен False), возвращает элемент, если он доступен немедленно, иначе вызывает исключение queue.Empty (timeout в этом случае игнорируется).

Изменено в версии 3.8: Если очередь закрыта, вместо OSError поднимается ValueError.

get_nowait()

Эквивалент get(False).

В multiprocessing.Queue есть несколько дополнительных методов, которых нет в queue.Queue. Эти методы обычно не нужны для большинства кода:

close()

Указывает, что текущий процесс больше не будет помещать данные в эту очередь. Фоновый поток завершится после того, как выгрузит все буферизованные данные в трубу. Это вызывается автоматически при сборке очереди.

join_thread()

Присоединиться к фоновому потоку. Может использоваться только после вызова close(). Он блокирует выполнение до выхода из фонового потока, гарантируя, что все данные в буфере были смыты в трубу.

По умолчанию, если процесс не является создателем очереди, то при выходе он попытается присоединиться к фоновому потоку очереди. Процесс может вызвать cancel_join_thread(), чтобы заставить join_thread() ничего не делать.

cancel_join_thread()

Предотвратите блокировку join_thread(). В частности, это предотвращает автоматическое присоединение к фоновому потоку при выходе из процесса - см. join_thread().

Более подходящим названием для этого метода может быть allow_exit_without_flush(). Он, скорее всего, приведет к потере выстроенных данных, и вам почти наверняка не понадобится его использовать. Он действительно нужен только в том случае, если вам нужно, чтобы текущий процесс немедленно завершился, не дожидаясь смыва выстроенных данных в базовую трубу, и вас не волнует потеря данных.

Примечание

Для работы этого класса необходима работающая реализация общего семафора в операционной системе хоста. При отсутствии таковой функциональность данного класса будет отключена, а попытки инстанцировать Queue приведут к появлению ImportError. Дополнительную информацию см. в разделе bpo-3770. То же самое справедливо для любого из специализированных типов очередей, перечисленных ниже.

class multiprocessing.SimpleQueue

Это упрощенный тип Queue, очень близкий к заблокированному Pipe.

close()

Закрыть очередь: освободить внутренние ресурсы.

Очередь не должна больше использоваться после ее закрытия. Например, методы get(), put() и empty() больше не должны вызываться.

Added in version 3.9.

empty()

Возвращает True, если очередь пуста, False в противном случае.

get()

Удаление и возврат элемента из очереди.

put(item)

Поместите элемент в очередь.

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue, подкласс Queue, - это очередь, которая дополнительно имеет методы task_done() и join().

task_done()

Указывает, что ранее поставленная в очередь задача завершена. Используется потребителями очереди. Для каждого get(), использованного для получения задачи, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.

Если в данный момент join() блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызов task_done() был получен для каждого элемента, который был put() в очереди).

Вызывает ошибку ValueError, если вызывается большее количество раз, чем было помещено элементов в очередь.

join()

Блокируйте до тех пор, пока все элементы в очереди не будут получены и обработаны.

Счетчик незавершенных задач увеличивается каждый раз, когда в очередь добавляется элемент. Счетчик уменьшается всякий раз, когда потребитель вызывает task_done(), чтобы указать, что элемент был получен и вся работа над ним завершена. Когда счетчик незавершенных задач падает до нуля, join() разблокируется.

Разное

multiprocessing.active_children()

Возвращает список всех живых дочерних процессов текущего процесса.

Вызов этой функции имеет побочный эффект «присоединения» к процессам, которые уже завершились.

multiprocessing.cpu_count()

Возвращает количество центральных процессоров в системе.

Это число не эквивалентно количеству процессоров, которые может использовать текущий процесс. Число используемых процессоров можно получить с помощью os.process_cpu_count() (или len(os.sched_getaffinity(0))).

Если количество процессоров не может быть определено, выдается сообщение NotImplementedError.

Изменено в версии 3.13: Возвращаемое значение также можно переопределить с помощью флага -X cpu_count или PYTHON_CPU_COUNT, поскольку это всего лишь обертка вокруг API os cpu count.

multiprocessing.current_process()

Возвращает объект Process, соответствующий текущему процессу.

Аналог threading.current_thread().

multiprocessing.parent_process()

Возвращает объект Process, соответствующий родительскому процессу current_process(). Для главного процесса parent_process будет None.

Added in version 3.8.

multiprocessing.freeze_support()

Добавьте поддержку для случаев, когда программа, использующая multiprocessing, была заморожена для создания исполняемого файла Windows. (Проверено с помощью py2exe, PyInstaller и cx_Freeze).

Вызывать эту функцию нужно сразу после строки if __name__ == '__main__' основного модуля. Например:

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

Если строка freeze_support() опущена, то попытка запустить замороженный исполняемый файл приведет к появлению RuntimeError.

Вызов freeze_support() не имеет никакого эффекта при вызове в любой операционной системе, отличной от Windows. Кроме того, если модуль нормально выполняется интерпретатором Python под Windows (программа не была заморожена), то вызов freeze_support() не имеет никакого эффекта.

multiprocessing.get_all_start_methods()

Возвращает список поддерживаемых методов запуска, первый из которых используется по умолчанию. Возможные методы запуска: 'fork', 'spawn' и 'forkserver'. Не все платформы поддерживают все методы. См. раздел Контексты и методы запуска.

Added in version 3.4.

multiprocessing.get_context(method=None)

Возвращает объект контекста, имеющий те же атрибуты, что и модуль multiprocessing.

Если method имеет значение None, то возвращается контекст по умолчанию. В противном случае method должен быть 'fork', 'spawn', 'forkserver'. Если указанный метод запуска недоступен, возникает сообщение ValueError. См. Контексты и методы запуска.

Added in version 3.4.

multiprocessing.get_start_method(allow_none=False)

Возвращает имя метода запуска, используемого для запуска процессов.

Если метод запуска не был зафиксирован и allow_none равно false, то метод запуска фиксируется на значении по умолчанию и возвращается имя. Если метод запуска не был зафиксирован и allow_none равно true, то возвращается None.

Возвращаемое значение может быть 'fork', 'spawn', 'forkserver' или None. См. Контексты и методы запуска.

Added in version 3.4.

Изменено в версии 3.8: В macOS метод запуска spawn теперь используется по умолчанию. Метод запуска fork следует считать небезопасным, так как он может привести к сбоям в работе подпроцесса. См. bpo-33725.

multiprocessing.set_executable(executable)

Устанавливает путь к интерпретатору Python, который будет использоваться при запуске дочернего процесса. (По умолчанию используется sys.executable). Встраивателям, вероятно, потребуется сделать что-то вроде

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

прежде чем они смогут создавать дочерние процессы.

Изменено в версии 3.4: Теперь поддерживается на POSIX, когда используется метод запуска 'spawn'.

Изменено в версии 3.11: Принимает path-like object.

multiprocessing.set_forkserver_preload(module_names)

Задает список имен модулей, которые главный процесс форк-сервера должен попытаться импортировать, чтобы их уже импортированное состояние было унаследовано форк-процессами. Любые ImportError при этом молча игнорируются. Это может быть использовано для повышения производительности, чтобы избежать повторной работы в каждом процессе.

Чтобы эта функция работала, она должна быть вызвана до запуска процесса forkserver (до создания Pool или запуска Process).

Имеет смысл только при использовании метода запуска 'forkserver'. См. Контексты и методы запуска.

Added in version 3.4.

multiprocessing.set_start_method(method, force=False)

Устанавливает метод, который должен использоваться для запуска дочерних процессов. Аргумент method может быть 'fork', 'spawn' или 'forkserver'. Вызывает ошибку RuntimeError, если метод запуска уже был задан и force не равен True. Если method равно None и force равно True, то метод запуска будет установлен на None. Если method равно None и force равно False, то контекст устанавливается на контекст по умолчанию.

Обратите внимание, что эта функция должна вызываться не более одного раза, и она должна быть защищена внутри if __name__ == '__main__' в главном модуле.

См. Контексты и методы запуска.

Added in version 3.4.

Объекты соединения

Объекты соединений позволяют отправлять и получать picklable объекты или строки. Их можно рассматривать как ориентированные на сообщения подключенные сокеты.

Объекты соединения обычно создаются с помощью Pipe - см. также Слушатели и клиенты.

class multiprocessing.connection.Connection
send(obj)

Отправьте на другой конец соединения объект, который должен быть прочитан с помощью recv().

Объект должен быть picklable. Очень большие пикули (примерно 32 Мб+, хотя это зависит от ОС) могут вызвать исключение ValueError.

recv()

Возвращает объект, отправленный с другого конца соединения с помощью send(). Блокируется до тех пор, пока не будет что получать. Вызывает EOFError, если нечего принимать и другой конец был закрыт.

fileno()

Возвращает дескриптор файла или дескриптор, используемый соединением.

close()

Закройте соединение.

Эта функция вызывается автоматически, когда соединение собирается в мусор.

poll([timeout])

Возвращает, имеются ли данные, доступные для чтения.

Если timeout не указан, то возврат происходит немедленно. Если timeout - это число, то оно задает максимальное время блокировки в секундах. Если timeout имеет значение None, то используется бесконечный таймаут.

Обратите внимание, что с помощью multiprocessing.connection.wait() можно опрашивать сразу несколько объектов соединения.

send_bytes(buffer[, offset[, size]])

Отправка байтовых данных из bytes-like object в виде полного сообщения.

Если задано offset, то данные будут прочитаны из этой позиции в буфере. Если задан size, то из буфера будет прочитано столько байт, сколько указано. Очень большие буферы (примерно 32 MiB+, хотя это зависит от ОС) могут вызвать исключение ValueError.

recv_bytes([maxlength])

Возвращает полное сообщение с байтовыми данными, отправленное с другого конца соединения, в виде строки. Блокирует до тех пор, пока не будет что получать. Вызывает EOFError, если нечего принимать и другой конец соединения закрыт.

Если указано maxlength и сообщение длиннее maxlength, то будет поднят OSError и соединение больше не будет доступно для чтения.

Изменено в версии 3.3: Раньше эта функция поднимала IOError, которая теперь является псевдонимом OSError.

recv_bytes_into(buffer[, offset])

Считывает в буфер полное сообщение с байтовыми данными, отправленное с другого конца соединения, и возвращает количество байтов в сообщении. Блокирует до тех пор, пока не будет что получать. Вызывает EOFError, если нечего принимать и другой конец соединения был закрыт.

buffer должен быть записываемым bytes-like object. Если указано offset, то сообщение будет записано в буфер с этой позиции. Смещение должно быть неотрицательным целым числом, меньшим, чем длина буфера (в байтах).

Если буфер слишком короткий, то возникает исключение BufferTooShort, а полное сообщение доступно как e.args[0], где e - экземпляр исключения.

Изменено в версии 3.3: Сами объекты соединений теперь можно передавать между процессами с помощью Connection.send() и Connection.recv().

Объекты соединения также теперь поддерживают протокол управления контекстом - см. Типы менеджеров контекста. __enter__() возвращает объект соединения, а __exit__() вызывает close().

Например:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

Предупреждение

Метод Connection.recv() автоматически распаковывает полученные данные, что может представлять угрозу безопасности, если вы не доверяете процессу, отправившему сообщение.

Поэтому, если объект соединения не был создан с помощью Pipe(), вы должны использовать методы recv() и send() только после выполнения какой-либо аутентификации. См. раздел Ключи аутентификации.

Предупреждение

Если процесс будет убит во время попытки чтения или записи в трубу, то данные в трубе, скорее всего, будут испорчены, поскольку невозможно будет определить, где находятся границы сообщений.

Примитивы синхронизации

Как правило, примитивы синхронизации не так необходимы в многопроцессной программе, как в многопоточной. См. документацию по модулю threading.

Обратите внимание, что примитивы синхронизации можно создавать и с помощью объекта-менеджера - см. раздел Менеджеры.

class multiprocessing.Barrier(parties[, action[, timeout]])

Объект барьера: клон threading.Barrier.

Added in version 3.3.

class multiprocessing.BoundedSemaphore([value])

Ограниченный объект семафора: близкий аналог threading.BoundedSemaphore.

Единственное отличие от близкого аналога: первый аргумент метода acquire называется block, как и в случае с Lock.acquire().

Примечание

В macOS это неотличимо от Semaphore, поскольку sem_getvalue() на этой платформе не реализовано.

class multiprocessing.Condition([lock])

Переменная условия: псевдоним для threading.Condition.

Если указан lock, то это должен быть объект Lock или RLock из multiprocessing.

Изменено в версии 3.3: Был добавлен метод wait_for().

class multiprocessing.Event

Клон threading.Event.

class multiprocessing.Lock

Объект нерекурсивной блокировки: близкий аналог threading.Lock. После того как процесс или поток приобрел блокировку, последующие попытки получить ее от любого процесса или потока будут блокироваться до тех пор, пока блокировка не будет освобождена; любой процесс или поток может освободить ее. Концепции и поведение threading.Lock применительно к потокам воспроизводятся здесь в multiprocessing.Lock применительно к процессам или потокам, за исключением отмеченного.

Обратите внимание, что Lock на самом деле является фабричной функцией, которая возвращает экземпляр multiprocessing.synchronize.Lock, инициализированный контекстом по умолчанию.

Lock поддерживает протокол context manager и поэтому может использоваться в утверждениях with.

acquire(block=True, timeout=None)

Получение блокировки, блокирующей или неблокирующей.

Если аргумент block установлен в True (по умолчанию), вызов метода будет блокироваться до тех пор, пока блокировка не окажется в разблокированном состоянии, затем установит ее в состояние locked и вернет True. Обратите внимание, что имя этого первого аргумента отличается от имени аргумента threading.Lock.acquire().

Если аргумент block установлен в False, вызов метода не блокируется. Если замок в данный момент находится в заблокированном состоянии, верните False; в противном случае переведите замок в заблокированное состояние и верните True.

При вызове с положительным значением timeout с плавающей точкой, блокировка выполняется не более чем на количество секунд, указанное timeout, пока блокировка не может быть получена. Вызовы с отрицательным значением timeout эквивалентны timeout, равному нулю. Вызовы со значением timeout, равным None (по умолчанию), устанавливают период тайм-аута бесконечным. Обратите внимание, что обработка отрицательных значений или значений None для timeout отличается от реализованного поведения в threading.Lock.acquire(). Аргумент timeout не имеет практических последствий, если аргумент block имеет значение False и поэтому игнорируется. Возвращает True, если блокировка была получена, или False, если истекло время тайм-аута.

release()

Освободить блокировку. Эта функция может быть вызвана из любого процесса или потока, а не только из процесса или потока, который первоначально получил блокировку.

Поведение такое же, как и в threading.Lock.release(), за исключением того, что при вызове на разблокированной блокировке возникает ошибка ValueError.

class multiprocessing.RLock

Объект рекурсивной блокировки: близкий аналог threading.RLock. Рекурсивная блокировка должна быть освобождена процессом или потоком, который ее приобрел. После того как процесс или поток приобрел рекурсивную блокировку, этот же процесс или поток может получить ее снова без блокировки; этот процесс или поток должен освободить ее один раз за каждый раз, когда она была получена.

Обратите внимание, что RLock на самом деле является фабричной функцией, которая возвращает экземпляр multiprocessing.synchronize.RLock, инициализированный контекстом по умолчанию.

RLock поддерживает протокол context manager и поэтому может использоваться в утверждениях with.

acquire(block=True, timeout=None)

Получение блокировки, блокирующей или неблокирующей.

При вызове с аргументом block, установленным в True, блокировка происходит до тех пор, пока блокировка не окажется в разблокированном состоянии (не будет принадлежать ни одному процессу или потоку), если только блокировка уже не принадлежит текущему процессу или потоку. Затем текущий процесс или поток получает право собственности на блокировку (если оно еще не принадлежит ему), а уровень рекурсии внутри блокировки увеличивается на единицу, в результате чего возвращается значение True. Обратите внимание, что в поведении этого первого аргумента есть несколько различий по сравнению с реализацией threading.RLock.acquire(), начиная с названия самого аргумента.

При вызове с аргументом block, установленным в False, блокировка не производится. Если блокировка уже была приобретена (и, следовательно, принадлежит) другим процессом или потоком, текущий процесс или поток не принимает права собственности и уровень рекурсии в блокировке не изменяется, что приводит к возврату значения False. Если блокировка находится в разблокированном состоянии, текущий процесс или поток получает право собственности, а уровень рекурсии увеличивается, в результате чего возвращается значение True.

Использование и поведение аргумента timeout такие же, как и в Lock.acquire(). Обратите внимание, что некоторые из этих поведений timeout отличаются от реализованных в threading.RLock.acquire().

release()

Освободите блокировку, уменьшив уровень рекурсии. Если после декремента уровень рекурсии равен нулю, сбросьте блокировку на разблокированную (не принадлежащую ни одному процессу или потоку), и если какие-либо другие процессы или потоки заблокированы в ожидании разблокировки блокировки, разрешите ровно одному из них продолжить выполнение. Если после декремента уровень рекурсии все еще ненулевой, блокировка остается заблокированной и принадлежит вызывающему процессу или потоку.

Вызывайте этот метод только тогда, когда вызывающий процесс или поток владеет блокировкой. Если этот метод вызывается процессом или потоком, не являющимся владельцем, или если блокировка находится в разблокированном (не принадлежащем владельцу) состоянии, то возникает исключение AssertionError. Обратите внимание, что тип исключения, возникающего в этой ситуации, отличается от реализованного поведения в threading.RLock.release().

class multiprocessing.Semaphore([value])

Объект семафора: близкий аналог threading.Semaphore.

Единственное отличие от близкого аналога: первый аргумент метода acquire называется block, как и в случае с Lock.acquire().

Примечание

В macOS функция sem_timedwait не поддерживается, поэтому вызов acquire() с таймаутом будет эмулировать поведение этой функции с помощью спящего цикла.

Примечание

Если сигнал SIGINT, генерируемый Ctrl-C, поступает в то время, когда основной поток заблокирован вызовом BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() или Condition.wait(), то этот вызов будет немедленно прерван и будет поднят KeyboardInterrupt.

Это отличается от поведения threading, где SIGINT будет игнорироваться, пока выполняются эквивалентные блокирующие вызовы.

Примечание

Некоторые функции этого пакета требуют наличия в операционной системе хоста работающей реализации общего семафора. Без нее модуль multiprocessing.synchronize будет отключен, а попытки импортировать его приведут к ошибке ImportError. Дополнительную информацию см. в разделе bpo-3770.

Общие ctypes Объекты

Можно создавать общие объекты, используя общую память, которая может быть унаследована дочерними процессами.

multiprocessing.Value(typecode_or_type, *args, lock=True)

Возвращает объект ctypes, выделенный из общей памяти. По умолчанию возвращаемое значение представляет собой синхронизированную обертку для объекта. Доступ к самому объекту можно получить через атрибут value объекта Value.

typecode_or_type определяет тип возвращаемого объекта: это либо тип ctypes, либо односимвольный типовой код, используемый модулем array. *args передается конструктору типа.

Если lock - это True (по умолчанию), то для синхронизации доступа к значению создается новый объект рекурсивной блокировки. Если lock - это объект Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если lock - это False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным для процесса».

Операции типа +=, включающие чтение и запись, не являются атомарными. Поэтому, например, если вы хотите атомарно увеличить общее значение, недостаточно просто сделать

counter.value += 1

Предполагая, что связанная блокировка является рекурсивной (а по умолчанию это так), вы можете вместо этого сделать

with counter.get_lock():
    counter.value += 1

Обратите внимание, что lock - это аргумент только для ключевого слова.

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

Возвращает массив ctypes, выделенный из общей памяти. По умолчанию возвращаемое значение является синхронизированной оберткой для массива.

typecode_or_type определяет тип элементов возвращаемого массива: это либо тип ctypes, либо односимвольный типовой код, используемый модулем array. Если size_or_initializer - целое число, то оно определяет длину массива, и массив будет изначально обнулен. В противном случае size_or_initializer - это последовательность, которая используется для инициализации массива и длина которой определяет длину массива.

Если lock - это True (по умолчанию), то для синхронизации доступа к значению создается новый объект блокировки. Если lock - это объект Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если lock - это False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным для процесса».

Обратите внимание, что lock - это аргумент только для ключевого слова.

Обратите внимание, что массив ctypes.c_char имеет атрибуты value и raw, которые позволяют использовать его для хранения и извлечения строк.

Модуль multiprocessing.sharedctypes

Модуль multiprocessing.sharedctypes предоставляет функции для выделения объектов ctypes из общей памяти, которые могут быть унаследованы дочерними процессами.

Примечание

Хотя можно хранить указатель в общей памяти, помните, что он будет ссылаться на место в адресном пространстве конкретного процесса. Однако в контексте второго процесса указатель, скорее всего, будет недействительным, и попытка разыменовать указатель из второго процесса может привести к аварийному завершению.

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

Возвращает массив ctypes, выделенный из общей памяти.

typecode_or_type определяет тип элементов возвращаемого массива: это либо тип ctypes, либо односимвольный типовой код, используемый модулем array. Если size_or_initializer - целое число, то оно определяет длину массива, и массив будет изначально обнулен. В противном случае size_or_initializer - это последовательность, которая используется для инициализации массива и длина которой определяет длину массива.

Обратите внимание, что установка и получение элемента потенциально неатомарны - используйте вместо этого Array(), чтобы обеспечить автоматическую синхронизацию доступа с помощью блокировки.

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

Возвращает объект ctypes, выделенный из общей памяти.

typecode_or_type определяет тип возвращаемого объекта: это либо тип ctypes, либо односимвольный типовой код, используемый модулем array. *args передается конструктору типа.

Обратите внимание, что установка и получение значения потенциально неатомарны - используйте вместо этого Value(), чтобы обеспечить автоматическую синхронизацию доступа с помощью блокировки.

Обратите внимание, что массив ctypes.c_char имеет атрибуты value и raw, которые позволяют использовать его для хранения и извлечения строк - см. документацию для ctypes.

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

То же самое, что и RawArray(), за исключением того, что в зависимости от значения lock вместо необработанного массива ctypes может быть возвращена обёртка синхронизации, безопасная для процесса.

Если lock - это True (по умолчанию), то для синхронизации доступа к значению создается новый объект блокировки. Если lock - это объект Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если lock - это False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным для процесса».

Обратите внимание, что lock - это аргумент только для ключевого слова.

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

То же самое, что и RawValue(), за исключением того, что в зависимости от значения lock вместо необработанного объекта ctypes может быть возвращена обёртка синхронизации, безопасная для процесса.

Если lock - это True (по умолчанию), то для синхронизации доступа к значению создается новый объект блокировки. Если lock - это объект Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если lock - это False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным для процесса».

Обратите внимание, что lock - это аргумент только для ключевого слова.

multiprocessing.sharedctypes.copy(obj)

Возвращает объект ctypes, выделенный из общей памяти, который является копией объекта ctypes obj.

multiprocessing.sharedctypes.synchronized(obj[, lock])

Возвращает безопасный для процесса объект-обертку для объекта ctypes, который использует lock для синхронизации доступа. Если lock равен None (по умолчанию), то автоматически создается объект multiprocessing.RLock.

Синхронизированная обертка будет иметь два метода в дополнение к методам объекта, который она обертывает: get_obj() возвращает обернутый объект и get_lock() возвращает объект блокировки, используемый для синхронизации.

Обратите внимание, что доступ к объекту ctypes через обертку может быть намного медленнее, чем доступ к необработанному объекту ctypes.

Изменено в версии 3.5: Синхронизированные объекты поддерживают протокол context manager.

В таблице ниже приводится сравнение синтаксиса создания общих объектов ctypes из общей памяти с обычным синтаксисом ctypes. (В таблице MyStruct является некоторым подклассом ctypes.Structure).

ctypes

sharedctypes с использованием типа

общие типы с использованием типового кода

c_double(2.4)

RawValue(c_double, 2.4)

RawValue(„d“, 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray(„h“, 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray(„i“, (9, 2, 8))

Ниже приведен пример, в котором несколько объектов ctypes изменяются дочерним процессом:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

На печать выводятся следующие результаты:

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

Менеджеры

Менеджеры обеспечивают способ создания данных, которые могут совместно использоваться различными процессами, включая обмен по сети между процессами, запущенными на разных машинах. Объект менеджера управляет серверным процессом, который управляет общими объектами. Другие процессы могут получить доступ к общим объектам с помощью прокси.

multiprocessing.Manager()

Возвращает запущенный объект SyncManager, который может быть использован для совместного использования объектов между процессами. Возвращаемый объект менеджера соответствует порожденному дочернему процессу и имеет методы, которые будут создавать общие объекты и возвращать соответствующие прокси.

Процессы менеджера будут завершены, как только они будут собраны или их родительский процесс завершится. Классы менеджеров определены в модуле multiprocessing.managers:

class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)

Создайте объект BaseManager.

После создания необходимо вызвать start() или get_server().serve_forever(), чтобы убедиться, что объект менеджера ссылается на запущенный процесс менеджера.

address - это адрес, по которому процесс менеджера прослушивает новые соединения. Если address равен None, то выбирается произвольный адрес.

authkey - это ключ аутентификации, который будет использоваться для проверки валидности входящих соединений с серверным процессом. Если authkey равен None, то используется current_process().authkey. В противном случае используется authkey, который должен быть байтовой строкой.

сериализатор должен быть 'pickle' (используйте сериализацию pickle) или 'xmlrpclib' (используйте сериализацию xmlrpc.client).

ctx - объект контекста, или None (использование текущего контекста). См. функцию get_context().

shutdown_timeout - это таймаут в секундах, используемый для ожидания завершения процесса, используемого менеджером, в методе shutdown(). Если время завершения истекло, процесс завершается. Если время завершения процесса также истекло, процесс будет убит.

Изменено в версии 3.11: Добавлен параметр shutdown_timeout.

start([initializer[, initargs]])

Запуск подпроцесса для запуска менеджера. Если initializer не None, то при запуске подпроцесс будет вызывать initializer(*initargs).

get_server()

Возвращает объект Server, представляющий реальный сервер под управлением менеджера. Объект Server поддерживает метод serve_forever():

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

У Server дополнительно есть атрибут address.

connect()

Подключите локальный объект менеджера к удаленному процессу менеджера:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

Остановка процесса, используемого менеджером. Эта функция доступна только в том случае, если для запуска процесса сервера использовался start().

Эту функцию можно вызывать несколько раз.

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

Метод класса, который может быть использован для регистрации типа или вызываемого объекта в классе менеджера.

typeid - это «идентификатор типа», который используется для идентификации конкретного типа разделяемого объекта. Это должна быть строка.

callable - вызываемый объект, используемый для создания объектов для данного идентификатора типа. Если экземпляр менеджера будет подключен к серверу с помощью метода connect(), или если аргумент create_method равен False, то можно оставить значение None.

proxytype - это подкласс BaseProxy, который используется для создания прокси для общих объектов с данным typeid. Если None, то класс прокси создается автоматически.

exposed используется для указания последовательности имен методов, доступ к которым должен быть разрешен прокси для данного typeid с помощью BaseProxy._callmethod(). (Если exposed равен None, то вместо него используется proxytype._exposed_, если он существует.) В случае, когда список exposed не указан, будут доступны все «публичные методы» разделяемого объекта. (Здесь «публичный метод» означает любой атрибут, у которого есть метод __call__() и имя которого не начинается с '_').

method_to_typeid - это отображение, используемое для указания возвращаемого типа тех открытых методов, которые должны возвращать прокси. Оно сопоставляет имена методов со строками typeid. (Если method_to_typeid имеет значение None, то вместо него используется proxytype._method_to_typeid_, если оно существует). Если имя метода не является ключом этого отображения или если отображение имеет значение None, то объект, возвращаемый методом, будет скопирован по значению.

create_method определяет, должен ли быть создан метод с именем typeid, который может быть использован для указания серверному процессу создать новый общий объект и вернуть для него прокси. По умолчанию это значение равно True.

У экземпляров BaseManager также есть одно свойство, доступное только для чтения:

address

Адрес, используемый менеджером.

Изменено в версии 3.3: Объекты менеджера поддерживают протокол управления контекстом - см. Типы менеджеров контекста. __enter__() запускает процесс сервера (если он еще не запущен), а затем возвращает объект менеджера. __exit__() вызывает shutdown().

В предыдущих версиях __enter__() не запускал серверный процесс менеджера, если он еще не был запущен.

class multiprocessing.managers.SyncManager

Подкласс BaseManager, который может быть использован для синхронизации процессов. Объекты этого типа возвращаются multiprocessing.Manager().

Его методы создают и возвращают Прокси-объекты для ряда часто используемых типов данных, которые должны быть синхронизированы между процессами. К ним, в частности, относятся общие списки и словари.

Barrier(parties[, action[, timeout]])

Создайте общий объект threading.Barrier и верните для него прокси.

Added in version 3.3.

BoundedSemaphore([value])

Создайте общий объект threading.BoundedSemaphore и верните для него прокси.

Condition([lock])

Создайте общий объект threading.Condition и верните для него прокси.

Если указан lock, то это должен быть прокси для объекта threading.Lock или threading.RLock.

Изменено в версии 3.3: Был добавлен метод wait_for().

Event()

Создайте общий объект threading.Event и верните для него прокси.

Lock()

Создайте общий объект threading.Lock и верните для него прокси.

Namespace()

Создайте общий объект Namespace и верните для него прокси.

Queue([maxsize])

Создайте общий объект queue.Queue и верните для него прокси.

RLock()

Создайте общий объект threading.RLock и верните для него прокси.

Semaphore([value])

Создайте общий объект threading.Semaphore и верните для него прокси.

Array(typecode, sequence)

Создайте массив и верните для него прокси.

Value(typecode, value)

Создайте объект с атрибутом value, доступным для записи, и верните для него прокси.

dict()
dict(mapping)
dict(sequence)

Создайте общий объект dict и верните для него прокси.

list()
list(sequence)

Создайте общий объект list и верните для него прокси.

Изменено в версии 3.6: Общие объекты могут быть вложенными. Например, общий объект-контейнер, такой как общий список, может содержать другие общие объекты, которые будут управляться и синхронизироваться SyncManager.

class multiprocessing.managers.Namespace

Тип, который может регистрироваться с SyncManager.

Объект пространства имен не имеет открытых методов, но имеет атрибуты, доступные для записи. Его представление показывает значения его атрибутов.

Однако при использовании прокси для объекта пространства имен атрибут, начинающийся с '_', будет атрибутом прокси, а не атрибутом референта:

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

Индивидуальные менеджеры

Для создания собственного менеджера создается подкласс BaseManager и используется метод класса register() для регистрации новых типов или вызываемых элементов в классе менеджера. Например:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

Использование удаленного менеджера

Можно запустить сервер менеджера на одной машине, а клиенты будут использовать его с других машин (при условии, что соответствующие брандмауэры позволяют это сделать).

Выполнение следующих команд создает сервер для одной общей очереди, к которой могут обращаться удаленные клиенты:

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Один клиент может получить доступ к серверу следующим образом:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

Его может использовать и другой клиент:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

Локальные процессы также могут получить доступ к этой очереди, используя код, приведенный выше на клиенте, для удаленного доступа к ней:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super().__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Прокси-объекты

Прокси - это объект, который ссылается на общий объект, который живет (предположительно) в другом процессе. Считается, что общий объект является референтом прокси. Несколько прокси-объектов могут иметь одного и того же референта.

У прокси-объекта есть методы, которые вызывают соответствующие методы его референта (хотя не все методы референта обязательно будут доступны через прокси). Таким образом, прокси-объект можно использовать так же, как и его референт:

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

Обратите внимание, что применение str() к прокси вернет представление референта, в то время как применение repr() вернет представление прокси.

Важной особенностью прокси-объектов является то, что их можно передавать между процессами. Как таковой, референт может содержать Прокси-объекты. Это позволяет вложить их в управляемые списки, дикты и другие Прокси-объекты:

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

Аналогично, прокси dict и list могут быть вложены друг в друга:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

Если в референте содержатся стандартные (непрокси) объекты list или dict, модификации этих изменяемых значений не будут передаваться через менеджер, поскольку прокси не имеет возможности узнать, когда изменяются содержащиеся в нем значения. Однако сохранение значения в контейнерном прокси (что вызывает срабатывание __setitem__ на прокси-объекте) передается через менеджер, и поэтому для эффективного изменения такого элемента можно переназначить измененное значение контейнерному прокси:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

Такой подход, возможно, менее удобен, чем использование вложенных Прокси-объекты для большинства случаев использования, но также демонстрирует уровень контроля над синхронизацией.

Примечание

Типы прокси в multiprocessing не поддерживают сравнение по значению. Так, например, мы имеем:

>>> manager.list([1,2,3]) == [1,2,3]
False

При сравнении следует просто использовать копию референта.

class multiprocessing.managers.BaseProxy

Прокси-объекты являются экземплярами подклассов BaseProxy.

_callmethod(methodname[, args[, kwds]])

Вызывает и возвращает результат метода референта прокси.

Если proxy - это прокси, чьим референтом является obj, то выражение

proxy._callmethod(methodname, args, kwds)

оценит выражение

getattr(obj, methodname)(*args, **kwds)

в процессе работы менеджера.

Возвращаемое значение будет копией результата вызова или прокси для нового общего объекта - см. документацию для аргумента method_to_typeid в BaseManager.register().

Если при вызове возникло исключение, то оно повторно вызывается командой _callmethod(). Если в процессе менеджера возникло какое-то другое исключение, то оно преобразуется в исключение RemoteError и вызывается _callmethod().

В частности, обратите внимание, что если имя метода methodname не было раскрыто, то возникнет исключение.

Пример использования _callmethod():

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

Возвращает копию референта.

Если референт является непиклируемым, это вызовет исключение.

__repr__()

Возвращает представление объекта прокси.

__str__()

Возвращает представление референта.

Очистка

Прокси-объект использует обратный вызов weakref, поэтому, когда его собирают, он удаляет себя из менеджера, которому принадлежит его референт.

Общий объект удаляется из процесса менеджера, когда на него больше не ссылается ни один прокси.

Пулы процессов

С помощью класса Pool можно создать пул процессов, которые будут выполнять переданные им задания.

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

Объект пула процессов, который управляет пулом рабочих процессов, на которые могут быть отправлены задания. Он поддерживает асинхронные результаты с таймаутами и обратными вызовами и имеет параллельную реализацию карты.

processes - это количество рабочих процессов, которые необходимо использовать. Если processes равно None, то используется число, возвращаемое os.process_cpu_count().

Если initializer не None, то каждый рабочий процесс при запуске будет вызывать initializer(*initargs).

maxtasksperchild - это количество задач, которые может выполнить рабочий процесс, прежде чем он завершится и будет заменен новым рабочим процессом, чтобы освободить неиспользуемые ресурсы. По умолчанию maxtasksperchild равно None, что означает, что рабочие процессы будут жить столько же, сколько и пул.

context может использоваться для указания контекста, используемого для запуска рабочих процессов. Обычно пул создается с помощью функции multiprocessing.Pool() или метода Pool() объекта контекста. В обоих случаях context задается соответствующим образом.

Обратите внимание, что методы объекта пула должны вызываться только процессом, создавшим пул.

Предупреждение

Объекты multiprocessing.pool имеют внутренние ресурсы, которыми необходимо правильно управлять (как и любым другим ресурсом), используя пул в качестве менеджера контекста или вызывая close() и terminate() вручную. Невыполнение этого требования может привести к зависанию процесса при завершении.

Обратите внимание, что неправильно полагаться на сборщик мусора для уничтожения пула, поскольку CPython не гарантирует, что финализатор пула будет вызван (см. object.__del__() для получения дополнительной информации).

Изменено в версии 3.2: Добавлен параметр maxtasksperchild.

Изменено в версии 3.4: Добавлен параметр context.

Изменено в версии 3.13: процессы по умолчанию используют os.process_cpu_count(), а не os.cpu_count().

Примечание

Рабочие процессы в пуле Pool обычно живут в течение всего срока действия очереди работ пула. Часто встречающийся в других системах (таких как Apache, mod_wsgi и т. д.) способ освобождения ресурсов, удерживаемых рабочими процессами, заключается в том, чтобы позволить рабочему процессу в пуле завершить только определенный объем работы перед выходом, очисткой и порождением нового процесса взамен старого. Аргумент maxtasksperchild в Pool открывает эту возможность конечному пользователю.

apply(func[, args[, kwds]])

Вызовите func с аргументами args и аргументами ключевых слов kwds. Он блокирует работу до тех пор, пока не будет готов результат. Учитывая эту блокировку, apply_async() лучше подходит для параллельного выполнения работы. Кроме того, func выполняется только в одном из рабочих пула.

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

Вариант метода apply(), который возвращает объект AsyncResult.

Если указан callback, то это должен быть вызываемый элемент, принимающий один аргумент. Когда результат будет готов, к нему будет применен callback, если только вызов не завершился неудачей, в этом случае вместо него будет применен error_callback.

Если указан error_callback, то это должен быть вызываемый элемент, принимающий один аргумент. Если целевая функция не работает, то вызывается error_callback с экземпляром исключения.

Обратные вызовы должны завершаться немедленно, так как в противном случае поток, обрабатывающий результаты, будет заблокирован.

map(func, iterable[, chunksize])

Параллельный аналог встроенной функции map() (поддерживает только один аргумент iterable, для множественных итераций см. starmap()). Она блокируется до тех пор, пока не будет готов результат.

Этот метод разбивает итерируемую таблицу на несколько фрагментов, которые он передает в пул процессов как отдельные задачи. Размер (приблизительный) этих кусков можно задать, установив chunksize в целое положительное число.

Обратите внимание, что для очень длинных итераций это может привести к большим затратам памяти. Для большей эффективности рассмотрите возможность использования imap() или imap_unordered() с явной опцией chunksize.

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

Вариант метода map(), который возвращает объект AsyncResult.

Если указан callback, то это должен быть вызываемый элемент, принимающий один аргумент. Когда результат будет готов, к нему будет применен callback, если только вызов не завершился неудачей, в этом случае вместо него будет применен error_callback.

Если указан error_callback, то это должен быть вызываемый элемент, принимающий один аргумент. Если целевая функция не работает, то вызывается error_callback с экземпляром исключения.

Обратные вызовы должны завершаться немедленно, так как в противном случае поток, обрабатывающий результаты, будет заблокирован.

imap(func, iterable[, chunksize])

Более ленивая версия map().

Аргумент chunksize аналогичен тому, который используется в методе map(). Для очень длинных итераций использование большого значения chunksize может ускорить выполнение задания на много, чем использование значения по умолчанию 1.

Также если chunksize равен 1, то метод next() итератора, возвращаемого методом imap(), имеет необязательный параметр timeout: next(timeout) поднимет сообщение multiprocessing.TimeoutError, если результат не может быть возвращен в течение timeout секунд.

imap_unordered(func, iterable[, chunksize])

То же самое, что и imap(), за исключением того, что порядок следования результатов из возвращаемого итератора следует считать произвольным. (Только при наличии только одного рабочего процесса порядок гарантированно «правильный»).

starmap(func, iterable[, chunksize])

Как и map(), за исключением того, что элементы iterable должны быть итерациями, которые распаковываются как аргументы.

Отсюда следует, что цитата из [(1,2), (3, 4)] приводит к [func(1,2), func(3,4)].

Added in version 3.3.

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

Комбинация starmap() и map_async(), которая выполняет итерацию по iterable итерируемых объектов и вызывает func с распакованными итерируемыми объектами. Возвращает объект result.

Added in version 3.3.

close()

Предотвращает дальнейшее поступление заданий в пул. Как только все задания будут выполнены, рабочие процессы завершатся.

terminate()

Немедленно останавливает рабочие процессы, не завершая невыполненную работу. Когда объект пула будет собран, terminate() будет вызван немедленно.

join()

Дождитесь завершения рабочих процессов. Перед использованием join() необходимо вызвать close() или terminate().

Изменено в версии 3.3: Объекты пула теперь поддерживают протокол управления контекстом - см. Типы менеджеров контекста. __enter__() возвращает объект пула, а __exit__() вызывает terminate().

class multiprocessing.pool.AsyncResult

Класс результата, полученного с помощью Pool.apply_async() и Pool.map_async().

get([timeout])

Верните результат, когда он будет получен. Если timeout не равно None и результат не получен в течение timeout секунд, то вызывается multiprocessing.TimeoutError. Если удаленный вызов вызвал исключение, то это исключение будет повторно вызвано get().

wait([timeout])

Подождите, пока результат не будет доступен или пока не пройдет timeout секунд.

ready()

Возвращает, завершился ли вызов.

successful()

Возвращает, завершился ли вызов без возникновения исключения. Возвращает ValueError, если результат не готов.

Изменено в версии 3.7: Если результат не готов, вместо AssertionError выдается ValueError.

Следующий пример демонстрирует использование пула:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

Слушатели и клиенты

Обычно передача сообщений между процессами осуществляется с помощью очередей или с использованием объектов Connection, возвращаемых Pipe().

Однако модуль multiprocessing.connection обеспечивает дополнительную гибкость. В основном он предоставляет высокоуровневый API, ориентированный на сообщения, для работы с сокетами или именованными трубами Windows. Он также поддерживает аутентификацию digest с помощью модуля hmac и опрос нескольких соединений одновременно.

multiprocessing.connection.deliver_challenge(connection, authkey)

Отправьте случайно сгенерированное сообщение на другой конец соединения и дождитесь ответа.

Если ответ совпадает с дайджестом сообщения, использующим authkey в качестве ключа, то на другой конец соединения отправляется приветственное сообщение. В противном случае выдается сообщение AuthenticationError.

multiprocessing.connection.answer_challenge(connection, authkey)

Получите сообщение, вычислите его дайджест, используя authkey в качестве ключа, а затем отправьте дайджест обратно.

Если приветственное сообщение не было получено, то будет вызвано сообщение AuthenticationError.

multiprocessing.connection.Client(address[, family[, authkey]])

Попытка установить соединение со слушателем, использующим адрес address, с возвратом Connection.

Тип соединения определяется аргументом family, но его можно не указывать, так как он обычно определяется по формату address. (См. Форматы адресов)

Если задан authkey, а не None, он должен быть байтовой строкой и будет использоваться в качестве секретного ключа для вызова аутентификации на основе HMAC. Если authkey равен None, аутентификация не выполняется. При неудачной аутентификации выдается сообщение AuthenticationError. См. Ключи аутентификации.

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

Обертка для связанного сокета или именованной трубы Windows, которая «слушает» соединения.

address - это адрес, который будет использоваться связанным сокетом или именованной трубой объекта-слушателя.

Примечание

Если используется адрес „0.0.0.0“, он не будет подключаемой конечной точкой в Windows. Если вам нужна подключаемая конечная точка, используйте „127.0.0.1“.

family - это тип используемого сокета (или именованной трубы). Это может быть одна из строк 'AF_INET' (для сокета TCP), 'AF_UNIX' (для сокета домена Unix) или 'AF_PIPE' (для именованной трубы Windows). Из них только первая гарантированно доступна. Если family имеет значение None, то семейство определяется по формату address. Если address также имеет значение None, то выбирается значение по умолчанию. По умолчанию выбирается семейство, которое считается самым быстрым из доступных. См. Форматы адресов. Обратите внимание, что если семейство равно 'AF_UNIX', а адрес - None, то сокет будет создан в частной временной директории, созданной с помощью tempfile.mkstemp().

Если объект слушателя использует сокет, то backlog (по умолчанию 1) передается в метод listen() сокета после его привязки.

Если задан authkey, а не None, он должен быть байтовой строкой и будет использоваться в качестве секретного ключа для вызова аутентификации на основе HMAC. Если authkey равен None, аутентификация не выполняется. При неудачной аутентификации выдается сообщение AuthenticationError. См. Ключи аутентификации.

accept()

Принимает соединение на связанном сокете или именованной трубе объекта слушателя и возвращает объект Connection. Если попытка аутентификации была предпринята и не увенчалась успехом, то возвращается AuthenticationError.

close()

Закрывает связанный сокет или именованную трубу объекта слушателя. Эта функция вызывается автоматически при сборке мусора в слушателе. Однако рекомендуется вызывать его явно.

Объекты слушателей имеют следующие свойства, доступные только для чтения:

address

Адрес, который используется объектом Listener.

last_accepted

Адрес, с которого было получено последнее принятое соединение. Если этот адрес недоступен, то он равен None.

Изменено в версии 3.3: Объекты слушателей теперь поддерживают протокол управления контекстом - см. Типы менеджеров контекста. __enter__() возвращает объект слушателя, а __exit__() вызывает close().

multiprocessing.connection.wait(object_list, timeout=None)

Подождите, пока объект из object_list будет готов. Возвращает список объектов из object_list, которые готовы. Если timeout имеет значение float, то вызов блокируется не более чем на столько секунд. Если timeout равен None, то вызов будет блокироваться неограниченное время. Отрицательный таймаут эквивалентен нулевому таймауту.

Как в POSIX, так и в Windows, объект может появиться в object_list, если он

Объект соединения или сокета готов, когда из него можно читать данные или другой конец закрыт.

POSIX: wait(object_list, timeout) почти эквивалентен select.select(object_list, [], [], timeout). Разница в том, что, если select.select() прерывается сигналом, он может поднять OSError с номером ошибки EINTR, тогда как wait() этого не сделает.

Windows: Элемент в object_list должен быть либо целочисленным хэндлом, который является ожидаемым (согласно определению, используемому в документации к Win32-функции WaitForMultipleObjects()), либо объектом с методом fileno(), который возвращает хэндл сокета или хэндл трубы. (Обратите внимание, что дескрипторы труб и дескрипторы сокетов не являются ожидающими дескрипторами).

Added in version 3.3.

Примеры.

Следующий код сервера создает слушателя, который использует 'secret password' в качестве ключа аутентификации. Затем он ожидает соединения и отправляет некоторые данные клиенту:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

Следующий код подключается к серверу и получает от него некоторые данные:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

Следующий код использует wait() для ожидания сообщений от нескольких процессов одновременно:

from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

Форматы адресов

  • Адрес 'AF_INET' - это кортеж вида (hostname, port), где hostname - строка, а port - целое число.

  • Адрес 'AF_UNIX' - это строка, представляющая имя файла в файловой системе.

  • Адрес 'AF_PIPE' - это строка вида r'\\.\pipe\PipeName'. Чтобы использовать Client() для подключения к именованной трубе на удаленном компьютере с именем ServerName, нужно использовать адрес вида r'\\ServerName\pipe\PipeName'.

Обратите внимание, что любая строка, начинающаяся с двух обратных косых черточек, по умолчанию считается адресом 'AF_PIPE', а не 'AF_UNIX'.

Ключи аутентификации

При использовании Connection.recv полученные данные автоматически распаковываются. К сожалению, распикировка данных из ненадежного источника представляет собой риск для безопасности. Поэтому Listener и Client() используют модуль hmac для обеспечения аутентификации по дайджесту.

Ключ аутентификации - это байтовая строка, которую можно представить как пароль: после установления соединения оба конца потребуют доказательства того, что другому известен ключ аутентификации. (Демонстрация того, что оба конца используют один и тот же ключ, не включает в себя отправку ключа по соединению).

Если аутентификация запрошена, но ключ аутентификации не указан, то используется возвращаемое значение current_process().authkey (см. Process). Это значение будет автоматически унаследовано любым объектом Process, который создаст текущий процесс. Это означает, что (по умолчанию) все процессы многопроцессной программы будут иметь единый ключ аутентификации, который можно использовать при установке соединений между собой.

Подходящие ключи аутентификации можно также сгенерировать с помощью os.urandom().

Ведение журнала

Имеется некоторая поддержка протоколирования. Обратите внимание, однако, что пакет logging не использует общие блокировки процессов, поэтому возможно (в зависимости от типа обработчика) смешивание сообщений от разных процессов.

multiprocessing.get_logger()

Возвращает регистратор, используемый multiprocessing. При необходимости будет создан новый.

При первом создании этот регистратор имеет уровень logging.NOTSET и не имеет обработчика по умолчанию. Сообщения, отправленные в этот логгер, по умолчанию не будут передаваться в корневой логгер.

Обратите внимание, что в Windows дочерние процессы наследуют только уровень регистратора родительского процесса - любая другая настройка регистратора не наследуется.

multiprocessing.log_to_stderr(level=None)

Эта функция выполняет вызов get_logger(), но помимо возврата логгера, созданного get_logger, она добавляет обработчик, который отправляет вывод в sys.stderr, используя формат '[%(levelname)s/%(processName)s] %(message)s'. Вы можете изменить levelname логгера, передав аргумент level.

Ниже приведен пример сеанса с включенным протоколированием:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

Полную таблицу уровней протоколирования см. в модуле logging.

Модуль multiprocessing.dummy

multiprocessing.dummy повторяет API multiprocessing, но является не более чем оберткой для модуля threading.

В частности, функция Pool, предоставляемая multiprocessing.dummy, возвращает экземпляр ThreadPool, который является подклассом Pool, поддерживающим все те же вызовы методов, но использующим пул рабочих потоков, а не рабочих процессов.

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

Объект пула потоков, управляющий пулом рабочих потоков, на которые могут быть отправлены задания. Экземпляры ThreadPool полностью совместимы по интерфейсу с экземплярами Pool, и их ресурсами также необходимо правильно управлять, либо используя пул в качестве менеджера контекста, либо вызывая close() и terminate() вручную.

processes - это количество рабочих потоков, которые необходимо использовать. Если processes равно None, то используется число, возвращаемое os.process_cpu_count().

Если initializer не None, то каждый рабочий процесс при запуске будет вызывать initializer(*initargs).

В отличие от Pool, maxtasksperchild и context не могут быть предоставлены.

Примечание

У ThreadPool тот же интерфейс, что и у Pool, который был разработан для пула процессов и появился до появления модуля concurrent.futures. Поэтому он наследует некоторые операции, которые не имеют смысла для пула, поддерживаемого потоками, и имеет свой собственный тип для представления статуса асинхронных заданий, AsyncResult, который не понимается другими библиотеками.

Пользователи, как правило, предпочитают использовать concurrent.futures.ThreadPoolExecutor, который имеет более простой интерфейс, который с самого начала был разработан с учетом потоков, и который возвращает экземпляры concurrent.futures.Future, совместимые со многими другими библиотеками, включая asyncio.

Рекомендации по программированию

Существуют определенные рекомендации и идиомы, которых следует придерживаться при использовании multiprocessing.

Все методы запуска

Следующее относится ко всем методам запуска.

Избегайте общего состояния

По возможности нужно стараться избегать перемещения больших объемов данных между процессами.

Вероятно, лучше всего использовать очереди или трубы для связи между процессами, а не примитивы синхронизации нижнего уровня.

Маринованность

Убедитесь, что аргументы методов прокси являются picklable.

Безопасность прокси-серверов

Не используйте прокси-объект более чем в одном потоке, если вы не защитили его блокировкой.

(Никогда не возникает проблем с различными процессами, использующими один и тот же прокси).

Объединение зомби-процессов

В POSIX, когда процесс завершается, но к нему не присоединились, он становится зомби. Их никогда не должно быть очень много, потому что каждый раз, когда начинается новый процесс (или вызывается active_children()), все завершенные процессы, которые еще не были присоединены, будут присоединены. Также вызов Process.is_alive завершенного процесса присоединит его. Тем не менее, вероятно, хорошей практикой является явное присоединение ко всем процессам, которые вы запускаете.

Лучше наследовать, чем отбирать/не отбирать

При использовании методов запуска spawn или forkserver многие типы из multiprocessing должны быть picklable, чтобы дочерние процессы могли их использовать. Однако, как правило, следует избегать передачи общих объектов другим процессам с помощью труб или очередей. Вместо этого следует организовать программу таким образом, чтобы процесс, которому нужен доступ к общему ресурсу, созданному в другом месте, мог унаследовать его от процесса-предка.

Избегайте завершения процессов

Использование метода Process.terminate для остановки процесса может привести к тому, что все общие ресурсы (такие как блокировки, семафоры, каналы и очереди), используемые процессом в данный момент, станут нерабочими или недоступными для других процессов.

Поэтому, вероятно, лучше использовать Process.terminate только для процессов, которые никогда не используют общие ресурсы.

Присоединение к процессам, использующим очереди

Имейте в виду, что процесс, поместивший элементы в очередь, будет ждать перед завершением, пока все буферизованные элементы не будут переданы потоком-«фидером» в базовую трубу. (Чтобы избежать такого поведения, дочерний процесс может вызвать метод Queue.cancel_join_thread очереди).

Это означает, что при использовании очереди необходимо убедиться, что все элементы, которые были поставлены в очередь, в конечном итоге будут удалены до того, как процесс будет завершен. В противном случае вы не сможете быть уверены, что процессы, поставившие элементы в очередь, завершатся. Помните также, что недемонические процессы будут присоединяться автоматически.

Пример, который приведет к тупику, следующий:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

Исправление заключается в том, чтобы поменять местами две последние строки (или просто удалить строку p.join()).

Явная передача ресурсов дочерним процессам

В POSIX при использовании метода запуска fork дочерний процесс может использовать общий ресурс, созданный в родительском процессе с помощью глобального ресурса. Однако лучше передать объект в качестве аргумента в конструктор дочернего процесса.

Помимо того, что код (потенциально) совместим с Windows и другими методами запуска, это также гарантирует, что пока дочерний процесс жив, объект не будет собран в родительском процессе. Это может быть важно, если при сборке объекта в родительском процессе освобождается какой-то ресурс.

Так, например,

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

следует переписать как

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

Остерегайтесь заменять sys.stdin «файлоподобным объектом».

multiprocessing первоначально безоговорочно назывался:

os.close(sys.stdin.fileno())

в методе multiprocessing.Process._bootstrap() - это приводило к проблемам с процессами-в-процессах. Это было изменено на:

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

Это решает фундаментальную проблему столкновения процессов друг с другом, что приводит к ошибке плохого дескриптора файла, но создает потенциальную опасность для приложений, которые заменяют sys.stdin() на «файлоподобный объект» с буферизацией вывода. Эта опасность заключается в том, что если несколько процессов вызовут close() на этом файлоподобном объекте, это может привести к тому, что одни и те же данные будут выгружены в объект несколько раз, что приведет к повреждению.

Если вы напишете файлоподобный объект и реализуете собственное кэширование, вы можете сделать его fork-safe, сохраняя pid при каждом добавлении в кэш и удаляя кэш при изменении pid. Например:

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

Дополнительную информацию см. в разделах bpo-5155, bpo-5313 и bpo-5331.

Методы запуска spawn и forkserver

Есть несколько дополнительных ограничений, которые не распространяются на метод запуска fork.

Более высокая маринованность

Убедитесь, что все аргументы метода Process.__init__() являются picklable. Также, если вы подкласс Process, то убедитесь, что экземпляры будут picklable при вызове метода Process.start.

Глобальные переменные

Имейте в виду, что если код, запущенный в дочернем процессе, попытается получить доступ к глобальной переменной, то значение, которое он увидит (если оно есть), может не совпадать со значением в родительском процессе в момент вызова Process.start.

Однако глобальные переменные, которые являются просто константами уровня модуля, не вызывают никаких проблем.

Безопасный импорт главного модуля

Убедитесь, что главный модуль может быть безопасно импортирован новым интерпретатором Python, не вызывая непредвиденных побочных эффектов (например, запуска нового процесса).

Например, при использовании метода запуска spawn или forkserver запуск следующего модуля завершится с ошибкой RuntimeError:

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

Вместо этого следует защитить «точку входа» программы, используя if __name__ == '__main__': следующим образом:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(Строку freeze_support() можно опустить, если программа будет выполняться в обычном режиме, а не в замороженном).

Это позволяет вновь создаваемому интерпретатору Python безопасно импортировать модуль и затем запустить его функцию foo().

Аналогичные ограничения действуют, если пул или менеджер создаются в главном модуле.

Примеры

Демонстрация того, как создавать и использовать настраиваемые менеджеры и прокси:

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

Использование Pool:

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

Пример, показывающий, как использовать очереди для подачи заданий на набор рабочих процессов и сбора результатов:

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()