3年前 (2021-10-29)  相关技术 |   抢沙发  443 
文章评分 1 次,平均分 5.0
[收起] 文章目录

最近,我有机会通过在FlowWithContext中添加未提供的节流操作符为Akka做出贡献,它可以在流中承载上下文而不必关心它,就像Kafka offset一样。这是学习Akka Streams的一些实现细节的一个很好的机会,下面是我学到的。

在我们开始之前,为什么我们需要throttle?

有很多情况下,我们可以使用throttle,我们使用throttle的主要原因是对访问外部资源(如API)进行速率限制,因此,当流量达到峰值时,我们不会对API进行DDoS攻击,从而产生大量错误,并通过重试杀死更多错误的服务。

那么,让我们先看看如何在没有Akka流的情况下进行节流。

Akka Streams的速率限制

假设我们有一个名为processJob的方法,它访问外部资源,我们希望每秒只调用一次API。

for (i <- 1 to count) yield {
  Thread.sleep(1000)
  processJob(i)
}

我们很容易看出这个解决方案的问题所在,

  • 它将阻塞当前线程并浪费计算资源。
  • 它不够精确,因为我们不知道这项工作需要多长时间。

Akka Streams中的throttle

在Akka Streams中,我们可以在它前面加一个throttle。

Source.tick(0 milliseconds, 10 milliseconds, ())
    .throttle(elements = 1, per = 1 second, maximumBurst = 1, mode = ThrottleMode.shaping)
    .map(i => processJob(i))
    .runWith(Sink.foreach(println))

让我在这里补充解释一下,

throttle可以通过每个特定持续时间的消息量、每条消息的成本和突发性来限制速率(稍后解释)

它有两种策略。

  • 成形:在发出消息之前暂停,以满足调节速率
  • 强制执行:当上游速度快于节流速度时,异常情况下失败

它的内部工作原理

在了解了如何使用它之后,是时候开始有趣的部分了,阅读实现代码。

// ...
private val tokenBucket = new NanoTimeTokenBucket(effectiveMaximumBurst, nanosBetweenTokens)
// ...
override def onPush(): Unit = {
  val elem = grab(in)
  val cost = costCalculation(elem)
  val delayNanos = tokenBucket.offer(cost)

  if (delayNanos == 0L) push(out, elem)
  else {
    if (enforcing) failStage(new RateExceededException("Maximum throttle throughput exceeded."))
    else {
      currentElement = elem
      scheduleOnce(timerName, delayNanos.nanos)
    }
  }
}
// ...
override protected def onTimer(key: Any): Unit = {
  push(out, currentElement)
  currentElement = null.asInstanceOf[T]
  if (willStop) completeStage()
}

当输入端口现在有新元素时,将调用onPush()。现在可以在端口上使用grab(in)和/或调用 pull(in)来获取该元素,以请求下一个元素。

在这里,我们可以看到逻辑从创建一个具有特定maxBurstnanoBetweentOkens的令牌桶开始,该令牌桶由per.toNanos/cost计算。

然后,当新消息传入时,Throttle类通过向tokenBucket提供消息的开销来计算延迟消息所需的时间。

最后,它将决定如果不需要延迟,是将消息推送到下一阶段,还是在延迟后安排推送。

令牌桶算法

通过throttle的scaladoc,我们可能知道:

Throttle实现令牌桶模型。存在一个具有给定令牌容量(突发大小或最大突发)的存储桶。代币以给定的速率落入桶中,可以“备用”以供以后使用,直到桶容量达到一定程度,以允许一些突发性。

那么,什么是令牌桶模型,我们可以在维基百科中找到它:

令牌桶是分组交换计算机网络和电信网络中使用的一种算法。在流量整形中,数据包被延迟,直到它们一致。

以下是源代码:

def offer(cost: Long): Long = {
  if (cost < 0) throw new IllegalArgumentException("Cost must be non-negative")

  val now = currentTime
  val timeElapsed = now - lastUpdate

  val tokensArrived =
    // Was there even a tick since last time?
    if (timeElapsed >= nanosBetweenTokens) {
      // only one tick elapsed
      if (timeElapsed < nanosBetweenTokens * 2) {
        lastUpdate += nanosBetweenTokens
        1
      } else {
        // Ok, no choice, do the slow integer division
        val tokensArrived = timeElapsed / nanosBetweenTokens
        lastUpdate += tokensArrived * nanosBetweenTokens
        tokensArrived
      }
    } else 0

  availableTokens = math.min(availableTokens + tokensArrived, capacity)

  if (cost <= availableTokens) {
    availableTokens -= cost
    0
  } else {
    val remainingCost = cost - availableTokens
    // Tokens always arrive at exact multiples of the token generation period, we must account for that
    val timeSinceTokenArrival = now - lastUpdate
    val delay = remainingCost * nanosBetweenTokens - timeSinceTokenArrival
    availableTokens = 0
    lastUpdate = now + delay
    delay
  }
}

