• FIX 协议开发(1):协议介绍及开发方案

    本文背景

    公司因业务需要,准备接入 FIX 协议。在调研过程中,我发现中文的 FIX 协议相关资料不太多,准备边学边记录,预计会写 3 篇左右。

    FIX 协议简介

    FIX(Financial Information eXchange Protocol,金融信息交换协议)是由国际FIX协会组织提供的一个开放式协议,目的是推动国际贸易电子化进程,在各类参与者之间,包括投资经理、经纪人、买方、卖方建立起实时的电子化通讯协议。

    FIX协议的目标是把各类证券金融业务需求流程格式化,成为一个可用计算机语言描述的功能流程,并在每个业务功能接口上统一交换格式,方便各个功能模块的连接。

    消息格式

    FIX 协议消息均由多个 “key=value” 组成。其中 key 可以是协议规定的字段,或自定义字段。协议规定的key可查询 FIX 协议字典,【不同版本的 FIX 协议均有其字典,用于开发的库一般也有自带;也可参考第三方,如 wireshark】例如 8 代表 begin string,34 代表消息的序列号,52 代表时间戳等。自定义字段不与规定的 key 重复,供金融机构定制,开发时需要向对应金融机构获取其专有字段的字典。只要有了对应的字典,就可以读懂 FIX 数据包的内容。

    一般来说,一个 消息由“头部 + 消息体 + 尾部”构成。头部包含一些必要的字段,例如 BeginString (8)、BodyLength (9)、MsgType (35)、MsgSeqNum (34)、SenderCompID (49) 等,尾部包含的必要字段是 CheckSum (10)。

    FIX 登陆消息示例(假设”^”是分隔符):

    8=FIX.4.3^9=65^35=A^34=1^49=TESTACC^52=20130703-15:55:08.609^56=EXEC^98=0^108=30^10=225^

    对照字典可知,BeginString (8) 是 FIX.4.3;BodyLength (9) 是 65 字节;MsgType (35) 是 A,A 对应 logon 操作;MsgSeqNum (34) 是 1,即这是我方发送的第 1 个消息。

    有关更详细的协议介绍,可参考 https://blog.51cto.com/9291927/2536105

    开发方案

    不使用库

    由上面的示例可以发现,FIX 协议十分简单。可以不需要依赖第三方库,手动查字典构造消息
    8=FIX.4.3^9=65^35=A^34=1<省略>
    再通过标准的 socket 通信,即可完成交互。

    这个方案自由度最高,不依赖底层开发语言,但开发流程与查字典较为繁琐,后续维护也不太方便。

    Python 库 simplefix

    simplefix 是一个 FIX 协议的简易实现。它使用户可以方便地任意构造 FIX 消息,非常适合学习、测试协议。但这个库不包含任何网络收发、FIX 异常处理等功能模块。因此,开发 FIX 客户端时,我使用该库构造数据包,然后通过标准 socket 发送,再分析其网络底层交互。

    示例:发送 logon 消息的代码

    import socket
    import time
    
    import simplefix
    from simplefix.constants import (ENCRYPTMETHOD_NONE, MSGTYPE_LOGON,
                                     TAG_BEGINSTRING, TAG_CLIENTID,
                                     TAG_ENCRYPTMETHOD, TAG_HEARTBTINT,
                                     TAG_MSGSEQNUM, TAG_MSGTYPE, TAG_SENDER_COMPID,
                                     TAG_SENDING_TIME, TAG_TARGET_COMPID)
    
    HOST = '1.2.3.45'
    PORT = 9000
    CLIENT_ID = 12345678
    PASSWORD = 'mypassword'
    
    seq_num = 1  # 需维护msg sequence number,每次发送后加1
    
    
    if __name__ == "__main__":
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)  # 长连接
        sock.connect((HOST, PORT))
    
        # logon
        msg_logon = simplefix.FixMessage()
        msg_logon.append_pair(TAG_BEGINSTRING, 'FIX.4.2')
        msg_logon.append_pair(TAG_MSGSEQNUM, seq_num)  # 需维护msg sequence number,每次发送后加1
        msg_logon.append_pair(TAG_SENDER_COMPID, 'FIXTEST001')
        msg_logon.append_utc_timestamp(TAG_SENDING_TIME)
        msg_logon.append_pair(TAG_TARGET_COMPID, 'TESTENV')
    
        msg_logon.append_pair(TAG_MSGTYPE, MSGTYPE_LOGON)  # 类型
    
        msg_logon.append_pair(TAG_ENCRYPTMETHOD, ENCRYPTMETHOD_NONE)
        msg_logon.append_pair(TAG_HEARTBTINT, 30)
        msg_logon_buffer = msg_logon.encode()
    
        sock.send(msg_logon_buffer)
        print('seq', seq_num, 'sent')
        seq_num += 1
    
    
        time.sleep(1)
        sock.close()

    多平台 QuickFix 引擎

    QuickFix 是全功能的 FIX 开源引擎,目前很多 Fix 解决方案都是根据或参考 QuickFix 实现的。目前(2020年10月)它有 C++、Python、Java、.NET、Go 和 Ruby 共 6 种语言的实现/接口。

    根据我司的情况,选择其中的 Java 实现 QuickFIX/J 进行下一步开发。其使用方法,将在下一篇文章继续。

  • 树状数组(Binary Indexed Tree / Fenwick Tree)学习与实现

    树状数组是一个能高效处理数组①更新、②求前缀和的数据结构。它提供了2 个方法,时间复杂度均为O(log n)

    1. update(index, delta):将 delta 加到数组的 index 位置
    2. prefix_sum(n):获取数组的前 n 个元素的和
      range_sum(start, end):获取数组从 [start, end] 的和,相当于 prefix_sum(end) – prefix_sum(start-1)

    如果只追求第 1 点,即快速修改数组,普通的线性数组可满足需求。但对于 range sum(),需要O(n)

    如果只追求第 2 点,即快速求 range sum,使用前缀数组的效果更好。但对于 add() 操作,则需要O(n),所以只适合更新较少的情况。

    树状数组则处于两者之间,适合数组又修改,又获取区间和的情景。

    思想

    树状数组的思想是怎样的呢?

    假设有一个数组 [1, 7, 3, 0, 5, 8, 3, 2, 6, 2, 1, 1, 4, 5],想求前 13 个元素的和。那么,

    13 = 23 + 22 + 20 = 8 + 4 + 1

    前 13 个数的和等于【前 8 个数的和】+【接下来 4 个数的和】+【接下来 1 个数的和】,即 range(1, 13) = range(1, 8) + range(9, 12) + range(13, 13)。如果有一种方法,可以保存 range(1, 8)、range(9, 12)、range(13, 13),那么计算这个区间和就可以加快了。

    这里给出已经计算好的结果(即最下面的 array 层)。例如 array[8] 是 29,往上可以找到 29 对应的是 [1,8],即 range(1, 8) = array[8]。同理,range(9, 12) = array[12],range(13, 13) = array[13]。

    range(1, 13) = range(1, 8) + range(9, 12) + range(13, 13) = array[8] + array[12] + array[13]

    由此图可以发现,虽然它的英文是含有 Tree,中间的部分看起来也是树状的,但是最终用到的 array 是线性的数组(太好了,复杂程度大减)。

    那中间这 3 层是怎么来的呢?——需要从上到下,从左到右看。

    首先计算 [1, 1] 的和,然后计算 [1, 2] 的和,然后计算 [1, 4]、[1, 8] 的和,每次乘 2,直到越界([1, 16] 越界),这里分别算出来了1、8、11、29。

    然后是第二层,从空缺的位置继续,这里的“界”不是整个数组的最大值,而是所有上层中下一个非空缺的位置。计算 [3, 3] 的和,[3, 4] 不用算,因为越界了。然后计算 [5, 5] 的和,接下来是 [5, 6] 的和,[5, 8] 越界不用算。

    第三层也是类似,然后发现填完了。

    以上可以帮助理解 result 数组中各值的来源,实际建立时有更简洁的做法。至于为什么是这样定义,可以另外找找资料,我看起来这有点像“分形”的感觉。

    阅读更多…
  • 一图流领悟跳跃列表(Skip List),附 Python/Java/Kotlin 实现

    跳跃列表是一种随机数据结构。它使得包含 n 个元素的有序序列的查找、插入、删除操作的平均时间复杂度都是 O(log n)。(注意关键词有序,如果不需要有序,也不需要用到跳跃列表;数据量大时,时间复杂度退化到较慢的概率微乎其微)

    平均最差
    搜索O(log n)O(n)
    插入O(log n)O(n)
    删除O(log n)O(n)
    空间O(n)O(n log n)

    跳跃列表是通过维护一个多层链表实现的。每一层链表中的元素的数量,在统计上来说都比下面一层链表元素的数量更少。也就是说,上层疏,下层密,底层数据是完整的,上面的稀疏层作为索引——这就是链表的“二分查找”啊。

    一开始时,算法在最稀疏的层次进行搜索,直至需要查找的元素在该层两个相邻的元素中间。这时,算法将跳转到下一个层次,重复刚才的搜索,直到找到需要查找的元素为止。

    Wikipedia 的道理就讲到这里,我不希望把本文写得难懂。说好的一图流就能领悟呢?其实我有点标题党,本文不止一幅图,但是核心的图只有一幅,上图(来自 Wikipedia):

    请多次认真观看插入节点的全过程 gif。我看完之后,就觉得自己可以实现出来了(虽然后来实际开发调试了很多次)。

    例如想在上图中所示的跳跃列表中插入 80,首先要找到应该插入的位置。

    首先从最稀疏的层的 30 开始,把当前位置设置为顶层的 30。
    80 比当前位置右边的 NIL 小,所以把当前位置移到下一层的 30;
    80 比当前位置右边的 50 大,所以把当前位置右移到 50;
    80 比当前位置右边的 NIL 小,所以把当前位置移到下一层的 50;
    80 比当前位置右边的 70 大,所以把当前位置右移到 70;
    80 比当前位置右边的 NIL 小,所以把当前位置移到下一层的 70;(当前位置已到达底层)
    之后用 80 不断与右边的节点比较大小,右移至合适的位置插入 80 节点。(底层插入完毕)
    接下来用随机决定是否把这个 80 提升到上面的层中,例如图中的提升概率是 50%(抛硬币),不断提升至硬币为反面为止。

    上面一段描述了 gif 中插入 80 的搜索和插入过程。那么,代码如何实现?右移和下移的逻辑很浅显,那么重点就在如何提升节点到上层的逻辑。

    阅读更多…
  • Python 分布式任务队列 Celery 入门与使用

    最近工作的项目使我接触到了 Celery 这个任务队列。看了一下官方的文档,感觉设计得还挺 Pythonic,理念也非常简单易懂——类似生产者与消费者。在这里稍微总(fan)结(yi)一下 Celery 的使用方法。

    简介

    Celery 是一个分布式任务队列,网上也有说是分布式任务调度框架,这里我以官方文档的“Distributed Task Queue”为准。它简单、灵活、可靠,可以处理大量的大量的任务,其主要专注于实时处理,同时也支持计划任务。

    为什么要用任务队列?我的理解是,首先方便了任务的分发调度与管理,另外也使调用的过程变得异步(非常适合 web 请求)。

    名词解释

    • 任务队列(task queue):一种分发任务到不同的线程或机器的方法,其输入为一个任务(task)。
    • Worker:实际执行任务的进程,它不断检查任务队列中的新任务并执行。
    • Broker:客户端与 worker 通信的中介。客户端发送任务的消息到队列中,broker 把这条消息传递给一个 worker。

    入门

    如果不考虑进阶用法,5 分钟入门。

    安装

    首先安装 Celery 并选择 broker。其中 broker 主要支持 RabbitMQ 和 Redis。RabbitMQ 不需要额外依赖,直接pip install -U celery安装。 Redis 需要在安装 Celery 时安装附加的依赖:pip install -U "celery[redis]"

    RabbitMQ 更为适合生产环境,也稍微大型;Redis 更轻量级,但突然退出时可能丢失数据。为了稍微简单轻量,本文都用 Redis。(如何安装 broker 不在此讨论,docker 启动一个最为简单)

    新建 Celery 应用

    新建一个mytasks.py

    from celery import Celery
    
    app = Celery('tasks', broker='redis://localhost:6379/0')
    
    @app.task
    def add(x, y):
        return x + y

    接下来就可以启动 worker 了:(生产环境当然不会这样手动运行,而会把它作为后台程序运行)

    $ celery -A mytasks worker --loglevel=info
    
    # 如果不了解上面的命令用法,可查看命令帮助
    # celery help
    # celery worker --help

    调用 task

    在当前的目录,运行

    >>> from mytasks import add
    >>> add.delay(1, 2)  # 使用 delay() 来调用这个 task

    可以得到类似<AsyncResult: fd9cdbe3-bcb3-432a-8d46-67b41243cfed>的返回值,而不会返回 3;这个 3 在 worker 的控制台里可以看到:Task mytasks.add[fd9cdbe3-bcb3-432a-8d46-67b41243cfed] succeeded in 0.0003419s: 3

    保存结果

    默认情况下,结果是不保存的。如果想保存结果,需要指定 result backend,支持 SQLAlchemy/Django ORM, MongoDB, Memcached, Redis, RPC (RabbitMQ/AMQP) 等。例如app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0'),调用之后就可以查询任务的状态及结果。

    >>> result = add.delay(1, 2)
    >>> result.ready()
    True
    >>> result.get(timeout=1)
    3

    参数配置

    简单的参数配置可以直接修改app.conf,例如:

    app.conf.task_serializer = 'json'

    对于大型一点的项目,最好专门做一个配置模块。先新建一个 celeryconfig.py:

    broker_url = 'pyamqp://xxxx'
    result_backend = 'rpc://xxxx'
    
    task_serializer = 'json'
    timezone = 'Europe/Oslo'
    enable_utc = True
    
    task_annotations = {
        'mytasks.add': {'rate_limit': '10/m'}
    }

    然后通过app.config_from_object('celeryconfig')导入。

    稍微深入

    Task

    Task 有很多选项可以填入,例如@app.task(bind=True, default_retry_delay=30 * 60)。具体及其余内容见Tasks文档

    调用 task

    前面用到的 delay() 方法是 apply_async() 的简化,但前者不支持传递执行的参数。举例来说,

    task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
    # 等价于
    task.apply_async(args=(arg1, arg2), kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

    可见简化了许多。

    Countdown 参数可以设置任务至少(可能受 worker busy 或其他原因有所推迟)多少秒后执行;而 eta (estimated time of arrival) 参数可以设置任务至少(原因相同)在具体时刻之后执行:

    >>> result = add.apply_async((1, 2), countdown=5)  # 至少5秒后执行
    >>> result.get()  # 阻塞至任务完成
    
    >>> tomorrow = datetime.utcnow() + timedelta(days=1)
    >>> add.apply_async((1, 2), eta=tomorrow)

    一个任务由于种种原因,延迟太久了,我们可以把它设置为过期,支持输入秒数或一个 datetime:

    add.apply_async((10, 10), expires=60)  # 如果任务延迟超过60s,将不会被执行

    对于一个任务,还可以指定这个任务放到哪个队列中(routing),例如

    add.apply_async(queue='priority.high')

    使用 -Q 来给 worker 指定监听的队列:

    $ celery -A mytasks worker -l info -Q celery,priority.high

    像上面这样硬编码 add 的对应 queue 不是太好,更佳的方法是使用 configuration routers

    其他调用 task 的文档,见 Calling Tasks

    函数签名(signature)

    对于简单的 task 调用,使用 .delay() 或 .apply_async() 方法一般就已足够。但有时我们需要更高级的调用,例如把任务的返回值用作下一个任务的输入,如果把一系列任务写成串行,就很不推荐了。为此,可以通过函数签名来调用 tasks。

    下面给 add() 函数创建一个签名(signature):

    >>> add.signature((2, 2), countdown=10)
    tasks.add(2, 2)
    >>> add.s(2, 2)  # 简化,但不能传入task的option,例如countdown
    tasks.add(2, 2)
    >>> sig = add.signature((2, 2), {'debug': True}, countdown=10)  # 完全版

    定义了签名后,就可以用sig.delay()来调用这个任务。

    签名的一个很重要的功能是它可以定义偏函数,类似 Python 的 functools.partial:

    >>> partial = add.s(2)          # 不完整的 signature
    >>> partial.delay(1)            # 1 + 2  注意这个1是填在前面的

    偏函数主要的应用场合是各种的原语(Primitives)。这些 primitives 主要包括 group、chain、chord、map、starmap、chunks 等。下面介绍其中几个的用法。

    group

    group 可以实现任务的并行:

    >>> from celery import group
    >>> res = group(add.s(i, i) for i in range(10))()
    >>> res.get(timeout=1)
    [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    chain

    chain 可以按顺序执行任务,把前一个任务的结果作为接下来的任务的输入。注意偏函数的使用:

    >>> from celery import chain
    >>> result = chain(add.s('h', 'e'), add.s('llo'), add.s(' world'))()
    >>> result.get()
    'hello world'
    >>> (add.s('h', 'e') | add.s('llo') | add.s(' world'))().get()  # 也可以用 | 连接

    有关这一部分的更详细内容,见 Canvas: Designing Work-flows

    后台启动

    在实际环境中,celery 肯定是以后台服务的方式运行的。文档给出了 systemd、init.d、supervisor 等启动的方式。具体见 Daemonization

    定时任务

    定时运行任务的功能由 celery beat 完成。它按设定的周期/时间把任务发送到队列中,并被 worker 执行。对于一个集群的 worker,celery beat 应只有一个,否则任务会重复。

    mytasks.py改成下面所示:

    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery('tasks', broker='redis://localhost:6379/0')
    
    @app.task
    def add(x, y):
        return x + y
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # 每10s执行一次
        sender.add_periodic_task(10.0, add.s('hello', ' world'), name='every 10s')
    
        # 按crontab的格式定期执行
        sender.add_periodic_task(
            crontab(hour='*', minute=5),
            add.s('it is', ' xx:05')
        )

    然后启动 beat:

    $ celery -A mytasks beat

    可以在 worker 看到每 10s 输出了一次 “hello world”。每个小时的 5 分,都会输出 “it is xx:05”

    关于定时任务,具体见 Periodic Tasks

    相关参考

    上面只是比较基本的用法。对于更多深入使用中遇到的问题,还是应该参考官网文档

  • 京东方高分辨率QLED重大突破:颠覆AMOLED的下一代显示技术

    从京东方官方获悉,近日,京东方集团中央研究院关于高分辨率、全彩量子点发光二极管(QLED)的研究取得重大突破,实现了分辨率500ppi、色域114%NTSC的全彩QLED器件,技术指标全球领先。相关研究论文在国际权威学术期刊Nano Research上发表。

    据了解,量子点发光二极管(QLED)被普遍认为是继主动矩阵有机发光二极管(AMOLED)之后更具颠覆性的下一代显示技术。

    京东方500PPI 全彩QLED器件点亮效果及子像素排布
    阅读更多…
  • Google AdSense PIN 码,终于收到

    前后加起来超过5个月,终于收到了来自 AdSense 的 PIN 码。记录一下过程。

    我总共申请寄了 4 次。前面 3 次大概是 12 或 1 、2、3 月,都没有收到,当时觉得可能是疫情导致交通中断了。最后一次到了 4 月,实在没有办法,我把原来英文的地址改为中文,最后试了一次。

    终于,在 5 月底,收到了来自马来西亚(不是美国)的 AdSense 信件。

    信的地址,除了“CHINA”是英文的,其他都是中文。所以不用担心填了中文,国外的邮政看不懂,CHINA 总知道吧。我反而觉得是不是中国邮政看不懂英文(虽然基本都是拼音),所以之前一直都没有收到……

  • Python 类型标注 小结

    众所周知,Python 是动态语言,函数调用不检查传入的变量类型,一个变量的类型现在是数字,下一秒可以变成字符串。这是它的特点(优点/缺点)。

    IDE 的代码提示依赖于对变量类型的了解,它大大提升了我的编码体验。

    Python 3 之后,增加了多种类型标注的支持。本文主要是给自己备忘,哪一个版本可以用哪些类型标注。

    PEP 484

    def greeting(name: str) -> str:
        return 'Hello ' + name

    PEP 484 由 Python 3.5 开始支持。通过定义函数参数的类型与函数返回值的类型,可以覆盖大部分“自己写的代码”的情况。

    typing

    from typing import List
    Vector = List[float]
    
    def scale(scalar: float, vector: List[float]) -> Vector:
        return [scalar * num for num in vector]
    # 或
    def scale(scalar: float, vector: Vector) -> Vector:
        return [scalar * num for num in vector]
    
    new_vector = scale(2.0, [1.0, -4.2, 5.4])

    typing 模块也是由 Python 3.5 开始支持。增加了类似List<String>、类型别名、泛型等功能。

    PEP 526

    primes: List[int] = []
    
    captain: str  # Note: no initial value!
    
    class Starship:
        stats: ClassVar[Dict[str, int]] = {}
    
    some_value: int = other_3rd_party_function()

    PEP 526 由 Python 3.6 开始支持。它通过把变量类型写在等号前面,大幅增加了存在感,并可以处理“别人写的代码”。

    其他

    Python 3.8 还增加了更多类似的支持,如 PEP 544、PEP 586、PEP 589、PEP 591。由于没有用过 3.8,这里就不介绍了。总的方向还是不变的,就是增加一些 feature 使动态的语言变得稍微“静态”一点,从而方便开发者、提高安全性。

  • Python 存储大量 NumPy Array 等数据的方案:HDF5

    对于序列化保存各种 array / data frame 等类型的数据,一直以来有各种各样的办法。例如我用过的,对于简单的一个 array,NumPy 有提供读写的方法;pandas 也有对应的 data frame 读写;而字符串/字典,可以变成 json 保存等。

    但是,如果数量多了,例如有 100 个 array,上面的方法就不太方便了。我比较懒,会把这些 array 放到一个 dict 里面,然后用 pickle 把这个 dict pickle下来——保存和读取都非常方便,而且兼容所有数据类型。

    后来,数据量多了之后,就发现 pickle 的方案也是有缺点的,就是性能不好(文末有初步的性能对比)。所以调研了一下后,选择了 HDF5。以前只是听过,没有用过,现在用了感觉不错,在下面稍微总结一下。

    目标用户

    无论是科学研究,还是各行各业,都有 HDF5 的身影。高效、跨平台、无上限,尤其适合数据量大的情景。见官网的 Who Uses HDF?

    安装

    HDF5 支持各种语言,Python 对应的库是 h5py。

    $ pip install h5py
     or
    $ conda install h5py  # Anaconda

    核心概念

    HDF5 里只有 2 种类型:datasetgroup
    – dataset 就像数组,类似 Python 的 list (一维或多维),或 NumPy 的 ndarray。dataset 的语法和 ndarray 类似。
    – group 就像 Python 的 dict,在我看来,它更像是带路径的文件夹。group 的语法和 dict 类似。

    就像是在一层一层的文件夹中,存放着不同的 dataset。记住以上两点,就🆗

    阅读更多…
  • Kotlin 协程踩坑之 Redis 分布式锁

    本文不涉及技术细节,只是记录一点踩坑的过程和感受。

    背景

    项目中的两个不同的服务可能会同时修改一个资源的状态,决定使用 Redis 分布式锁。我这边的服务是用 Kotlin 写的,每当接收到一个 HTTP 请求,就会开一个协程来处理。在协程运行的过程中,会调用 Redisson 来获取一个锁,运行结束后(一般要十几秒),会释放锁。

    问题

    服务跑的时候发现,有时候,同时收到了两个请求,这两个请求会同时锁一个东西,然而它们竟然一起运行了,导致彼此之间产生了干扰。更奇怪的是,尝试复现的时候,有时候能复现,有时又不能。

    阅读更多…