• FIX 协议开发(3):QuickFIX/J 实战经验小结

    本系列导航

    FIX 协议开发(1):协议介绍及开发方案
    FIX 协议开发(2):QuickFIX/J 入门
    FIX 协议开发(3):QuickFIX/J 实战经验小结(本文)

    代码结构

    这里主要讲一下我们的思路,具体的代码不方便贴上来。

    首先需要实现 Application,主要是其中的 fromApp() 需要 crack 不同类型的 Message。但是,不把具体业务逻辑写在 crack() 方法里,crack() 主要就是把 message 通过一个或多个队列传到外部。

    收发信息的部分,从上一节可以看到,具体代码还是挺零碎的,所以做一层抽象会方便一点。
    另一个重点是注意同步/异步:有些方法做成同步会方便一点,例如查询持仓/资金,又例如撤单【同步/异步都可以】;而下单则应做成异步。

    Fix Session 持久化

    程序启动时,会自动读入之前保存的状态。FIX 协议对序列号的要求严格,如果收到的序列号偏大或偏小,会导致需要额外的处理甚至 session 被断掉。因此,持久化非常重要,尤其是维护收发序列号。

    以我方下一次发送的序列号 NextSenderMsgSeqNum 为例。Session.javasendRaw()方法调用了persist()方法,该方法中 MessageStore 实例把即将发送的消息持久化;该步骤完成后,MessageStore 实例接着执行incrNextSenderMsgSeqNum(),把序列号保存下来。在persist()完成之后,才真正把数据发送出去。

    MessageStore 自带了几种实现,例如基于文件的FileStore,放在内存里的MemoryStore,我们参考MemoryStore稍作修改,做了一个RedisStore,感觉会灵活一点。

    关于 session 的另一个话题是,如何手动控制一个 session,例如使 session logout、reset 等。

    可以使用静态函数Session.lookupSession(sessionID)来从 ID 获取对应的 session,接下来就可以调用 session.logon()、logout()、reset() 方法。后两个的区别,logout 是发送 FIX 的 logout 信号,然后断开连接,不影响序列号等内部状态,之后调用 logon() 即可回到正常使用的模式。reset() 会清除序列号等内部状态及其持久化,下次收发包会都从 1 开始。盘中就别用这个了。

    如果不需要手动控制 session,而是每天自动从几点连到几点,那么只需要在参数中配置StartTimeEndTime即可。

    异常处理

    进行 QFJ 开发时,特别需要注意的是,如何捕捉“异常”。不像 API 调用,时刻都有 error code 字段。对方返回的消息中如有错误信息或特殊情况时,是需要针对性处理的。

    例如查询持仓是空仓,服务器不可能什么都不返回,而会返回一个特殊的空仓标记。又例如各种字段填写不合法,报错信息很可能藏在可选的 Text 字段中。

    接收业务信息后报错

    主要有两个原因:

    • 一是不符合 xml 的定义,在底层就会自动给对方发送拒绝消息“MsgType (35): 3 (REJECT)”,不会传到 Application 的应用层;
    • 二是在应用层的 fromApp() 内 crack 信息时没有找到对应 crack 的方法,会发送“BusinessRejectReason (380): 3 (UNSUPPORTED MESSAGE TYPE)”给服务器。

    解决的要点都在于我方的字典没有适配对方的字典(对方故意发错误的数据包除外)。参考上一篇的“自定义字段”一节,根据实际情况修改 xml,自行打造合适的 Message 库才是王道。

    消息自动按类型分类、处理流程

    对比自己构造数据包、自己解析收到的数据全 DIY 的实现方式, QFJ 框架已经为我们做了很多。如果好奇的话,数据包是如何解析,最终传给我们的 fromApp() 方法?先从源头,即 MINA 框架接收到数据包说起。

    对方发来的数据包,首先经过 quickfix.mina.message.FIXMessageDecoder 解码,进行基础的校验,例如检测 FIX 协议头、获取消息体的长度、校验 checksum 等。

    如果校验正常,InitiatorIoHandler/AcceptorIoHandler 调用其父类 Quickfix.mina.AbstractIoHandler 继续处理消息,包括但不限于:从 session 获取字典、解析 FIX 数据【通过调用quickfix.MessageUtils的函数】等。解析时,先提取 msgType,调用 session 对应的 messageFactory 创建 msgType 类型的 message 实例,接下来根据字典解析数据内容。

    解析 message 完成后,InitiatorIoHandler/AcceptorIoHandler 把 message 传给 processMessage() 方法;执行 eventHandlingStrategy.onMessage(),该方法内把 message 塞到队列 queueTracker 中。

    最后,队列的元素会被“QFJ Message Processor”线程取出,给到 quickfix.Session 的 next(Message) 方法。这个 next() 方法,里面还有对字段的校验,里面的 verify(message) 再进而校验登录状态、时间、seq num 等,最终调用我们写的 fromApp() 方法。

    可见,从接收到数据,到我们实现的 Application,经历了很多很多层的调用。😅

    这个流程还没有结束。

    我们让自己的 Application 继承 quickfix.MessageCracker,从而拥有了 crack(),crack 是怎样根据不同实际类型的 message 调用对应的 onMessage() 的呢?

    MessageCracker 在初始化时,执行 initialize() ,通过反射,找到所有 handler【也就是各种自己写的 onMessage()】,并存到一个 mapping。之后 fromApp() 调用 crack() 时,就通过传入的 message 提取其类型以及 mapping 找到对应的 handler,调用对应的 onMessage()。

    如果觉得这个 crack 过程有点太“黑科技”,我们可以不用它,不继承 quickfix.MessageCracker,转而继承 quickfix.fix42.MessageCracker【按实际对应的版本】。点进去可以看到,全是各种 onMessage,这样就不涉及反射,都是 override 了。我喜欢 explicit 的方法。

    性能

    目前业务是用来接入券商。还没做过大规模的测试,但按当前测试数据估计每秒可发单 100 个以上,性能满足当前业务需求。

    相关参考

    1. https://www.quickfixj.org/usermanual/2.1.0/
    2. https://blog.csdn.net/u011279740/category_1517545.html
  • FIX 协议开发(2):QuickFIX/J 入门

    本系列导航

    FIX 协议开发(1):协议介绍及开发方案
    FIX 协议开发(2):QuickFIX/J 入门 (本文)
    FIX 协议开发(3):QuickFIX/J 实战经验小结

    安装

    QuickFIX/J 的官网是 https://www.quickfixj.org,其 GitHub 是 https://github.com/quickfix-j/quickfixj。撰写本文时,GitHub 上的版本是 2.2,官网上的文档给到了 2.1 的版本。所以这里选择安装 2.1,避免与文档不匹配。

    我们通过 Maven 来引入 quickfixj 库,见此页的“Maven Integration”部分,在 pom.xml 中添加对应 xml 片段(该网页中的xml引入的是2.0.0版本的库,或许是笔误,我将其改为2.1.0)即可:

    <!-- QuickFIX/J dependencies -->
    <dependency>
        <groupId>org.quickfixj</groupId>
        <artifactId>quickfixj-core</artifactId>
        <version>2.1.0</version>
    </dependency>
    <!-- 选择所需版本的messages -->
    <dependency>
        <groupId>org.quickfixj</groupId>
        <artifactId>quickfixj-messages-fix42</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.22</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.22</version>
    </dependency>
    阅读更多…
  • FIX 协议开发(1):协议介绍及开发方案

    本系列导航

    FIX 协议开发(1):协议介绍及开发方案(本文)
    FIX 协议开发(2):QuickFIX/J 入门
    FIX 协议开发(3):QuickFIX/J 实战经验小结

    本文背景

    公司因业务需要,准备接入 FIX 协议。在调研过程中,我发现中文的 FIX 协议相关资料不太多,准备边学边记录,预计会写 3 篇左右。

    FIX 协议简介

    FIX(Financial Information eXchange Protocol,金融信息交换协议)是由国际FIX协会组织提供的一个开放式协议,目的是推动国际贸易电子化进程,在各类参与者之间,包括投资经理、经纪人、买方、卖方建立起实时的电子化通讯协议。

    FIX协议的目标是把各类证券金融业务需求流程格式化,成为一个可用计算机语言描述的功能流程,并在每个业务功能接口上统一交换格式,方便各个功能模块的连接。

    消息格式

    FIX 协议消息均由多个 “key=value” 组成。其中 key 可以是协议规定的字段,或自定义字段。协议规定的key可查询 FIX 协议字典,【不同版本的 FIX 协议均有其字典,用于开发的库一般也有自带;也可参考第三方,如 wireshark】例如 8 代表 begin string,34 代表消息的序列号,52 代表时间戳等。自定义字段不与规定的 key 重复,供金融机构定制,开发时需要向对应金融机构获取其专有字段的字典。只要有了对应的字典,就可以读懂 FIX 数据包的内容。

    一般来说,一个 消息由“头部 + 消息体 + 尾部”构成。头部包含一些必要的字段,例如 BeginString (8)、BodyLength (9)、MsgType (35)、MsgSeqNum (34)、SenderCompID (49) 等,尾部包含的必要字段是 CheckSum (10)。

    FIX 登陆消息示例(假设”^”是分隔符):

    8=FIX.4.3^9=65^35=A^34=1^49=TESTACC^52=20130703-15:55:08.609^56=EXEC^98=0^108=30^10=225^

    对照字典可知,BeginString (8) 是 FIX.4.3;BodyLength (9) 是 65 字节;MsgType (35) 是 A,A 对应 logon 操作;MsgSeqNum (34) 是 1,即这是我方发送的第 1 个消息。

    有关更详细的协议介绍,可参考 https://blog.51cto.com/9291927/2536105

    开发方案

    不使用库

    由上面的示例可以发现,FIX 协议十分简单。可以不需要依赖第三方库,手动查字典构造消息
    8=FIX.4.3^9=65^35=A^34=1<省略>
    再通过标准的 socket 通信,即可完成交互。

    这个方案自由度最高,不依赖底层开发语言,但开发流程与查字典较为繁琐,后续维护也不太方便。

    Python 库 simplefix

    simplefix 是一个 FIX 协议的简易实现。它使用户可以方便地任意构造 FIX 消息,非常适合学习、测试协议。但这个库不包含任何网络收发、FIX 异常处理等功能模块。因此,开发 FIX 客户端时,我使用该库构造数据包,然后通过标准 socket 发送,再分析其网络底层交互。

    示例:发送 logon 消息的代码

    import socket
    import time
    
    import simplefix
    from simplefix.constants import (ENCRYPTMETHOD_NONE, MSGTYPE_LOGON,
                                     TAG_BEGINSTRING, TAG_CLIENTID,
                                     TAG_ENCRYPTMETHOD, TAG_HEARTBTINT,
                                     TAG_MSGSEQNUM, TAG_MSGTYPE, TAG_SENDER_COMPID,
                                     TAG_SENDING_TIME, TAG_TARGET_COMPID)
    
    HOST = '1.2.3.45'
    PORT = 9000
    CLIENT_ID = 12345678
    PASSWORD = 'mypassword'
    
    seq_num = 1  # 需维护msg sequence number,每次发送后加1
    
    
    if __name__ == "__main__":
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)  # 长连接
        sock.connect((HOST, PORT))
    
        # logon
        msg_logon = simplefix.FixMessage()
        msg_logon.append_pair(TAG_BEGINSTRING, 'FIX.4.2')
        msg_logon.append_pair(TAG_MSGSEQNUM, seq_num)  # 需维护msg sequence number,每次发送后加1
        msg_logon.append_pair(TAG_SENDER_COMPID, 'FIXTEST001')
        msg_logon.append_utc_timestamp(TAG_SENDING_TIME)
        msg_logon.append_pair(TAG_TARGET_COMPID, 'TESTENV')
    
        msg_logon.append_pair(TAG_MSGTYPE, MSGTYPE_LOGON)  # 类型
    
        msg_logon.append_pair(TAG_ENCRYPTMETHOD, ENCRYPTMETHOD_NONE)
        msg_logon.append_pair(TAG_HEARTBTINT, 30)
        msg_logon_buffer = msg_logon.encode()
    
        sock.send(msg_logon_buffer)
        print('seq', seq_num, 'sent')
        seq_num += 1
    
    
        time.sleep(1)
        sock.close()

    多平台 QuickFix 引擎

    QuickFix 是全功能的 FIX 开源引擎,目前很多 Fix 解决方案都是根据或参考 QuickFix 实现的。目前(2020年10月)它有 C++、Python、Java、.NET、Go 和 Ruby 共 6 种语言的实现/接口。

    根据我司的情况,选择其中的 Java 实现 QuickFIX/J 进行下一步开发。其使用方法,将在下一篇文章继续。

  • 树状数组(Binary Indexed Tree / Fenwick Tree)学习与实现

    树状数组是一个能高效处理数组①更新、②求前缀和的数据结构。它提供了2 个方法,时间复杂度均为O(log n)

    1. update(index, delta):将 delta 加到数组的 index 位置
    2. prefix_sum(n):获取数组的前 n 个元素的和
      range_sum(start, end):获取数组从 [start, end] 的和,相当于 prefix_sum(end) – prefix_sum(start-1)

    如果只追求第 1 点,即快速修改数组,普通的线性数组可满足需求。但对于 range sum(),需要O(n)

    如果只追求第 2 点,即快速求 range sum,使用前缀数组的效果更好。但对于 add() 操作,则需要O(n),所以只适合更新较少的情况。

    树状数组则处于两者之间,适合数组又修改,又获取区间和的情景。

    思想

    树状数组的思想是怎样的呢?

    假设有一个数组 [1, 7, 3, 0, 5, 8, 3, 2, 6, 2, 1, 1, 4, 5],想求前 13 个元素的和。那么,

    13 = 23 + 22 + 20 = 8 + 4 + 1

    前 13 个数的和等于【前 8 个数的和】+【接下来 4 个数的和】+【接下来 1 个数的和】,即 range(1, 13) = range(1, 8) + range(9, 12) + range(13, 13)。如果有一种方法,可以保存 range(1, 8)、range(9, 12)、range(13, 13),那么计算这个区间和就可以加快了。

    这里给出已经计算好的结果(即最下面的 array 层)。例如 array[8] 是 29,往上可以找到 29 对应的是 [1,8],即 range(1, 8) = array[8]。同理,range(9, 12) = array[12],range(13, 13) = array[13]。

    range(1, 13) = range(1, 8) + range(9, 12) + range(13, 13) = array[8] + array[12] + array[13]

    由此图可以发现,虽然它的英文是含有 Tree,中间的部分看起来也是树状的,但是最终用到的 array 是线性的数组(太好了,复杂程度大减)。

    那中间这 3 层是怎么来的呢?——需要从上到下,从左到右看。

    首先计算 [1, 1] 的和,然后计算 [1, 2] 的和,然后计算 [1, 4]、[1, 8] 的和,每次乘 2,直到越界([1, 16] 越界),这里分别算出来了1、8、11、29。

    然后是第二层,从空缺的位置继续,这里的“界”不是整个数组的最大值,而是所有上层中下一个非空缺的位置。计算 [3, 3] 的和,[3, 4] 不用算,因为越界了。然后计算 [5, 5] 的和,接下来是 [5, 6] 的和,[5, 8] 越界不用算。

    第三层也是类似,然后发现填完了。

    以上可以帮助理解 result 数组中各值的来源,实际建立时有更简洁的做法。至于为什么是这样定义,可以另外找找资料,我看起来这有点像“分形”的感觉。

    阅读更多…
  • 一图流领悟跳跃列表(Skip List),附 Python/Java/Kotlin 实现

    跳跃列表是一种随机数据结构。它使得包含 n 个元素的有序序列的查找、插入、删除操作的平均时间复杂度都是 O(log n)。(注意关键词有序,如果不需要有序,也不需要用到跳跃列表;数据量大时,时间复杂度退化到较慢的概率微乎其微)

    平均最差
    搜索O(log n)O(n)
    插入O(log n)O(n)
    删除O(log n)O(n)
    空间O(n)O(n log n)

    跳跃列表是通过维护一个多层链表实现的。每一层链表中的元素的数量,在统计上来说都比下面一层链表元素的数量更少。也就是说,上层疏,下层密,底层数据是完整的,上面的稀疏层作为索引——这就是链表的“二分查找”啊。

    一开始时,算法在最稀疏的层次进行搜索,直至需要查找的元素在该层两个相邻的元素中间。这时,算法将跳转到下一个层次,重复刚才的搜索,直到找到需要查找的元素为止。

    Wikipedia 的道理就讲到这里,我不希望把本文写得难懂。说好的一图流就能领悟呢?其实我有点标题党,本文不止一幅图,但是核心的图只有一幅,上图(来自 Wikipedia):

    请多次认真观看插入节点的全过程 gif。我看完之后,就觉得自己可以实现出来了(虽然后来实际开发调试了很多次)。

    例如想在上图中所示的跳跃列表中插入 80,首先要找到应该插入的位置。

    首先从最稀疏的层的 30 开始,把当前位置设置为顶层的 30。
    80 比当前位置右边的 NIL 小,所以把当前位置移到下一层的 30;
    80 比当前位置右边的 50 大,所以把当前位置右移到 50;
    80 比当前位置右边的 NIL 小,所以把当前位置移到下一层的 50;
    80 比当前位置右边的 70 大,所以把当前位置右移到 70;
    80 比当前位置右边的 NIL 小,所以把当前位置移到下一层的 70;(当前位置已到达底层)
    之后用 80 不断与右边的节点比较大小,右移至合适的位置插入 80 节点。(底层插入完毕)
    接下来用随机决定是否把这个 80 提升到上面的层中,例如图中的提升概率是 50%(抛硬币),不断提升至硬币为反面为止。

    上面一段描述了 gif 中插入 80 的搜索和插入过程。那么,代码如何实现?右移和下移的逻辑很浅显,那么重点就在如何提升节点到上层的逻辑。

    阅读更多…
  • Python 分布式任务队列 Celery 入门与使用

    最近工作的项目使我接触到了 Celery 这个任务队列。看了一下官方的文档,感觉设计得还挺 Pythonic,理念也非常简单易懂——类似生产者与消费者。在这里稍微总(fan)结(yi)一下 Celery 的使用方法。

    简介

    Celery 是一个分布式任务队列,网上也有说是分布式任务调度框架,这里我以官方文档的“Distributed Task Queue”为准。它简单、灵活、可靠,可以处理大量的大量的任务,其主要专注于实时处理,同时也支持计划任务。

    为什么要用任务队列?我的理解是,首先方便了任务的分发调度与管理,另外也使调用的过程变得异步(非常适合 web 请求)。

    名词解释

    • 任务队列(task queue):一种分发任务到不同的线程或机器的方法,其输入为一个任务(task)。
    • Worker:实际执行任务的进程,它不断检查任务队列中的新任务并执行。
    • Broker:客户端与 worker 通信的中介。客户端发送任务的消息到队列中,broker 把这条消息传递给一个 worker。

    入门

    如果不考虑进阶用法,5 分钟入门。

    安装

    首先安装 Celery 并选择 broker。其中 broker 主要支持 RabbitMQ 和 Redis。RabbitMQ 不需要额外依赖,直接执行pip install -U celery安装。 Redis 需要在安装 Celery 时安装附加的依赖:pip install -U "celery[redis]"

    RabbitMQ 更为适合生产环境,也稍微大型;Redis 更轻量级,但突然退出时可能丢失数据。为了稍微简单轻量,本文都用 Redis。(如何安装 broker 不在本文内讨论,docker 启动一个最为简单)

    新建 Celery 应用

    新建一个mytasks.py

    from celery import Celery
    
    app = Celery('tasks', broker='redis://localhost:6379/0')
    
    @app.task
    def add(x, y):
        return x + y

    接下来就可以启动 worker 了:(生产环境当然不会这样手动运行,而会把它作为后台程序运行)

    $ celery -A mytasks worker --loglevel=info
    
    # 如果不了解上面的命令用法,可查看命令帮助
    # celery help
    # celery worker --help

    调用 task

    在当前的目录,运行

    >>> from mytasks import add
    >>> add.delay(1, 2)  # 使用 delay() 来使worker调用这个task

    可以得到类似<AsyncResult: fd9cdbe3-bcb3-432a-8d46-67b41243cfed>的返回值,而不会返回 3;这个 3 在 worker 的控制台里可以看到:Task mytasks.add[fd9cdbe3-bcb3-432a-8d46-67b41243cfed] succeeded in 0.0003419s: 3

    保存结果

    默认情况下,结果是不保存的。如果想保存结果,需要指定 result backend,支持 SQLAlchemy/Django ORM, MongoDB, Memcached, Redis, RPC (RabbitMQ/AMQP) 等。例如app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0'),调用之后就可以查询任务的状态及结果。

    >>> result = add.delay(1, 2)
    >>> result.ready()
    True
    >>> result.get(timeout=1)
    3

    参数配置

    简单的参数配置可以直接在代码中修改app.conf,例如:

    app.conf.task_serializer = 'json'

    对于大型一点的项目,最好专门做一个配置模块。先新建一个 celeryconfig.py:

    broker_url = 'pyamqp://xxxx'
    result_backend = 'rpc://xxxx'
    
    task_serializer = 'json'
    timezone = 'Europe/Oslo'
    enable_utc = True
    
    task_annotations = {
        'mytasks.add': {'rate_limit': '10/m'}
    }

    然后通过app.config_from_object('celeryconfig')导入。

    稍微深入

    Task

    Task 有很多选项可以填入,例如用@app.task(bind=True, default_retry_delay=30 * 60),可以修改任务失败后,等待重试的时间。

    关于任务的重试,我后来因工作需要,又深入阅读了文档。理想的目标是使一个任务可以自动重试,若重试一定次数仍失败,则发送通知。

    首先我看到了acks_late这个参数,它的意思是说一个 task 只有在执行成功后,才给队列 ack(移除)。我试了一下,似乎是不行的,fail 一次之后就没有然后了:

    # 是不行的,会被ack
    @app.task(acks_late=True)
    def add_may_fail_late_ack(x, y):
        if random.random() < 0.5:
            raise RuntimeError('unlucky')
        print('ok')  
        return x + y

    然后是autoretry_for=(XxxException,)参数。这个是最简单的自动重试写法,不需要修改原代码的逻辑,但不够灵活,对于简单的任务比较适用。

    最后是功能最全面的写法。首先定义一个自己的 Task,而不使用自带的 Task,因为 Task 可以提供一系列的回调函数(on_xxx)供自定义。例如我可以覆写on_failure方法,在任务超过一定重试次数仍失败时报警。然后是要注意两处地方:一是bind=True,对应的要把def add(x, y)改为def add(self, x, y);二是重试的操作是在业务逻辑手动触发的,且是通过 raise 的方式进行。代码大概是这样子:

    class MyTask(Task):
        def on_failure(self, exc, task_id, args, kwargs, einfo):  # einfo是完整的traceback
            print(f'on failure!!! name={self.name}, exc={exc}, task_id={task_id}, args={args}, kwargs={kwargs}')
    
    @app.task(base=MyTask, bind=True, default_retry_delay=5, max_retries=1)
    def add_may_fail_custom_retry(self: Task, x, y):
        try:
            if random.random() < 0.5:
                print('fail')
                raise RuntimeError('unlucky')
            print('ok')
            return x + y
        except RuntimeError as e:
            raise self.retry(exc=e)

    上述的代码在第一次遇到RuntimeError时,会等待 5s 重新执行,若仍然遇到RuntimeError(设置了max_retries=1),worker 才会抛出异常。此时会调用 on_failure(),把有用的信息记录下来,例如

    on failure!!! name=mytasks.add_may_fail_custom_retry, exc=unlucky, task_id=9ad47d43-7b7f-4d8d-a078-e54934f54d6e, args=[1, 7], kwargs={}

    这样就基本达成了预想的效果。其他有关 task 的具体内容,见Tasks文档

    调用 task

    前面用到的 delay() 方法是 apply_async() 的简化,但前者不支持传递执行的参数。举例来说,

    task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
    # 等价于
    task.apply_async(args=(arg1, arg2), kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

    可见简化了许多。

    Countdown 参数可以设置任务至少(可能受 worker busy 或其他原因有所推迟)多少秒后执行;而 eta (estimated time of arrival) 参数可以设置任务至少(原因相同)在具体时刻之后执行:

    >>> result = add.apply_async((1, 2), countdown=5)  # 至少5秒后执行
    >>> result.get()  # 阻塞至任务完成
    
    >>> tomorrow = datetime.utcnow() + timedelta(days=1)
    >>> add.apply_async((1, 2), eta=tomorrow)

    一个任务由于种种原因,延迟太久了,我们可以把它设置为过期,支持输入秒数或一个 datetime:

    add.apply_async((10, 10), expires=60)  # 如果任务延迟超过60s,将不会被执行

    对于一个任务,还可以指定这个任务放到哪个队列中(routing),例如

    add.apply_async(queue='priority.high')

    使用 -Q 来给 worker 指定监听的队列:

    $ celery -A mytasks worker -l info -Q celery,priority.high

    像上面这样硬编码 add 的对应 queue 不是太好,更佳的方法是使用 configuration routers

    其他调用 task 的文档,见 Calling Tasks

    函数签名(signature)

    对于简单的 task 调用,使用 .delay() 或 .apply_async() 方法一般就已足够。但有时我们需要更高级的调用,例如把任务的返回值用作下一个任务的输入,如果把一系列任务写成串行,就很不推荐了。为此,可以通过函数签名来调用 tasks。

    下面给 add() 函数创建一个签名(signature):

    >>> add.signature((2, 2), countdown=10)
    tasks.add(2, 2)
    >>> add.s(2, 2)  # 简化,但不能传入task的option,例如countdown
    tasks.add(2, 2)
    >>> sig = add.signature((2, 2), {'debug': True}, countdown=10)  # 完全版

    定义了签名后,就可以用sig.delay()来调用这个任务。

    签名的一个很重要的功能是它可以定义偏函数,类似 Python 的 functools.partial:

    >>> partial = add.s(2)          # 不完整的 signature
    >>> partial.delay(1)            # 1 + 2  注意这个1是填在前面的

    偏函数主要的应用场合是各种的原语(Primitives)。这些 primitives 主要包括 group、chain、chord、map、starmap、chunks 等。下面介绍其中几个的用法。

    group

    group 可以实现任务的并行:

    >>> from celery import group
    >>> res = group(add.s(i, i) for i in range(10))()
    >>> res.get(timeout=1)
    [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    chain

    chain 可以按顺序执行任务,把前一个任务的结果作为接下来的任务的输入。注意偏函数的使用:

    >>> from celery import chain
    >>> result = chain(add.s('h', 'e'), add.s('llo'), add.s(' world'))()
    >>> result.get()
    'hello world'
    >>> (add.s('h', 'e') | add.s('llo') | add.s(' world'))().get()  # 也可以用 | 连接

    有关这一部分的更详细内容,见 Canvas: Designing Work-flows

    后台启动

    在实际环境中,celery 肯定是以后台服务的方式运行的。文档给出了 systemd、init.d、supervisor 等启动的方式。具体见 Daemonization

    定时任务

    定时运行任务的功能由 celery beat 完成。它按设定的周期/时间把任务发送到队列中,并被 worker 执行。对于一个集群的 worker,celery beat 应只有一个,否则任务会重复。

    mytasks.py改成下面所示:

    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery('tasks', broker='redis://localhost:6379/0')
    
    @app.task
    def add(x, y):
        return x + y
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # 每10s执行一次
        sender.add_periodic_task(10.0, add.s('hello', ' world'), name='every 10s')
    
        # 按crontab的格式定期执行
        sender.add_periodic_task(
            crontab(hour='*', minute=5),
            add.s('it is', ' xx:05')
        )

    然后启动 beat:

    $ celery -A mytasks beat

    可以在 worker 看到每 10s 输出了一次 “hello world”。每个小时的 5 分,都会输出 “it is xx:05”

    关于定时任务,具体见 Periodic Tasks

    相关参考

    上面只是比较基本的用法。对于更多深入使用中遇到的问题,还是应该参考官网文档

  • Python 类型标注 小结

    众所周知,Python 是动态语言,函数调用不检查传入的变量类型,一个变量的类型现在是数字,下一秒可以变成字符串。这是它的特点(是优点,也是缺点)。

    IDE 的代码提示依赖于对变量类型的了解,它大大提升了我的编码体验。

    Python 3 之后,增加了多种类型标注的支持。本文主要是给自己备忘,哪一个版本可以用哪些类型标注。

    PEP 484

    def greeting(name: str) -> str:
        return 'Hello ' + name

    PEP 484 由 Python 3.5 开始支持。通过定义函数参数的类型与函数返回值的类型,可以覆盖大部分“自己写的代码”的情况。

    typing

    from typing import List
    Vector = List[float]
    
    def scale(scalar: float, vector: List[float]) -> Vector:
        return [scalar * num for num in vector]
    # 或
    def scale(scalar: float, vector: Vector) -> Vector:
        return [scalar * num for num in vector]
    
    new_vector = scale(2.0, [1.0, -4.2, 5.4])

    typing 模块也是由 Python 3.5 开始支持。增加了类似List<String>、类型别名、泛型等功能。

    PEP 526

    primes: List[int] = []
    
    captain: str  # Note: no initial value!
    
    class Starship:
        stats: ClassVar[Dict[str, int]] = {}
    
    some_value: int = other_3rd_party_function()

    PEP 526 由 Python 3.6 开始支持。它通过把变量类型写在等号前面,大幅增加了类型的存在感,并可以标注“别人写的代码”返回的类型。

    PEP 585

    PEP 585 由 Python 3.9 开始支持。之前from typing import List这样的写法,相当于给 list、dict、set 等类型分别复制了一个大写字母开头的类型,不太方便。所以在 3.9 版本,改为直接用小写的即可。

    其他

    Python 还增加了更多类似的支持,如 PEP 544、PEP 586、PEP 589、PEP 591 等。总的方向还是不变的,就是增加一些 feature 使动态的语言变得稍微“静态”一点,从而方便开发者、提高安全性。

  • Python 存储大量 NumPy Array 等数据的方案:HDF5

    对于序列化保存各种 array / data frame 等类型的数据,一直以来有各种各样的办法。例如我用过的,对于简单的一个 array,NumPy 有提供读写的方法;pandas 也有对应的 data frame 读写;而字符串/字典,可以变成 json 保存等。

    但是,如果数量多了,例如有 100 个 array,上面的方法就不太方便了。我比较懒,会把这些 array 放到一个 dict 里面,然后用 pickle 把这个 dict pickle下来——保存和读取都非常方便,而且兼容所有数据类型。

    后来,数据量多了之后,就发现 pickle 的方案也是有缺点的,就是性能不好(文末有初步的性能对比)。所以调研了一下后,选择了 HDF5。以前只是听过,没有用过,现在用了感觉不错,在下面稍微总结一下。

    目标用户

    无论是科学研究,还是各行各业,都有 HDF5 的身影。高效、跨平台、无上限,尤其适合数据量大的情景。见官网的 Who Uses HDF?

    安装

    HDF5 支持各种语言,Python 对应的库是 h5py。

    $ pip install h5py
     or
    $ conda install h5py  # Anaconda

    核心概念

    HDF5 里只有 2 种类型:datasetgroup
    – dataset 就像数组,类似 Python 的 list (一维或多维),或 NumPy 的 ndarray。dataset 的语法和 ndarray 类似。
    – group 就像 Python 的 dict,在我看来,它更像是带路径的文件夹。group 的语法和 dict 类似。

    就像是在一层一层的文件夹中,存放着不同的 dataset。记住以上两点,就🆗

    阅读更多…
  • Kotlin 协程踩坑之 Redis 分布式锁

    本文不涉及技术细节,只是记录一点踩坑的过程和感受。

    背景

    项目中的两个不同的服务可能会同时修改一个资源的状态,决定使用 Redis 分布式锁。我这边的服务是用 Kotlin 写的,每当接收到一个 HTTP 请求,就会开一个协程来处理。在协程运行的过程中,会调用 Redisson 来获取一个锁,运行结束后(一般要十几秒),会释放锁。

    问题

    服务跑的时候发现,有时候,同时收到了两个请求,这两个请求会同时锁一个东西,然而它们竟然一起运行了,导致彼此之间产生了干扰。更奇怪的是,尝试复现的时候,有时候能复现,有时又不能。

    阅读更多…