代码非常简单,我们可以从代码中看到:

  • TokenBucket不会存储实际的消息,即使它被称为“Bucket”,它只是计算每个传入消息的延迟
  • Nanobetweentoken将用于计算滴答声和延迟
  • 容量将决定有多少消息可以毫不延迟地继续,或者需要等待多少个滴答声

散列轮计时器算法

另一个有趣的部分是它如何在不阻塞工作线程的情况下实现调度器。

我们可以通过深入了解scheduleOnce来了解它。

此调度器实现基于桶的旋转轮,如Netty的HashedWheelTimer,它以固定的滴答频率前进,并将在当前桶中找到的任务分派到各自的执行上下文。

@volatile private var timerThread: Thread = threadFactory.newThread(new Runnable {

    var tick = startTick
    var totalTick: Long = tick // tick count that doesn't wrap around, used for calculating sleep time
    val wheel = Array.fill(WheelSize)(new TaskQueue)

    private def clearAll(): immutable.Seq[TimerTask] = {
      @tailrec def collect(q: TaskQueue, acc: Vector[TimerTask]): Vector[TimerTask] = {
        q.poll() match {
          case null => acc
          case x    => collect(q, acc :+ x)
        }
      }
      (0 until WheelSize).flatMap(i => collect(wheel(i), Vector.empty)) ++ collect(queue, Vector.empty)
    }

    @tailrec
    private def checkQueue(time: Long): Unit = queue.pollNode() match {
      case null => ()
      case node =>
        node.value.ticks match {
          case 0 => node.value.executeTask()
          case ticks =>
            val futureTick = ((
              time - start + // calculate the nanos since timer start
              (ticks * tickNanos) + // adding the desired delay
              tickNanos - 1 // rounding up
            ) / tickNanos).toInt // and converting to slot number
            // tick is an Int that will wrap around, but toInt of futureTick gives us modulo operations
            // and the difference (offset) will be correct in any case
            val offset = futureTick - tick
            val bucket = futureTick & wheelMask
            node.value.ticks = offset
            wheel(bucket).addNode(node)
        }
        checkQueue(time)
    }

    override final def run(): Unit =
      try nextTick()
      catch {
        case t: Throwable =>
          log.error(t, "exception on LARS’ timer thread")
          stopped.get match {
            case null =>
              val thread = threadFactory.newThread(this)
              log.info("starting new LARS thread")
              try thread.start()
              catch {
                case e: Throwable =>
                  log.error(e, "LARS cannot start new thread, ship’s going down!")
                  stopped.set(Promise.successful(Nil))
                  clearAll()
              }
              timerThread = thread
            case p =>
              assert(stopped.compareAndSet(p, Promise.successful(Nil)), "Stop signal violated in LARS")
              p.success(clearAll())
          }
          throw t
      }

    @tailrec final def nextTick(): Unit = {
      val time = clock()
      val sleepTime = start + (totalTick * tickNanos) - time

      if (sleepTime > 0) {
        // check the queue before taking a nap
        checkQueue(time)
        waitNanos(sleepTime)
      } else {
        val bucket = tick & wheelMask
        val tasks = wheel(bucket)
        val putBack = new TaskQueue

        @tailrec def executeBucket(): Unit = tasks.pollNode() match {
          case null => ()
          case node =>
            val task = node.value
            if (!task.isCancelled) {
              if (task.ticks >= WheelSize) {
                task.ticks -= WheelSize
                putBack.addNode(node)
              } else task.executeTask()
            }
            executeBucket()
        }
        executeBucket()
        wheel(bucket) = putBack

        tick += 1
        totalTick += 1
      }
      stopped.get match {
        case null => nextTick()
        case p =>
          assert(stopped.compareAndSet(p, Promise.successful(Nil)), "Stop signal violated in LARS")
          p.success(clearAll())
      }
    }
  })

  timerThread.start()
}

代码很长,但我们仍然可以知道重要的部分是:

  • 将创建一个singleton TimerRead来管理滴答声
  • TaskHolder包含可运行任务和ExecutionContext,将由TimerRead执行,因此实际任务在特定ExecutionContext中执行。

结论

有一种称为令牌桶的算法用于速率限制(流量整形)

还有另一种称为hashwheel的算法用于调度

原文地址:https://xoyo24.me/2020/06/08/rate-limit-throttle-in-akka-streams/

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2485.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册