programing

Pandas DataFrame 적용() 모든 코어 사용?

padding 2023. 9. 25. 22:24
반응형

Pandas DataFrame 적용() 모든 코어 사용?

2017년 8월 현재 Pandas DataFame.apply()는 안타깝게도 여전히 단일 코어로 작업하는 것으로 제한되어 있습니다. 즉, 다중 코어 시스템은 실행 시 대부분의 컴퓨팅 시간을 낭비하게 됩니다.df.apply(myfunc, axis=1).

데이터 프레임에서 애플리케이션을 병렬로 실행하기 위해 어떻게 코어를 모두 사용할 수 있습니까?

패키지를 사용할 수 있습니다.

pip install swifter

설치된 종속성과의 버전 충돌을 피하기 위해 가상 환경에서 이 기능을 사용할 수도 있습니다.

할 수 .apply함수:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

함수가 벡터화되었는지 여부에 관계없이(위의 예에서와 같이) 함수를 병렬화하는 가장 효율적인 방법을 자동으로 파악합니다.

많은 예시와 성능 비교는 GitHub에서 이용할 수 있습니다.패키지는 현재 개발 중이므로 API가 변경될 수 있습니다.

또한 문자열 열에 대해서는 자동으로 작동하지 않습니다.줄을 사용할 때 스위퍼는 "단순한" 판다로 되돌아갑니다.apply평행하지 을 사용하도록 도 합니다dask는 성능 향상을 가져오지 않으며 데이터셋을 수동으로 분할하고 를 사용하여 병렬화하는 것이 좋습니다.

가장 간단한 방법은 Dask의 map_partitions를 사용하는 것입니다.이 수입품이 필요합니다(필요할 것입니다).pip install dask):

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

그리고 구문은

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  

(코어가 16개라면 30개가 적당한 파티션 개수라고 생각합니다.완성도를 높이기 위해 기계(16개 코어)의 차이를 측정했습니다.

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0.010668013244867325

팬더로부터 10배의 속도 증가를 주는 것은 파티션에 적용되는 dask에 적용됩니다.물론 벡터화할 수 있는 함수가 있다면 - 이 경우 함수 (y*(x**2+1))는 3중 벡터화되어 있지만 벡터화가 불가능한 것들이 많습니다.

해봐도 좋습니다.pandarallel대신: & 모든 CPU(On Linux & macOS)에서 하는 간단하고

  • 병렬화에는 비용(새로운 프로세스 인스턴스화, 공유 메모리를 통한 데이터 전송 등)이 들기 때문에 병렬화할 계산량이 충분히 많은 경우에만 병렬화가 효율적입니다.매우 적은 양의 데이터의 경우 병렬화를 사용하는 것이 항상 가치 있는 것은 아닙니다.
  • 적용되는 기능은 람다 기능이 아니어야 합니다.
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

https://github.com/nalepae/pandarallel 를 참조하십시오.

네이티브 파이썬에 머무르려면:

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

입니다 합니다.fcoldf

Dask에 대한 업데이트 답변을 원합니다.

import dask.dataframe as dd

def your_func(row):
  #do something
  return row

ddf = dd.from_pandas(df, npartitions=30) # find your own number of partitions
ddf_update = ddf.apply(your_func, axis=1).compute()

10만 건의 기록을 남겼지만, 다스크는 없었습니다.

CPU 시간: 사용자 6분 32초, sys: 100ms, 총: 6분 32초 벽 시간: 6분 32초

Dask 사용 시:

CPU 시간: 사용자 5.19초, 시스템: 784ms, 총: 5.98초 월 시간: 1분 3초

모든 (물리적 또는 논리적) 코어를 사용하기 위해서는swifter그리고.pandarallel.

코어의 양(및 청킹 동작)을 설정할 수 있습니다.

import pandas as pd
import mapply

mapply.init(n_workers=-1)

...

df.mapply(myfunc, axis=1)

으로()n_workers=-1합니다.) CPU 를합니다를 합니다.의 두 배)을 딩적 CPU배)우,mapply시스템의 다른 프로세스보다 멀티프로세싱 풀의 우선 순위를 정하기 위해 한 명의 추가 작업자를 생성합니다.

all your cores도 있습니다와 같이 가 느려질 수 즉, ( CPU CPU 를).

import multiprocessing
n_workers = multiprocessing.cpu_count()

# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)

여기에 팬더가 적용되는 sklearn base transformer의 예가 있습니다.

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator

class ParllelTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,
                 n_jobs=1):
        """
        n_jobs - parallel jobs to run
        """
        self.variety = variety
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit(self, X, y=None):
        return self
    def transform(self, X, *_):
        X_copy = X.copy()
        cores = mp.cpu_count()
        partitions = 1

        if self.n_jobs <= -1:
            partitions = cores
        elif self.n_jobs <= 0:
            partitions = 1
        else:
            partitions = min(self.n_jobs, cores)

        if partitions == 1:
            # transform sequentially
            return X_copy.apply(self._transform_one)

        # splitting data into batches
        data_split = np.array_split(X_copy, partitions)

        pool = mp.Pool(cores)

        # Here reduce function - concationation of transformed batches
        data = pd.concat(
            pool.map(self._preprocess_part, data_split)
        )

        pool.close()
        pool.join()
        return data
    def _transform_part(self, df_part):
        return df_part.apply(self._transform_one)
    def _transform_one(self, line):
        # some kind of transformations here
        return line

자세한 내용은 https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8 을 참조하십시오.

원래 질문에 따라 전체 DataFrame에 적용할 수 있는 기본 Python 솔루션(numpy 포함)(단일 열에만 적용되는 것이 아님)

import numpy as np
import multiprocessing as mp

dfs = np.array_split(df, 8000) # divide the dataframe as desired

def f_app(df):
    return df.apply(myfunc, axis=1)

with mp.Pool(mp.cpu_count()) as pool:
    res = pd.concat(pool.map(f_app, dfs))

여기 또 다른 것은 잡립과 스키킷-런의 도우미 코드를 사용하는 것입니다.가벼운(이미 스키킷 러닝을 가지고 있는 경우), 잡리브는 해킹이 용이하기 때문에 작업을 더 잘 제어하고 싶다면 좋습니다.

from joblib import parallel_backend, Parallel, delayed, effective_n_jobs
from sklearn.utils import gen_even_slices
from sklearn.utils.validation import _num_samples


def parallel_apply(df, func, n_jobs= -1, **kwargs):
    """ Pandas apply in parallel using joblib. 
    Uses sklearn.utils to partition input evenly.
    
    Args:
        df: Pandas DataFrame, Series, or any other object that supports slicing and apply.
        func: Callable to apply
        n_jobs: Desired number of workers. Default value -1 means use all available cores.
        **kwargs: Any additional parameters will be supplied to the apply function
        
    Returns:
        Same as for normal Pandas DataFrame.apply()
        
    """
    
    if effective_n_jobs(n_jobs) == 1:
        return df.apply(func, **kwargs)
    else:
        ret = Parallel(n_jobs=n_jobs)(
            delayed(type(df).apply)(df[s], func, **kwargs)
            for s in gen_even_slices(_num_samples(df), effective_n_jobs(n_jobs)))
        return pd.concat(ret)

:result = parallel_apply(my_dataframe, my_func)

대신에

df["new"] = df["old"].map(fun)

하다

from joblib import Parallel, delayed
df["new"] = Parallel(n_jobs=-1, verbose=10)(delayed(fun)(i) for i in df["old"])

제가 보기에 이것은 약간의 개선입니다.

import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
    df["new"] = pool.map(fun, df["old"])

작업이 매우 작은 경우 진행률 표시와 자동 배치를 받을 수 있습니다.

"어떻게 하면 모든 코어를 사용하여 데이터 프레임에서 병렬로 응용 프로그램을 실행할 수 있는가?"라는 질문이 있었기 때문에, 답은 다음과 같습니다.modin할 수 은 더 않습니다 모든 코어를 병렬로 실행할 수 있지만, 실시간은 더 좋지 않습니다.

https://github.com/modin-project/modin 를 참조하십시오.위에서 달립니다.dask아니면ray입니다." 라고 "Modin 1MB 에서 1TB 된 데이터프레임 입니다" 라고 .라고 합니다.pip3 install "modin"[ray]" 대 대 모딘 대 팬더는 6코어에서 12초 대 6초였습니다.

열해야 할 에는 해야 합니다..apply기능이 문제가 될 수도 있습니다.저 다를 .astype()열 이름을 기준으로 함수를 지정합니다.이 방법은 아마도 가장 효율적인 방법은 아니지만 목적을 충족하고 열 이름을 원래 이름으로 유지할 수 있습니다.

import multiprocessing as mp

def f(df):
    """ the function that you want to apply to each column """
    column_name = df.columns[0] # this is the same as the original column name
    # do something what you need to do to that column
    return df

# Here I just make a list of all the columns. If you don't use .to_frame() 
# it will pass series type instead of a dataframe

dfs = [df[column].to_frame() for column in df.columns]
with mp.Pool(mp.cpu_num) as pool:
    processed_df = pd.concat(pool.map(f, dfs), axis=1)

언급URL : https://stackoverflow.com/questions/45545110/make-pandas-dataframe-apply-use-all-cores

반응형