• Python 分布式任务队列 Celery 入门与使用

    最近工作的项目使我接触到了 Celery 这个任务队列。看了一下官方的文档,感觉设计得还挺 Pythonic,理念也非常简单易懂——类似生产者与消费者。在这里稍微总(fan)结(yi)一下 Celery 的使用方法。

    简介

    Celery 是一个分布式任务队列,网上也有说是分布式任务调度框架,这里我以官方文档的“Distributed Task Queue”为准。它简单、灵活、可靠,可以处理大量的大量的任务,其主要专注于实时处理,同时也支持计划任务。

    为什么要用任务队列?我的理解是,首先方便了任务的分发调度与管理,另外也使调用的过程变得异步(非常适合 web 请求)。

    名词解释

    • 任务队列(task queue):一种分发任务到不同的线程或机器的方法,其输入为一个任务(task)。
    • Worker:实际执行任务的进程,它不断检查任务队列中的新任务并执行。
    • Broker:客户端与 worker 通信的中介。客户端发送任务的消息到队列中,broker 把这条消息传递给一个 worker。

    入门

    如果不考虑进阶用法,5 分钟入门。

    安装

    首先安装 Celery 并选择 broker。其中 broker 主要支持 RabbitMQ 和 Redis。RabbitMQ 不需要额外依赖,直接执行pip install -U celery安装。 Redis 需要在安装 Celery 时安装附加的依赖:pip install -U "celery[redis]"

    RabbitMQ 更为适合生产环境,也稍微大型;Redis 更轻量级,但突然退出时可能丢失数据。为了稍微简单轻量,本文都用 Redis。(如何安装 broker 不在本文内讨论,docker 启动一个最为简单)

    新建 Celery 应用

    新建一个mytasks.py

    from celery import Celery
    
    app = Celery('tasks', broker='redis://localhost:6379/0')
    
    @app.task
    def add(x, y):
        return x + y

    接下来就可以启动 worker 了:(生产环境当然不会这样手动运行,而会把它作为后台程序运行)

    $ celery -A mytasks worker --loglevel=info
    
    # 如果不了解上面的命令用法,可查看命令帮助
    # celery help
    # celery worker --help

    调用 task

    在当前的目录,运行

    >>> from mytasks import add
    >>> add.delay(1, 2)  # 使用 delay() 来使worker调用这个task

    可以得到类似<AsyncResult: fd9cdbe3-bcb3-432a-8d46-67b41243cfed>的返回值,而不会返回 3;这个 3 在 worker 的控制台里可以看到:Task mytasks.add[fd9cdbe3-bcb3-432a-8d46-67b41243cfed] succeeded in 0.0003419s: 3

    保存结果

    默认情况下,结果是不保存的。如果想保存结果,需要指定 result backend,支持 SQLAlchemy/Django ORM, MongoDB, Memcached, Redis, RPC (RabbitMQ/AMQP) 等。例如app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0'),调用之后就可以查询任务的状态及结果。

    >>> result = add.delay(1, 2)
    >>> result.ready()
    True
    >>> result.get(timeout=1)
    3

    参数配置

    简单的参数配置可以直接在代码中修改app.conf,例如:

    app.conf.task_serializer = 'json'

    对于大型一点的项目,最好专门做一个配置模块。先新建一个 celeryconfig.py:

    broker_url = 'pyamqp://xxxx'
    result_backend = 'rpc://xxxx'
    
    task_serializer = 'json'
    timezone = 'Europe/Oslo'
    enable_utc = True
    
    task_annotations = {
        'mytasks.add': {'rate_limit': '10/m'}
    }

    然后通过app.config_from_object('celeryconfig')导入。

    稍微深入

    Task

    Task 有很多选项可以填入,例如用@app.task(bind=True, default_retry_delay=30 * 60),可以修改任务失败后,等待重试的时间。

    关于任务的重试,我后来因工作需要,又深入阅读了文档。理想的目标是使一个任务可以自动重试,若重试一定次数仍失败,则发送通知。

    首先我看到了acks_late这个参数,它的意思是说一个 task 只有在执行成功后,才给队列 ack(移除)。我试了一下,似乎是不行的,fail 一次之后就没有然后了:

    # 是不行的,会被ack
    @app.task(acks_late=True)
    def add_may_fail_late_ack(x, y):
        if random.random() < 0.5:
            raise RuntimeError('unlucky')
        print('ok')  
        return x + y

    然后是autoretry_for=(XxxException,)参数。这个是最简单的自动重试写法,不需要修改原代码的逻辑,但不够灵活,对于简单的任务比较适用。

    最后是功能最全面的写法。首先定义一个自己的 Task,而不使用自带的 Task,因为 Task 可以提供一系列的回调函数(on_xxx)供自定义。例如我可以覆写on_failure方法,在任务超过一定重试次数仍失败时报警。然后是要注意两处地方:一是bind=True,对应的要把def add(x, y)改为def add(self, x, y);二是重试的操作是在业务逻辑手动触发的,且是通过 raise 的方式进行。代码大概是这样子:

    class MyTask(Task):
        def on_failure(self, exc, task_id, args, kwargs, einfo):  # einfo是完整的traceback
            print(f'on failure!!! name={self.name}, exc={exc}, task_id={task_id}, args={args}, kwargs={kwargs}')
    
    @app.task(base=MyTask, bind=True, default_retry_delay=5, max_retries=1)
    def add_may_fail_custom_retry(self: Task, x, y):
        try:
            if random.random() < 0.5:
                print('fail')
                raise RuntimeError('unlucky')
            print('ok')
            return x + y
        except RuntimeError as e:
            raise self.retry(exc=e)

    上述的代码在第一次遇到RuntimeError时,会等待 5s 重新执行,若仍然遇到RuntimeError(设置了max_retries=1),worker 才会抛出异常。此时会调用 on_failure(),把有用的信息记录下来,例如

    on failure!!! name=mytasks.add_may_fail_custom_retry, exc=unlucky, task_id=9ad47d43-7b7f-4d8d-a078-e54934f54d6e, args=[1, 7], kwargs={}

    这样就基本达成了预想的效果。其他有关 task 的具体内容,见Tasks文档

    调用 task

    前面用到的 delay() 方法是 apply_async() 的简化,但前者不支持传递执行的参数。举例来说,

    task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
    # 等价于
    task.apply_async(args=(arg1, arg2), kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

    可见简化了许多。

    Countdown 参数可以设置任务至少(可能受 worker busy 或其他原因有所推迟)多少秒后执行;而 eta (estimated time of arrival) 参数可以设置任务至少(原因相同)在具体时刻之后执行:

    >>> result = add.apply_async((1, 2), countdown=5)  # 至少5秒后执行
    >>> result.get()  # 阻塞至任务完成
    
    >>> tomorrow = datetime.utcnow() + timedelta(days=1)
    >>> add.apply_async((1, 2), eta=tomorrow)

    一个任务由于种种原因,延迟太久了,我们可以把它设置为过期,支持输入秒数或一个 datetime:

    add.apply_async((10, 10), expires=60)  # 如果任务延迟超过60s,将不会被执行

    对于一个任务,还可以指定这个任务放到哪个队列中(routing),例如

    add.apply_async(queue='priority.high')

    使用 -Q 来给 worker 指定监听的队列:

    $ celery -A mytasks worker -l info -Q celery,priority.high

    像上面这样硬编码 add 的对应 queue 不是太好,更佳的方法是使用 configuration routers

    其他调用 task 的文档,见 Calling Tasks

    函数签名(signature)

    对于简单的 task 调用,使用 .delay() 或 .apply_async() 方法一般就已足够。但有时我们需要更高级的调用,例如把任务的返回值用作下一个任务的输入,如果把一系列任务写成串行,就很不推荐了。为此,可以通过函数签名来调用 tasks。

    下面给 add() 函数创建一个签名(signature):

    >>> add.signature((2, 2), countdown=10)
    tasks.add(2, 2)
    >>> add.s(2, 2)  # 简化,但不能传入task的option,例如countdown
    tasks.add(2, 2)
    >>> sig = add.signature((2, 2), {'debug': True}, countdown=10)  # 完全版

    定义了签名后,就可以用sig.delay()来调用这个任务。

    签名的一个很重要的功能是它可以定义偏函数,类似 Python 的 functools.partial:

    >>> partial = add.s(2)          # 不完整的 signature
    >>> partial.delay(1)            # 1 + 2  注意这个1是填在前面的

    偏函数主要的应用场合是各种的原语(Primitives)。这些 primitives 主要包括 group、chain、chord、map、starmap、chunks 等。下面介绍其中几个的用法。

    group

    group 可以实现任务的并行:

    >>> from celery import group
    >>> res = group(add.s(i, i) for i in range(10))()
    >>> res.get(timeout=1)
    [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    chain

    chain 可以按顺序执行任务,把前一个任务的结果作为接下来的任务的输入。注意偏函数的使用:

    >>> from celery import chain
    >>> result = chain(add.s('h', 'e'), add.s('llo'), add.s(' world'))()
    >>> result.get()
    'hello world'
    >>> (add.s('h', 'e') | add.s('llo') | add.s(' world'))().get()  # 也可以用 | 连接

    有关这一部分的更详细内容,见 Canvas: Designing Work-flows

    后台启动

    在实际环境中,celery 肯定是以后台服务的方式运行的。文档给出了 systemd、init.d、supervisor 等启动的方式。具体见 Daemonization

    定时任务

    定时运行任务的功能由 celery beat 完成。它按设定的周期/时间把任务发送到队列中,并被 worker 执行。对于一个集群的 worker,celery beat 应只有一个,否则任务会重复。

    mytasks.py改成下面所示:

    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery('tasks', broker='redis://localhost:6379/0')
    
    @app.task
    def add(x, y):
        return x + y
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # 每10s执行一次
        sender.add_periodic_task(10.0, add.s('hello', ' world'), name='every 10s')
    
        # 按crontab的格式定期执行
        sender.add_periodic_task(
            crontab(hour='*', minute=5),
            add.s('it is', ' xx:05')
        )

    然后启动 beat:

    $ celery -A mytasks beat

    可以在 worker 看到每 10s 输出了一次 “hello world”。每个小时的 5 分,都会输出 “it is xx:05”

    关于定时任务,具体见 Periodic Tasks

    相关参考

    上面只是比较基本的用法。对于更多深入使用中遇到的问题,还是应该参考官网文档

  • Python 类型标注 小结

    众所周知,Python 是动态语言,函数调用不检查传入的变量类型,一个变量的类型现在是数字,下一秒可以变成字符串。这是它的特点(是优点,也是缺点)。

    IDE 的代码提示依赖于对变量类型的了解,它大大提升了我的编码体验。

    Python 3 之后,增加了多种类型标注的支持。本文主要是给自己备忘,哪一个版本可以用哪些类型标注。

    PEP 484

    def greeting(name: str) -> str:
        return 'Hello ' + name

    PEP 484 由 Python 3.5 开始支持。通过定义函数参数的类型与函数返回值的类型,可以覆盖大部分“自己写的代码”的情况。

    typing

    from typing import List
    Vector = List[float]
    
    def scale(scalar: float, vector: List[float]) -> Vector:
        return [scalar * num for num in vector]
    # 或
    def scale(scalar: float, vector: Vector) -> Vector:
        return [scalar * num for num in vector]
    
    new_vector = scale(2.0, [1.0, -4.2, 5.4])

    typing 模块也是由 Python 3.5 开始支持。增加了类似List<String>、类型别名、泛型等功能。

    PEP 526

    primes: List[int] = []
    
    captain: str  # Note: no initial value!
    
    class Starship:
        stats: ClassVar[Dict[str, int]] = {}
    
    some_value: int = other_3rd_party_function()

    PEP 526 由 Python 3.6 开始支持。它通过把变量类型写在等号前面,大幅增加了类型的存在感,并可以标注“别人写的代码”返回的类型。

    PEP 585

    PEP 585 由 Python 3.9 开始支持。之前from typing import List这样的写法,相当于给 list、dict、set 等类型分别复制了一个大写字母开头的类型,不太方便。所以在 3.9 版本,改为直接用小写的即可。

    其他

    Python 还增加了更多类似的支持,如 PEP 544、PEP 586、PEP 589、PEP 591 等。总的方向还是不变的,就是增加一些 feature 使动态的语言变得稍微“静态”一点,从而方便开发者、提高安全性。

  • Python 存储大量 NumPy Array 等数据的方案:HDF5

    对于序列化保存各种 array / data frame 等类型的数据,一直以来有各种各样的办法。例如我用过的,对于简单的一个 array,NumPy 有提供读写的方法;pandas 也有对应的 data frame 读写;而字符串/字典,可以变成 json 保存等。

    但是,如果数量多了,例如有 100 个 array,上面的方法就不太方便了。我比较懒,会把这些 array 放到一个 dict 里面,然后用 pickle 把这个 dict pickle下来——保存和读取都非常方便,而且兼容所有数据类型。

    后来,数据量多了之后,就发现 pickle 的方案也是有缺点的,就是性能不好(文末有初步的性能对比)。所以调研了一下后,选择了 HDF5。以前只是听过,没有用过,现在用了感觉不错,在下面稍微总结一下。

    目标用户

    无论是科学研究,还是各行各业,都有 HDF5 的身影。高效、跨平台、无上限,尤其适合数据量大的情景。见官网的 Who Uses HDF?

    安装

    HDF5 支持各种语言,Python 对应的库是 h5py。

    $ pip install h5py
     or
    $ conda install h5py  # Anaconda

    核心概念

    HDF5 里只有 2 种类型:datasetgroup
    – dataset 就像数组,类似 Python 的 list (一维或多维),或 NumPy 的 ndarray。dataset 的语法和 ndarray 类似。
    – group 就像 Python 的 dict,在我看来,它更像是带路径的文件夹。group 的语法和 dict 类似。

    就像是在一层一层的文件夹中,存放着不同的 dataset。记住以上两点,就🆗

    阅读更多…
  • 使用 Nginx+Gunicorn 部署 Flask,with venv+systemd

    记录一下我的部署过程。

    Flask

    文件为 /root/myproject/application.py,其中的 Flask 实例为

    app = Flask(__name__)

    Gunicorn

    /root/myproject/ 中新建一个虚拟环境 venv 并激活虚拟环境,使用 pip 安装 Flask 等模块。然后安装 gunicorn:

    pip install gunicorn

    装好之后,执行命令:

    gunicorn --bind 127.0.0.1:8000 application:app   # application为文件名 app为实例名

    http://127.0.0.1:8000 应该是可以访问的。(服务器可能需要做一下端口转发,不然就绑定 0.0.0.0)

    Systemd

    我希望服务器重启后,也可以自动启动 web server。

    新建 /usr/lib/systemd/system/gunicorn.service,内容如下:

    [Unit]
    Description=gunicorn daemon
    After=network.target
    
    [Service]
    WorkingDirectory=/root/myproject
    ExecStart=/root/myproject/venv/bin/gunicorn -w 1 --bind 127.0.0.1:8000 application:app
    PrivateTmp=true
    Environment=key=value
    
    [Install]
    WantedBy=multi-user.target

    然后执行 systemctl enable gunicorn,重启一下服务器,之后执行 systemctl status gunicorn 确认服务正常启动。这里备注一下“Environment=key=value”这一行,systemd 启动的服务是不带环境变量的,被这个坑了好久🤣。

    Nginx

    最后,我使用 nginx 进行转发,和实现 https 访问。修改 /etc/nginx/conf.d/default.conf

    server {
        listen       443 ssl;
        server_name  myproject;
    
        access_log  /var/log/nginx/access.log;
        error_log   /var/log/nginx/error.log;
    
        location / {
            proxy_pass http://127.0.0.1:8000;
            proxy_redirect     off;
            proxy_set_header   Host                $host:$server_port;
            proxy_set_header   X-Real-IP           $remote_addr;
            proxy_set_header   X-Forwarded-For     $proxy_add_x_forwarded_for;
            proxy_set_header   X-Forwarded-Proto   $scheme;
         }
    
        ssl_certificate     /path/yourssl.cer;
        ssl_certificate_key /path/yourssl.key;
        ssl_session_timeout  5m;
        ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;
        ssl_protocols TLSv1.1 TLSv1.2;
        ssl_prefer_server_ciphers on;
    }

    最后,测试一下 https://server_ip 看看能不能访问。

  • Python 性能分析之每行耗时 line_profiler

    大家都知道,Python 的运算性能不是很强,所以才有了那么多用 C/C++ 来计算的第三方 Python 包,还有各种各样的加速实践。

    那么,应该加速哪些代码呢?我之前一般用自带的 cProfile,然而它的输出确实不是太好看,夹杂了非常多无用的信息。

    最近才发现了 line_profiler 这个第三方扩展,用起来比 cProfile 直观很多。

    安装

    pip install line-profiler

    安装需要编译器。如果在 Windows 平台,需要自行先安装 C++ 编译器。如果不想装麻烦的 VC++,可以转而在 这里 下载别人编译好的 .whl 安装包。在 Linux/Mac 上面就简单很多,编译环境肯定有的。 最近发现新版的已经不需要了,Windows 也有了编译好的包,可以直接安装。

    使用

    在需要 profile 的函数前,加上”@profile”,例如下面的 xxxxxx.py:

    @profile
    def main():
        l = [i for i in range(10000)]
        s = set(l)
    
        for _ in range(1000):
            if 9876 in l:
                pass
            if 9876 in s:
                pass
    
    if __name__ == '__main__':
        main()

    这个”@profile”只是一个标记,不是 Python 的语句,所以会导致代码不能直接运行,只能用专门的方法运行(下面有),这不是太方便(目前的版本是这样)。

    经过一点使用,了解到 @profile 的用法有一点限制,不可以对 class 打标签,但是可以打在 class 的方法上;子函数也可以用;并且可以同时 profile 多个函数 。

    然后,运行:

    kernprof -v -l xxxxxx.py

    我们就得到了结果:

    Wrote profile results to xxxxxx.py.lprof
    Timer unit: 1e-06 s
    
    Total time: 0.076552 s
    File: xxxxxx.py
    Function: main at line 2
    
    Line #      Hits         Time  Per Hit   % Time  Line Contents
    ==============================================================
         2                                           @profile
         3                                           def main():
         4         1        965.0    965.0      1.3      l = [i for i in range(10000)]
         5         1        792.0    792.0      1.0      s = set(l)
         6
         7      1001       1278.0      1.3      1.7      for _ in range(1000):
         8      1000      71133.0     71.1     92.9          if 9876 in l:
         9                                                       pass
        10      1000       1297.0      1.3      1.7          if 9876 in s:
        11      1000       1087.0      1.1      1.4              pass

    可以发现,第 8 行的地方,无论是每次运行(Per Hit),还是总耗时(% Time),都占用了大量的时间。所以就改为第 10 行的用法,马上快了几十倍。

    参考

    1. https://github.com/rkern/line_profiler
    2. https://github.com/pyutils/line_profiler 这个是新版本

  • Python 多进程共享内存、NumPy 数组 | Sharing NumPy Array When Using Python Multiprocessing

    背景

    当前的项目需要对大型 numpy 数组进行各种运算(不是深度学习的那种运算),实践发现只开一个 python 进程时,只能使用一个 CPU 核心。所以考虑使用 multiprocessing 模块进行多进程运算。

    但是,问题也很明显:用的是 multiprocessing.pool,如果我的 pool 的 size 是 4,一个 GB 级的 ndarray 传给 pool,会复制 4 份到每一个子进程。这首先会在传输时花时间做相应的 pickle 和 unpickle 操作;更重要的是,这坨数据会在内存里复制 4 份——这直接导致能处理的最大数据大小缩小了四分之三。

    本文使用的 Python 版本为 3.6 / 3.7,Windows 系统。
    在 3.8 版本中,新加入了 multiprocessing.shared_memory 模块,应该能简化这个问题。但是目前为止,项目使用的部分包还不支持 3.8,所以仍需要在旧版本中解决这个问题。

    Value 与 Array

    在 multiprocessing 包中,提供了一些可共享的对象:Value、Array、RawValue 与 RawArray。基本上,前者没有 Raw 的,可以加锁以进行进程间同步,后面 Raw 的没有锁。项目中用到的 numpy 数组都是只读的,子进程只需要读不需要写,所以选择使用 RawArray。

    阅读更多…
  • 用 Numba 加速你的 Python 代码,性能轻松大提升

    Numba 简介

    Numba 是 Python 的一个 JIT (just-in-time) 编译器,最适用于 NumPy 数组、函数,以及 Python 循环。基本上,用法就是给原来的 Python 函数加一个修饰器,当运行到经 Numba 修饰的函数时,它会被编译为机器码,之后再调用时,就能以机器码的速度来执行了。

    按我上手使用的经验来看,Numba 对原代码的改动不是太大,对能加速的部分,加速效果明显;对不支持的加速的 Python 语句/第三方库,可以选择不使用 numba 来规避。这是我选择 Numba 的原因。

    首先:应该编译(优化)什么?

    由于 Numba 本身的限制(稍后介绍),不能做到对整个程序完全的优化。实际上,也没必要这样做——只需要优化真正耗时间的部分即可。

    怎么找到真正耗时间的部分?除了靠直觉,还可以借用工具来分析,例如 Python 自带的 cProfile,还有 line_profiler 等,这里不再细讲。

    安装

    可以通过 conda 或 pip,一个命令安装:
    conda / pip install numba

    什么样的代码能加速?

    按照官方文档的示例代码,如果代码中含有很多数学运算、使用 NumPy,或者有大量 Python 的 for 循环(这可是 Python 性能大忌),那么 Numba 就能给你很好的效果。尤其是多重 for 循环,可以获得极大的加速

    大家都知道,给一个 np.ndarray 加 1 是很快的(向量化、广播),但是如果 for 遍历这个 array 的元素再每个加 1就会很慢(新手容易犯的小错误);但是这都没关系,有了 Numba 再 for 遍历元素加 1,和直接用 ndarray 加 1 的耗时是差不多的!

    再举个例子,下面这段代码,就能享受到 JIT:

    from numba import jit
    import numpy as np
    
    x = np.arange(100).reshape(10, 10)
    
    @jit(nopython=True)  # 设置为"nopython"模式 有更好的性能
    def go_fast(a):  # 第一次调用时会编译
        trace = 0
        for i in range(a.shape[0]):   # Numba likes loops
            trace += np.tanh(a[i, i]) # Numba likes NumPy functions
        return a + trace              # Numba likes NumPy broadcasting
    
    print(go_fast(x))

    但是,类似下面的代码,Numba 就没什么效果:

    from numba import jit
    import pandas as pd
    
    x = {'a': [1, 2, 3], 'b': [20, 30, 40]}
    
    @jit
    def use_pandas(a):  # 这个函数就加速不了
        df = pd.DataFrame.from_dict(a) # Numba 不支持 pd.DataFrame
        df += 1                        # Numba 也不支持这个
        return df.cov()                # 和这个
    
    print(use_pandas(x))

    总之,Numba 应付不了 pandas。以我的经验,需要先把 DataFrame 转成 np.ndarray,再输入给 Numba。

    要强制用 nopython 模式

    刚才有效果的代码中,@jit(nopython=True) 这里传入了 nopython 这个参数,而没什么效果的代码中,就没有这个参数。为什么呢?

    这是因为,@jit 实际上有两种模式,分为别 nopython 和 object 模式。只有 nopython 模式,才是能真正大幅加速的模式。而 nopython 模式只支持部分的 Python 和 NumPy 函数,如果运行时用到了不支持的函数/方法,程序就会崩掉 (例如刚才不能加速的例子如果加上 nopython 就会崩) 。如果不强制设定 nopython 模式,编译函数失败时,会回退到 object 模式,程序虽然不会崩,但却偏离了我们给它加速的本意。

    我既然用了 Numba,我就希望它能真正地发挥作用。所以选择强制开启 nopython ,如果不能加速,不如让它直接崩溃,我们再作对应修改。

    阅读更多…
  • 安利一个美股历史数据Python库:yfinance

    相比A股和港股,(免费的)美股的数据没有那么容易拿到,而适合Python的source/library就更少了。

    最近找到一个免费、轻量的Python库——yfinance。整个库只有几个文件,数据从yahoo下载,免费无限制。安装及使用教程见上面的链接。

    无需申请token,即装即用,和tushare一样方便,值得拥有。赶紧 pip install 一个吧。

    附上 github 上的一点使用文档:

    import yfinance as yf
    
    msft = yf.Ticker("MSFT")
    
    # get stock info
    msft.info
    
    # get historical market data
    hist = msft.history(period="max")
    
    # show actions (dividends, splits)
    msft.actions
    
    # show dividends
    msft.dividends
    
    # show splits
    msft.splits
    
    # show financials
    msft.financials
    msft.quarterly_financials
  • (PyTorch)使用 LSTM 预测时间序列(股票)

    前言

    经本文的评论指出,本文中的代码的原理可能有严重的问题。当作是学习 pytorch 的语法就好了,在修复之前不要用于学术用途。Don’t take it serious!能赚钱的算法都不会公开🤣

    目标

    学习使用 LSTM 来预测时间序列,本文中使用上证指数的收盘价。

    运行环境

    Python 3.5+, PyTorch 1.1.0, tushare

    数据获取与处理

    首先用 tushare 下载上证指数的K线数据,然后作标准化处理。

    import numpy as np
    import tushare as ts
    
    data_close = ts.get_k_data('000001', start='2018-01-01', index=True)['close'].values  # 获取上证指数从20180101开始的收盘价的np.ndarray
    data_close = data_close.astype('float32')  # 转换数据类型
    
    # 将价格标准化到0~1
    max_value = np.max(data_close)
    min_value = np.min(data_close)
    data_close = (data_close - min_value) / (max_value - min_value)
    原始数据:上证指数从2018-01-01到2019-05-24的收盘价(未标准化处理)

    把K线数据进行分割,每 DAYS_FOR_TRAIN 个收盘价对应 1 个未来的收盘价。例如K线为 [1,2,3,4,5], DAYS_FOR_TRAIN=3,那么将会生成2组数据:
    第1组的输入是 [1,2,3],对应输出 4;
    第2组的输入是 [2,3,4],对应输出 5。

    然后只使用前70%的数据用于训练,剩下的不用,用来与实际数据进行对比。

    DAYS_FOR_TRAIN = 10
    
    def create_dataset(data, days_for_train=5) -> (np.array, np.array):
        """
            根据给定的序列data,生成数据集
            
            数据集分为输入和输出,每一个输入的长度为days_for_train,每一个输出的长度为1。
            也就是说用days_for_train天的数据,对应下一天的数据。
    
            若给定序列的长度为d,将输出长度为(d-days_for_train+1)个输入/输出对
        """
        dataset_x, dataset_y= [], []
        for i in range(len(data)-days_for_train):
            _x = data[i:(i+days_for_train)]
            dataset_x.append(_x)
            dataset_y.append(data[i+days_for_train])
        return (np.array(dataset_x), np.array(dataset_y))
    
    dataset_x, dataset_y = create_dataset(data_close, DAYS_FOR_TRAIN)
    
    # 划分训练集和测试集,70%作为训练集
    train_size = int(len(dataset_x) * 0.7)
    
    train_x = dataset_x[:train_size]
    train_y = dataset_y[:train_size]
    
    # 将数据改变形状,RNN 读入的数据维度是 (seq_size, batch_size, feature_size)
    train_x = train_x.reshape(-1, 1, DAYS_FOR_TRAIN)
    train_y = train_y.reshape(-1, 1, 1)
    
    # 转为pytorch的tensor对象
    train_x = torch.from_numpy(train_x)
    train_y = torch.from_numpy(train_y)
    阅读更多…