• Kotlin 使用 protobuf 的正确方法(Maven)

    背景

    有一个后端的 Java/Kotlin 项目需要与同事的模块通信,协议是 protobuf。

    这块我想用 Kotlin 来写,当然 Java 也不是不行。没想到踩了些坑,故以本文作记录。

    文档

    protobuf 官方是有 Kotlin 的教程的,但是并不全面,这是出现各种坑的根源。

    另外,我原以为 protobuf 对 Kotlin 的专门支持是多年前就有的(指的是支持 Kotlin 的各种方便写法),然而我后来才发现是2021年才有,见 Announcing Kotlin support for protocol buffers

    方法

    1. 从 .proto 生成 .java .kt

    首先找同事得到 .proto 文件,然后下载 protoc,解压后得到一个可执行文件。

    然后按照官方教程,在命令行中执行

    protoc --java_out=$DST_DIR --kotlin_out=$DST_DIR xxxxxx.proto

    注意--java_out--kotlin_out都是需要的,当前版本的 protoc 对 Kotlin 的支持是在原来 Java 生成的基础上多了 Kotlin 的增强。

    然后就得到了一些 .java .kt 文件。

    2. Maven

    把生成的文件放进项目中,可以发现编译是不能通过的,因为缺少 protobuf 相关库。我的项目是 Maven 的,所以在 pom.xml 中加上

    <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-kotlin -->
    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-kotlin</artifactId>
        <version>3.19.4</version>  <!-- version 要与之前 protoc 的版本对应 -->
    </dependency>
    

    这个 pom 的 artifactId (protobuf-kotlin) 是关键!!我看到的文档写的都是protobuf-java,就会导致生成的 .kt 里面的

    @kotlin.OptIn(com.google.protobuf.kotlin.OnlyForUseByGeneratedProtoCode::class)

    报找不到的错误,必须使用protobuf-kotlin

    3. 使用

    这里参照 Announcing Kotlin support for protocol buffers 举的例子,用了 Kotlin 之后,写法更简洁了。假设原来 Java 的是这样写:

    DiceSeries series = DiceSeries.newBuilder()
        .addRoll(DiceRoll.newBuilder()
            .setValue(5))
        .addRoll(DiceRoll.newBuilder()
            .setValue(20)
            .setNickname("critical hit"))
        .build()

    而 Kotlin 可以改成:

    val series = diceSeries {
      rolls = listOf(
        diceRoll { value = 5 },
        diceRoll {
          value = 20
          nickname = "critical hit"
        }
      )
    }

    至于如何序列化和反序列化,继续用刚才的例子,序列化是这样的:

    series.toByteArray()

    反序列化:

    DiceSeries.parseFrom(message)
  • GlobalScope.launch 与 CoroutineScope.launch 的区别?

    背景

    我想在一个 suspend fun 里面 launch 一个协程,应该怎么做?

    我用了GlobalScope.launch {},但是 IDE 给我标黄了,不建议我这样写,那应该怎么写呢?

    带着这个问题,我搜索并阅读了相关参考中的解答,记录为本文。

    解答

    CoroutineScope首先是一个接口,这个接口要求有一个CoroutineContext属性,相当于CoroutineScope给这个属性包了一层。具体来说,CoroutineScope是这样定义的:

    public interface CoroutineScope {
        /**
         * The context of this scope.
         * Context is encapsulated by the scope and used for implementation of coroutine builders that are extensions on the scope.
         * Accessing this property in general code is not recommended for any purposes except accessing the [Job] instance for advanced usages.
         *
         * By convention, should contain an instance of a [job][Job] to enforce structured concurrency.
         */
        public val coroutineContext: CoroutineContext
    }

    然后,CoroutineScope同样的名字,还是一个函数,我们代码中调用的就是这个函数。这个CoroutineScope函数会创建一个给定contextCoroutineScope。通过下面的代码我们还可以发现,该函数返回的 scope 的context里面肯定会带有一个Job,如果传入的context没有Job,函数会给你附送一个。

    /**
     * Creates a [CoroutineScope] that wraps the given coroutine [context].
     *
     * If the given [context] does not contain a [Job] element, then a default `Job()` is created.
     * This way, cancellation or failure of any child coroutine in this scope cancels all the other children,
     * just like inside [coroutineScope] block.
     */
    @Suppress("FunctionName")
    public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
        ContextScope(if (context[Job] != null)
                         context
                     else
                         context + Job()
    )
    
    
    internal class ContextScope(context: CoroutineContext) : CoroutineScope {
        override val coroutineContext: CoroutineContext = context
        // CoroutineScope is used intentionally for user-friendly representation
        override fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
    }

    上面提到的context,它包括了一系列的参数,用来决定该协程将如何执行。这些参数主要有:

    • CoroutineDispatcher — 分配到哪个线程
    • Job — 控制协程的生命周期
    • CoroutineName — 协程的名称
    • CoroutineExceptionHandler — 处理未被捕获的异常

    最重要的两个参数当然是CoroutineDispatcherJob

    Dispatcher 主要有自带的Dispatchers.DefaultDispatchers.IODispatchers.Main(Android)。

    CPU密集型的任务用Dispatchers.Default;而网络、文件的 IO 就用Dispatchers.IO,大家都懂的。详情可看官方文档

    而 Job 则代表了创建出来的协程,launch()aysnc()会返回一个Job的实例。可以调用Job实例的isActiveisCancelled获取协程的状态,也可以调用它的cancel()等方法手动取消这个协程。

    GlobalScope是什么呢?可以看到,它继承了CoroutineScope,并且它的context是固定的EmptyCoroutineContext

    @DelicateCoroutinesApi
    public object GlobalScope : CoroutineScope {
        /**
         * Returns [EmptyCoroutineContext].
         */
        override val coroutineContext: CoroutineContext
            get() = EmptyCoroutineContext
    }

    那么回到最开始的问题,GlobalScope.launchCoroutineScope.launch的区别是什么?

    1. GlobalScope.launch {}会在顶层创建一个协程,跑在 Dispatchers.Default 所指定的线程中;
    2. GlobalScope.launch(Dispatchers.IO) {}会在顶层创建一个协程,跑在 Dispatchers.IO 对应的 IO 线程中;
    3. CoroutineScope(Dispatchers.IO).launch {}和第 2 个是一样的,也是在顶层创建,只是语法的区别;
    4. launch {}会沿用当前的 context,不在顶层,但本文的背景为“在一个 suspend fun 中创建协程”,在没有 scope 的情况下,是不能直接用launch {}的;
    5. CoroutineScope(currentCoroutineContext()).launch {}沿用当前的 context,且不在顶层;我在 GitHub 上搜了一下,挺少人这样写的

    上面提到的“在顶层”的意思是,不受 structured concurrency 的影响,即不会被父协程的 cancel() 取消,也不会被其他协程抛出的异常导致自己也被退出。

    对于一个在顶层被创建的协程,不用的时候记得 cancel(),否则它会在后台一直跑,直到里面的程序结束,这也是 IDE 把 GlobalScope 标黄的原因。

    相关参考

    测试代码

    // 最后附上我测试的代码,test()中有3种写法,在顶层创建的协程不会受RuntimeException的影响
    
    import kotlinx.coroutines.*
    import kotlin.concurrent.thread
    
    fun main() {
        thread(isDaemon = true) {
            runBlocking {
                test()
    
                delay(1000L)
                throw RuntimeException()
            }
        }
    
        Thread.sleep(5000L)
    }
    
    suspend fun test() {
    //    launch { // 不可以
        CoroutineScope(currentCoroutineContext()).launch(Dispatchers.IO) {
    //    CoroutineScope(Dispatchers.Default).launch {
    //    GlobalScope.launch {
            while (true) {
                println("${Thread.currentThread().name}, ${currentCoroutineContext()} inside")
                delay(1000L)
            }
        }
    }
  • 使用 OpenConnect 代替 Cisco AnyConnect,避免路由表被锁定

    前言

    最近做一个项目,需要通过 VPN 连结到对方的服务器,对方用的是 Cisco AnyConnect。

    我们装上给的 AnyConnect 客户端,可以连上。

    问题

    后来,我们的系统跑不起来,发现了问题:我们的系统需要连接我们内网的服务 X,而这个服务 X 的 IP 地址被 AnyConnect 的路由表包含在其中了。查看路由表可以发现这个情况,把 VPN 一断开,服务 X 又能访问了,一连上马上又不行了。我们连 VPN 只需要访问对方的仅仅一个 IP,但是这 VPN 把一堆内网网段都路由了。

    我直接删了对应的路由表,再刷新路由表一看,那个条目怎么还在??

    后来找了好久,总算找到了元凶,就是 AnyConnect 本身。它就是会故意监视路由表,一旦发现被篡改就给你改回去。

    解决

    在刚才的链接中,有人就提出了办法:不用 AnyConnect,改用开源的 OpenConnect。安装很简单,Linux/Mac OS 都有现成的包可以通过命令行安装,Windows 也有对应的 GUI 版本。

    安装完成后,使用以下命令连接 VPN:

    # echo password | openconnect --background -u vpnuser \
    --servercert pin-sha256:xxxxxxxxxxxxxxxxxxxx \
    vpn.your.com

    连接建立后,可以查看路由表,发现路由表确实又被加了许多。

    但是没关系,sudo ip del xxxxxxxxxx,直接就能删掉。这样问题就解决了。

  • CTP Java API Linux/Windows x64 编译(SWIG 封装 C++ 动态库),并解决中文乱码问题

    前言

    网上有不少教程讲到 CTP API 的编译,但是我按照多数教程照着做都不太成功,只有一份是成功了。在此把过程记录下来,希望可以帮到更多的人。

    (2022年4月更新:补充 Windows 下的支持,写在文末)

    Linux

    1. 下载 CTP API

    在上期的官网下载,然后解压到某一个工作目录。我们只需要 Linux x64 的,工作目录的文件如下:

    error.dtd
    error.xml
    ThostFtdcMdApi.h
    ThostFtdcTraderApi.h
    ThostFtdcUserApiDataType.h
    ThostFtdcUserApiStruct.h
    thostmduserapi_se.so
    thosttraderapi_se.so
    阅读更多…
  • 禁用 Hibernate 的代理(proxy),获取其真实对象 / Disable Hibernate Proxy and Retrieve Real Entity Object

    Java 的各种框架还是很复杂的,不看文档或者不踩坑,都不知道原来框架还“偷偷地”做了一些别的事情——可能这就是一部分面试题的出处吧😂

    踩坑

    假设有一个 Model 叫做 Person,存到数据库里,框架用的是 Hibernate。代码中用 personDao.findById() 来从数据库中获取一个 Person 实例。项目中大概就是这么写的。

    问题来了:要把这个 Person 实例序列化,之后再反序列化。序列化的时候没发现什么问题,反序列化的时候报错了,告诉我这不是一个 Person,而是一个 Person$HibernateProxy$9cjcxRNr

    啥情况,代码明明写的是 Person person = personDao.findById(123); 怎么出来的不是 Person 而是 Person 后面加了一长串东西?

    解惑

    经过搜索,得知 Hibernate 使用 Proxy 实现延迟加载(Lazy Loading)功能。延迟加载,这个我懂,真正用到时才去查询。

    找到的资料还告诉我,被偷偷换成了 proxy 之后,还可能有别的坑:直接访问成员变量可能访问不到、用 IDE 断点调试时的行为和实际运行的行为不一致、instanceOf 的行为可能与预期不一致等。

    解决

    第一个办法,若不确定它是不是 proxy,可以强行让 Hibernate 给我们转回真正的实例,是就转,不是就原样返回:

    Object unproxiedEntity = Hibernate.unproxy(proxy); // 自己再做强制类型转换

    第二个办法,通过注解,不让 Hibernate 给这个 Model / Entity Class 做延迟加载:

    @Proxy(lazy = false)

    相关参考

  • 真 · Python 序列化/反序列化库:marshmallow

    前言

    本文通过实际项目的经历,经过搜索与比较,终于找到了好用的 Python 序列化库:marshmallow。
    它拥有类似 django 的语法,支持详细的自定义设置,适合各种各样的序列化/反序列化情景。

    问题

    我们都知道,Python 自带了一些序列化的库,例如 json、pickle、marshal 等。现在的问题是,要向一个 HTTP 服务器提交一个 POST 请求,附带一个 JSON 作为 payload,假设这个 payload 是这样的:

    {
      "name": "abc",
      "price": 1.23456,
      "date": "2021-06-26"
    }

    那么,它对应的 Python 定义是

    from dataclasses import dataclass
    import datetime
    
    @dataclass
    class Item:
        name: str
        price: float
        date: datetime.date

    所以,这个 POST 请求会是这样发送的:

    import json
    import requests
    
    item = Item('abc', Decimal('1.23456'), '2021-06-26')
    requests.post(url, data=json.dumps(item))

    这样一跑,马上给报错:TypeError: Object of type Item is not JSON serializable。嗯?????怎么报错了?才知道 Python 自带的 json 库不支持序列化一个自定义的 class,仅仅支持那几个 Python 内置的类,如 dict, str, float, list, bool。对此,网上是有一些解决方法,但其实都是妥协之举:如果服务器返回一个 {"name": "abc","price": 1.23456,"date":"2021-06-26"},能方便地反序列化成实例么?

    遇到这种问题,首先就想起“不要自己造轮子”的原则。Python 作为一门非常成熟的语言,就没有什么 pythonic 的解决方案?然后我去百度、Google,似乎还真没有,marshmallow 已经是我能找到的最好的了。

    手上还有 JVM 的项目,可以用 jackson/fastjson 等库,那才是方便啊,类型传进去后,正反序列化一气呵成。

    如果自己给这个类写一个 to_json() 方法,手动构造一个 dict 呢?想想还是不行,如果未来要修改这个类的属性,那么还得对应改。什么?你说连这个类都不要了,post 的时候直接现场构造一个 dict?这绝对是在给自己挖坑啊。

    所以,marshmallow 赶紧学起来。

    Marshmallow

    Schema

    最重要的,是定义 schema。对于刚才的 Item 类,对应的 schema 会这样写:

    @dataclass
    class Item:
        name: str
        price: Decimal
        date: datetime.date
    
    # 对于其他的所有 fields, 可以参考文档
    # https://marshmallow.readthedocs.io/en/stable/marshmallow.fields.html#api-fields
    
    from marshmallow import Schema, fields
    
    class ItemSchema(Schema):
        name = fields.String()
        price = fields.Decimal()
        date = fields.Date()

    如果以前接触过 django,应该对这种写法很熟悉,不过不熟悉也没关系。

    序列化

    然后,就可以用 dump() 序列化了。

    item = Item('abc', Decimal('1.23456'), datetime.date(2021, 6, 26))
    
    schema = ItemSchema()
    result_obj = schema.dump(item)
    print(result_obj)
    
    # 会输出 {'price': Decimal('1.23456'), 'name': 'abc', 'date': '2021-06-26'}

    可以看到,在定义好的 schema 的帮助下,item 实例变成了一个 dict。

    如果想输出一个字符串呢?可以把 schema.dump 换成 schema.dumps(不过由于 price 是一个 Decimal 实例,而自带的 json 不支持 Decimal 会报错,可以考虑另外装一个 simplejson 库来解决。这个问题在后文会再讲到)

    反序列化

    使用 load() 进行反序列化:

    input_data = { 'name': 'abc', 'price': Decimal('1.23456'), 'date': '2021-06-26'}
    
    load_result = schema.load(input_data)
    print(load_result)
    
    # 输出 {'name': 'abc', 'date': datetime.date(2021, 6, 26), 'price': Decimal('1.23456')}

    注意到 date 属性变成了一个 Python 的 datetime.date 实例,这是我们希望的。

    这里只是反序列化成了一个 dict,那么怎样才能返回一个真正的 Item 实例?可以给 schema 添加一个返回 Item 的方法,并打上 post_load 的注解(装饰器)。

    from marshmallow import Schema, fields, post_load
    
    class ItemSchema(Schema):
        name = fields.String()
        price = fields.Decimal()
        date = fields.Date()
    
        @post_load
        def make_item(self, data, **kwargs):
            return Item(**data)
    
    result_obj = schema.dump(item)
    print(result_obj)
    # 返回 Item(name='abc', price=Decimal('1.23456'), date=datetime.date(2021, 6, 26))

    可以,有点样子了。

    数据校验

    既然有了 schema,当然可以顺便在 load 时做校验了。例如输入的字段有没有多,有没有缺,属性的类型对不对,值的范围有没有超,哪些属性可以填 None(null)等等。这里不再赘述,见文档

    自定义字段

    Marshmallow 库自带的一些类型可能不够满足需求,例如想加一个 省份、身份证 什么的,用字符串的话好像感觉欠缺了一点校验。而刚才提到的 Decimal 比较难处理,我觉得也可以用自定义字段来解决。

    例如,可以用 Method 字段来自定义正反序列化时所使用的方法。

    class ItemSchema(Schema):
        name = fields.String()
        price = fields.Method('price_decimal_2_float', deserialize='float_2_decimal')
        date = fields.Date()
    
        @post_load
        def make_item(self, data, **kwargs):
            return Item(**data)
    
        def price_decimal_2_float(self, item: Item):
            return float(item.price)
    
        def float_2_decimal(self, float):
            return decimal.Decimal(str(float))

    这样就可以正常用 dumps(), loads() 了:

    result_str = schema.dumps(item)
    print(result_str)
    # {"date": "2021-06-26", "name": "abc", "price": 1.23456}
    
    input_str = '{"date": "2021-06-26", "name": "abc", "price": 1.23456}'
    load_result = schema.loads(input_str)
    print(load_result)
    # Item(name='abc', price=Decimal('1.23456'), date=datetime.date(2021, 6, 26))

    太棒了,输入的 JSON 的 date 和 price,在反序列化后,自动变成了 datetime.dateDecimal

    小结

    Marshmallow 的核心是 schema,数据类型、校验等都记录在 schema 中,从而支持复杂对象的序列化和反序列化。如果说它有什么缺点,那必须是它仍然需要专门定义一个 schema 才能使用。如果是 JVM 系列的 jackson 等库,可以直接使用 data class 作为 model/schema,额外的配置通过注解等方式引入,这样会少一个对应的 schema 类。marshmallow 的做法使类的数量双倍了。总体来说,它仍然是不造轮子的情况下的好选择。

    至于更深入的用法,还是需要参考官方文档,见文末。

    相关参考

  • 使反向代理后的 Flask 的 url_for() 使用 https

    这个问题困扰我多时,曾尝试多个方案未果,今天终于搜到了正确的解决方法,记录下来。

    我的网络拓扑: 外部 → docker的nginx → docker的gunicorn → flask

    问题

    Flask 代码中的 redirect(url_for('login')) 会跳回 http 的登录页面。

    我也不想给每个 url_for() 添加强制 https 的参数。

    解决

    from werkzeug.middleware.proxy_fix import ProxyFix
    from flask import Flask
    
    app = Flask(__name__)
    app.wsgi_app = ProxyFix(app.wsgi_app)

    相关参考

  • Redis 的 Python 客户端推荐:walrus

    最近在工作中需要在 Python 程序中读写 Redis。之前在自己的项目中,用的是 redis-py ——也是最广为人知的客户端。

    不过,它相当难用,几乎就是原生的 Redis 命令,在大一点的项目中,写一堆 Redis 命令,我估计是受不了的。而我在工作的其他 Java/JVM 项目里,用的是经过抽象封装后的库 Redisson,使用的体验就很舒服。

    那么,在 Python 环境,如果不用 redis-py,用什么库好呢,还是自己造轮子?

    在官网的客户端列表中可以找到,除了 redis-py,Python 另外还有 walrus 这个推荐的客户端。直接点进去,了解到它支持很 pythonic 风格的 Hash、List、Set、Sorted Set 等容器,足以满足我的使用需求,就用它了~

    附其代码样例:

    >>> h = db.Hash('charlie')
    >>> h['age'] = 31
    >>> print(h)
    <Hash "charlie": {'age': '31'}>
  • 解决 expecting SSH2_MSG_KEX_ECDH_REPLY 等一系列网络问题

    背景

    公司有多个内网,之间通过 VPN 互联。

    问题

    本地使用 Linux 电脑,进行软件开发等活动。遇到了各种各样奇异的网络问题,例如 SSH 不能连接内网的 git;开发的 Java 程序不能访问内网的数据库、zookeeper;远程桌面可以连接,但一旦复制粘贴文件,远程桌面就断开等。ping、telnet 连接对应端口,都是正常的。

    更奇怪的是,如果重启进入 Windows 系统,“SSH 不能连接内网的 git”就自动好了。

    一度怀疑是某些数据包触发了墙,然后整个连接被墙了。

    解决

    求助运维同事,一开始也排查不出来,后来在给 SSH 连接加上了详细的 debug 信息后,发现是”expecting SSH2_MSG_KEX_ECDH_REPLY”这步卡住了。

    上网一搜,就是 MTU 的问题。

    马上用 ifconfig 把 MTU 从 1500 改到 1400,结果问题依然存在。改为 1300,SSH 能联通了,其他问题也都没有了。

    后记

    MTU 感觉就是在学网络时才有提到的词,没想到被坑到了。很可能与公司加了一层 VPN 有关,毕竟平时 1500 MTU,还没出现过问题。