• CTP JAVA API Linux x64 编译(SWIG 封装 C++ 动态库)

    前言

    网上有不少教程讲到 CTP API 的编译,但是我按照多数教程照着做都不太成功。在此我把我照着做成功的过程记录下来,给自己留底,也希望可以帮到更多的人。

    运行环境

    CTP 服务端主要由 Java 编写,部署在 Linux 系统,本地运行也是 Linux 系统。所以 API 的编译也在 Linux。

    过程

    1. 下载 CTP API

    在上期的官网下载,然后解压。我们只需要 Linux x64 的,工作目录的文件如下:

    error.dtd
    error.xml
    ThostFtdcMdApi.h
    ThostFtdcTraderApi.h
    ThostFtdcUserApiDataType.h
    ThostFtdcUserApiStruct.h
    thostmduserapi_se.so
    thosttraderapi_se.so
    阅读更多…
  • 禁用 Hibernate 的代理(proxy),获取其真实对象 / Disable Hibernate Proxy and Retrieve Real Entity Object

    Java 的各种框架还是很复杂的,不看文档或者不踩坑,都不知道原来框架还“偷偷地”做了一些别的事情——可能这就是一部分面试题的出处吧😂

    踩坑

    假设有一个 Model 叫做 Person,存到数据库里,框架用的是 Hibernate。代码中用 personDao.findById() 来从数据库中获取一个 Person 实例。项目中大概就是这么写的。

    问题来了:要把这个 Person 实例序列化,之后再反序列化。序列化的时候没发现什么问题,反序列化的时候报错了,告诉我这不是一个 Person,而是一个 Person$HibernateProxy$9cjcxRNr

    啥情况,代码明明写的是 Person person = personDao.findById(123); 怎么出来的不是 Person 而是 Person 后面加了一长串东西?

    解惑

    经过搜索,得知 Hibernate 使用 Proxy 实现延迟加载(Lazy Loading)功能。延迟加载,这个我懂,真正用到时才去查询。

    找到的资料还告诉我,被偷偷换成了 proxy 之后,还可能有别的坑:直接访问成员变量可能访问不到、用 IDE 断点调试时的行为和实际运行的行为不一致、instanceOf 的行为可能与预期不一致等。

    解决

    第一个办法,若不确定它是不是 proxy,可以强行让 Hibernate 给我们转回真正的实例,是就转,不是就原样返回:

    Object unproxiedEntity = Hibernate.unproxy(proxy); // 自己再做强制类型转换

    第二个办法,通过注解,不让 Hibernate 给这个 Model / Entity Class 做延迟加载:

    @Proxy(lazy = false)

    相关参考

  • 真 · Python 序列化/反序列化库:marshmallow

    前言

    本文通过实际项目的经历,经过搜索与比较,终于找到了好用的 Python 序列化库:marshmallow。
    它拥有类似 django 的语法,支持详细的自定义设置,适合各种各样的序列化/反序列化情景。

    问题

    我们都知道,Python 自带了一些序列化的库,例如 json、pickle、marshal 等。现在的问题是,要向一个 HTTP 服务器提交一个 POST 请求,附带一个 JSON 作为 payload,假设这个 payload 是这样的:

    {
      "name": "abc",
      "price": 1.23456,
      "date": "2021-06-26"
    }

    那么,它对应的 Python 定义是

    from dataclasses import dataclass
    import datetime
    
    @dataclass
    class Item:
        name: str
        price: float
        date: datetime.date

    所以,这个 POST 请求会是这样发送的:

    import json
    import requests
    
    item = Item('abc', Decimal('1.23456'), '2021-06-26')
    requests.post(url, data=json.dumps(item))

    这样一跑,马上给报错:TypeError: Object of type Item is not JSON serializable。嗯?????怎么报错了?才知道 Python 自带的 json 库不支持序列化一个自定义的 class,仅仅支持那几个 Python 内置的类,如 dict, str, float, list, bool。对此,网上是有一些解决方法,但其实都是妥协之举:如果服务器返回一个 {"name": "abc","price": 1.23456,"date":"2021-06-26"},能方便地反序列化成实例么?

    遇到这种问题,首先就想起“不要自己造轮子”的原则。Python 作为一门非常成熟的语言,就没有什么 pythonic 的解决方案?然后我去百度、Google,似乎还真没有,marshmallow 已经是我能找到的最好的了。

    手上还有 JVM 的项目,可以用 jackson/fastjson 等库,那才是方便啊,类型传进去后,正反序列化一气呵成。

    如果自己给这个类写一个 to_json() 方法,手动构造一个 dict 呢?想想还是不行,如果未来要修改这个类的属性,那么还得对应改。什么?你说连这个类都不要了,post 的时候直接现场构造一个 dict?这绝对是在给自己挖坑啊。

    所以,marshmallow 赶紧学起来。

    Marshmallow

    Schema

    最重要的,是定义 schema。对于刚才的 Item 类,对应的 schema 会这样写:

    @dataclass
    class Item:
        name: str
        price: Decimal
        date: datetime.date
    
    # 对于其他的所有 fields, 可以参考文档
    # https://marshmallow.readthedocs.io/en/stable/marshmallow.fields.html#api-fields
    
    from marshmallow import Schema, fields
    
    class ItemSchema(Schema):
        name = fields.String()
        price = fields.Decimal()
        date = fields.Date()

    如果以前接触过 django,应该对这种写法很熟悉,不过不熟悉也没关系。

    序列化

    然后,就可以用 dump() 序列化了。

    item = Item('abc', Decimal('1.23456'), datetime.date(2021, 6, 26))
    
    schema = ItemSchema()
    result_obj = schema.dump(item)
    print(result_obj)
    
    # 会输出 {'price': Decimal('1.23456'), 'name': 'abc', 'date': '2021-06-26'}

    可以看到,在定义好的 schema 的帮助下,item 实例变成了一个 dict。

    如果想输出一个字符串呢?可以把 schema.dump 换成 schema.dumps(不过由于 price 是一个 Decimal 实例,而自带的 json 不支持 Decimal 会报错,可以考虑另外装一个 simplejson 库来解决。这个问题在后文会再讲到)

    反序列化

    使用 load() 进行反序列化:

    input_data = { 'name': 'abc', 'price': Decimal('1.23456'), 'date': '2021-06-26'}
    
    load_result = schema.load(input_data)
    print(load_result)
    
    # 输出 {'name': 'abc', 'date': datetime.date(2021, 6, 26), 'price': Decimal('1.23456')}

    注意到 date 属性变成了一个 Python 的 datetime.date 实例,这是我们希望的。

    这里只是反序列化成了一个 dict,那么怎样才能返回一个真正的 Item 实例?可以给 schema 添加一个返回 Item 的方法,并打上 post_load 的注解(装饰器)。

    from marshmallow import Schema, fields, post_load
    
    class ItemSchema(Schema):
        name = fields.String()
        price = fields.Decimal()
        date = fields.Date()
    
        @post_load
        def make_item(self, data, **kwargs):
            return Item(**data)
    
    result_obj = schema.dump(item)
    print(result_obj)
    # 返回 Item(name='abc', price=Decimal('1.23456'), date=datetime.date(2021, 6, 26))

    可以,有点样子了。

    数据校验

    既然有了 schema,当然可以顺便在 load 时做校验了。例如输入的字段有没有多,有没有缺,属性的类型对不对,值的范围有没有超,哪些属性可以填 None(null)等等。这里不再赘述,见文档

    自定义字段

    Marshmallow 库自带的一些类型可能不够满足需求,例如想加一个 省份、身份证 什么的,用字符串的话好像感觉欠缺了一点校验。而刚才提到的 Decimal 比较难处理,我觉得也可以用自定义字段来解决。

    例如,可以用 Method 字段来自定义正反序列化时所使用的方法。

    class ItemSchema(Schema):
        name = fields.String()
        price = fields.Method('price_decimal_2_float', deserialize='float_2_decimal')
        date = fields.Date()
    
        @post_load
        def make_item(self, data, **kwargs):
            return Item(**data)
    
        def price_decimal_2_float(self, item: Item):
            return float(item.price)
    
        def float_2_decimal(self, float):
            return decimal.Decimal(str(float))

    这样就可以正常用 dumps(), loads() 了:

    result_str = schema.dumps(item)
    print(result_str)
    # {"date": "2021-06-26", "name": "abc", "price": 1.23456}
    
    input_str = '{"date": "2021-06-26", "name": "abc", "price": 1.23456}'
    load_result = schema.loads(input_str)
    print(load_result)
    # Item(name='abc', price=Decimal('1.23456'), date=datetime.date(2021, 6, 26))

    太棒了,输入的 JSON 的 date 和 price,在反序列化后,自动变成了 datetime.dateDecimal

    小结

    Marshmallow 的核心是 schema,数据类型、校验等都记录在 schema 中,从而支持复杂对象的序列化和反序列化。如果说它有什么缺点,那必须是它仍然需要专门定义一个 schema 才能使用。如果是 JVM 系列的 jackson 等库,可以直接使用 data class 作为 model/schema,额外的配置通过注解等方式引入,这样会少一个对应的 schema 类。marshmallow 的做法使类的数量双倍了。总体来说,它仍然是不造轮子的情况下的好选择。

    至于更深入的用法,还是需要参考官方文档,见文末。

    相关参考

  • 使反向代理后的 Flask 的 url_for() 使用 https

    这个问题困扰我多时,曾尝试多个方案未果,今天终于搜到了正确的解决方法,记录下来。

    我的网络拓扑: 外部 → docker的nginx → docker的gunicorn → flask

    问题

    Flask 代码中的 redirect(url_for('login')) 会跳回 http 的登录页面。

    我也不想给每个 url_for() 添加强制 https 的参数。

    解决

    from werkzeug.middleware.proxy_fix import ProxyFix
    from flask import Flask
    
    app = Flask(__name__)
    app.wsgi_app = ProxyFix(app.wsgi_app)

    相关参考

  • Redis 的 Python 客户端推荐:walrus

    最近在工作中需要在 Python 程序中读写 Redis。之前在自己的项目中,用的是 redis-py ——也是最广为人知的客户端。

    不过,它相当难用,几乎就是原生的 Redis 命令,在大一点的项目中,写一堆 Redis 命令,我估计是受不了的。而我在工作的其他 Java/JVM 项目里,用的是经过抽象封装后的库 Redisson,使用的体验就很舒服。

    那么,在 Python 环境,如果不用 redis-py,用什么库好呢,还是自己造轮子?

    在官网的客户端列表中可以找到,除了 redis-py,Python 另外还有 walrus 这个推荐的客户端。直接点进去,了解到它支持很 pythonic 风格的 Hash、List、Set、Sorted Set 等容器,足以满足我的使用需求,就用它了~

    附其代码样例:

    >>> h = db.Hash('charlie')
    >>> h['age'] = 31
    >>> print(h)
    <Hash "charlie": {'age': '31'}>
  • 解决 expecting SSH2_MSG_KEX_ECDH_REPLY 等一系列网络问题

    背景

    公司有多个内网,之间通过 VPN 互联。

    问题

    本地使用 Linux 电脑,进行软件开发等活动。遇到了各种各样奇异的网络问题,例如 SSH 不能连接内网的 git;开发的 Java 程序不能访问内网的数据库、zookeeper;远程桌面可以连接,但一旦复制粘贴文件,远程桌面就断开等。ping、telnet 连接对应端口,都是正常的。

    更奇怪的是,如果重启进入 Windows 系统,“SSH 不能连接内网的 git”就自动好了。

    一度怀疑是某些数据包触发了墙,然后整个连接被墙了。

    解决

    求助运维同事,一开始也排查不出来,后来在给 SSH 连接加上了详细的 debug 信息后,发现是”expecting SSH2_MSG_KEX_ECDH_REPLY”这步卡住了。

    上网一搜,就是 MTU 的问题。

    马上用 ifconfig 把 MTU 从 1500 改到 1400,结果问题依然存在。改为 1300,SSH 能联通了,其他问题也都没有了。

    后记

    MTU 感觉就是在学网络时才有提到的词,没想到被坑到了。很可能与公司加了一层 VPN 有关,毕竟平时 1500 MTU,还没出现过问题。

  • FIX 协议开发(3):QuickFIX/J 实战经验小结

    本系列导航

    FIX 协议开发(1):协议介绍及开发方案
    FIX 协议开发(2):QuickFIX/J 入门
    FIX 协议开发(3):QuickFIX/J 实战经验小结(本文)

    代码结构

    这里主要讲一下我们的思路,具体的代码不方便贴上来。

    首先需要实现 Application,主要是其中的 fromApp(),需要 crack 不同类型的 Message。处理这些 Message 的共同点在于,不把业务逻辑写在 crack() 方法里,crack() 主要就是把 message 通过一个或多个队列传到外部。

    收发信息的部分,从上一节可以看到,具体代码还是挺零碎的,所以做一层抽象会方便一点。
    另一个重点是注意同步/异步:有些方法做成同步会方便一点,例如查询持仓/资金,又例如撤单;而下单则应做成异步。

    Fix Session 持久化

    程序启动时,会自动读入之前保存的状态。FIX 协议对序列号的要求严格,如果收到的序列号偏大或偏小,都需要额外的谨慎处理。因此,持久化非常重要,尤其是维护收发序列号。

    以我方下一次发送的序列号 NextSenderMsgSeqNum 为例。Session.javasendRaw()方法调用了persist()方法,该方法中 MessageStore 实例把即将发送的消息持久化;该步骤完成后,MessageStore 实例接着执行incrNextSenderMsgSeqNum(),把序列号保存下来。在persist()完成之后,才真正把数据发送出去。

    关于 session 的另一个话题是,如何手动控制一个 session,例如使 session logout、reset 等。

    可以使用静态函数Session.lookupSession(sessionID)来从 ID 获取对应的 session,接下来就可以调用 session.logon()、logout()、reset() 方法。后两个的区别,logout 是发送 FIX 的 logout 信号,然后断开连接,不影响序列号等内部状态,之后调用 logon() 即可回到正常使用的模式。reset() 会清除序列号等内部状态及其持久化,下次收发包会都从 1 开始。盘中就别用这个了。

    异常处理

    进行 QFJ 开发时,特别需要注意的是,如何捕捉“异常”。不像 API 调用,时刻都有 error code 字段。对方返回的消息中如有错误信息或特殊情况时,是需要针对性处理的。

    例如查询持仓是空仓,服务器不可能什么都不返回,而会返回一个特殊的空仓标记。又例如各种字段填写不合法,报错信息很可能藏在可选的 Text 字段中。

    接收业务信息后报错

    主要有两个原因:

    • 一是不符合 xml 的定义,在底层就会自动给对方发送拒绝消息“MsgType (35): 3 (REJECT)”,不会传到 Application 的应用层;
    • 二是在应用层的 fromApp() 内 crack 信息时没有找到对应 crack 的方法,会发送“BusinessRejectReason (380): 3 (UNSUPPORTED MESSAGE TYPE)”给服务器。

    解决的要点都在于我方的字典没有适配对方的字典(对方故意发错误的数据包除外)。参考上一篇的“自定义字段”一节,根据实际情况修改 xml,自行打造合适的 Message 库才是王道。

    消息自动按类型分类、处理流程

    对比自己构造数据包、自己解析收到的数据全 DIY 的实现方式, QFJ 框架已经为我们做了很多。如果好奇的话,数据包是如何解析,最终传给我们的 fromApp() 方法?先从源头,即 MINA 框架接收到数据包说起。

    对方发来的数据包,首先经过 quickfix.mina.message.FIXMessageDecoder 解码,进行基础的校验,例如检测 FIX 协议头、获取消息体的长度、校验 checksum 等。

    如果校验正常,InitiatorIoHandler/AcceptorIoHandler 调用其父类 Quickfix.mina.AbstractIoHandler 继续处理消息,包括但不限于:从 session 获取字典、解析 FIX 数据【通过调用quickfix.MessageUtils的函数】等。解析时,先提取 msgType,调用 session 对应的 messageFactory 创建 msgType 类型的 message 实例,接下来根据字典解析数据内容。

    解析 message 完成后,InitiatorIoHandler/AcceptorIoHandler 把 message 传给 processMessage() 方法;执行 eventHandlingStrategy.onMessage(),该方法内把 message 塞到队列 queueTracker 中。

    最后,队列的元素会被“QFJ Message Processor”线程取出,给到 quickfix.Session 的 next(Message) 方法。这个 next() 方法,里面还有对字段的校验,里面的 verify(message) 再进而校验登录状态、时间、seq num 等,最终调用我们写的 fromApp() 方法。

    可见,从接收到数据,到我们实现的 Application,经历了很多很多层的调用。😅

    这个流程还没有结束。

    我们让自己的 Application 继承 quickfix.MessageCracker,从而拥有了 crack(),crack 是怎样根据不同实际类型的 message 调用对应的 onMessage() 的呢?

    MessageCracker 在初始化时,执行 initialize() ,通过反射,找到所有 handler【也就是各种自己写的 onMessage()】,并存到一个 mapping。之后 fromApp() 调用 crack() 时,就通过传入的 message 提取其类型以及 mapping 找到对应的 handler,调用对应的 onMessage()。

    如果觉得这个 crack 过程有点太“黑科技”,我们可以不用它,不继承 quickfix.MessageCracker,转而继承 quickfix.fix42.MessageCracker【按实际对应的版本】。点进去可以看到,全是各种 onMessage,这样就不涉及反射,都是 override 了。我喜欢 explicit 的方法。

    性能

    目前业务是用来接入券商。还没做过大规模的测试,但按当前测试数据估计每秒可发单 100 个以上,性能满足当前业务需求。

    相关参考

    1. https://www.quickfixj.org/usermanual/2.1.0/
    2. https://blog.csdn.net/u011279740/category_1517545.html
  • FIX 协议开发(2):QuickFIX/J 入门

    本系列导航

    FIX 协议开发(1):协议介绍及开发方案
    FIX 协议开发(2):QuickFIX/J 入门 (本文)
    FIX 协议开发(3):QuickFIX/J 实战经验小结

    安装

    QuickFIX/J 的官网是 https://www.quickfixj.org,其 GitHub 是 https://github.com/quickfix-j/quickfixj。撰写本文时,GitHub 上的版本是 2.2,官网上的文档给到了 2.1 的版本。所以这里选择安装 2.1,避免与文档不匹配。

    我们通过 Maven 来引入 quickfixj 库,见此页的“Maven Integration”部分,在 pom.xml 中添加对应 xml 片段(该网页中的xml引入的是2.0.0版本的库,或许是笔误,我将其改为2.1.0)即可:

    <!-- QuickFIX/J dependencies -->
    <dependency>
        <groupId>org.quickfixj</groupId>
        <artifactId>quickfixj-core</artifactId>
        <version>2.1.0</version>
    </dependency>
    <!-- 选择所需版本的messages -->
    <dependency>
        <groupId>org.quickfixj</groupId>
        <artifactId>quickfixj-messages-fix42</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.22</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.22</version>
    </dependency>
    阅读更多…
  • FIX 协议开发(1):协议介绍及开发方案

    本系列导航

    FIX 协议开发(1):协议介绍及开发方案(本文)
    FIX 协议开发(2):QuickFIX/J 入门
    FIX 协议开发(3):QuickFIX/J 实战经验小结

    本文背景

    公司因业务需要,准备接入 FIX 协议。在调研过程中,我发现中文的 FIX 协议相关资料不太多,准备边学边记录,预计会写 3 篇左右。

    FIX 协议简介

    FIX(Financial Information eXchange Protocol,金融信息交换协议)是由国际FIX协会组织提供的一个开放式协议,目的是推动国际贸易电子化进程,在各类参与者之间,包括投资经理、经纪人、买方、卖方建立起实时的电子化通讯协议。

    FIX协议的目标是把各类证券金融业务需求流程格式化,成为一个可用计算机语言描述的功能流程,并在每个业务功能接口上统一交换格式,方便各个功能模块的连接。

    消息格式

    FIX 协议消息均由多个 “key=value” 组成。其中 key 可以是协议规定的字段,或自定义字段。协议规定的key可查询 FIX 协议字典,【不同版本的 FIX 协议均有其字典,用于开发的库一般也有自带;也可参考第三方,如 wireshark】例如 8 代表 begin string,34 代表消息的序列号,52 代表时间戳等。自定义字段不与规定的 key 重复,供金融机构定制,开发时需要向对应金融机构获取其专有字段的字典。只要有了对应的字典,就可以读懂 FIX 数据包的内容。

    一般来说,一个 消息由“头部 + 消息体 + 尾部”构成。头部包含一些必要的字段,例如 BeginString (8)、BodyLength (9)、MsgType (35)、MsgSeqNum (34)、SenderCompID (49) 等,尾部包含的必要字段是 CheckSum (10)。

    FIX 登陆消息示例(假设”^”是分隔符):

    8=FIX.4.3^9=65^35=A^34=1^49=TESTACC^52=20130703-15:55:08.609^56=EXEC^98=0^108=30^10=225^

    对照字典可知,BeginString (8) 是 FIX.4.3;BodyLength (9) 是 65 字节;MsgType (35) 是 A,A 对应 logon 操作;MsgSeqNum (34) 是 1,即这是我方发送的第 1 个消息。

    有关更详细的协议介绍,可参考 https://blog.51cto.com/9291927/2536105

    开发方案

    不使用库

    由上面的示例可以发现,FIX 协议十分简单。可以不需要依赖第三方库,手动查字典构造消息
    8=FIX.4.3^9=65^35=A^34=1<省略>
    再通过标准的 socket 通信,即可完成交互。

    这个方案自由度最高,不依赖底层开发语言,但开发流程与查字典较为繁琐,后续维护也不太方便。

    Python 库 simplefix

    simplefix 是一个 FIX 协议的简易实现。它使用户可以方便地任意构造 FIX 消息,非常适合学习、测试协议。但这个库不包含任何网络收发、FIX 异常处理等功能模块。因此,开发 FIX 客户端时,我使用该库构造数据包,然后通过标准 socket 发送,再分析其网络底层交互。

    示例:发送 logon 消息的代码

    import socket
    import time
    
    import simplefix
    from simplefix.constants import (ENCRYPTMETHOD_NONE, MSGTYPE_LOGON,
                                     TAG_BEGINSTRING, TAG_CLIENTID,
                                     TAG_ENCRYPTMETHOD, TAG_HEARTBTINT,
                                     TAG_MSGSEQNUM, TAG_MSGTYPE, TAG_SENDER_COMPID,
                                     TAG_SENDING_TIME, TAG_TARGET_COMPID)
    
    HOST = '1.2.3.45'
    PORT = 9000
    CLIENT_ID = 12345678
    PASSWORD = 'mypassword'
    
    seq_num = 1  # 需维护msg sequence number,每次发送后加1
    
    
    if __name__ == "__main__":
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)  # 长连接
        sock.connect((HOST, PORT))
    
        # logon
        msg_logon = simplefix.FixMessage()
        msg_logon.append_pair(TAG_BEGINSTRING, 'FIX.4.2')
        msg_logon.append_pair(TAG_MSGSEQNUM, seq_num)  # 需维护msg sequence number,每次发送后加1
        msg_logon.append_pair(TAG_SENDER_COMPID, 'FIXTEST001')
        msg_logon.append_utc_timestamp(TAG_SENDING_TIME)
        msg_logon.append_pair(TAG_TARGET_COMPID, 'TESTENV')
    
        msg_logon.append_pair(TAG_MSGTYPE, MSGTYPE_LOGON)  # 类型
    
        msg_logon.append_pair(TAG_ENCRYPTMETHOD, ENCRYPTMETHOD_NONE)
        msg_logon.append_pair(TAG_HEARTBTINT, 30)
        msg_logon_buffer = msg_logon.encode()
    
        sock.send(msg_logon_buffer)
        print('seq', seq_num, 'sent')
        seq_num += 1
    
    
        time.sleep(1)
        sock.close()

    多平台 QuickFix 引擎

    QuickFix 是全功能的 FIX 开源引擎,目前很多 Fix 解决方案都是根据或参考 QuickFix 实现的。目前(2020年10月)它有 C++、Python、Java、.NET、Go 和 Ruby 共 6 种语言的实现/接口。

    根据我司的情况,选择其中的 Java 实现 QuickFIX/J 进行下一步开发。其使用方法,将在下一篇文章继续。