最近,我有机会通过在FlowWithContext中添加未提供的节流操作符为Akka做出贡献,它可以在流中承载上下文而不必关心它,就像Kafka offset一样。这是学习Akka Streams的一些实现细节的一个很好的机会,下面是我学到的。
在我们开始之前,为什么我们需要throttle?
有很多情况下,我们可以使用throttle,我们使用throttle的主要原因是对访问外部资源(如API)进行速率限制,因此,当流量达到峰值时,我们不会对API进行DDoS攻击,从而产生大量错误,并通过重试杀死更多错误的服务。
那么,让我们先看看如何在没有Akka流的情况下进行节流。
无Akka Streams的速率限制
假设我们有一个名为processJob的方法,它访问外部资源,我们希望每秒只调用一次API。
for (i <- 1 to count) yield {
  Thread.sleep(1000)
  processJob(i)
}我们很容易看出这个解决方案的问题所在,
- 它将阻塞当前线程并浪费计算资源。
- 它不够精确,因为我们不知道这项工作需要多长时间。
Akka Streams中的throttle
在Akka Streams中,我们可以在它前面加一个throttle。
Source.tick(0 milliseconds, 10 milliseconds, ())
    .throttle(elements = 1, per = 1 second, maximumBurst = 1, mode = ThrottleMode.shaping)
    .map(i => processJob(i))
    .runWith(Sink.foreach(println))让我在这里补充解释一下,
