• FastAPI CPU 密集型任务的处理办法

    FastAPI 经常号称是非常快的框架,因为它大量使用 async。对于一些 IO 密集型的任务,是有一些坑要处理的,不然整个 async 就被堵住了。这个处理还是比较简单的,按官网文档的说法,

    如果你的应用程序不需要与其他任何东西通信而等待其响应,请使用async def
    如果你不清楚,使用def就好

    那如果是 CPU 密集型的任务呢?这是工作中同事遇到的问题。

    def的办法就不行了——会把线程池堵住。

    下面 4 种写法,有 2 种都可以解决这个问题,都是进程池的方案。

    from concurrent.futures import ProcessPoolExecutor
    import asyncio
    import time
    
    from fastapi import FastAPI  
    import uvicorn
    
    
    app = FastAPI()  
    
    def fib(n: int) -> int:
        if n == 1 or n == 2:
            return 1
        return fib(n-1) + fib(n-2)
    
    def some_cpu_work() -> int:
        return fib(36)  # 大概1s
    
    
    # 通用配置:20个并发,同时请求
    
    @app.get("/test1")  
    async def test1():
        """ async定义+直接计算    排队 27s  CPU占用1核心 """
        data = some_cpu_work()
        print(data)
        return data  
    
    @app.get("/test2")  
    def test2():
        """ 普通定义+直接计算      自带线程池 依然27s  CPU占用1核心 """
        data = some_cpu_work()
        print(data)
        return data  
    
    #################################### 开一个进程池
    process_pool_executor = ProcessPoolExecutor(max_workers=4)
    
    @app.get("/test3")
    def test3():
        """ 普通定义+进程池   8s CPU占用4核心   """
        task = process_pool_executor.submit(some_cpu_work)
        data = task.result()
        print(data)
        return data
    
    @app.get("/test4")
    async def test4():
        """ async定义+交给进程池  8s CPU占用4核心  """
        data = await asyncio.get_running_loop().run_in_executor(process_pool_executor, some_cpu_work)
        print(data)
        return data
    
    if __name__ == "__main__":
        uvicorn.run(app, host="127.0.0.1", port=8000)

    具体来说,20 个请求同时请求,这 20 个请求在 4 种写法的结果如下:(其中第2种堵住了自带线程池的结果最出乎意料)

    阅读更多…
  • Python 多进程共享内存、NumPy 数组 | Sharing NumPy Array When Using Python Multiprocessing

    背景

    当前的项目需要对大型 numpy 数组进行各种运算(不是深度学习的那种运算),实践发现只开一个 python 进程时,只能使用一个 CPU 核心。所以考虑使用 multiprocessing 模块进行多进程运算。

    但是,问题也很明显:用的是 multiprocessing.pool,如果我的 pool 的 size 是 4,一个 GB 级的 ndarray 传给 pool,会复制 4 份到每一个子进程。这首先会在传输时花时间做相应的 pickle 和 unpickle 操作;更重要的是,这坨数据会在内存里复制 4 份——这直接导致能处理的最大数据大小缩小了四分之三。

    本文使用的 Python 版本为 3.6 / 3.7,Windows 系统。
    在 3.8 版本中,新加入了 multiprocessing.shared_memory 模块,应该能简化这个问题。但是目前为止,项目使用的部分包还不支持 3.8,所以仍需要在旧版本中解决这个问题。

    Value 与 Array

    在 multiprocessing 包中,提供了一些可共享的对象:Value、Array、RawValue 与 RawArray。基本上,前者没有 Raw 的,可以加锁以进行进程间同步,后面 Raw 的没有锁。项目中用到的 numpy 数组都是只读的,子进程只需要读不需要写,所以选择使用 RawArray。

    阅读更多…