2年前 (2022-08-15)  相关技术 |   抢沙发  1288 
文章评分 0 次,平均分 0.0
[收起] 文章目录

本篇文章以直观的方式解释结构化并发,并展示它如何在实践中简化并发程序!

什么是Structured-Concurrency结构化并发?

结构化并发是一个强大的概念,了解它对于充分利用协程的强大功能非常重要。

结构化并发

为了理解某件事并围绕该主题建立良好的直觉,从它试图首先解决的问题开始通常是有帮助的。因此,让我们考虑一个不使用结构化并发的简单示例程序,看看会出现什么问题。

下面的代码创建了三个并发任务,它们在后台单独的线程中运行。每个任务的任务是使用fetchData函数从远程服务中提取一组数据,并将结果数添加到全局sum

val sum = AtomicInteger(0)

// Create 3 concurrent tasks that compute the total sum in parallel
repeat(3) {
    CompletableFuture.runAsync {
        val data: Int = fetchData()
        sum.addAndGet(data)
    }
}

// Do something useful in the meantime while the sum is being computed
Thread.sleep(2000)

// And then use the sum.
println("The final sum is: $sum")

你应该已经在上面的代码片段中发现了一些问题:

我们从不等待创建的任务(futures)完成工作后才使用最终总和sum。在某些情况下,fetchData调用可能需要比预期更长的时间,我们最终在不知不觉中使用了一个不完整的sum

如果其中一个期货失败了怎么办?由于网络问题,fetchData()调用可能引发异常。future的失败将被忽视,永远不会传播到外界。换句话说,我们可以泄漏异常!再次导致我们的sum在不知不觉中不正确。

最后但并非最不重要的一点:如果其中一个future无限期地陷入困境,永远不会结束,那该怎么办?假设fetchData调用由于死锁或实现中的错误而挂起,并且永远不会返回。在这种情况下,我们的未来和承载它的线程将永远在后台运行,消耗不必要的资源。我们可能永远不会意识到这一点,也无法关闭它。换句话说,我们可以泄漏线程

我们可以尝试通过添加一些样板代码来修复上述问题,以跟踪列表中的future,等待它们逐一完成,并转发将抛出的任何异常。

val sum = AtomicInteger(0)

// A list to keep track of the created futures
val futures = ArrayList<Future<Void>>()

repeat(3) {
    val future = CompletableFuture.runAsync {
        val data: Int = fetchData()
        sum.addAndGet(data)
    }

    // Add the future to the list
    futures.add(future)
}

// Do something useful in the meantime in parallel
Thread.sleep(2000)

// Wait for all the futures and propagate exceptions
for (future in futures) {
    future.get()
}

// And then use the sum.
println("The final sum is: $sum")

我们在这里所做的实际上是“使我们的并发更加结构化”:我们没有在空中启动后台futures并忽略它们,而是在列表中跟踪它们并等待它们完成。这正是我们所说的结构化并发的意思:并发路径的开始和结束都是明确的

然而,这个样板代码不仅编写起来很麻烦,而且还远远不够完美:

如果for循环遇到的第一个future抛出异常怎么办?在这种情况下,for循环停止迭代并将异常传播到外部,而不等待剩余的futures。因此,如果其中一个线程挂起或需要不合理的长时间才能完成,我们仍然可能会泄漏线程!为了解决这种边缘情况,我们需要捕获异常并在将错误传播到外部之前取消仍在运行的剩余futures。这需要比我们编写的更复杂的样板代码。

我们没有处理用户取消。如果我们想让用户在中间取消求和计算,该怎么办?这样做需要更多的样板代码来监听取消信号,并将它们一个接一个地传播到我们推出的期货中。

总之,正确构造并发是复杂的,需要大量的样板代码才能使其在所有情况下都没有bug并100%安全。

好消息是:结构化并发已融入到协程库的设计中,我们不必编写任何样板代码来使用它并利用它!让我们看看这个!

协程的结构化并发

让我们使用协程重写代码,看看它是什么样子:

suspend fun distributedSum() {
    val sum = AtomicInteger(0)

    // The coroutine scope acts like a parent that keeps track of
    // all the child coroutines created inside it
    coroutineScope {
        // Create 3 coroutines that compute the total sum concurrently
        repeat(3) {
            launch {
                val data: Int = fetchDataAsync()
                sum.addAndGet(data)
            }
        }
    }

    println("The final sum is: $sum")
}

这段简短且极其简单的代码已经摆脱了上一节中提到的所有问题:它不能泄漏异常或协程,它永远不会打印不完整的和或错误的sum,并且已经可以取消!

感觉像魔术?让我们一步一步地把它打开,以了解它是如何实现的。

coroutineScope { // this: CoroutineScope (receiver)
    ...
}