throttle可以通过每个特定持续时间的消息量、每条消息的成本和突发性来限制速率(稍后解释)
它有两种策略。
- 成形:在发出消息之前暂停,以满足调节速率
- 强制执行:当上游速度快于节流速度时,异常情况下失败
它的内部工作原理
在了解了如何使用它之后,是时候开始有趣的部分了,阅读实现代码。
// ...
private val tokenBucket = new NanoTimeTokenBucket(effectiveMaximumBurst, nanosBetweenTokens)
// ...
override def onPush(): Unit = {
  val elem = grab(in)
  val cost = costCalculation(elem)
  val delayNanos = tokenBucket.offer(cost)
  if (delayNanos == 0L) push(out, elem)
  else {
    if (enforcing) failStage(new RateExceededException("Maximum throttle throughput exceeded."))
    else {
      currentElement = elem
      scheduleOnce(timerName, delayNanos.nanos)
    }
  }
}
// ...
override protected def onTimer(key: Any): Unit = {
  push(out, currentElement)
  currentElement = null.asInstanceOf[T]
  if (willStop) completeStage()
}当输入端口现在有新元素时,将调用onPush()。现在可以在端口上使用grab(in)和/或调用 pull(in)来获取该元素,以请求下一个元素。
在这里,我们可以看到逻辑从创建一个具有特定maxBurst和nanoBetweentOkens的令牌桶开始,该令牌桶由per.toNanos/cost计算。
然后,当新消息传入时,Throttle类通过向tokenBucket提供消息的开销来计算延迟消息所需的时间。
最后,它将决定如果不需要延迟,是将消息推送到下一阶段,还是在延迟后安排推送。
令牌桶算法
通过throttle的scaladoc,我们可能知道:
Throttle实现令牌桶模型。存在一个具有给定令牌容量(突发大小或最大突发)的存储桶。代币以给定的速率落入桶中,可以“备用”以供以后使用,直到桶容量达到一定程度,以允许一些突发性。
那么,什么是令牌桶模型,我们可以在维基百科中找到它:
令牌桶是分组交换计算机网络和电信网络中使用的一种算法。在流量整形中,数据包被延迟,直到它们一致。
以下是源代码:
def offer(cost: Long): Long = {
  if (cost < 0) throw new IllegalArgumentException("Cost must be non-negative")
  val now = currentTime
  val timeElapsed = now - lastUpdate
  val tokensArrived =
    // Was there even a tick since last time?
    if (timeElapsed >= nanosBetweenTokens) {
      // only one tick elapsed
      if (timeElapsed < nanosBetweenTokens * 2) {
        lastUpdate += nanosBetweenTokens
        1
      } else {
        // Ok, no choice, do the slow integer division
        val tokensArrived = timeElapsed / nanosBetweenTokens
        lastUpdate += tokensArrived * nanosBetweenTokens
        tokensArrived
      }
    } else 0
  availableTokens = math.min(availableTokens + tokensArrived, capacity)
  if (cost <= availableTokens) {
    availableTokens -= cost
    0
  } else {
    val remainingCost = cost - availableTokens
    // Tokens always arrive at exact multiples of the token generation period, we must account for that
    val timeSinceTokenArrival = now - lastUpdate
    val delay = remainingCost * nanosBetweenTokens - timeSinceTokenArrival
    availableTokens = 0
    lastUpdate = now + delay
    delay
  }
}代码非常简单,我们可以从代码中看到:
- TokenBucket不会存储实际的消息,即使它被称为“Bucket”,它只是计算每个传入消息的延迟
- Nanobetweentoken将用于计算滴答声和延迟
- 容量将决定有多少消息可以毫不延迟地继续,或者需要等待多少个滴答声
散列轮计时器算法
另一个有趣的部分是它如何在不阻塞工作线程的情况下实现调度器。
我们可以通过深入了解scheduleOnce来了解它。
此调度器实现基于桶的旋转轮,如Netty的HashedWheelTimer,它以固定的滴答频率前进,并将在当前桶中找到的任务分派到各自的执行上下文。
@volatile private var timerThread: Thread = threadFactory.newThread(new Runnable {
    var tick = startTick
    var totalTick: Long = tick // tick count that doesn't wrap around, used for calculating sleep time
    val wheel = Array.fill(WheelSize)(new TaskQueue)
    private def clearAll(): immutable.Seq[TimerTask] = {
      @tailrec def collect(q: TaskQueue, acc: Vector[TimerTask]): Vector[TimerTask] = {
        q.poll() match {
          case null => acc
          case x    => collect(q, acc :+ x)
        }
      }
      (0 until WheelSize).flatMap(i => collect(wheel(i), Vector.empty)) ++ collect(queue, Vector.empty)
    }
    @tailrec
    private def checkQueue(time: Long): Unit = queue.pollNode() match {
      case null => ()
      case node =>
        node.value.ticks match {
          case 0 => node.value.executeTask()
          case ticks =>
            val futureTick = ((
              time - start + // calculate the nanos since timer start
              (ticks * tickNanos) + // adding the desired delay
              tickNanos - 1 // rounding up
            ) / tickNanos).toInt // and converting to slot number
            // tick is an Int that will wrap around, but toInt of futureTick gives us modulo operations
            // and the difference (offset) will be correct in any case
            val offset = futureTick - tick
            val bucket = futureTick & wheelMask
            node.value.ticks = offset
            wheel(bucket).addNode(node)
        }
        checkQueue(time)
    }
    override final def run(): Unit =
      try nextTick()
      catch {
        case t: Throwable =>
          log.error(t, "exception on LARS’ timer thread")
          stopped.get match {
            case null =>
              val thread = threadFactory.newThread(this)
              log.info("starting new LARS thread")
              try thread.start()
              catch {
                case e: Throwable =>
                  log.error(e, "LARS cannot start new thread, ship’s going down!")
                  stopped.set(Promise.successful(Nil))
                  clearAll()
              }
              timerThread = thread
            case p =>
              assert(stopped.compareAndSet(p, Promise.successful(Nil)), "Stop signal violated in LARS")
              p.success(clearAll())
          }
          throw t
      }
    @tailrec final def nextTick(): Unit = {
      val time = clock()
      val sleepTime = start + (totalTick * tickNanos) - time
      if (sleepTime > 0) {
        // check the queue before taking a nap
        checkQueue(time)
        waitNanos(sleepTime)
      } else {
        val bucket = tick & wheelMask
        val tasks = wheel(bucket)
        val putBack = new TaskQueue
        @tailrec def executeBucket(): Unit = tasks.pollNode() match {
          case null => ()
          case node =>
            val task = node.value
            if (!task.isCancelled) {
              if (task.ticks >= WheelSize) {
                task.ticks -= WheelSize
                putBack.addNode(node)
              } else task.executeTask()
            }
            executeBucket()
        }
        executeBucket()
        wheel(bucket) = putBack
        tick += 1
        totalTick += 1
      }
      stopped.get match {
        case null => nextTick()
        case p =>
          assert(stopped.compareAndSet(p, Promise.successful(Nil)), "Stop signal violated in LARS")
          p.success(clearAll())
      }
    }
  })
  timerThread.start()
}代码很长,但我们仍然可以知道重要的部分是:
- 将创建一个singleton TimerRead来管理滴答声
- TaskHolder包含可运行任务和- ExecutionContext,将由- TimerRead执行,因此实际任务在特定- ExecutionContext中执行。
结论
有一种称为令牌桶的算法用于速率限制(流量整形)
还有另一种称为hashwheel的算法用于调度
原文地址:https://xoyo24.me/2020/06/08/rate-limit-throttle-in-akka-streams/
除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2485.html

 
								 
								 
								 
								 在这个努力程度如此低下的时代,还轮不到比拼天赋。静下心来,just do it
在这个努力程度如此低下的时代,还轮不到比拼天赋。静下心来,just do it

暂无评论