本系列导航
FIX 协议开发(1):协议介绍及开发方案
FIX 协议开发(2):QuickFIX/J 入门
FIX 协议开发(3):QuickFIX/J 实战经验小结(本文)
代码结构
这里主要讲一下我们的思路,具体的代码不方便贴上来。
首先需要实现 Application,主要是其中的 fromApp() 需要 crack 不同类型的 Message。但是,不把具体业务逻辑写在 crack() 方法里,crack() 主要就是把 message 通过一个或多个队列传到外部。
收发信息的部分,从上一节可以看到,具体代码还是挺零碎的,所以做一层抽象会方便一点。
另一个重点是注意同步/异步:有些方法做成同步会方便一点,例如查询持仓/资金,又例如撤单【同步/异步都可以】;而下单则应做成异步。
Fix Session 持久化
程序启动时,会自动读入之前保存的状态。FIX 协议对序列号的要求严格,如果收到的序列号偏大或偏小,会导致需要额外的处理甚至 session 被断掉。因此,持久化非常重要,尤其是维护收发序列号。
以我方下一次发送的序列号 NextSenderMsgSeqNum 为例。Session.java
的sendRaw()
方法调用了persist()
方法,该方法中 MessageStore 实例把即将发送的消息持久化;该步骤完成后,MessageStore 实例接着执行incrNextSenderMsgSeqNum()
,把序列号保存下来。在persist()
完成之后,才真正把数据发送出去。
MessageStore 自带了几种实现,例如基于文件的FileStore
,放在内存里的MemoryStore
,我们参考MemoryStore
稍作修改,做了一个RedisStore
,感觉会灵活一点。
关于 session 的另一个话题是,如何手动控制一个 session,例如使 session logout、reset 等。
可以使用静态函数Session.lookupSession(sessionID)
来从 ID 获取对应的 session,接下来就可以调用 session.logon()、logout()、reset() 方法。后两个的区别,logout 是发送 FIX 的 logout 信号,然后断开连接,不影响序列号等内部状态,之后调用 logon() 即可回到正常使用的模式。reset() 会清除序列号等内部状态及其持久化,下次收发包会都从 1 开始。盘中就别用这个了。
如果不需要手动控制 session,而是每天自动从几点连到几点,那么只需要在参数中配置StartTime
和EndTime
即可。
异常处理
进行 QFJ 开发时,特别需要注意的是,如何捕捉“异常”。不像 API 调用,时刻都有 error code 字段。对方返回的消息中如有错误信息或特殊情况时,是需要针对性处理的。
例如查询持仓是空仓,服务器不可能什么都不返回,而会返回一个特殊的空仓标记。又例如各种字段填写不合法,报错信息很可能藏在可选的 Text 字段中。
接收业务信息后报错
主要有两个原因:
- 一是不符合 xml 的定义,在底层就会自动给对方发送拒绝消息“MsgType (35): 3 (REJECT)”,不会传到 Application 的应用层;
- 二是在应用层的 fromApp() 内 crack 信息时没有找到对应 crack 的方法,会发送“BusinessRejectReason (380): 3 (UNSUPPORTED MESSAGE TYPE)”给服务器。
解决的要点都在于我方的字典没有适配对方的字典(对方故意发错误的数据包除外)。参考上一篇的“自定义字段”一节,根据实际情况修改 xml,自行打造合适的 Message 库才是王道。
消息自动按类型分类、处理流程
对比自己构造数据包、自己解析收到的数据全 DIY 的实现方式, QFJ 框架已经为我们做了很多。如果好奇的话,数据包是如何解析,最终传给我们的 fromApp() 方法?先从源头,即 MINA 框架接收到数据包说起。
对方发来的数据包,首先经过 quickfix.mina.message.FIXMessageDecoder 解码,进行基础的校验,例如检测 FIX 协议头、获取消息体的长度、校验 checksum 等。
如果校验正常,InitiatorIoHandler/AcceptorIoHandler 调用其父类 Quickfix.mina.AbstractIoHandler 继续处理消息,包括但不限于:从 session 获取字典、解析 FIX 数据【通过调用quickfix.MessageUtils的函数】等。解析时,先提取 msgType,调用 session 对应的 messageFactory 创建 msgType 类型的 message 实例,接下来根据字典解析数据内容。
解析 message 完成后,InitiatorIoHandler/AcceptorIoHandler 把 message 传给 processMessage() 方法;执行 eventHandlingStrategy.onMessage(),该方法内把 message 塞到队列 queueTracker 中。
最后,队列的元素会被“QFJ Message Processor”线程取出,给到 quickfix.Session 的 next(Message) 方法。这个 next() 方法,里面还有对字段的校验,里面的 verify(message) 再进而校验登录状态、时间、seq num 等,最终调用我们写的 fromApp() 方法。
可见,从接收到数据,到我们实现的 Application,经历了很多很多层的调用。😅
这个流程还没有结束。
我们让自己的 Application 继承 quickfix.MessageCracker,从而拥有了 crack(),crack 是怎样根据不同实际类型的 message 调用对应的 onMessage() 的呢?
MessageCracker 在初始化时,执行 initialize() ,通过反射,找到所有 handler【也就是各种自己写的 onMessage()】,并存到一个 mapping。之后 fromApp() 调用 crack() 时,就通过传入的 message 提取其类型以及 mapping 找到对应的 handler,调用对应的 onMessage()。
如果觉得这个 crack 过程有点太“黑科技”,我们可以不用它,不继承 quickfix.MessageCracker,转而继承 quickfix.fix42.MessageCracker【按实际对应的版本】。点进去可以看到,全是各种 onMessage,这样就不涉及反射,都是 override 了。我喜欢 explicit 的方法。
性能
目前业务是用来接入券商。还没做过大规模的测试,但按当前测试数据估计每秒可发单 100 个以上,性能满足当前业务需求。