본문 바로가기

카테고리 없음

Python : 병렬성 처리 - futures 모듈 가이드: 개념, 장단점, 사용법 및 예제

 futures는 동시성을 쉽게 관리하기 위한 고수준 API로, 특히 멀티쓰레딩(ThreadPoolExecutor)과 멀티프로세싱(ProcessPoolExecutor)를 통해 여러 작업을 비동기적으로 실행할 수 있게 해줍니다.  멀티쓰레딩 / 멀티프로세싱 API 를 통일 시켜 동일한 API를 제공하기 때문에 사용하기가 매우 쉽습니다.  실행중인 작업 취소/완료 여부 체크, 타임아웃 옵션, 콜백추가, 동기화 코드 등을 쉽게 작성할 수 있습니다.  JavaScript의 Promise 개념과 동일하다고 볼 수 있습니다.

 

futures 개요 및 장단점

futures 모듈은 Python의 concurrent.futures에 포함되어 있으며, 비동기 실행을 간편하게 관리할 수 있는 기능을 제공합니다. 주로 비동기 작업이 필요할 때 유용하며, 특히 멀티스레드 또는 멀티프로세싱을 활용할 수 있는 API를 제공합니다.

  • ThreadPoolExecutor: I/O 바운드 작업에 적합합니다.
  • ProcessPoolExecutor: CPU 바운드 작업에 적합합니다.

 

 futures의 장점

  • 간단한 동시성 처리: 복잡한 동시성 코드를 간단한 API로 처리할 수 있습니다.
  • Future 객체로 작업 상태 추적: Future 객체는 작업 완료 상태와 결과를 확인할 수 있게 해줍니다.
  • Context Manager 지원: with 문을 사용해 자원을 자동으로 관리합니다.

 

futures의 단점

  • 스레드와 프로세스의 제약: 스레드가 병렬 작업에서 성능 이점을 크게 주지는 않습니다(Python의 GIL로 인한 제한). CPU 바운드 작업에서는 멀티프로세싱이 필요하지만, 메모리 사용량이 증가할 수 있습니다.
  • 디버깅 어려움: 비동기 작업이 많아질수록 예외 추적이 어려워질 수 있습니다.

 

사용 시 주의사항

  • 자원 관리: 많은 스레드 또는 프로세스를 생성하면 오버헤드가 발생할 수 있습니다. 필요할 때만 생성하세요.
  • GIL(Global Interpreter Lock): 두 개 이상의 쓰래드가 동시 수행될 때 하나의 자원을 두 쓰래드가 동시에 엑세스하려는 경우 동기화 문제가 발생할 수 있는데, 이를 방지하기 위해 GIL이 수행됩니다. GIL이 수행되면 리소스 전체에 Lock이 걸리게되므로 성능 상에 큰 걸림돌이 됩니다. 따라서 이를 회피하기 위해서는 멀티프로세싱-ProcessPoolExecutor 사용이 더 적합합니다.
  • • Exception Handling: 예외가 발생할 수 있는 Future 작업에서는 적절한 예외 처리 구문을 추가해야 합니다.

 

예제 코드

map 함수를 사용하여 쓰래드 수행 결과 보기

 

# 주어진 숫자들의 누적합을 동시에 구하기
import time
from concurrent import futures

WORK_LIST=[10000,100000,1000000,10000000]

def sum_generator(n):
    return sum( n for n in range(1,n+1))

def main():
    # 사용할 쓰래드 수를 계산한다. max=10
    worker = min(10, len(WORK_LIST))

    # 시작시간
    start_tm = time.time()

    #쓰래드 수행
    with futures.ThreadPoolExecutor(max_workers=worker) as executor:
        result = executor.map(sum_generator, WORK_LIST)

    #수행시간계산
    end_tm = time.time() - start_tm

    #결과 출력
    msg = '\n Result -> {} Time: {:.2f}s'
    print(msg,list(result), end_tm)

if __name__ == '__main__':
    main()

# Result -> [50005000, 5000050000, 500000500000, 50000005000000] Time: 10.21s

 

 위 예제는 멀티쓰레딩으로 작성된 코드입니다만, ThreadPoolExecutor를 ProcessPoolExecutor 로 바꾸면 멀티프로세싱 모델로 쉽게 바꿀 수 있습니다.  

 또한, 위 예제에 사용된 executor.map 함수는 모든 작업들이 끝나야 결과를 받고 다음 작업이 가능합니다. 

 

wait 함수를 사용하여 쓰래드 수행 결과 보기

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORK_LIST=[10000,100000,1000000,10000000]

def sum_generator(n):
    return sum( n for n in range(1,n+1))

