Throttling in Akka and Spray
When you want to limit the amount of messages an actor gets, you can use the throttler in akka-contrib. This will let you limit the max transactions per second(tps). It will queue up the surplus. Here I'll describe another way. I'll reject all the surplus messages. This has the advantage that the requester knows it's sending too much and can act on that. Both methods have their advantages. And both have limits, since they still require resources to queue or reject the messages. In Akka we can create an Actor that sends messages through to the target actor, or rejects them when it exceeds the specified tps.
object ThrottleActor {
object OneSecondLater
object Accepted
object ExceededMaxTps
}
import ThrottleActor._
class ThrottleActor (target: ActorRef, maxTps: Int) extends Actor with ActorLogging {
implicit val executionContext: ExecutionContext = context.dispatcher
context.system.scheduler.schedule(1.second, 1.second, self, OneSecondLater)
var messagesThisSecond: Int = 0
def receive = {
case OneSecondLater =>
log.info(s"OneSecondLater ${DateTime.now} $messagesThisSecond requests.")
messagesThisSecond = 0
case message if messagesThisSecond >= maxTps =>
sender ! ExceededMaxTps
messagesThisSecond += 1
log.info(s"ExceededMaxTps ${DateTime.now} $messagesThisSecond requests.")
case message =>
sender ! Accepted
target ! message
messagesThisSecond += 1
}
}
This will return an Accepted or ExceededMaxTps message to the sender. In Spray we can make a directive that returns with http code 400 when you exceed the maximum.
def throttle(maxTps: Int) = {
def currentSecond = DateTime.now.withMillisOfSecond(0)
var measuredSecond = currentSecond
var messagesThisSecond: Int = 0
println("create throttler")
extract { ctx =>
val s = currentSecond
println(s"throttle $s $measuredSecond")
if (s == measuredSecond) {
messagesThisSecond += 1
messagesThisSecond
} else {
measuredSecond = s
messagesThisSecond = 1
messagesThisSecond
}
}.flatMap[HNil] { m: Int =>
if (m <= maxTps) pass
else reject(ValidationRejection(s"Exceeded maximum tps! measured tps = $messagesThisSecond, only $maxTps allowed."))
}
}
This can then easily be used in our route.
val apiRoute: Route =
path("robots") {
throttle(1) {
get { //with get we will return our current list of robots
log.info("Building get route")
complete {
log.info("Executing get route")
//complete will return the result in an appropriate format
//With SprayJsonSupport it knows how to marshall a List to json
//With RobotFormat it knows how to marshall Robot
robots
}
} ~ post { //With post we will add a robot
log.info("Building post route")
handleWith { robot: Robot => //handleWith will unmarshall the input
log.info("Executing post route")
robots = robot :: robots
robot //handleWith will also marshall the result. Here we simply return the new robot.
}
}
}
}
And to see it in action, I quickly ran curl two times.
tammo@merkel:~$ curl 127.0.0.1:8080/robots
[{
"name": "R2D2",
"color": "white",
"amountOfArms": 0
}, {
"name": "Asimo",
"amountOfArms": 2
}]
tammo@merkel:~$ curl 127.0.0.1:8080/robots
Exceeded maximum tps! measured tps = 2, only 1 allowed.
The whole code can be found here.