• Kotlin 使用 protobuf 的正确方法(Maven)

    背景

    有一个后端的 Java/Kotlin 项目需要与同事的模块通信,协议是 protobuf。

    这块我想用 Kotlin 来写,当然 Java 也不是不行。没想到踩了些坑,故以本文作记录。

    文档

    protobuf 官方是有 Kotlin 的教程的,但是并不全面,这是出现各种坑的根源。

    另外,我原以为 protobuf 对 Kotlin 的专门支持是多年前就有的(指的是支持 Kotlin 的各种方便写法),然而我后来才发现是2021年才有,见 Announcing Kotlin support for protocol buffers

    方法

    1. 从 .proto 生成 .java .kt

    首先找同事得到 .proto 文件,然后下载 protoc,解压后得到一个可执行文件。

    然后按照官方教程,在命令行中执行

    protoc --java_out=$DST_DIR --kotlin_out=$DST_DIR xxxxxx.proto

    注意--java_out--kotlin_out都是需要的,当前版本的 protoc 对 Kotlin 的支持是在原来 Java 生成的基础上多了 Kotlin 的增强。

    然后就得到了一些 .java .kt 文件。

    2. Maven

    把生成的文件放进项目中,可以发现编译是不能通过的,因为缺少 protobuf 相关库。我的项目是 Maven 的,所以在 pom.xml 中加上

    <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-kotlin -->
    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-kotlin</artifactId>
        <version>3.19.4</version>  <!-- version 要与之前 protoc 的版本对应 -->
    </dependency>
    

    这个 pom 的 artifactId (protobuf-kotlin) 是关键!!我看到的文档写的都是protobuf-java,就会导致生成的 .kt 里面的

    @kotlin.OptIn(com.google.protobuf.kotlin.OnlyForUseByGeneratedProtoCode::class)

    报找不到的错误,必须使用protobuf-kotlin

    3. 使用

    这里参照 Announcing Kotlin support for protocol buffers 举的例子,用了 Kotlin 之后,写法更简洁了。假设原来 Java 的是这样写:

    DiceSeries series = DiceSeries.newBuilder()
        .addRoll(DiceRoll.newBuilder()
            .setValue(5))
        .addRoll(DiceRoll.newBuilder()
            .setValue(20)
            .setNickname("critical hit"))
        .build()

    而 Kotlin 可以改成:

    val series = diceSeries {
      rolls = listOf(
        diceRoll { value = 5 },
        diceRoll {
          value = 20
          nickname = "critical hit"
        }
      )
    }

    至于如何序列化和反序列化,继续用刚才的例子,序列化是这样的:

    series.toByteArray()

    反序列化:

    DiceSeries.parseFrom(message)
  • GlobalScope.launch 与 CoroutineScope.launch 的区别?

    背景

    我想在一个 suspend fun 里面 launch 一个协程,应该怎么做?

    我用了GlobalScope.launch {},但是 IDE 给我标黄了,不建议我这样写,那应该怎么写呢?

    带着这个问题,我搜索并阅读了相关参考中的解答,记录为本文。

    解答

    CoroutineScope首先是一个接口,这个接口要求有一个CoroutineContext属性,相当于CoroutineScope给这个属性包了一层。具体来说,CoroutineScope是这样定义的:

    public interface CoroutineScope {
        /**
         * The context of this scope.
         * Context is encapsulated by the scope and used for implementation of coroutine builders that are extensions on the scope.
         * Accessing this property in general code is not recommended for any purposes except accessing the [Job] instance for advanced usages.
         *
         * By convention, should contain an instance of a [job][Job] to enforce structured concurrency.
         */
        public val coroutineContext: CoroutineContext
    }

    然后,CoroutineScope同样的名字,还是一个函数,我们代码中调用的就是这个函数。这个CoroutineScope函数会创建一个给定contextCoroutineScope。通过下面的代码我们还可以发现,该函数返回的 scope 的context里面肯定会带有一个Job,如果传入的context没有Job,函数会给你附送一个。

    /**
     * Creates a [CoroutineScope] that wraps the given coroutine [context].
     *
     * If the given [context] does not contain a [Job] element, then a default `Job()` is created.
     * This way, cancellation or failure of any child coroutine in this scope cancels all the other children,
     * just like inside [coroutineScope] block.
     */
    @Suppress("FunctionName")
    public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
        ContextScope(if (context[Job] != null)
                         context
                     else
                         context + Job()
    )
    
    
    internal class ContextScope(context: CoroutineContext) : CoroutineScope {
        override val coroutineContext: CoroutineContext = context
        // CoroutineScope is used intentionally for user-friendly representation
        override fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
    }

    上面提到的context,它包括了一系列的参数,用来决定该协程将如何执行。这些参数主要有:

    • CoroutineDispatcher — 分配到哪个线程
    • Job — 控制协程的生命周期
    • CoroutineName — 协程的名称
    • CoroutineExceptionHandler — 处理未被捕获的异常

    最重要的两个参数当然是CoroutineDispatcherJob

    Dispatcher 主要有自带的Dispatchers.DefaultDispatchers.IODispatchers.Main(Android)。

    CPU密集型的任务用Dispatchers.Default;而网络、文件的 IO 就用Dispatchers.IO,大家都懂的。详情可看官方文档

    而 Job 则代表了创建出来的协程,launch()aysnc()会返回一个Job的实例。可以调用Job实例的isActiveisCancelled获取协程的状态,也可以调用它的cancel()等方法手动取消这个协程。

    GlobalScope是什么呢?可以看到,它继承了CoroutineScope,并且它的context是固定的EmptyCoroutineContext

    @DelicateCoroutinesApi
    public object GlobalScope : CoroutineScope {
        /**
         * Returns [EmptyCoroutineContext].
         */
        override val coroutineContext: CoroutineContext
            get() = EmptyCoroutineContext
    }

    那么回到最开始的问题,GlobalScope.launchCoroutineScope.launch的区别是什么?

    1. GlobalScope.launch {}会在顶层创建一个协程,跑在 Dispatchers.Default 所指定的线程中;
    2. GlobalScope.launch(Dispatchers.IO) {}会在顶层创建一个协程,跑在 Dispatchers.IO 对应的 IO 线程中;
    3. CoroutineScope(Dispatchers.IO).launch {}和第 2 个是一样的,也是在顶层创建,只是语法的区别;
    4. launch {}会沿用当前的 context,不在顶层,但本文的背景为“在一个 suspend fun 中创建协程”,在没有 scope 的情况下,是不能直接用launch {}的;
    5. CoroutineScope(currentCoroutineContext()).launch {}沿用当前的 context,且不在顶层;我在 GitHub 上搜了一下,挺少人这样写的

    上面提到的“在顶层”的意思是,不受 structured concurrency 的影响,即不会被父协程的 cancel() 取消,也不会被其他协程抛出的异常导致自己也被退出。

    对于一个在顶层被创建的协程,不用的时候记得 cancel(),否则它会在后台一直跑,直到里面的程序结束,这也是 IDE 把 GlobalScope 标黄的原因。

    相关参考

    测试代码

    // 最后附上我测试的代码,test()中有3种写法,在顶层创建的协程不会受RuntimeException的影响
    
    import kotlinx.coroutines.*
    import kotlin.concurrent.thread
    
    fun main() {
        thread(isDaemon = true) {
            runBlocking {
                test()
    
                delay(1000L)
                throw RuntimeException()
            }
        }
    
        Thread.sleep(5000L)
    }
    
    suspend fun test() {
    //    launch { // 不可以
        CoroutineScope(currentCoroutineContext()).launch(Dispatchers.IO) {
    //    CoroutineScope(Dispatchers.Default).launch {
    //    GlobalScope.launch {
            while (true) {
                println("${Thread.currentThread().name}, ${currentCoroutineContext()} inside")
                delay(1000L)
            }
        }
    }
  • 树状数组(Binary Indexed Tree / Fenwick Tree)学习与实现

    树状数组是一个能高效处理数组①更新、②求前缀和的数据结构。它提供了2 个方法,时间复杂度均为O(log n)

    1. update(index, delta):将 delta 加到数组的 index 位置
    2. prefix_sum(n):获取数组的前 n 个元素的和
      range_sum(start, end):获取数组从 [start, end] 的和,相当于 prefix_sum(end) – prefix_sum(start-1)

    如果只追求第 1 点,即快速修改数组,普通的线性数组可满足需求。但对于 range sum(),需要O(n)

    如果只追求第 2 点,即快速求 range sum,使用前缀数组的效果更好。但对于 add() 操作,则需要O(n),所以只适合更新较少的情况。

    树状数组则处于两者之间,适合数组又修改,又获取区间和的情景。

    思想

    树状数组的思想是怎样的呢?

    假设有一个数组 [1, 7, 3, 0, 5, 8, 3, 2, 6, 2, 1, 1, 4, 5],想求前 13 个元素的和。那么,

    13 = 23 + 22 + 20 = 8 + 4 + 1

    前 13 个数的和等于【前 8 个数的和】+【接下来 4 个数的和】+【接下来 1 个数的和】,即 range(1, 13) = range(1, 8) + range(9, 12) + range(13, 13)。如果有一种方法,可以保存 range(1, 8)、range(9, 12)、range(13, 13),那么计算这个区间和就可以加快了。

    这里给出已经计算好的结果(即最下面的 array 层)。例如 array[8] 是 29,往上可以找到 29 对应的是 [1,8],即 range(1, 8) = array[8]。同理,range(9, 12) = array[12],range(13, 13) = array[13]。

    range(1, 13) = range(1, 8) + range(9, 12) + range(13, 13) = array[8] + array[12] + array[13]

    由此图可以发现,虽然它的英文是含有 Tree,中间的部分看起来也是树状的,但是最终用到的 array 是线性的数组(太好了,复杂程度大减)。

    那中间这 3 层是怎么来的呢?——需要从上到下,从左到右看。

    首先计算 [1, 1] 的和,然后计算 [1, 2] 的和,然后计算 [1, 4]、[1, 8] 的和,每次乘 2,直到越界([1, 16] 越界),这里分别算出来了1、8、11、29。

    然后是第二层,从空缺的位置继续,这里的“界”不是整个数组的最大值,而是所有上层中下一个非空缺的位置。计算 [3, 3] 的和,[3, 4] 不用算,因为越界了。然后计算 [5, 5] 的和,接下来是 [5, 6] 的和,[5, 8] 越界不用算。

    第三层也是类似,然后发现填完了。

    以上可以帮助理解 result 数组中各值的来源,实际建立时有更简洁的做法。至于为什么是这样定义,可以另外找找资料,我看起来这有点像“分形”的感觉。

    阅读更多…
  • 一图流领悟跳跃列表(Skip List),附 Python/Java/Kotlin 实现

    跳跃列表是一种随机数据结构。它使得包含 n 个元素的有序序列的查找、插入、删除操作的平均时间复杂度都是 O(log n)。(注意关键词有序,如果不需要有序,也不需要用到跳跃列表;数据量大时,时间复杂度退化到较慢的概率微乎其微)

    平均最差
    搜索O(log n)O(n)
    插入O(log n)O(n)
    删除O(log n)O(n)
    空间O(n)O(n log n)

    跳跃列表是通过维护一个多层链表实现的。每一层链表中的元素的数量,在统计上来说都比下面一层链表元素的数量更少。也就是说,上层疏,下层密,底层数据是完整的,上面的稀疏层作为索引——这就是链表的“二分查找”啊。

    一开始时,算法在最稀疏的层次进行搜索,直至需要查找的元素在该层两个相邻的元素中间。这时,算法将跳转到下一个层次,重复刚才的搜索,直到找到需要查找的元素为止。

    Wikipedia 的道理就讲到这里,我不希望把本文写得难懂。说好的一图流就能领悟呢?其实我有点标题党,本文不止一幅图,但是核心的图只有一幅,上图(来自 Wikipedia):

    请多次认真观看插入节点的全过程 gif。我看完之后,就觉得自己可以实现出来了(虽然后来实际开发调试了很多次)。

    例如想在上图中所示的跳跃列表中插入 80,首先要找到应该插入的位置。

    首先从最稀疏的层的 30 开始,把当前位置设置为顶层的 30。
    80 比当前位置右边的 NIL 小,所以把当前位置移到下一层的 30;
    80 比当前位置右边的 50 大,所以把当前位置右移到 50;
    80 比当前位置右边的 NIL 小,所以把当前位置移到下一层的 50;
    80 比当前位置右边的 70 大,所以把当前位置右移到 70;
    80 比当前位置右边的 NIL 小,所以把当前位置移到下一层的 70;(当前位置已到达底层)
    之后用 80 不断与右边的节点比较大小,右移至合适的位置插入 80 节点。(底层插入完毕)
    接下来用随机决定是否把这个 80 提升到上面的层中,例如图中的提升概率是 50%(抛硬币),不断提升至硬币为反面为止。

    上面一段描述了 gif 中插入 80 的搜索和插入过程。那么,代码如何实现?右移和下移的逻辑很浅显,那么重点就在如何提升节点到上层的逻辑。

    阅读更多…
  • Kotlin 协程踩坑之 Redis 分布式锁

    本文不涉及技术细节,只是记录一点踩坑的过程和感受。

    背景

    项目中的两个不同的服务可能会同时修改一个资源的状态,决定使用 Redis 分布式锁。我这边的服务是用 Kotlin 写的,每当接收到一个 HTTP 请求,就会开一个协程来处理。在协程运行的过程中,会调用 Redisson 来获取一个锁,运行结束后(一般要十几秒),会释放锁。

    问题

    服务跑的时候发现,有时候,同时收到了两个请求,这两个请求会同时锁一个东西,然而它们竟然一起运行了,导致彼此之间产生了干扰。更奇怪的是,尝试复现的时候,有时候能复现,有时又不能。

    阅读更多…