def main():
    worker = min(10, len(WORK_LIST))
    futures_list = []

    start_tm = time.time()

    with ThreadPoolExecutor(max_workers=worker) as executor:
        for work in WORK_LIST:
            #future 반환
            future = executor.submit(sum_generator,work)
            #스케줄링 된 것이지 수행된 것은 아님.
            futures_list.append(future)
            #스케쥴링 확인
            print(f'Scheduled for {work} : {future}')

        # #wait 사용
        result = wait(futures_list,timeout=7)
        # 성공한 작업보기
        print('Completed Tasks: ' + str(result.done))
        # 실패한 작업보기
        print('Uncompleted Tasks: ' + str(result.not_done))

    end_tm = time.time() - start_tm
    #결과값 보기
    msg = '\n Result -> {} Time: {:.2f}s'
    print(msg.format([future.result() for future in result.done], end_tm))

if __name__ == '__main__':
    main()

# result
# Scheduled for 10000 : <Future at 0x10490dad0 state=pending>
# Scheduled for 100000 : <Future at 0x104735610 state=running>
# Scheduled for 1000000 : <Future at 0x10490f5d0 state=pending>
# Scheduled for 10000000 : <Future at 0x10490fed0 state=running>
# Completed Tasks: {<Future at 0x104735610 state=finished returned int>, <Future at 0x10490f5d0 state=finished returned int>, <Future at 0x10490dad0 state=finished returned int>}
# Uncompleted Tasks: {<Future at 0x10490fed0 state=running>}

#  Result -> [5000050000, 500000500000, 50005000] Time: 9.95s

 

wait 함수는 아래와 같이 사용합니다.

wait( fs , timeout=None, return_when=ALL_COMPLETED )

 

future 인스턴스들이 완료될때까지 기다립니다, 하지만 시간제한과 종료 조건이 같이 주어집니다. 종료 조건은 아래와 같습니다. 

FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED(default)

 

예제에서 처럼 완료된 task와 미완료되 task 들을 별도의 셋으로 반환하기 때문에 별도의 처리가 가능합니다.

 

as_completed 를 사용하여 쓰래드 수행 결과 보기

import time
from concurrent.futures import ThreadPoolExecutor, as_completed

WORK_LIST=[10000,100000,1000000,10000000]

def sum_generator(n):
    return sum( n for n in range(1,n+1))

def main():
    worker = min(10, len(WORK_LIST))   

    start_tm = time.time()

    with ThreadPoolExecutor(max_workers=worker) as executor:
        futures_list = [executor.submit(sum_generator,work) for work in WORK_LIST]        
        #as_completed 사용
        for future in as_completed(futures_list):
            #빨리 처리된 task부터 반환
            result = future.result()
            done = future.done()
            cancelled = future.cancelled
            
            print(f'Done: {done}, result: {result}')
            print(f'Cancelled: {cancelled}')

    end_tm = time.time() - start_tm
    
    # 소요시간 보기
    msg = '\n Result -> Time: {:.2f}s'
    print(msg.format(end_tm))

if __name__ == '__main__':
    main()

# Result
# Done: True, result: 50005000
# Cancelled: <bound method Future.cancelled of <Future at 0x101f96e50 state=finished returned int>>
# Done: True, result: 5000050000
# Cancelled: <bound method Future.cancelled of <Future at 0x101fa04d0 state=finished returned int>>
# Done: True, result: 500000500000
# Cancelled: <bound method Future.cancelled of <Future at 0x101f97150 state=finished returned int>>
# Done: True, result: 50000005000000
# Cancelled: <bound method Future.cancelled of <Future at 0x101fa1050 state=finished returned int>>

# Result -> Time: 10.07s

 

as_completed 함수는 아래와 같이 사용합니다.

as_completed( fs , timeout=None )

 

wait 함수와는 다르게  먼저 완료된 future 결과값을 먼저 반환합니다.  선택적으로 timeout 키워드를 줄 수 있습니다.  이 경우 시간 내에 처리되지 못한 future 에 대해서 TimeoutError 를 발생 시킵니다. 

 

결론 

Python의 concurrent.futures 모듈은 동시성 작업을 간편하게 처리할 수 있도록 도와주는 강력한 도구입니다. ThreadPoolExecutorProcessPoolExecutor를 통해 I/O 바운드와 CPU 바운드 작업을 효율적으로 분리하고 관리할 수 있어, 코드의 성능을 높이고 응답성을 향상시킬 수 있습니다.

 

하지만 모든 경우에 적합한 것은 아닙니다. ThreadPoolExecutor는 GIL로 인해 CPU 바운드 작업에서는 성능 이점이 제한적이며, ProcessPoolExecutor는 멀티프로세싱의 메모리 오버헤드를 고려해야 합니다. 따라서 적절한 동시성 도구를 선택하고 예외 처리와 자원 관리에 주의를 기울여야 합니다.

 

올바르게 사용한다면 concurrent.futures는 Python에서 간단하고 효율적으로 동시성 프로그램을 작성하는 데 큰 도움이 될 것입니다.