• FastAPI CPU 密集型任务的处理办法

    FastAPI 经常号称是非常快的框架,因为它大量使用 async。对于一些 IO 密集型的任务,是有一些坑要处理的,不然整个 async 就被堵住了。这个处理还是比较简单的,按官网文档的说法,

    如果你的应用程序不需要与其他任何东西通信而等待其响应,请使用async def
    如果你不清楚,使用def就好

    那如果是 CPU 密集型的任务呢?这是工作中同事遇到的问题。

    def的办法就不行了——会把线程池堵住。

    下面 4 种写法,有 2 种都可以解决这个问题,都是进程池的方案。

    from concurrent.futures import ProcessPoolExecutor
    import asyncio
    import time
    
    from fastapi import FastAPI  
    import uvicorn
    
    
    app = FastAPI()  
    
    def fib(n: int) -> int:
        if n == 1 or n == 2:
            return 1
        return fib(n-1) + fib(n-2)
    
    def some_cpu_work() -> int:
        return fib(36)  # 大概1s
    
    
    # 通用配置:20个并发,同时请求
    
    @app.get("/test1")  
    async def test1():
        """ async定义+直接计算    排队 27s  CPU占用1核心 """
        data = some_cpu_work()
        print(data)
        return data  
    
    @app.get("/test2")  
    def test2():
        """ 普通定义+直接计算      自带线程池 依然27s  CPU占用1核心 """
        data = some_cpu_work()
        print(data)
        return data  
    
    #################################### 开一个进程池
    process_pool_executor = ProcessPoolExecutor(max_workers=4)
    
    @app.get("/test3")
    def test3():
        """ 普通定义+进程池   8s CPU占用4核心   """
        task = process_pool_executor.submit(some_cpu_work)
        data = task.result()
        print(data)
        return data
    
    @app.get("/test4")
    async def test4():
        """ async定义+交给进程池  8s CPU占用4核心  """
        data = await asyncio.get_running_loop().run_in_executor(process_pool_executor, some_cpu_work)
        print(data)
        return data
    
    if __name__ == "__main__":
        uvicorn.run(app, host="127.0.0.1", port=8000)

    具体来说,20 个请求同时请求,这 20 个请求在 4 种写法的结果如下:(其中第2种堵住了自带线程池的结果最出乎意料)

    阅读更多…