ravineport blog

日々の学びをつらつらと

Akkaを使ってScalaのFutureにタイムアウトメソッドを生やす(あと注意点)

ScalaのFutureにはタイムアウトを設定するメソッドはない。
Await.result で設定時間待つことができるが、失敗した場合は例外がスローされてしまうのでTryなりで扱う必要があるし、なによりブロッキングするのでスレッドを無駄遣いしてしまっている。
そこで akka.pattern.after メソッドを使う。

akka.pattern.afterを使ったFutureのタイムアウト設定

こちらを参考にした stackoverflow.com

import akka.actor.ActorSystem

import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future, TimeoutException }

object Main {
  implicit class FutureHelper[T](val f: Future[T]) {
    def withTimeout(timeout: FiniteDuration)(implicit ec: ExecutionContext, system: ActorSystem): Future[T] = {
      val timeoutF = akka.pattern.after(timeout, system.scheduler) {
        Future.failed(new TimeoutException("timeout occurred!"))
      }
      Future.firstCompletedOf(Seq(f, timeoutF))
    }
  }
}

implicit classFuturewithTimeout というメソッドを生やしている。
akka.pattern.after メソッドを使って timeout 時間経つとFutureが失敗し TimeoutException が発生する。で、Future.firstCompletedOf メソッドで先に完了した方を結果として返すことでFutureのタイムアウト設定を実現している。

使い方はこんな感じ

import scala.concurrent.ExecutionContext.Implicits.global

implicit val actorSystem: ActorSystem = ActorSystem("ActorSystem")

// 1秒後にタイムアウトするFuture
val future1 = Future {
  println("1st future start at " + Thread.currentThread().getId)
  Thread.sleep(3000) // 3秒スリープ
  true
}.withTimeout(1.second)

future1 onComplete {
  case Success(value) => println("future1 done: " + value)
  case Failure(exception) => println("future1 failed: " + exception)
}

結果

1st future start at 17
future1 failed: java.util.concurrent.TimeoutException: timeout occurred!

ちゃんとタイムアウトしているヽ(´ー`)ノ

注意点

implicitで渡すExecutionContextのスレッドプールが枯渇していたときは期待する動作をしない。

implicit val actorSystem: ActorSystem = ActorSystem("ActorSystem")
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) // スレッド数1のスレッドプール作成

// 1秒後にタイムアウトするはずのFuture
val future1 = Future {
  println("1st future start at " + Thread.currentThread().getId)
  Thread.sleep(3000) // 3秒スリープ
  true
}.withTimeout(1.second)

future1 onComplete {
  case Success(value) => println("future1 done: " + value)
  case Failure(exception) => println("future1 failed: " + exception)
}

結果

1st future start at 17
future1 done: true

Why?

3秒スリープする Future が生成され、スレッドを取得して実行

withTimeout メソッド内の timeoutF はスレッドを取得できず実行待ち(今回作ったExecutionContextはスレッドプールのサイズが1のため)

timeoutF がスレッドを取得できるのは3秒スリープする Future が完了してスレッドがプールに戻されてから

future1 が完了してしまう

今回は極端な例だが実際のアプリケーションでも起こりうる(というか起こってしまった)ので気をつけたい。