When you want to limit the amount of messages an actor gets, you can use the throttler in akka-contrib. This will let you limit the max transactions per second(tps). It will queue up the surplus. Here I'll describe another way. I'll reject all the surplus messages. This has the advantage that the requester knows it's sending too much and can act on that. Both methods have their advantages. And both have limits, since they still require resources to queue or reject the messages. In Akka we can create an Actor that sends messages through to the target actor, or rejects them when it exceeds the specified tps.

object ThrottleActor {
  object OneSecondLater
  object Accepted
  object ExceededMaxTps
}
import ThrottleActor._

class ThrottleActor (target: ActorRef, maxTps: Int) extends Actor with ActorLogging {
  implicit val executionContext: ExecutionContext = context.dispatcher
  context.system.scheduler.schedule(1.second, 1.second, self, OneSecondLater)

  var messagesThisSecond: Int = 0

  def receive = {
    case OneSecondLater =>
      log.info(s"OneSecondLater ${DateTime.now} $messagesThisSecond requests.")
      messagesThisSecond = 0
    case message if messagesThisSecond >= maxTps =>
      sender ! ExceededMaxTps
      messagesThisSecond += 1
      log.info(s"ExceededMaxTps ${DateTime.now} $messagesThisSecond requests.")
    case message =>
      sender ! Accepted
      target ! message
      messagesThisSecond += 1
  }
}

This will return an Accepted or ExceededMaxTps message to the sender. In Spray we can make a directive that returns with http code 400 when you exceed the maximum.

  def throttle(maxTps: Int) = {
    def currentSecond = DateTime.now.withMillisOfSecond(0)
    var measuredSecond = currentSecond
    var messagesThisSecond: Int = 0
    println("create throttler")

    extract { ctx =>
      val s = currentSecond
      println(s"throttle $s $measuredSecond")
      if (s == measuredSecond) {
        messagesThisSecond += 1
        messagesThisSecond
      } else {
        measuredSecond = s
        messagesThisSecond = 1
        messagesThisSecond
      }
    }.flatMap[HNil] { m: Int =>
      if (m <= maxTps) pass
      else reject(ValidationRejection(s"Exceeded maximum tps! measured tps = $messagesThisSecond, only $maxTps allowed."))
    }
  }

This can then easily be used in our route.

  val apiRoute: Route =
    path("robots") {
      throttle(1) {
        get { //with get we will return our current list of robots
          log.info("Building get route")
          complete {
            log.info("Executing get route")
            //complete will return the result in an appropriate format
            //With SprayJsonSupport it knows how to marshall a List to json
            //With RobotFormat it knows how to marshall Robot
            robots
          }
        } ~ post { //With post we will add a robot
          log.info("Building post route")
          handleWith { robot: Robot =>  //handleWith will unmarshall the input
            log.info("Executing post route")
            robots = robot :: robots
            robot //handleWith will also marshall the result. Here we simply return the new robot.
          }
        }
      }
    }

And to see it in action, I quickly ran curl two times.

tammo@merkel:~$ curl 127.0.0.1:8080/robots
[{
  "name": "R2D2",
  "color": "white",
  "amountOfArms": 0
}, {
  "name": "Asimo",
  "amountOfArms": 2
}]
tammo@merkel:~$ curl 127.0.0.1:8080/robots
Exceeded maximum tps! measured tps = 2, only 1 allowed.

The whole code can be found here.

shadow-left