multiprocessing
— Параллелизм на основе процессов¶
Источник: Lib/multiprocessing/
Availability: не WASI, не iOS.
Этот модуль не работает или недоступен на платформах WebAssembly или iOS. Дополнительные сведения о доступности WASM см. в разделе Платформы WebAssembly; о доступности iOS - в разделе iOS.
Введение¶
multiprocessing
- это пакет, поддерживающий порождение процессов с помощью API, аналогичного модулю threading
. Пакет multiprocessing
предлагает как локальный, так и удаленный параллелизм, эффективно обходя Global Interpreter Lock за счет использования подпроцессов вместо потоков. Благодаря этому модуль multiprocessing
позволяет программисту полностью задействовать несколько процессоров на данной машине. Он работает как под POSIX, так и под Windows.
В модуле multiprocessing
также появились API, не имеющие аналогов в модуле threading
. Ярким примером этого является объект Pool
, который предлагает удобное средство распараллеливания выполнения функции по нескольким входным значениям, распределяя входные данные по процессам (параллелизм данных). Следующий пример демонстрирует распространенную практику определения таких функций в модуле, чтобы дочерние процессы могли успешно импортировать этот модуль. Этот базовый пример параллелизма данных с использованием Pool
,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
выведет на стандартный вывод
[1, 4, 9]
См.также
concurrent.futures.ProcessPoolExecutor
предлагает интерфейс более высокого уровня для передачи задач фоновому процессу, не блокируя выполнение вызывающего процесса. По сравнению с непосредственным использованием интерфейса Pool
, интерфейс concurrent.futures
API позволяет отделить отправку работы в пул базовых процессов от ожидания результатов.
Класс Process
¶
В multiprocessing
процессы порождаются путем создания объекта Process
и последующего вызова его метода start()
. В Process
используется API из threading.Thread
. Тривиальным примером многопроцессной программы является
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
Чтобы показать отдельные идентификаторы процессов, приведем расширенный пример:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
О том, зачем нужна часть if __name__ == '__main__'
, смотрите в разделе Рекомендации по программированию.
Контексты и методы запуска¶
В зависимости от платформы, multiprocessing
поддерживает три способа запуска процесса. К этим способам запуска относятся
- spawn
Родительский процесс запускает новый процесс интерпретатора Python. Дочерний процесс наследует только те ресурсы, которые необходимы для выполнения метода
run()
объекта процесса. В частности, ненужные файловые дескрипторы и дескрипторы родительского процесса не будут унаследованы. Запуск процесса с помощью этого метода довольно медленный по сравнению с использованием fork или forkserver.Доступно на платформах POSIX и Windows. По умолчанию в Windows и macOS.
- fork
Родительский процесс использует
os.fork()
для форка интерпретатора Python. Дочерний процесс, когда он запускается, фактически идентичен родительскому. Все ресурсы родительского процесса наследуются дочерним процессом. Обратите внимание, что безопасное форкирование многопоточного процесса проблематично.Доступно на POSIX-системах. В настоящее время используется по умолчанию в POSIX, за исключением macOS.
Примечание
В Python 3.14 метод запуска по умолчанию будет заменен на fork. Код, требующий fork, должен явно указать это через
get_context()
илиset_start_method()
.Изменено в версии 3.12: Если Python обнаружит, что ваш процесс имеет несколько потоков, функция
os.fork()
, которую вызывает этот метод запуска, вызовет ошибкуDeprecationWarning
. Используйте другой метод запуска. Более подробные объяснения см. в документации поos.fork()
.- forkserver
Когда программа запускается и выбирает метод запуска forkserver, порождается серверный процесс. С этого момента всякий раз, когда требуется новый процесс, родительский процесс соединяется с сервером и запрашивает у него форк нового процесса. Процесс fork server является однопоточным, если только системные библиотеки или предварительно загруженный импорт не порождают потоки в качестве побочного эффекта, поэтому в целом безопасно использовать
os.fork()
. Никаких лишних ресурсов не наследуется.Доступен на POSIX-платформах, поддерживающих передачу файловых дескрипторов через Unix pipes, таких как Linux.
Изменено в версии 3.4: Добавлены spawn для всех POSIX-платформ и forkserver для некоторых POSIX-платформ. В Windows дочерние процессы больше не наследуют все ручки, наследуемые родителями.
Изменено в версии 3.8: В macOS метод запуска spawn теперь используется по умолчанию. Метод запуска fork следует считать небезопасным, поскольку он может привести к аварийному завершению подпроцесса, так как системные библиотеки macOS могут запускать потоки. См. bpo-33725.
На POSIX при использовании методов запуска spawn или forkserver также запускается процесс resource tracker, который отслеживает несвязанные именованные системные ресурсы (например, именованные семафоры или объекты SharedMemory
), созданные процессами программы. Когда все процессы завершат свою работу, процесс отслеживания ресурсов отсоединит все оставшиеся отслеживаемые объекты. Обычно их не должно быть, но если процесс был убит сигналом, то могут остаться «просочившиеся» ресурсы. (Ни утечка семафоров, ни сегменты общей памяти не будут автоматически отсоединены до следующей перезагрузки. Это проблематично для обоих объектов, поскольку система позволяет использовать только ограниченное количество именованных семафоров, а сегменты разделяемой памяти занимают некоторое место в основной памяти).
Для выбора метода запуска используется символ set_start_method()
в пункте if __name__ == '__main__'
основного модуля. Например:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method()
не должен использоваться более одного раза в программе.
В качестве альтернативы можно использовать get_context()
, чтобы получить контекстный объект. Контекстные объекты имеют тот же API, что и модуль мультипроцессинга, и позволяют использовать несколько методов запуска в одной программе.
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
Обратите внимание, что объекты, относящиеся к одному контексту, могут быть несовместимы с процессами для другого контекста. В частности, блокировки, созданные с помощью контекста fork, не могут быть переданы процессам, запущенным с помощью методов spawn или forkserver start.
Библиотека, которая хочет использовать определенный метод запуска, вероятно, должна использовать get_context()
, чтобы не вмешиваться в выбор пользователя библиотеки.
Предупреждение
Методы запуска 'spawn'
и 'forkserver'
обычно нельзя использовать с «замороженными» исполняемыми файлами (т. е. с двоичными файлами, созданными пакетами типа PyInstaller и cx_Freeze) на POSIX-системах. Метод запуска 'fork'
может работать, если код не использует потоки.
Обмен объектами между процессами¶
multiprocessing
поддерживает два типа каналов связи между процессами:
Количество
Класс
Queue
является почти клономqueue.Queue
. Например:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()Очереди безопасны для потоков и процессов.
Трубы
Функция
Pipe()
возвращает пару объектов соединения, соединенных трубой, которая по умолчанию является дуплексной (двусторонней). Например:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()Два объекта соединения, возвращаемые
Pipe()
, представляют собой два конца трубы. Каждый объект соединения имеет методыsend()
иrecv()
(среди прочих). Обратите внимание, что данные в трубе могут быть повреждены, если два процесса (или потока) попытаются читать из или записывать в один и тот же конец трубы в одно и то же время. Конечно, нет никакого риска повреждения от процессов, использующих разные концы трубы в одно и то же время.
Синхронизация между процессами¶
multiprocessing
содержит эквиваленты всех примитивов синхронизации из threading
. Например, можно использовать блокировку, чтобы гарантировать, что только один процесс печатает на стандартный вывод в одно и то же время:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Без использования блокировки вывод различных процессов может смешаться.
Использование резерва работников¶
Класс Pool
представляет собой пул рабочих процессов. В нем есть методы, которые позволяют перекладывать задачи на рабочие процессы несколькими различными способами.
Например:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 seconds
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
Обратите внимание, что методы пула должны использоваться только тем процессом, который его создал.
Примечание
Функциональность этого пакета требует, чтобы модуль __main__
был импортируемым дочерними модулями. Об этом говорится в Рекомендации по программированию, однако здесь стоит на это обратить внимание. Это означает, что некоторые примеры, такие как примеры multiprocessing.pool.Pool
, не будут работать в интерактивном интерпретаторе. Например:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
(Если вы попробуете это сделать, то на самом деле будет выведено три полных трассировки, чередующихся полуслучайным образом, и тогда вам, возможно, придется как-то остановить родительский процесс).
Ссылка¶
Пакет multiprocessing
в основном повторяет API модуля threading
.
Process
и исключения¶
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶
Объекты Process представляют деятельность, которая выполняется в отдельном процессе. Класс
Process
имеет эквиваленты всех методовthreading.Thread
.Конструктор всегда должен вызываться с аргументами в виде ключевых слов. group всегда должно быть
None
; оно существует исключительно для совместимости сthreading.Thread
. target - это вызываемый объект, который будет вызван методомrun()
. По умолчанию оно равноNone
, что означает, что ничего не вызывается. name - имя процесса (см.name
для более подробной информации). args - кортеж аргументов для целевого вызова. kwargs - словарь аргументов с ключевыми словами для целевого вызова. Если указан, аргумент daemon, содержащий только ключевое слово, устанавливает флагdaemon
процесса наTrue
илиFalse
. ЕслиNone
(по умолчанию), то этот флаг будет унаследован от создающего процесса.По умолчанию в target не передается никаких аргументов. Аргумент args, который по умолчанию равен
()
, может быть использован для указания списка или кортежа аргументов, которые нужно передать target.Если подкласс переопределяет конструктор, он должен убедиться, что вызывает конструктор базового класса (
Process.__init__()
), прежде чем делать что-либо еще с процессом.Изменено в версии 3.3: Добавлен параметр daemon.
- run()¶
Метод, представляющий деятельность процесса.
Вы можете переопределить этот метод в подклассе. Стандартный метод
run()
вызывает вызываемый объект, переданный конструктору объекта в качестве целевого аргумента, если таковой имеется, с последовательными и ключевыми аргументами, взятыми из аргументов args и kwargs, соответственно.Использование списка или кортежа в качестве аргумента args, передаваемого в
Process
, позволяет добиться того же эффекта.Пример:
>>> from multiprocessing import Process >>> p = Process(target=print, args=[1]) >>> p.run() 1 >>> p = Process(target=print, args=(1,)) >>> p.run() 1
- start()¶
Запустите активность процесса.
Эта функция должна вызываться не более одного раза для каждого объекта процесса. Он организует вызов метода
run()
объекта в отдельном процессе.
- join([timeout])¶
Если необязательный аргумент timeout равен
None
(по умолчанию), метод блокируется до тех пор, пока не завершится процесс, чей метод вызванjoin()
. Если timeout - положительное число, метод блокирует не более timeout секунд. Обратите внимание, что метод возвращаетNone
, если его процесс завершается или если метод завершает работу. Проверьтеexitcode
процесса, чтобы определить, завершился ли он.К одному процессу можно присоединяться много раз.
Процесс не может присоединиться к самому себе, так как это приведет к тупику. Ошибкой является попытка присоединиться к процессу до того, как он был запущен.
- name¶
Имя процесса. Имя - это строка, используемая только для идентификации. Оно не имеет никакой семантики. Одно и то же имя может быть присвоено нескольким процессам.
Начальное имя задается конструктором. Если конструктору не задано явное имя, то строится имя вида „Process-N1:N2:…:Nk“, где каждый Nk является N-м дочерним элементом своего родителя.
- is_alive()¶
Возвращает, жив ли данный процесс.
Грубо говоря, объект процесса жив с момента возвращения метода
start()
до завершения дочернего процесса.
- daemon¶
Флаг демона процесса, булево значение. Он должен быть установлен до вызова
start()
.Начальное значение наследуется от процесса создания.
Когда процесс завершается, он пытается завершить все свои дочерние демонические процессы.
Обратите внимание, что демоническому процессу не разрешается создавать дочерние процессы. В противном случае демонический процесс оставит свои дочерние процессы сиротами, если он будет завершен при выходе из родительского процесса. Кроме того, это не демоны или службы Unix, это обычные процессы, которые будут завершены (и не присоединятся), если завершились не демонические процессы.
В дополнение к
threading.Thread
API, объектыProcess
также поддерживают следующие атрибуты и методы:- pid¶
Возвращает идентификатор процесса. До порождения процесса это будет
None
.
- exitcode¶
Код завершения дочернего процесса. Он будет равен
None
, если процесс еще не завершился.Если дочерний метод
run()
завершился нормально, код выхода будет равен 0. Если он завершился черезsys.exit()
с целочисленным аргументом N, код выхода будет равен N.Если дочерняя программа завершилась из-за исключения, не пойманного в пределах
run()
, код выхода будет равен 1. Если он был завершен по сигналу N, код выхода будет отрицательным значением -N.
- authkey¶
Ключ аутентификации процесса (байтовая строка).
При инициализации
multiprocessing
главному процессу присваивается случайная строка с помощьюos.urandom()
.Когда создается объект
Process
, он наследует ключ аутентификации своего родительского процесса, хотя его можно изменить, установивauthkey
в другую байтовую строку.См. Ключи аутентификации.
- sentinel¶
Числовой хэндл системного объекта, который станет «готовым» после завершения процесса.
Вы можете использовать это значение, если хотите ждать сразу несколько событий, используя
multiprocessing.connection.wait()
. В противном случае вызовjoin()
будет более простым.В Windows это дескриптор ОС, используемый с помощью API-вызовов семейства
WaitForSingleObject
иWaitForMultipleObjects
. В POSIX это дескриптор файла, используемый с примитивами из модуляselect
.Added in version 3.3.
- terminate()¶
Завершить процесс. На POSIX для этого используется сигнал
SIGTERM
; на Windows -TerminateProcess()
. Обратите внимание, что обработчики выхода, клаузулы finally и т. д. не будут выполнены.Обратите внимание, что процессы-потомки этого процесса не будут завершены - они просто станут сиротами.
Предупреждение
Если этот метод используется, когда связанный процесс использует трубу или очередь, то труба или очередь могут быть повреждены и стать непригодными для использования другими процессами. Аналогично, если процесс получил блокировку или семафор и т. д., то его завершение может привести к тупику других процессов.
- kill()¶
То же самое, что и
terminate()
, но с использованием сигналаSIGKILL
на POSIX.Added in version 3.7.
- close()¶
Закрывает объект
Process
, освобождая все связанные с ним ресурсы. Если основной процесс все еще запущен, будет вызван сигналValueError
. После успешного возвратаclose()
большинство других методов и атрибутов объектаProcess
будут вызыватьValueError
.Added in version 3.7.
Обратите внимание, что методы
start()
,join()
,is_alive()
,terminate()
иexitcode
должны вызываться только тем процессом, который создал объект процесса.Пример использования некоторых методов из
Process
:>>> import multiprocessing, time, signal >>> mp_context = multiprocessing.get_context('spawn') >>> p = mp_context.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <...Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <...Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <...Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
- exception multiprocessing.ProcessError¶
Базовый класс всех исключений
multiprocessing
.
- exception multiprocessing.BufferTooShort¶
Исключение, вызываемое
Connection.recv_bytes_into()
, когда предоставленный объект буфера слишком мал для прочитанного сообщения.Если
e
является экземпляромBufferTooShort
, тоe.args[0]
выдаст сообщение в виде байтовой строки.
- exception multiprocessing.AuthenticationError¶
Возникает при ошибке аутентификации.
- exception multiprocessing.TimeoutError¶
Вызывается методами с таймаутом, когда таймаут истекает.
Трубы и очереди¶
При использовании нескольких процессов для связи между ними обычно используется передача сообщений, что позволяет избежать использования таких примитивов синхронизации, как блокировки.
Для передачи сообщений можно использовать Pipe()
(для соединения между двумя процессами) или очередь (которая позволяет использовать несколько производителей и потребителей).
Типы Queue
, SimpleQueue
и JoinableQueue
представляют собой многопроизводные и многопотребительские очереди FIFO, созданные по образцу класса queue.Queue
из стандартной библиотеки. Они отличаются тем, что в Queue
отсутствуют методы task_done()
и join()
, появившиеся в классе queue.Queue
в Python 2.5.
Если вы используете JoinableQueue
, то вы должны вызывать JoinableQueue.task_done()
для каждой задачи, удаленной из очереди, иначе семафор, используемый для подсчета количества незавершенных задач, может в конечном итоге переполниться, вызвав исключение.
Обратите внимание, что общую очередь можно создать и с помощью объекта-менеджера - см. раздел Менеджеры.
Примечание
multiprocessing
использует обычные исключения queue.Empty
и queue.Full
, чтобы сигнализировать о тайм-ауте. Они недоступны в пространстве имен multiprocessing
, поэтому их нужно импортировать из queue
.
Примечание
Когда объект помещается в очередь, он пикируется, а фоновый поток позже сбрасывает пикированные данные в нижележащую трубу. Это имеет некоторые последствия, которые немного удивляют, но не должны вызывать никаких практических трудностей - если они вас действительно беспокоят, то вы можете использовать очередь, созданную с помощью manager.
После помещения объекта в пустую очередь может произойти бесконечно малая задержка, прежде чем метод
empty()
очереди вернетFalse
, аget_nowait()
может вернуться, не поднимаяqueue.Empty
.Если несколько процессов ставят объекты в очередь, возможно, что объекты будут получены на другом конце не по порядку. Однако объекты, зачисленные в очередь одним и тем же процессом, всегда будут располагаться в ожидаемом порядке относительно друг друга.
Предупреждение
Если процесс будет убит с помощью Process.terminate()
или os.kill()
, в то время как он пытается использовать Queue
, то данные в очереди, скорее всего, будут повреждены. Это может привести к тому, что любой другой процесс получит исключение при попытке использовать очередь в дальнейшем.
Предупреждение
Как упоминалось выше, если дочерний процесс поместил элементы в очередь (и не использовал JoinableQueue.cancel_join_thread
), то этот процесс не завершится, пока все буферизованные элементы не будут смыты в трубу.
Это означает, что если вы попытаетесь присоединиться к этому процессу, то можете получить тупик, если не будете уверены, что все элементы, которые были поставлены в очередь, были потреблены. Аналогично, если дочерний процесс не является демоническим, то родительский процесс может зависнуть на выходе, когда попытается присоединиться ко всем своим недемоническим дочерним процессам.
Обратите внимание, что очередь, созданная с помощью менеджера, не имеет этой проблемы. См. Рекомендации по программированию.
Пример использования очередей для межпроцессного взаимодействия приведен в Примеры.
- multiprocessing.Pipe([duplex])¶
Возвращает пару
(conn1, conn2)
изConnection
объектов, представляющих концы трубы.Если duplex равен
True
(по умолчанию), то труба двунаправленная. Если duplex равноFalse
, то труба будет однонаправленной:conn1
можно использовать только для приема сообщений, аconn2
- только для отправки.
- class multiprocessing.Queue([maxsize])¶
Возвращает общую очередь процессов, реализованную с помощью трубы и нескольких блокировок/семафоров. Когда процесс впервые помещает элемент в очередь, запускается поток-фидер, который передает объекты из буфера в трубу.
Обычные исключения
queue.Empty
иqueue.Full
из модуляqueue
стандартной библиотеки поднимаются, чтобы сигнализировать о тайм-ауте.В
Queue
реализованы все методыqueue.Queue
, кромеtask_done()
иjoin()
.- qsize()¶
Возвращает приблизительный размер очереди. Из-за семантики многопоточности/многопроцессорности это число не является надежным.
Обратите внимание, что это может привести к появлению
NotImplementedError
на таких платформах, как macOS, гдеsem_getvalue()
не реализована.
- empty()¶
Возвращает
True
, если очередь пуста,False
в противном случае. Из-за семантики многопоточности/многопроцессорности это ненадежно.
- full()¶
Возвращает
True
, если очередь заполнена,False
в противном случае. Из-за семантики многопоточности/многопроцессорности это ненадежно.
- put(obj[, block[, timeout]])¶
Помещает объект в очередь. Если необязательный аргумент block равен
True
(по умолчанию), а timeout равенNone
(по умолчанию), то при необходимости блокируется до тех пор, пока не появится свободный слот. Если timeout - положительное число, блокировка длится не более timeout секунд и вызывает исключениеqueue.Full
, если в течение этого времени свободный слот не был доступен. В противном случае (block равноFalse
), если свободный слот доступен немедленно, поместите элемент в очередь, иначе вызовите исключениеqueue.Full
(timeout в этом случае игнорируется).Изменено в версии 3.8: Если очередь закрыта, вместо
AssertionError
поднимаетсяValueError
.
- put_nowait(obj)¶
Эквивалент
put(obj, False)
.
- get([block[, timeout]])¶
Удаляет и возвращает элемент из очереди. Если опциональные args block равны
True
(по умолчанию) и timeout равныNone
(по умолчанию), то при необходимости блокируется до тех пор, пока элемент не будет доступен. Если timeout - положительное число, блокировка длится не более timeout секунд и вызывает исключениеqueue.Empty
, если в течение этого времени элемент не был доступен. В противном случае (блок равенFalse
), возвращает элемент, если он доступен немедленно, иначе вызывает исключениеqueue.Empty
(timeout в этом случае игнорируется).Изменено в версии 3.8: Если очередь закрыта, вместо
OSError
поднимаетсяValueError
.
- get_nowait()¶
Эквивалент
get(False)
.
В
multiprocessing.Queue
есть несколько дополнительных методов, которых нет вqueue.Queue
. Эти методы обычно не нужны для большинства кода:- close()¶
Указывает, что текущий процесс больше не будет помещать данные в эту очередь. Фоновый поток завершится после того, как выгрузит все буферизованные данные в трубу. Это вызывается автоматически при сборке очереди.
- join_thread()¶
Присоединиться к фоновому потоку. Может использоваться только после вызова
close()
. Он блокирует выполнение до выхода из фонового потока, гарантируя, что все данные в буфере были смыты в трубу.По умолчанию, если процесс не является создателем очереди, то при выходе он попытается присоединиться к фоновому потоку очереди. Процесс может вызвать
cancel_join_thread()
, чтобы заставитьjoin_thread()
ничего не делать.
- cancel_join_thread()¶
Предотвратите блокировку
join_thread()
. В частности, это предотвращает автоматическое присоединение к фоновому потоку при выходе из процесса - см.join_thread()
.Более подходящим названием для этого метода может быть
allow_exit_without_flush()
. Он, скорее всего, приведет к потере выстроенных данных, и вам почти наверняка не понадобится его использовать. Он действительно нужен только в том случае, если вам нужно, чтобы текущий процесс немедленно завершился, не дожидаясь смыва выстроенных данных в базовую трубу, и вас не волнует потеря данных.
Примечание
Для работы этого класса необходима работающая реализация общего семафора в операционной системе хоста. При отсутствии таковой функциональность данного класса будет отключена, а попытки инстанцировать
Queue
приведут к появлениюImportError
. Дополнительную информацию см. в разделе bpo-3770. То же самое справедливо для любого из специализированных типов очередей, перечисленных ниже.
- class multiprocessing.SimpleQueue¶
Это упрощенный тип
Queue
, очень близкий к заблокированномуPipe
.- close()¶
Закрыть очередь: освободить внутренние ресурсы.
Очередь не должна больше использоваться после ее закрытия. Например, методы
get()
,put()
иempty()
больше не должны вызываться.Added in version 3.9.
- empty()¶
Возвращает
True
, если очередь пуста,False
в противном случае.
- get()¶
Удаление и возврат элемента из очереди.
- put(item)¶
Поместите элемент в очередь.
- class multiprocessing.JoinableQueue([maxsize])¶
JoinableQueue
, подклассQueue
, - это очередь, которая дополнительно имеет методыtask_done()
иjoin()
.- task_done()¶
Указывает, что ранее поставленная в очередь задача завершена. Используется потребителями очереди. Для каждого
get()
, использованного для получения задачи, последующий вызовtask_done()
сообщает очереди, что обработка задачи завершена.Если в данный момент
join()
блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызовtask_done()
был получен для каждого элемента, который былput()
в очереди).Вызывает ошибку
ValueError
, если вызывается большее количество раз, чем было помещено элементов в очередь.
- join()¶
Блокируйте до тех пор, пока все элементы в очереди не будут получены и обработаны.
Счетчик незавершенных задач увеличивается каждый раз, когда в очередь добавляется элемент. Счетчик уменьшается всякий раз, когда потребитель вызывает
task_done()
, чтобы указать, что элемент был получен и вся работа над ним завершена. Когда счетчик незавершенных задач падает до нуля,join()
разблокируется.
Разное¶
- multiprocessing.active_children()¶
Возвращает список всех живых дочерних процессов текущего процесса.
Вызов этой функции имеет побочный эффект «присоединения» к процессам, которые уже завершились.
- multiprocessing.cpu_count()¶
Возвращает количество центральных процессоров в системе.
Это число не эквивалентно количеству процессоров, которые может использовать текущий процесс. Число используемых процессоров можно получить с помощью
os.process_cpu_count()
(илиlen(os.sched_getaffinity(0))
).Если количество процессоров не может быть определено, выдается сообщение
NotImplementedError
.См.также
Изменено в версии 3.13: Возвращаемое значение также можно переопределить с помощью флага
-X cpu_count
илиPYTHON_CPU_COUNT
, поскольку это всего лишь обертка вокруг APIos
cpu count.
- multiprocessing.current_process()¶
Возвращает объект
Process
, соответствующий текущему процессу.Аналог
threading.current_thread()
.
- multiprocessing.parent_process()¶
Возвращает объект
Process
, соответствующий родительскому процессуcurrent_process()
. Для главного процессаparent_process
будетNone
.Added in version 3.8.
- multiprocessing.freeze_support()¶
Добавьте поддержку для случаев, когда программа, использующая
multiprocessing
, была заморожена для создания исполняемого файла Windows. (Проверено с помощью py2exe, PyInstaller и cx_Freeze).Вызывать эту функцию нужно сразу после строки
if __name__ == '__main__'
основного модуля. Например:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
Если строка
freeze_support()
опущена, то попытка запустить замороженный исполняемый файл приведет к появлениюRuntimeError
.Вызов
freeze_support()
не имеет никакого эффекта при вызове в любой операционной системе, отличной от Windows. Кроме того, если модуль нормально выполняется интерпретатором Python под Windows (программа не была заморожена), то вызовfreeze_support()
не имеет никакого эффекта.
- multiprocessing.get_all_start_methods()¶
Возвращает список поддерживаемых методов запуска, первый из которых используется по умолчанию. Возможные методы запуска:
'fork'
,'spawn'
и'forkserver'
. Не все платформы поддерживают все методы. См. раздел Контексты и методы запуска.Added in version 3.4.
- multiprocessing.get_context(method=None)¶
Возвращает объект контекста, имеющий те же атрибуты, что и модуль
multiprocessing
.Если method имеет значение
None
, то возвращается контекст по умолчанию. В противном случае method должен быть'fork'
,'spawn'
,'forkserver'
. Если указанный метод запуска недоступен, возникает сообщениеValueError
. См. Контексты и методы запуска.Added in version 3.4.
- multiprocessing.get_start_method(allow_none=False)¶
Возвращает имя метода запуска, используемого для запуска процессов.
Если метод запуска не был зафиксирован и allow_none равно false, то метод запуска фиксируется на значении по умолчанию и возвращается имя. Если метод запуска не был зафиксирован и allow_none равно true, то возвращается
None
.Возвращаемое значение может быть
'fork'
,'spawn'
,'forkserver'
илиNone
. См. Контексты и методы запуска.Added in version 3.4.
Изменено в версии 3.8: В macOS метод запуска spawn теперь используется по умолчанию. Метод запуска fork следует считать небезопасным, так как он может привести к сбоям в работе подпроцесса. См. bpo-33725.
- multiprocessing.set_executable(executable)¶
Устанавливает путь к интерпретатору Python, который будет использоваться при запуске дочернего процесса. (По умолчанию используется
sys.executable
). Встраивателям, вероятно, потребуется сделать что-то вродеset_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
прежде чем они смогут создавать дочерние процессы.
Изменено в версии 3.4: Теперь поддерживается на POSIX, когда используется метод запуска
'spawn'
.Изменено в версии 3.11: Принимает path-like object.
- multiprocessing.set_forkserver_preload(module_names)¶
Задает список имен модулей, которые главный процесс форк-сервера должен попытаться импортировать, чтобы их уже импортированное состояние было унаследовано форк-процессами. Любые
ImportError
при этом молча игнорируются. Это может быть использовано для повышения производительности, чтобы избежать повторной работы в каждом процессе.Чтобы эта функция работала, она должна быть вызвана до запуска процесса forkserver (до создания
Pool
или запускаProcess
).Имеет смысл только при использовании метода запуска
'forkserver'
. См. Контексты и методы запуска.Added in version 3.4.
- multiprocessing.set_start_method(method, force=False)¶
Устанавливает метод, который должен использоваться для запуска дочерних процессов. Аргумент method может быть
'fork'
,'spawn'
или'forkserver'
. Вызывает ошибкуRuntimeError
, если метод запуска уже был задан и force не равенTrue
. Если method равноNone
и force равноTrue
, то метод запуска будет установлен наNone
. Если method равноNone
и force равноFalse
, то контекст устанавливается на контекст по умолчанию.Обратите внимание, что эта функция должна вызываться не более одного раза, и она должна быть защищена внутри
if __name__ == '__main__'
в главном модуле.См. Контексты и методы запуска.
Added in version 3.4.
Примечание
multiprocessing
не содержит аналогов threading.active_count()
, threading.enumerate()
, threading.settrace()
, threading.setprofile()
, threading.Timer
или threading.local
.
Объекты соединения¶
Объекты соединений позволяют отправлять и получать picklable объекты или строки. Их можно рассматривать как ориентированные на сообщения подключенные сокеты.
Объекты соединения обычно создаются с помощью Pipe
- см. также Слушатели и клиенты.
- class multiprocessing.connection.Connection¶
- send(obj)¶
Отправьте на другой конец соединения объект, который должен быть прочитан с помощью
recv()
.Объект должен быть picklable. Очень большие пикули (примерно 32 Мб+, хотя это зависит от ОС) могут вызвать исключение
ValueError
.
- recv()¶
Возвращает объект, отправленный с другого конца соединения с помощью
send()
. Блокируется до тех пор, пока не будет что получать. ВызываетEOFError
, если нечего принимать и другой конец был закрыт.
- fileno()¶
Возвращает дескриптор файла или дескриптор, используемый соединением.
- close()¶
Закройте соединение.
Эта функция вызывается автоматически, когда соединение собирается в мусор.
- poll([timeout])¶
Возвращает, имеются ли данные, доступные для чтения.
Если timeout не указан, то возврат происходит немедленно. Если timeout - это число, то оно задает максимальное время блокировки в секундах. Если timeout имеет значение
None
, то используется бесконечный таймаут.Обратите внимание, что с помощью
multiprocessing.connection.wait()
можно опрашивать сразу несколько объектов соединения.
- send_bytes(buffer[, offset[, size]])¶
Отправка байтовых данных из bytes-like object в виде полного сообщения.
Если задано offset, то данные будут прочитаны из этой позиции в буфере. Если задан size, то из буфера будет прочитано столько байт, сколько указано. Очень большие буферы (примерно 32 MiB+, хотя это зависит от ОС) могут вызвать исключение
ValueError
.
- recv_bytes([maxlength])¶
Возвращает полное сообщение с байтовыми данными, отправленное с другого конца соединения, в виде строки. Блокирует до тех пор, пока не будет что получать. Вызывает
EOFError
, если нечего принимать и другой конец соединения закрыт.Если указано maxlength и сообщение длиннее maxlength, то будет поднят
OSError
и соединение больше не будет доступно для чтения.
- recv_bytes_into(buffer[, offset])¶
Считывает в буфер полное сообщение с байтовыми данными, отправленное с другого конца соединения, и возвращает количество байтов в сообщении. Блокирует до тех пор, пока не будет что получать. Вызывает
EOFError
, если нечего принимать и другой конец соединения был закрыт.buffer должен быть записываемым bytes-like object. Если указано offset, то сообщение будет записано в буфер с этой позиции. Смещение должно быть неотрицательным целым числом, меньшим, чем длина буфера (в байтах).
Если буфер слишком короткий, то возникает исключение
BufferTooShort
, а полное сообщение доступно какe.args[0]
, гдеe
- экземпляр исключения.
Изменено в версии 3.3: Сами объекты соединений теперь можно передавать между процессами с помощью
Connection.send()
иConnection.recv()
.Объекты соединения также теперь поддерживают протокол управления контекстом - см. Типы менеджеров контекста.
__enter__()
возвращает объект соединения, а__exit__()
вызываетclose()
.
Например:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Предупреждение
Метод Connection.recv()
автоматически распаковывает полученные данные, что может представлять угрозу безопасности, если вы не доверяете процессу, отправившему сообщение.
Поэтому, если объект соединения не был создан с помощью Pipe()
, вы должны использовать методы recv()
и send()
только после выполнения какой-либо аутентификации. См. раздел Ключи аутентификации.
Предупреждение
Если процесс будет убит во время попытки чтения или записи в трубу, то данные в трубе, скорее всего, будут испорчены, поскольку невозможно будет определить, где находятся границы сообщений.
Примитивы синхронизации¶
Как правило, примитивы синхронизации не так необходимы в многопроцессной программе, как в многопоточной. См. документацию по модулю threading
.
Обратите внимание, что примитивы синхронизации можно создавать и с помощью объекта-менеджера - см. раздел Менеджеры.
- class multiprocessing.Barrier(parties[, action[, timeout]])¶
Объект барьера: клон
threading.Barrier
.Added in version 3.3.
- class multiprocessing.BoundedSemaphore([value])¶
Ограниченный объект семафора: близкий аналог
threading.BoundedSemaphore
.Единственное отличие от близкого аналога: первый аргумент метода
acquire
называется block, как и в случае сLock.acquire()
.Примечание
В macOS это неотличимо от
Semaphore
, посколькуsem_getvalue()
на этой платформе не реализовано.
- class multiprocessing.Condition([lock])¶
Переменная условия: псевдоним для
threading.Condition
.Если указан lock, то это должен быть объект
Lock
илиRLock
изmultiprocessing
.Изменено в версии 3.3: Был добавлен метод
wait_for()
.
- class multiprocessing.Event¶
Клон
threading.Event
.
- class multiprocessing.Lock¶
Объект нерекурсивной блокировки: близкий аналог
threading.Lock
. После того как процесс или поток приобрел блокировку, последующие попытки получить ее от любого процесса или потока будут блокироваться до тех пор, пока блокировка не будет освобождена; любой процесс или поток может освободить ее. Концепции и поведениеthreading.Lock
применительно к потокам воспроизводятся здесь вmultiprocessing.Lock
применительно к процессам или потокам, за исключением отмеченного.Обратите внимание, что
Lock
на самом деле является фабричной функцией, которая возвращает экземплярmultiprocessing.synchronize.Lock
, инициализированный контекстом по умолчанию.Lock
поддерживает протокол context manager и поэтому может использоваться в утвержденияхwith
.- acquire(block=True, timeout=None)¶
Получение блокировки, блокирующей или неблокирующей.
Если аргумент block установлен в
True
(по умолчанию), вызов метода будет блокироваться до тех пор, пока блокировка не окажется в разблокированном состоянии, затем установит ее в состояние locked и вернетTrue
. Обратите внимание, что имя этого первого аргумента отличается от имени аргументаthreading.Lock.acquire()
.Если аргумент block установлен в
False
, вызов метода не блокируется. Если замок в данный момент находится в заблокированном состоянии, вернитеFalse
; в противном случае переведите замок в заблокированное состояние и вернитеTrue
.При вызове с положительным значением timeout с плавающей точкой, блокировка выполняется не более чем на количество секунд, указанное timeout, пока блокировка не может быть получена. Вызовы с отрицательным значением timeout эквивалентны timeout, равному нулю. Вызовы со значением timeout, равным
None
(по умолчанию), устанавливают период тайм-аута бесконечным. Обратите внимание, что обработка отрицательных значений или значенийNone
для timeout отличается от реализованного поведения вthreading.Lock.acquire()
. Аргумент timeout не имеет практических последствий, если аргумент block имеет значениеFalse
и поэтому игнорируется. ВозвращаетTrue
, если блокировка была получена, илиFalse
, если истекло время тайм-аута.
- release()¶
Освободить блокировку. Эта функция может быть вызвана из любого процесса или потока, а не только из процесса или потока, который первоначально получил блокировку.
Поведение такое же, как и в
threading.Lock.release()
, за исключением того, что при вызове на разблокированной блокировке возникает ошибкаValueError
.
- class multiprocessing.RLock¶
Объект рекурсивной блокировки: близкий аналог
threading.RLock
. Рекурсивная блокировка должна быть освобождена процессом или потоком, который ее приобрел. После того как процесс или поток приобрел рекурсивную блокировку, этот же процесс или поток может получить ее снова без блокировки; этот процесс или поток должен освободить ее один раз за каждый раз, когда она была получена.Обратите внимание, что
RLock
на самом деле является фабричной функцией, которая возвращает экземплярmultiprocessing.synchronize.RLock
, инициализированный контекстом по умолчанию.RLock
поддерживает протокол context manager и поэтому может использоваться в утвержденияхwith
.- acquire(block=True, timeout=None)¶
Получение блокировки, блокирующей или неблокирующей.
При вызове с аргументом block, установленным в
True
, блокировка происходит до тех пор, пока блокировка не окажется в разблокированном состоянии (не будет принадлежать ни одному процессу или потоку), если только блокировка уже не принадлежит текущему процессу или потоку. Затем текущий процесс или поток получает право собственности на блокировку (если оно еще не принадлежит ему), а уровень рекурсии внутри блокировки увеличивается на единицу, в результате чего возвращается значениеTrue
. Обратите внимание, что в поведении этого первого аргумента есть несколько различий по сравнению с реализациейthreading.RLock.acquire()
, начиная с названия самого аргумента.При вызове с аргументом block, установленным в
False
, блокировка не производится. Если блокировка уже была приобретена (и, следовательно, принадлежит) другим процессом или потоком, текущий процесс или поток не принимает права собственности и уровень рекурсии в блокировке не изменяется, что приводит к возврату значенияFalse
. Если блокировка находится в разблокированном состоянии, текущий процесс или поток получает право собственности, а уровень рекурсии увеличивается, в результате чего возвращается значениеTrue
.Использование и поведение аргумента timeout такие же, как и в
Lock.acquire()
. Обратите внимание, что некоторые из этих поведений timeout отличаются от реализованных вthreading.RLock.acquire()
.
- release()¶
Освободите блокировку, уменьшив уровень рекурсии. Если после декремента уровень рекурсии равен нулю, сбросьте блокировку на разблокированную (не принадлежащую ни одному процессу или потоку), и если какие-либо другие процессы или потоки заблокированы в ожидании разблокировки блокировки, разрешите ровно одному из них продолжить выполнение. Если после декремента уровень рекурсии все еще ненулевой, блокировка остается заблокированной и принадлежит вызывающему процессу или потоку.
Вызывайте этот метод только тогда, когда вызывающий процесс или поток владеет блокировкой. Если этот метод вызывается процессом или потоком, не являющимся владельцем, или если блокировка находится в разблокированном (не принадлежащем владельцу) состоянии, то возникает исключение
AssertionError
. Обратите внимание, что тип исключения, возникающего в этой ситуации, отличается от реализованного поведения вthreading.RLock.release()
.
- class multiprocessing.Semaphore([value])¶
Объект семафора: близкий аналог
threading.Semaphore
.Единственное отличие от близкого аналога: первый аргумент метода
acquire
называется block, как и в случае сLock.acquire()
.
Примечание
В macOS функция sem_timedwait
не поддерживается, поэтому вызов acquire()
с таймаутом будет эмулировать поведение этой функции с помощью спящего цикла.
Примечание
Если сигнал SIGINT, генерируемый Ctrl-C, поступает в то время, когда основной поток заблокирован вызовом BoundedSemaphore.acquire()
, Lock.acquire()
, RLock.acquire()
, Semaphore.acquire()
, Condition.acquire()
или Condition.wait()
, то этот вызов будет немедленно прерван и будет поднят KeyboardInterrupt
.
Это отличается от поведения threading
, где SIGINT будет игнорироваться, пока выполняются эквивалентные блокирующие вызовы.
Примечание
Некоторые функции этого пакета требуют наличия в операционной системе хоста работающей реализации общего семафора. Без нее модуль multiprocessing.synchronize
будет отключен, а попытки импортировать его приведут к ошибке ImportError
. Дополнительную информацию см. в разделе bpo-3770.
Менеджеры¶
Менеджеры обеспечивают способ создания данных, которые могут совместно использоваться различными процессами, включая обмен по сети между процессами, запущенными на разных машинах. Объект менеджера управляет серверным процессом, который управляет общими объектами. Другие процессы могут получить доступ к общим объектам с помощью прокси.
- multiprocessing.Manager()¶
Возвращает запущенный объект
SyncManager
, который может быть использован для совместного использования объектов между процессами. Возвращаемый объект менеджера соответствует порожденному дочернему процессу и имеет методы, которые будут создавать общие объекты и возвращать соответствующие прокси.
Процессы менеджера будут завершены, как только они будут собраны или их родительский процесс завершится. Классы менеджеров определены в модуле multiprocessing.managers
:
- class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)¶
Создайте объект BaseManager.
После создания необходимо вызвать
start()
илиget_server().serve_forever()
, чтобы убедиться, что объект менеджера ссылается на запущенный процесс менеджера.address - это адрес, по которому процесс менеджера прослушивает новые соединения. Если address равен
None
, то выбирается произвольный адрес.authkey - это ключ аутентификации, который будет использоваться для проверки валидности входящих соединений с серверным процессом. Если authkey равен
None
, то используетсяcurrent_process().authkey
. В противном случае используется authkey, который должен быть байтовой строкой.сериализатор должен быть
'pickle'
(используйте сериализациюpickle
) или'xmlrpclib'
(используйте сериализациюxmlrpc.client
).ctx - объект контекста, или
None
(использование текущего контекста). См. функциюget_context()
.shutdown_timeout - это таймаут в секундах, используемый для ожидания завершения процесса, используемого менеджером, в методе
shutdown()
. Если время завершения истекло, процесс завершается. Если время завершения процесса также истекло, процесс будет убит.Изменено в версии 3.11: Добавлен параметр shutdown_timeout.
- start([initializer[, initargs]])¶
Запуск подпроцесса для запуска менеджера. Если initializer не
None
, то при запуске подпроцесс будет вызыватьinitializer(*initargs)
.
- get_server()¶
Возвращает объект
Server
, представляющий реальный сервер под управлением менеджера. ОбъектServer
поддерживает методserve_forever()
:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
У
Server
дополнительно есть атрибутaddress
.
- connect()¶
Подключите локальный объект менеджера к удаленному процессу менеджера:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
- shutdown()¶
Остановка процесса, используемого менеджером. Эта функция доступна только в том случае, если для запуска процесса сервера использовался
start()
.Эту функцию можно вызывать несколько раз.
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶
Метод класса, который может быть использован для регистрации типа или вызываемого объекта в классе менеджера.
typeid - это «идентификатор типа», который используется для идентификации конкретного типа разделяемого объекта. Это должна быть строка.
callable - вызываемый объект, используемый для создания объектов для данного идентификатора типа. Если экземпляр менеджера будет подключен к серверу с помощью метода
connect()
, или если аргумент create_method равенFalse
, то можно оставить значениеNone
.proxytype - это подкласс
BaseProxy
, который используется для создания прокси для общих объектов с данным typeid. ЕслиNone
, то класс прокси создается автоматически.exposed используется для указания последовательности имен методов, доступ к которым должен быть разрешен прокси для данного typeid с помощью
BaseProxy._callmethod()
. (Если exposed равенNone
, то вместо него используетсяproxytype._exposed_
, если он существует.) В случае, когда список exposed не указан, будут доступны все «публичные методы» разделяемого объекта. (Здесь «публичный метод» означает любой атрибут, у которого есть метод__call__()
и имя которого не начинается с'_'
).method_to_typeid - это отображение, используемое для указания возвращаемого типа тех открытых методов, которые должны возвращать прокси. Оно сопоставляет имена методов со строками typeid. (Если method_to_typeid имеет значение
None
, то вместо него используетсяproxytype._method_to_typeid_
, если оно существует). Если имя метода не является ключом этого отображения или если отображение имеет значениеNone
, то объект, возвращаемый методом, будет скопирован по значению.create_method определяет, должен ли быть создан метод с именем typeid, который может быть использован для указания серверному процессу создать новый общий объект и вернуть для него прокси. По умолчанию это значение равно
True
.
У экземпляров
BaseManager
также есть одно свойство, доступное только для чтения:- address¶
Адрес, используемый менеджером.
Изменено в версии 3.3: Объекты менеджера поддерживают протокол управления контекстом - см. Типы менеджеров контекста.
__enter__()
запускает процесс сервера (если он еще не запущен), а затем возвращает объект менеджера.__exit__()
вызываетshutdown()
.В предыдущих версиях
__enter__()
не запускал серверный процесс менеджера, если он еще не был запущен.
- class multiprocessing.managers.SyncManager¶
Подкласс
BaseManager
, который может быть использован для синхронизации процессов. Объекты этого типа возвращаютсяmultiprocessing.Manager()
.Его методы создают и возвращают Прокси-объекты для ряда часто используемых типов данных, которые должны быть синхронизированы между процессами. К ним, в частности, относятся общие списки и словари.
- Barrier(parties[, action[, timeout]])¶
Создайте общий объект
threading.Barrier
и верните для него прокси.Added in version 3.3.
- BoundedSemaphore([value])¶
Создайте общий объект
threading.BoundedSemaphore
и верните для него прокси.
- Condition([lock])¶
Создайте общий объект
threading.Condition
и верните для него прокси.Если указан lock, то это должен быть прокси для объекта
threading.Lock
илиthreading.RLock
.Изменено в версии 3.3: Был добавлен метод
wait_for()
.
- Event()¶
Создайте общий объект
threading.Event
и верните для него прокси.
- Lock()¶
Создайте общий объект
threading.Lock
и верните для него прокси.
- Queue([maxsize])¶
Создайте общий объект
queue.Queue
и верните для него прокси.
- RLock()¶
Создайте общий объект
threading.RLock
и верните для него прокси.
- Semaphore([value])¶
Создайте общий объект
threading.Semaphore
и верните для него прокси.
- Array(typecode, sequence)¶
Создайте массив и верните для него прокси.
- Value(typecode, value)¶
Создайте объект с атрибутом
value
, доступным для записи, и верните для него прокси.
Изменено в версии 3.6: Общие объекты могут быть вложенными. Например, общий объект-контейнер, такой как общий список, может содержать другие общие объекты, которые будут управляться и синхронизироваться
SyncManager
.
- class multiprocessing.managers.Namespace¶
Тип, который может регистрироваться с
SyncManager
.Объект пространства имен не имеет открытых методов, но имеет атрибуты, доступные для записи. Его представление показывает значения его атрибутов.
Однако при использовании прокси для объекта пространства имен атрибут, начинающийся с
'_'
, будет атрибутом прокси, а не атрибутом референта:>>> mp_context = multiprocessing.get_context('spawn') >>> manager = mp_context.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
Индивидуальные менеджеры¶
Для создания собственного менеджера создается подкласс BaseManager
и используется метод класса register()
для регистрации новых типов или вызываемых элементов в классе менеджера. Например:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
Использование удаленного менеджера¶
Можно запустить сервер менеджера на одной машине, а клиенты будут использовать его с других машин (при условии, что соответствующие брандмауэры позволяют это сделать).
Выполнение следующих команд создает сервер для одной общей очереди, к которой могут обращаться удаленные клиенты:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Один клиент может получить доступ к серверу следующим образом:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
Его может использовать и другой клиент:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
Локальные процессы также могут получить доступ к этой очереди, используя код, приведенный выше на клиенте, для удаленного доступа к ней:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super().__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Прокси-объекты¶
Прокси - это объект, который ссылается на общий объект, который живет (предположительно) в другом процессе. Считается, что общий объект является референтом прокси. Несколько прокси-объектов могут иметь одного и того же референта.
У прокси-объекта есть методы, которые вызывают соответствующие методы его референта (хотя не все методы референта обязательно будут доступны через прокси). Таким образом, прокси-объект можно использовать так же, как и его референт:
>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Обратите внимание, что применение str()
к прокси вернет представление референта, в то время как применение repr()
вернет представление прокси.
Важной особенностью прокси-объектов является то, что их можно передавать между процессами. Как таковой, референт может содержать Прокси-объекты. Это позволяет вложить их в управляемые списки, дикты и другие Прокси-объекты:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
Аналогично, прокси dict и list могут быть вложены друг в друга:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
Если в референте содержатся стандартные (непрокси) объекты list
или dict
, модификации этих изменяемых значений не будут передаваться через менеджер, поскольку прокси не имеет возможности узнать, когда изменяются содержащиеся в нем значения. Однако сохранение значения в контейнерном прокси (что вызывает срабатывание __setitem__
на прокси-объекте) передается через менеджер, и поэтому для эффективного изменения такого элемента можно переназначить измененное значение контейнерному прокси:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
Такой подход, возможно, менее удобен, чем использование вложенных Прокси-объекты для большинства случаев использования, но также демонстрирует уровень контроля над синхронизацией.
Примечание
Типы прокси в multiprocessing
не поддерживают сравнение по значению. Так, например, мы имеем:
>>> manager.list([1,2,3]) == [1,2,3]
False
При сравнении следует просто использовать копию референта.
- class multiprocessing.managers.BaseProxy¶
Прокси-объекты являются экземплярами подклассов
BaseProxy
.- _callmethod(methodname[, args[, kwds]])¶
Вызывает и возвращает результат метода референта прокси.
Если
proxy
- это прокси, чьим референтом являетсяobj
, то выражениеproxy._callmethod(methodname, args, kwds)
оценит выражение
getattr(obj, methodname)(*args, **kwds)
в процессе работы менеджера.
Возвращаемое значение будет копией результата вызова или прокси для нового общего объекта - см. документацию для аргумента method_to_typeid в
BaseManager.register()
.Если при вызове возникло исключение, то оно повторно вызывается командой
_callmethod()
. Если в процессе менеджера возникло какое-то другое исключение, то оно преобразуется в исключениеRemoteError
и вызывается_callmethod()
.В частности, обратите внимание, что если имя метода methodname не было раскрыто, то возникнет исключение.
Пример использования
_callmethod()
:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
- _getvalue()¶
Возвращает копию референта.
Если референт является непиклируемым, это вызовет исключение.
- __repr__()¶
Возвращает представление объекта прокси.
- __str__()¶
Возвращает представление референта.
Очистка¶
Прокси-объект использует обратный вызов weakref, поэтому, когда его собирают, он удаляет себя из менеджера, которому принадлежит его референт.
Общий объект удаляется из процесса менеджера, когда на него больше не ссылается ни один прокси.
Пулы процессов¶
С помощью класса Pool
можно создать пул процессов, которые будут выполнять переданные им задания.
- class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶
Объект пула процессов, который управляет пулом рабочих процессов, на которые могут быть отправлены задания. Он поддерживает асинхронные результаты с таймаутами и обратными вызовами и имеет параллельную реализацию карты.
processes - это количество рабочих процессов, которые необходимо использовать. Если processes равно
None
, то используется число, возвращаемоеos.process_cpu_count()
.Если initializer не
None
, то каждый рабочий процесс при запуске будет вызыватьinitializer(*initargs)
.maxtasksperchild - это количество задач, которые может выполнить рабочий процесс, прежде чем он завершится и будет заменен новым рабочим процессом, чтобы освободить неиспользуемые ресурсы. По умолчанию maxtasksperchild равно
None
, что означает, что рабочие процессы будут жить столько же, сколько и пул.context может использоваться для указания контекста, используемого для запуска рабочих процессов. Обычно пул создается с помощью функции
multiprocessing.Pool()
или методаPool()
объекта контекста. В обоих случаях context задается соответствующим образом.Обратите внимание, что методы объекта пула должны вызываться только процессом, создавшим пул.
Предупреждение
Объекты
multiprocessing.pool
имеют внутренние ресурсы, которыми необходимо правильно управлять (как и любым другим ресурсом), используя пул в качестве менеджера контекста или вызываяclose()
иterminate()
вручную. Невыполнение этого требования может привести к зависанию процесса при завершении.Обратите внимание, что неправильно полагаться на сборщик мусора для уничтожения пула, поскольку CPython не гарантирует, что финализатор пула будет вызван (см.
object.__del__()
для получения дополнительной информации).Изменено в версии 3.2: Добавлен параметр maxtasksperchild.
Изменено в версии 3.4: Добавлен параметр context.
Изменено в версии 3.13: процессы по умолчанию используют
os.process_cpu_count()
, а неos.cpu_count()
.Примечание
Рабочие процессы в пуле
Pool
обычно живут в течение всего срока действия очереди работ пула. Часто встречающийся в других системах (таких как Apache, mod_wsgi и т. д.) способ освобождения ресурсов, удерживаемых рабочими процессами, заключается в том, чтобы позволить рабочему процессу в пуле завершить только определенный объем работы перед выходом, очисткой и порождением нового процесса взамен старого. Аргумент maxtasksperchild вPool
открывает эту возможность конечному пользователю.- apply(func[, args[, kwds]])¶
Вызовите func с аргументами args и аргументами ключевых слов kwds. Он блокирует работу до тех пор, пока не будет готов результат. Учитывая эту блокировку,
apply_async()
лучше подходит для параллельного выполнения работы. Кроме того, func выполняется только в одном из рабочих пула.
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])¶
Вариант метода
apply()
, который возвращает объектAsyncResult
.Если указан callback, то это должен быть вызываемый элемент, принимающий один аргумент. Когда результат будет готов, к нему будет применен callback, если только вызов не завершился неудачей, в этом случае вместо него будет применен error_callback.
Если указан error_callback, то это должен быть вызываемый элемент, принимающий один аргумент. Если целевая функция не работает, то вызывается error_callback с экземпляром исключения.
Обратные вызовы должны завершаться немедленно, так как в противном случае поток, обрабатывающий результаты, будет заблокирован.
- map(func, iterable[, chunksize])¶
Параллельный аналог встроенной функции
map()
(поддерживает только один аргумент iterable, для множественных итераций см.starmap()
). Она блокируется до тех пор, пока не будет готов результат.Этот метод разбивает итерируемую таблицу на несколько фрагментов, которые он передает в пул процессов как отдельные задачи. Размер (приблизительный) этих кусков можно задать, установив chunksize в целое положительное число.
Обратите внимание, что для очень длинных итераций это может привести к большим затратам памяти. Для большей эффективности рассмотрите возможность использования
imap()
илиimap_unordered()
с явной опцией chunksize.
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
Вариант метода
map()
, который возвращает объектAsyncResult
.Если указан callback, то это должен быть вызываемый элемент, принимающий один аргумент. Когда результат будет готов, к нему будет применен callback, если только вызов не завершился неудачей, в этом случае вместо него будет применен error_callback.
Если указан error_callback, то это должен быть вызываемый элемент, принимающий один аргумент. Если целевая функция не работает, то вызывается error_callback с экземпляром исключения.
Обратные вызовы должны завершаться немедленно, так как в противном случае поток, обрабатывающий результаты, будет заблокирован.
- imap(func, iterable[, chunksize])¶
Более ленивая версия
map()
.Аргумент chunksize аналогичен тому, который используется в методе
map()
. Для очень длинных итераций использование большого значения chunksize может ускорить выполнение задания на много, чем использование значения по умолчанию1
.Также если chunksize равен
1
, то методnext()
итератора, возвращаемого методомimap()
, имеет необязательный параметр timeout:next(timeout)
поднимет сообщениеmultiprocessing.TimeoutError
, если результат не может быть возвращен в течение timeout секунд.
- imap_unordered(func, iterable[, chunksize])¶
То же самое, что и
imap()
, за исключением того, что порядок следования результатов из возвращаемого итератора следует считать произвольным. (Только при наличии только одного рабочего процесса порядок гарантированно «правильный»).
- starmap(func, iterable[, chunksize])¶
Как и
map()
, за исключением того, что элементы iterable должны быть итерациями, которые распаковываются как аргументы.Отсюда следует, что цитата из
[(1,2), (3, 4)]
приводит к[func(1,2), func(3,4)]
.Added in version 3.3.
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
Комбинация
starmap()
иmap_async()
, которая выполняет итерацию по iterable итерируемых объектов и вызывает func с распакованными итерируемыми объектами. Возвращает объект result.Added in version 3.3.
- close()¶
Предотвращает дальнейшее поступление заданий в пул. Как только все задания будут выполнены, рабочие процессы завершатся.
- terminate()¶
Немедленно останавливает рабочие процессы, не завершая невыполненную работу. Когда объект пула будет собран,
terminate()
будет вызван немедленно.
- join()¶
Дождитесь завершения рабочих процессов. Перед использованием
join()
необходимо вызватьclose()
илиterminate()
.
Изменено в версии 3.3: Объекты пула теперь поддерживают протокол управления контекстом - см. Типы менеджеров контекста.
__enter__()
возвращает объект пула, а__exit__()
вызываетterminate()
.
- class multiprocessing.pool.AsyncResult¶
Класс результата, полученного с помощью
Pool.apply_async()
иPool.map_async()
.- get([timeout])¶
Верните результат, когда он будет получен. Если timeout не равно
None
и результат не получен в течение timeout секунд, то вызываетсяmultiprocessing.TimeoutError
. Если удаленный вызов вызвал исключение, то это исключение будет повторно вызваноget()
.
- wait([timeout])¶
Подождите, пока результат не будет доступен или пока не пройдет timeout секунд.
- ready()¶
Возвращает, завершился ли вызов.
- successful()¶
Возвращает, завершился ли вызов без возникновения исключения. Возвращает
ValueError
, если результат не готов.Изменено в версии 3.7: Если результат не готов, вместо
AssertionError
выдаетсяValueError
.
Следующий пример демонстрирует использование пула:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
Слушатели и клиенты¶
Обычно передача сообщений между процессами осуществляется с помощью очередей или с использованием объектов Connection
, возвращаемых Pipe()
.
Однако модуль multiprocessing.connection
обеспечивает дополнительную гибкость. В основном он предоставляет высокоуровневый API, ориентированный на сообщения, для работы с сокетами или именованными трубами Windows. Он также поддерживает аутентификацию digest с помощью модуля hmac
и опрос нескольких соединений одновременно.
- multiprocessing.connection.deliver_challenge(connection, authkey)¶
Отправьте случайно сгенерированное сообщение на другой конец соединения и дождитесь ответа.
Если ответ совпадает с дайджестом сообщения, использующим authkey в качестве ключа, то на другой конец соединения отправляется приветственное сообщение. В противном случае выдается сообщение
AuthenticationError
.
- multiprocessing.connection.answer_challenge(connection, authkey)¶
Получите сообщение, вычислите его дайджест, используя authkey в качестве ключа, а затем отправьте дайджест обратно.
Если приветственное сообщение не было получено, то будет вызвано сообщение
AuthenticationError
.
- multiprocessing.connection.Client(address[, family[, authkey]])¶
Попытка установить соединение со слушателем, использующим адрес address, с возвратом
Connection
.Тип соединения определяется аргументом family, но его можно не указывать, так как он обычно определяется по формату address. (См. Форматы адресов)
Если задан authkey, а не
None
, он должен быть байтовой строкой и будет использоваться в качестве секретного ключа для вызова аутентификации на основе HMAC. Если authkey равенNone
, аутентификация не выполняется. При неудачной аутентификации выдается сообщениеAuthenticationError
. См. Ключи аутентификации.
- class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])¶
Обертка для связанного сокета или именованной трубы Windows, которая «слушает» соединения.
address - это адрес, который будет использоваться связанным сокетом или именованной трубой объекта-слушателя.
Примечание
Если используется адрес „0.0.0.0“, он не будет подключаемой конечной точкой в Windows. Если вам нужна подключаемая конечная точка, используйте „127.0.0.1“.
family - это тип используемого сокета (или именованной трубы). Это может быть одна из строк
'AF_INET'
(для сокета TCP),'AF_UNIX'
(для сокета домена Unix) или'AF_PIPE'
(для именованной трубы Windows). Из них только первая гарантированно доступна. Если family имеет значениеNone
, то семейство определяется по формату address. Если address также имеет значениеNone
, то выбирается значение по умолчанию. По умолчанию выбирается семейство, которое считается самым быстрым из доступных. См. Форматы адресов. Обратите внимание, что если семейство равно'AF_UNIX'
, а адрес -None
, то сокет будет создан в частной временной директории, созданной с помощьюtempfile.mkstemp()
.Если объект слушателя использует сокет, то backlog (по умолчанию 1) передается в метод
listen()
сокета после его привязки.Если задан authkey, а не
None
, он должен быть байтовой строкой и будет использоваться в качестве секретного ключа для вызова аутентификации на основе HMAC. Если authkey равенNone
, аутентификация не выполняется. При неудачной аутентификации выдается сообщениеAuthenticationError
. См. Ключи аутентификации.- accept()¶
Принимает соединение на связанном сокете или именованной трубе объекта слушателя и возвращает объект
Connection
. Если попытка аутентификации была предпринята и не увенчалась успехом, то возвращаетсяAuthenticationError
.
- close()¶
Закрывает связанный сокет или именованную трубу объекта слушателя. Эта функция вызывается автоматически при сборке мусора в слушателе. Однако рекомендуется вызывать его явно.
Объекты слушателей имеют следующие свойства, доступные только для чтения:
- address¶
Адрес, который используется объектом Listener.
- last_accepted¶
Адрес, с которого было получено последнее принятое соединение. Если этот адрес недоступен, то он равен
None
.
Изменено в версии 3.3: Объекты слушателей теперь поддерживают протокол управления контекстом - см. Типы менеджеров контекста.
__enter__()
возвращает объект слушателя, а__exit__()
вызываетclose()
.
- multiprocessing.connection.wait(object_list, timeout=None)¶
Подождите, пока объект из object_list будет готов. Возвращает список объектов из object_list, которые готовы. Если timeout имеет значение float, то вызов блокируется не более чем на столько секунд. Если timeout равен
None
, то вызов будет блокироваться неограниченное время. Отрицательный таймаут эквивалентен нулевому таймауту.Как в POSIX, так и в Windows, объект может появиться в object_list, если он
читаемый объект
Connection
;подключенный и читаемый объект
socket.socket
; или
Объект соединения или сокета готов, когда из него можно читать данные или другой конец закрыт.
POSIX:
wait(object_list, timeout)
почти эквивалентенselect.select(object_list, [], [], timeout)
. Разница в том, что, еслиselect.select()
прерывается сигналом, он может поднятьOSError
с номером ошибкиEINTR
, тогда какwait()
этого не сделает.Windows: Элемент в object_list должен быть либо целочисленным хэндлом, который является ожидаемым (согласно определению, используемому в документации к Win32-функции
WaitForMultipleObjects()
), либо объектом с методомfileno()
, который возвращает хэндл сокета или хэндл трубы. (Обратите внимание, что дескрипторы труб и дескрипторы сокетов не являются ожидающими дескрипторами).Added in version 3.3.
Примеры.
Следующий код сервера создает слушателя, который использует 'secret password'
в качестве ключа аутентификации. Затем он ожидает соединения и отправляет некоторые данные клиенту:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
Следующий код подключается к серверу и получает от него некоторые данные:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
Следующий код использует wait()
для ожидания сообщений от нескольких процессов одновременно:
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
Форматы адресов¶
Адрес
'AF_INET'
- это кортеж вида(hostname, port)
, где hostname - строка, а port - целое число.Адрес
'AF_UNIX'
- это строка, представляющая имя файла в файловой системе.Адрес
'AF_PIPE'
- это строка видаr'\\.\pipe\PipeName'
. Чтобы использоватьClient()
для подключения к именованной трубе на удаленном компьютере с именем ServerName, нужно использовать адрес видаr'\\ServerName\pipe\PipeName'
.
Обратите внимание, что любая строка, начинающаяся с двух обратных косых черточек, по умолчанию считается адресом 'AF_PIPE'
, а не 'AF_UNIX'
.
Ключи аутентификации¶
При использовании Connection.recv
полученные данные автоматически распаковываются. К сожалению, распикировка данных из ненадежного источника представляет собой риск для безопасности. Поэтому Listener
и Client()
используют модуль hmac
для обеспечения аутентификации по дайджесту.
Ключ аутентификации - это байтовая строка, которую можно представить как пароль: после установления соединения оба конца потребуют доказательства того, что другому известен ключ аутентификации. (Демонстрация того, что оба конца используют один и тот же ключ, не включает в себя отправку ключа по соединению).
Если аутентификация запрошена, но ключ аутентификации не указан, то используется возвращаемое значение current_process().authkey
(см. Process
). Это значение будет автоматически унаследовано любым объектом Process
, который создаст текущий процесс. Это означает, что (по умолчанию) все процессы многопроцессной программы будут иметь единый ключ аутентификации, который можно использовать при установке соединений между собой.
Подходящие ключи аутентификации можно также сгенерировать с помощью os.urandom()
.
Ведение журнала¶
Имеется некоторая поддержка протоколирования. Обратите внимание, однако, что пакет logging
не использует общие блокировки процессов, поэтому возможно (в зависимости от типа обработчика) смешивание сообщений от разных процессов.
- multiprocessing.get_logger()¶
Возвращает регистратор, используемый
multiprocessing
. При необходимости будет создан новый.При первом создании этот регистратор имеет уровень
logging.NOTSET
и не имеет обработчика по умолчанию. Сообщения, отправленные в этот логгер, по умолчанию не будут передаваться в корневой логгер.Обратите внимание, что в Windows дочерние процессы наследуют только уровень регистратора родительского процесса - любая другая настройка регистратора не наследуется.
- multiprocessing.log_to_stderr(level=None)¶
Эта функция выполняет вызов
get_logger()
, но помимо возврата логгера, созданного get_logger, она добавляет обработчик, который отправляет вывод вsys.stderr
, используя формат'[%(levelname)s/%(processName)s] %(message)s'
. Вы можете изменитьlevelname
логгера, передав аргументlevel
.
Ниже приведен пример сеанса с включенным протоколированием:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
Полную таблицу уровней протоколирования см. в модуле logging
.
Модуль multiprocessing.dummy
¶
multiprocessing.dummy
повторяет API multiprocessing
, но является не более чем оберткой для модуля threading
.
В частности, функция Pool
, предоставляемая multiprocessing.dummy
, возвращает экземпляр ThreadPool
, который является подклассом Pool
, поддерживающим все те же вызовы методов, но использующим пул рабочих потоков, а не рабочих процессов.
- class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])¶
Объект пула потоков, управляющий пулом рабочих потоков, на которые могут быть отправлены задания. Экземпляры
ThreadPool
полностью совместимы по интерфейсу с экземплярамиPool
, и их ресурсами также необходимо правильно управлять, либо используя пул в качестве менеджера контекста, либо вызываяclose()
иterminate()
вручную.processes - это количество рабочих потоков, которые необходимо использовать. Если processes равно
None
, то используется число, возвращаемоеos.process_cpu_count()
.Если initializer не
None
, то каждый рабочий процесс при запуске будет вызыватьinitializer(*initargs)
.В отличие от
Pool
, maxtasksperchild и context не могут быть предоставлены.Примечание
У
ThreadPool
тот же интерфейс, что и уPool
, который был разработан для пула процессов и появился до появления модуляconcurrent.futures
. Поэтому он наследует некоторые операции, которые не имеют смысла для пула, поддерживаемого потоками, и имеет свой собственный тип для представления статуса асинхронных заданий,AsyncResult
, который не понимается другими библиотеками.Пользователи, как правило, предпочитают использовать
concurrent.futures.ThreadPoolExecutor
, который имеет более простой интерфейс, который с самого начала был разработан с учетом потоков, и который возвращает экземплярыconcurrent.futures.Future
, совместимые со многими другими библиотеками, включаяasyncio
.
Рекомендации по программированию¶
Существуют определенные рекомендации и идиомы, которых следует придерживаться при использовании multiprocessing
.
Все методы запуска¶
Следующее относится ко всем методам запуска.
Избегайте общего состояния
По возможности нужно стараться избегать перемещения больших объемов данных между процессами.
Вероятно, лучше всего использовать очереди или трубы для связи между процессами, а не примитивы синхронизации нижнего уровня.
Маринованность
Убедитесь, что аргументы методов прокси являются picklable.
Безопасность прокси-серверов
Не используйте прокси-объект более чем в одном потоке, если вы не защитили его блокировкой.
(Никогда не возникает проблем с различными процессами, использующими один и тот же прокси).
Объединение зомби-процессов
В POSIX, когда процесс завершается, но к нему не присоединились, он становится зомби. Их никогда не должно быть очень много, потому что каждый раз, когда начинается новый процесс (или вызывается
active_children()
), все завершенные процессы, которые еще не были присоединены, будут присоединены. Также вызовProcess.is_alive
завершенного процесса присоединит его. Тем не менее, вероятно, хорошей практикой является явное присоединение ко всем процессам, которые вы запускаете.
Лучше наследовать, чем отбирать/не отбирать
При использовании методов запуска spawn или forkserver многие типы из
multiprocessing
должны быть picklable, чтобы дочерние процессы могли их использовать. Однако, как правило, следует избегать передачи общих объектов другим процессам с помощью труб или очередей. Вместо этого следует организовать программу таким образом, чтобы процесс, которому нужен доступ к общему ресурсу, созданному в другом месте, мог унаследовать его от процесса-предка.
Избегайте завершения процессов
Использование метода
Process.terminate
для остановки процесса может привести к тому, что все общие ресурсы (такие как блокировки, семафоры, каналы и очереди), используемые процессом в данный момент, станут нерабочими или недоступными для других процессов.Поэтому, вероятно, лучше использовать
Process.terminate
только для процессов, которые никогда не используют общие ресурсы.
Присоединение к процессам, использующим очереди
Имейте в виду, что процесс, поместивший элементы в очередь, будет ждать перед завершением, пока все буферизованные элементы не будут переданы потоком-«фидером» в базовую трубу. (Чтобы избежать такого поведения, дочерний процесс может вызвать метод
Queue.cancel_join_thread
очереди).Это означает, что при использовании очереди необходимо убедиться, что все элементы, которые были поставлены в очередь, в конечном итоге будут удалены до того, как процесс будет завершен. В противном случае вы не сможете быть уверены, что процессы, поставившие элементы в очередь, завершатся. Помните также, что недемонические процессы будут присоединяться автоматически.
Пример, который приведет к тупику, следующий:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()Исправление заключается в том, чтобы поменять местами две последние строки (или просто удалить строку
p.join()
).
Явная передача ресурсов дочерним процессам
В POSIX при использовании метода запуска fork дочерний процесс может использовать общий ресурс, созданный в родительском процессе с помощью глобального ресурса. Однако лучше передать объект в качестве аргумента в конструктор дочернего процесса.
Помимо того, что код (потенциально) совместим с Windows и другими методами запуска, это также гарантирует, что пока дочерний процесс жив, объект не будет собран в родительском процессе. Это может быть важно, если при сборке объекта в родительском процессе освобождается какой-то ресурс.
Так, например,
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()следует переписать как
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
Остерегайтесь заменять sys.stdin
«файлоподобным объектом».
multiprocessing
первоначально безоговорочно назывался:os.close(sys.stdin.fileno())в методе
multiprocessing.Process._bootstrap()
- это приводило к проблемам с процессами-в-процессах. Это было изменено на:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)Это решает фундаментальную проблему столкновения процессов друг с другом, что приводит к ошибке плохого дескриптора файла, но создает потенциальную опасность для приложений, которые заменяют
sys.stdin()
на «файлоподобный объект» с буферизацией вывода. Эта опасность заключается в том, что если несколько процессов вызовутclose()
на этом файлоподобном объекте, это может привести к тому, что одни и те же данные будут выгружены в объект несколько раз, что приведет к повреждению.Если вы напишете файлоподобный объект и реализуете собственное кэширование, вы можете сделать его fork-safe, сохраняя pid при каждом добавлении в кэш и удаляя кэш при изменении pid. Например:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cacheДополнительную информацию см. в разделах bpo-5155, bpo-5313 и bpo-5331.
Методы запуска spawn и forkserver¶
Есть несколько дополнительных ограничений, которые не распространяются на метод запуска fork.
Более высокая маринованность
Убедитесь, что все аргументы метода
Process.__init__()
являются picklable. Также, если вы подклассProcess
, то убедитесь, что экземпляры будут picklable при вызове методаProcess.start
.
Глобальные переменные
Имейте в виду, что если код, запущенный в дочернем процессе, попытается получить доступ к глобальной переменной, то значение, которое он увидит (если оно есть), может не совпадать со значением в родительском процессе в момент вызова
Process.start
.Однако глобальные переменные, которые являются просто константами уровня модуля, не вызывают никаких проблем.
Безопасный импорт главного модуля
Убедитесь, что главный модуль может быть безопасно импортирован новым интерпретатором Python, не вызывая непредвиденных побочных эффектов (например, запуска нового процесса).
Например, при использовании метода запуска spawn или forkserver запуск следующего модуля завершится с ошибкой
RuntimeError
:from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()Вместо этого следует защитить «точку входа» программы, используя
if __name__ == '__main__':
следующим образом:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(Строку
freeze_support()
можно опустить, если программа будет выполняться в обычном режиме, а не в замороженном).Это позволяет вновь создаваемому интерпретатору Python безопасно импортировать модуль и затем запустить его функцию
foo()
.Аналогичные ограничения действуют, если пул или менеджер создаются в главном модуле.
Примеры¶
Демонстрация того, как создавать и использовать настраиваемые менеджеры и прокси:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
Использование Pool
:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
Пример, показывающий, как использовать очереди для подачи заданий на набор рабочих процессов и сбора результатов:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()