最近,我有机会通过在FlowWithContext
中添加未提供的节流操作符为Akka做出贡献,它可以在流中承载上下文而不必关心它,就像Kafka offset一样。这是学习Akka Streams的一些实现细节的一个很好的机会,下面是我学到的。
在我们开始之前,为什么我们需要throttle?
有很多情况下,我们可以使用throttle,我们使用throttle的主要原因是对访问外部资源(如API)进行速率限制,因此,当流量达到峰值时,我们不会对API进行DDoS攻击,从而产生大量错误,并通过重试杀死更多错误的服务。
那么,让我们先看看如何在没有Akka流的情况下进行节流。
无Akka Streams的速率限制
假设我们有一个名为processJob
的方法,它访问外部资源,我们希望每秒只调用一次API。
for (i <- 1 to count) yield {
Thread.sleep(1000)
processJob(i)
}
我们很容易看出这个解决方案的问题所在,
- 它将阻塞当前线程并浪费计算资源。
- 它不够精确,因为我们不知道这项工作需要多长时间。
Akka Streams中的throttle
在Akka Streams中,我们可以在它前面加一个throttle。
Source.tick(0 milliseconds, 10 milliseconds, ())
.throttle(elements = 1, per = 1 second, maximumBurst = 1, mode = ThrottleMode.shaping)
.map(i => processJob(i))
.runWith(Sink.foreach(println))
让我在这里补充解释一下,
throttle可以通过每个特定持续时间的消息量、每条消息的成本和突发性来限制速率(稍后解释)
它有两种策略。
- 成形:在发出消息之前暂停,以满足调节速率
- 强制执行:当上游速度快于节流速度时,异常情况下失败
它的内部工作原理
在了解了如何使用它之后,是时候开始有趣的部分了,阅读实现代码。
// ...
private val tokenBucket = new NanoTimeTokenBucket(effectiveMaximumBurst, nanosBetweenTokens)
// ...
override def onPush(): Unit = {
val elem = grab(in)
val cost = costCalculation(elem)
val delayNanos = tokenBucket.offer(cost)
if (delayNanos == 0L) push(out, elem)
else {
if (enforcing) failStage(new RateExceededException("Maximum throttle throughput exceeded."))
else {
currentElement = elem
scheduleOnce(timerName, delayNanos.nanos)
}
}
}
// ...
override protected def onTimer(key: Any): Unit = {
push(out, currentElement)
currentElement = null.asInstanceOf[T]
if (willStop) completeStage()
}
当输入端口现在有新元素时,将调用onPush()
。现在可以在端口上使用grab(in)
和/或调用 pull(in)
来获取该元素,以请求下一个元素。
在这里,我们可以看到逻辑从创建一个具有特定maxBurst
和nanoBetweentOkens
的令牌桶开始,该令牌桶由per.toNanos/cost
计算。
然后,当新消息传入时,Throttle
类通过向tokenBucket
提供消息的开销来计算延迟消息所需的时间。
最后,它将决定如果不需要延迟,是将消息推送到下一阶段,还是在延迟后安排推送。
令牌桶算法
通过throttle的scaladoc,我们可能知道:
Throttle实现令牌桶模型。存在一个具有给定令牌容量(突发大小或最大突发)的存储桶。代币以给定的速率落入桶中,可以“备用”以供以后使用,直到桶容量达到一定程度,以允许一些突发性。
那么,什么是令牌桶模型,我们可以在维基百科中找到它:
令牌桶是分组交换计算机网络和电信网络中使用的一种算法。在流量整形中,数据包被延迟,直到它们一致。
以下是源代码:
def offer(cost: Long): Long = {
if (cost < 0) throw new IllegalArgumentException("Cost must be non-negative")
val now = currentTime
val timeElapsed = now - lastUpdate
val tokensArrived =
// Was there even a tick since last time?
if (timeElapsed >= nanosBetweenTokens) {
// only one tick elapsed
if (timeElapsed < nanosBetweenTokens * 2) {
lastUpdate += nanosBetweenTokens
1
} else {
// Ok, no choice, do the slow integer division
val tokensArrived = timeElapsed / nanosBetweenTokens
lastUpdate += tokensArrived * nanosBetweenTokens
tokensArrived
}
} else 0
availableTokens = math.min(availableTokens + tokensArrived, capacity)
if (cost <= availableTokens) {
availableTokens -= cost
0
} else {
val remainingCost = cost - availableTokens
// Tokens always arrive at exact multiples of the token generation period, we must account for that
val timeSinceTokenArrival = now - lastUpdate
val delay = remainingCost * nanosBetweenTokens - timeSinceTokenArrival
availableTokens = 0
lastUpdate = now + delay
delay
}
}
代码非常简单,我们可以从代码中看到:
TokenBucket
不会存储实际的消息,即使它被称为“Bucket”,它只是计算每个传入消息的延迟Nanobetweentoken
将用于计算滴答声和延迟- 容量将决定有多少消息可以毫不延迟地继续,或者需要等待多少个滴答声
散列轮计时器算法
另一个有趣的部分是它如何在不阻塞工作线程的情况下实现调度器。
我们可以通过深入了解scheduleOnce
来了解它。
此调度器实现基于桶的旋转轮,如Netty的HashedWheelTimer,它以固定的滴答频率前进,并将在当前桶中找到的任务分派到各自的执行上下文。
@volatile private var timerThread: Thread = threadFactory.newThread(new Runnable {
var tick = startTick
var totalTick: Long = tick // tick count that doesn't wrap around, used for calculating sleep time
val wheel = Array.fill(WheelSize)(new TaskQueue)
private def clearAll(): immutable.Seq[TimerTask] = {
@tailrec def collect(q: TaskQueue, acc: Vector[TimerTask]): Vector[TimerTask] = {
q.poll() match {
case null => acc
case x => collect(q, acc :+ x)
}
}
(0 until WheelSize).flatMap(i => collect(wheel(i), Vector.empty)) ++ collect(queue, Vector.empty)
}
@tailrec
private def checkQueue(time: Long): Unit = queue.pollNode() match {
case null => ()
case node =>
node.value.ticks match {
case 0 => node.value.executeTask()
case ticks =>
val futureTick = ((
time - start + // calculate the nanos since timer start
(ticks * tickNanos) + // adding the desired delay
tickNanos - 1 // rounding up
) / tickNanos).toInt // and converting to slot number
// tick is an Int that will wrap around, but toInt of futureTick gives us modulo operations
// and the difference (offset) will be correct in any case
val offset = futureTick - tick
val bucket = futureTick & wheelMask
node.value.ticks = offset
wheel(bucket).addNode(node)
}
checkQueue(time)
}
override final def run(): Unit =
try nextTick()
catch {
case t: Throwable =>
log.error(t, "exception on LARS’ timer thread")
stopped.get match {
case null =>
val thread = threadFactory.newThread(this)
log.info("starting new LARS thread")
try thread.start()
catch {
case e: Throwable =>
log.error(e, "LARS cannot start new thread, ship’s going down!")
stopped.set(Promise.successful(Nil))
clearAll()
}
timerThread = thread
case p =>
assert(stopped.compareAndSet(p, Promise.successful(Nil)), "Stop signal violated in LARS")
p.success(clearAll())
}
throw t
}
@tailrec final def nextTick(): Unit = {
val time = clock()
val sleepTime = start + (totalTick * tickNanos) - time
if (sleepTime > 0) {
// check the queue before taking a nap
checkQueue(time)
waitNanos(sleepTime)
} else {
val bucket = tick & wheelMask
val tasks = wheel(bucket)
val putBack = new TaskQueue
@tailrec def executeBucket(): Unit = tasks.pollNode() match {
case null => ()
case node =>
val task = node.value
if (!task.isCancelled) {
if (task.ticks >= WheelSize) {
task.ticks -= WheelSize
putBack.addNode(node)
} else task.executeTask()
}
executeBucket()
}
executeBucket()
wheel(bucket) = putBack
tick += 1
totalTick += 1
}
stopped.get match {
case null => nextTick()
case p =>
assert(stopped.compareAndSet(p, Promise.successful(Nil)), "Stop signal violated in LARS")
p.success(clearAll())
}
}
})
timerThread.start()
}
代码很长,但我们仍然可以知道重要的部分是:
- 将创建一个
singleton TimerRead
来管理滴答声 TaskHolder
包含可运行任务和ExecutionContext
,将由TimerRead
执行,因此实际任务在特定ExecutionContext
中执行。
结论
有一种称为令牌桶的算法用于速率限制(流量整形)
还有另一种称为hashwheel
的算法用于调度
原文地址:https://xoyo24.me/2020/06/08/rate-limit-throttle-in-akka-streams/
除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2485.html
暂无评论