source

다중 처리를 위해 공유 메모리에서 numpy 배열 사용

ittop 2023. 8. 25. 23:58
반응형

다중 처리를 위해 공유 메모리에서 numpy 배열 사용

멀티프로세싱 모듈과 함께 사용할 수 있도록 공유 메모리에 numpy 배열을 사용하고 싶습니다.문제는 단순히 ctypes 배열이 아닌 numpy 배열처럼 사용하는 것입니다.

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

이렇게 하면 다음과 같은 출력이 생성됩니다.

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

어레이는 예를 들어 actypes 방식으로 액세스할 수 있습니다.arr[i]말이 되는군요.그러나 numpy 배열이 아니므로 다음과 같은 작업을 수행할 수 없습니다.-1*arr또는arr.sum()해결책은 ctypes 배열을 numpy 배열로 변환하는 것이라고 생각합니다.하지만 (이 작업을 할 수 없는 것 외에도) 더 이상 공유되지 않을 것이라고 생각합니다.

일반적인 문제에 대한 표준 해결책이 있을 것으로 보입니다.

@unutbu의 (더 이상 사용할 수 없음) 및 @Henry Gomersall의 답변에 추가합니다.사용할 수 있습니다.shared_arr.get_lock()필요할 때 액세스 동기화:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

동기화된 액세스가 필요하지 않거나 자체 잠금을 만드는 경우mp.Array()불필요합니다.사용할 수 있습니다.mp.sharedctypes.RawArray이 경우에는

Array객체가 있습니다.get_obj()버퍼 인터페이스를 제공하는 ctypes 배열을 반환하는 연결된 메서드입니다.저는 다음이 효과가 있을 것이라고 생각합니다...

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

실행 시 다음의 첫 번째 요소가 출력됩니다.a현재 10.0이며, 보여줍니다.a그리고.b두 개의 뷰만 동일한 메모리에 저장됩니다.

멀티프로세서가 여전히 안전한지 확인하기 위해, 저는 당신이 그것을 사용해야 한다고 믿습니다.acquire그리고.release에 존재하는 방법Array물건,a안전하게 액세스할 수 있도록 잠금 기능이 내장되어 있습니다(멀티프로세서 모듈 전문가는 아니지만).

이미 제시된 답변은 좋지만 다음 두 가지 조건을 충족할 경우 이 문제를 훨씬 쉽게 해결할 수 있습니다.

  1. POSIX 호환 운영 체제(예: Linux, Mac OSX)에 있는 경우
  2. 하위 프로세스에는 공유 어레이에 대한 읽기 전용 액세스 권한이 필요합니다.

이 경우 하위 프로세스는 포크를 사용하여 생성되므로 변수를 명시적으로 공유할 필요가 없습니다.분기된 자식은 부모의 메모리 공간을 자동으로 공유합니다.파이썬 멀티프로세싱의 맥락에서, 이것은 모든 모듈 수준 변수를 공유한다는 것을 의미합니다. 이것은 당신이 당신의 자식 프로세스나 당신이 호출하는 함수에 명시적으로 전달하는 인수에는 적용되지 않습니다.multiprocessing.Pool그 정도.

간단한 예:

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))

POSIX 공유 메모리를 사용하여 파이썬 인터프리터 간에 numpy 배열을 공유하는 작은 파이썬 모듈을 작성했습니다.아마 당신은 그것을 편리하게 찾을 수 있을 것입니다.

https://pypi.python.org/pypi/SharedArray

작동 방식은 다음과 같습니다.

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])

# Destroying a does not affect b.
del a
print(b[0])

# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()

# Now destroy the array "test1" from memory.
sa.delete("test1")

# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])

당신은 할 수 .sharedmem모듈: https://bitbucket.org/cleemesser/numpy-sharedmem

그러면 여기 원래 코드가 있습니다. 이번에는 NumPy 배열처럼 작동하는 공유 메모리를 사용합니다(NumPy를 호출하는 추가적인 마지막 문장 참고).sum()함수):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()

Python 3.8+에는 표준 라이브러리가 있습니다.

# np_sharing.py
from multiprocessing import Process
from multiprocessing.managers import SharedMemoryManager
from multiprocessing.shared_memory import SharedMemory
from typing import Tuple

import numpy as np


def create_np_array_from_shared_mem(
    shared_mem: SharedMemory, shared_data_dtype: np.dtype, shared_data_shape: Tuple[int, ...]
) -> np.ndarray:
    arr = np.frombuffer(shared_mem.buf, dtype=shared_data_dtype)
    arr = arr.reshape(shared_data_shape)
    return arr


def child_process(
    shared_mem: SharedMemory, shared_data_dtype: np.dtype, shared_data_shape: Tuple[int, ...]
):
    """Logic to be executed by the child process"""
    arr = create_np_array_from_shared_mem(shared_mem, shared_data_dtype, shared_data_shape)
    arr[0, 0] = -arr[0, 0]  # modify the array backed by shared memory


def main():
    """Logic to be executed by the parent process"""

    # Data to be shared:
    data_to_share = np.random.rand(10, 10)

    SHARED_DATA_DTYPE = data_to_share.dtype
    SHARED_DATA_SHAPE = data_to_share.shape
    SHARED_DATA_NBYTES = data_to_share.nbytes

    with SharedMemoryManager() as smm:
        shared_mem = smm.SharedMemory(size=SHARED_DATA_NBYTES)

        arr = create_np_array_from_shared_mem(shared_mem, SHARED_DATA_DTYPE, SHARED_DATA_SHAPE)
        arr[:] = data_to_share  # load the data into shared memory

        print(f"The [0,0] element of arr is {arr[0,0]}")  # before

        # Run child process:
        p = Process(target=child_process, args=(shared_mem, SHARED_DATA_DTYPE, SHARED_DATA_SHAPE))
        p.start()
        p.join()

        print(f"The [0,0] element of arr is {arr[0,0]}")  # after

        del arr  # delete np array so the shared memory can be deallocated


if __name__ == "__main__":
    main()

스크립트 실행:

$ python3.10 np_sharing.py
The [0,0] element of arr is 0.262091705529628
The [0,0] element of arr is -0.262091705529628

서로 다른 프로세스의 어레이는 동일한 기본 메모리 버퍼를 공유하므로 표준 주의 사항, 즉 레이스 조건이 적용됩니다.

언급URL : https://stackoverflow.com/questions/7894791/use-numpy-array-in-shared-memory-for-multiprocessing

반응형