我们首先调用coroutineScope函数,并给它一个kotlin lambda,在其中我们可以使用launch创建任意数量的后台协程。我们之所以能够在这个lambda中使用launch,是因为它的receiver1是一个CoroutineScope实例,其中launch被定义为一个扩展函数。

coroutineScope { // this: CoroutineScope
    repeat(3) {
        // equivelent to: this.launch
        launch {
            ...
        }
    }
}

在此块中创建的任何后台协程都作为子项附加到此协程作用域实例,该实例跟踪其完成和失败。这类似于我们试图通过手动跟踪列表中的可完成future来实现的。但CoroutineScope没有使用列表,而是使用了一种更奇特的分层数据结构,称为Job。我们将在这篇文章后面更深入地研究这个数据结构。

以下是此设置带来的好处:

  • 我们不需要自己跟踪后台协程,协程作用域本身已经这样做了,并且协程作用库调用将挂起,直到它们全部完成或失败。因此,我们确信,在这个调用之后,我们所有的后台协程都完成了;然后我们可以自信地使用计算出的总和sum
  • 如果任何后台协程因某种原因失败,则该异常将被协程作用域捕获并传播到外部。但首先,它负责取消仍在运行的其余协程2,从而确保在传播异常后,没有任何后台协程仍处于挂起状态。

我们的后台协程现在有一个由协程作用域块定义的明确的生命周期,超过这个生命周期就不可能泄漏任何协程或异常。这就是结构化并发的核心思想:“每次我们的控件分裂成多个并发路径时,我们都会确保它们再次连接起来”。因此,我们的程序立即变得更安全,更容易推理。

协同工作范围和Jobs层次结构

让我们仔细看看launch coroutine builder函数的签名:

public fun CoroutineScope.launch(...): Job

启动功能需要一个协程镜才能使用。但为什么呢?

原因是为了防止在没有父协同程实例跟踪的情况下在空中创建后台协同程。所有协程生成器函数都是这样,这就是Kotlin强制结构化并发的方式。

协程Scope实例使用称为Job的分层数据结构跟踪其子协程。您可以将作业视为一个简单的树:

  • 存储子作业实例以跟踪它们(它们本身可以有其他嵌套的子作业实例)。
  • 并且具有状态(active, completed等)。

什么是Structured-Concurrency结构化并发?

协程生成器函数(如launch)也创建自己的协程Scope实例作为其lambda块的接收器,从而可以创建嵌套的协程!这使得能够构建并发运行的后台协程的复杂层次结构,而不会泄漏其中的任何一个。

coroutineScope { // this: CoroutineScope
    launch(CoroutineName("Coroutine A")) { // this: CoroutineScope (nested CoroutineScope with its own Job)
        launch(CoroutineName("Coroutine A.1")) { ... }
        launch(CoroutineName("Coroutine A.2")) { ... }
    }

    launch(CoroutineName("Coroutine B")) { ... }
}

上面的代码将创建以下层次结构:

什么是Structured-Concurrency结构化并发?

在协程A和协程B完成之前,协程Scope调用将挂起并不会完成。另一方面,协程A在协程A.1和协程A.2完成之前不会完成。如果协程A.1由于某种原因失败,那么协程A将作为一个整体失败,将异常传播到父作业,导致协程作用域本身失败。在此过程中,仍在此层次结构中运行的任何协程都将被取消。

CoroutineScope和CoroutineContext之间的差异

一个可能会让很多Kotliners感到困惑的方面是协程文本和协程范围之间的差异。

如果您查看CoroutineScope的源代码,您将发现以下内容:

public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}

那我们为什么需要一个CoroutineScope呢?为什么不直接使用CoroutineContext呢?

CoroutineContext只是一个包含0个或多个上下文元素的数据结构。它就像一个HashMap,作为一个容器,协程及其调度器可以在任何时间点访问它,从中检索有用的信息,如调度器、协程的名称等。

另一方面,协程镜就像“协程的管理者和跟踪器”。它是可以使用定义为此类扩展方法(如launch)的协程生成器函数创建后台协程的实体。

为了能够完成创建此类协程并跟踪它们的工作,协程范围基本上需要两件事:

  • 一个作业实例,用于跟踪子协程(以强制结构化并发)。
  • 我们希望将一些可选的协程上下文元素传播到在此范围内创建的所有子协程(例如,调度程序)。

事实证明,通过具有单个coroutineContext属性可以满足上述两个要求:

coroutineContext属性可以存储作业实例,该作业实例将用作此CoroutineScope创建的子coroutines的父级。所有实例化协程作用域的函数都将创建这样一个作业并将其放在那里。

它还将保存我们希望传播到子协同程的任何附加上下文元素。

结论

结构化并发使您的并发密集型程序更安全、更容易理解。它将您从所有样板代码中解放出来,否则您将需要编写这些样板代码来同步后台任务,并确保不会泄漏任何后台任务。

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2729.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册