ravineport blog

日々の学びをつらつらと

Azure Cosmos DBでAtomic Counterを実現する

Atomic Counter

一意性を保ったカウンタのこと(そのまま)。

あるキーに対してカウントが保存されているDBを考える。このDBに保存されているデータをカウントアップすることを普通に考えるなら、キーでSELECT、そのカウントを+1してデータUPDATEするという実装になるはず。

しかし、2つのクライアントが同時にデータを読み込み、同時に書き込みを行うようなタイミングが想定されるときには不整合が起きえてしまう。例えば、testというキーに1が保存されているときに、2つのクライアントが同時にカウントアップしようとした場合は期待する結果は3となる。しかし、読み取り時にどちらのクライアントも1を読み取り、カウントアップして2を書き込むと最終結果が2になってしまう。これが一意性が保たれていない状態。Atomic Counterはこのような場合でも3が記録されることを保証する。

Azure Cosmos DB

とにかく速くて世界中にスケールできるDB(雑)。AWSDynamo DB、GCPのCloud Spannerに相当するサービス。 docs.microsoft.com

今回はDatabase id: AtomicCounterTest、Container id: Counter(Partition key: /key、Unique keys: /key)で作成した。ドキュメントはkeyとcountプロパティを持つ。

f:id:taniT91:20200517114702p:plain
今回作成したデータベースとコンテナ

Cosmos DBでAtomic Counterの実現

Stored Procedureを使ったトランザクション管理

Atomic性を実現するためにトランザクション管理をする必要がある。そのままではAtomic性が保証されないので、Stored Procedureという機能を使うことで実現する。 SDKCLIを使って登録する他、Azure Portalからも作ることもできる。

f:id:taniT91:20200517004152p:plain
Azure PortalからStored Procedureを作成

今回はspCountUpという名前でStored Procedureを作ってみた。Stored ProcedureはJavaScriptで記述する。

f:id:taniT91:20200517113248p:plain
Stored Procedureを実装してみた

実装は以下。js力に自信がないのでもっといい書き方あったら教えて下さい。

// keyのcountに+countUpNumStrして更新、なければ新規作成
function countUp(key, countUpNumStr) {
    const context = getContext();
    const container = context.getCollection();
    if (!key) throw "Error: key is null or empty. key=" + key;
    const countUpNum = Number(countUpNumStr);
    if (isNaN(countUpNum)) throw "Error: countUpNumStr is not Number. countUpNumNumStr" + countUpNumNumStr;

    // keyでSELECT
    const filterQuery =
    {
        'query' : 'SELECT * FROM Counter c where c.key = @key',
        'parameters' : [{'name':'@key', 'value':key}]
    };

    // Query実行
    const accept = container.queryDocuments(container.getSelfLink(), filterQuery, {},
        function (err, items, responseOptions) {
            if (err) throw "Failed to read document: key=" + key;

            // なければ新規作成して終了
            if (items.length != 1) {
                const document = {'key': key, 'count': countUpNum};
                const accept2 =  container.createDocument(container.getSelfLink(), document, {},
                    function (error, resource, options) {
                        if (error) throw "Error: " + error.message;
                    }
                );
                if (!accept2) throw "Failed to create document: key=" + key;
                return;
            }
            // unique key制約により1つしか存在しないことが保証されている
            let document = items[0];
            document.count += countUpNum;

            // 該当ドキュメントを更新
            const accept2 = container.replaceDocument(document._self, document,
                function (err2, replacedDocument) {
                    if (err2) throw "Error: " + err2.message;
                }
            );
            if (!accept2) throw "Failed to update document: key=" + key;
        });
    if (!accept) throw "Failed to read document: key=" + key;
    return;
}

このStored Procedureはkey(更新したいkey。String)とcountUpNumStr(いくつカウントアップするか。String)という2つの入力パラメータを受け取る。keyでドキュメントを検索して、なければcountがcountUpNumStrのドキュメントを新規作成、あればそのcountを+countUpNumStrするという実装になっている。各処理で失敗したときはエラーを返す。context.getResponse().setBody()を使えばレスポンスボディも変更できる。

入力パラメータは常にStringで渡ってくる。

Azure Portalから実行

作成したStored Procedure、spCountUpをAzure Portal上からテストすることができる。

f:id:taniT91:20200517120652p:plain
Azure Portalから実行

Stored Procedureを実行するときは、Stored Procedure内で定義した引数の他にPartition keyを指定する必要があるので注意。1つのStored Procedureで複数のPartition keyを参照することはできないらしい。図の例ではtestというkeyをもつドキュメントのcountを+3する。

Java SDKから実行

次にJava SDK経由で実行してみる。Scalaが好きなのでScalaで。

package cosmos_db

import com.microsoft.azure.documentdb._
import scala.concurrent.{ ExecutionContext, Future }

class CosmosDB {
  private val cosmosDBClient = new DocumentClient(
    "DATABASE_URI",
    "MASTER_KEY",
    connectionPolicy,
    ConsistencyLevel.Session
  )

  private val connectionPolicy: ConnectionPolicy = {
    val retryOptions = new RetryOptions
    retryOptions.setMaxRetryAttemptsOnThrottledRequests(5)
    retryOptions.setMaxRetryWaitTimeInSeconds(30)

    val policy = new ConnectionPolicy
    policy.setConnectionMode(ConnectionMode.DirectHttps)
    policy.setMaxPoolSize(100)
    policy.setIdleConnectionTimeout(60)
    policy.setRetryOptions(retryOptions)
    policy
  }

  def executeStoredProcedure(storedProcedureLink: String, partitionKeyValue: String, args: Seq[String])(
    implicit ec: ExecutionContext
  ): Future[Unit] = {
    val requestOptions = new RequestOptions
    requestOptions.setPartitionKey(new PartitionKey(partitionKeyValue))
    Future(
      cosmosDBClient.executeStoredProcedure(storedProcedureLink, requestOptions, args.toArray)
    ).map { _ =>
      ()
    }.recoverWith {
      case e: Throwable =>
        Future.failed(
          new RuntimeException(
            s"failed to execute stored procedure($partitionKeyValue). arg: ${args.toString()}. message: ${e.getMessage}"
          )
        )
    }
  }
}

マルチリージョンの場合は一貫性レベル(コードのConsistencyLevel)をStrongにする必要があるかも。

使うときはこんな感じ。

import scala.concurrent.ExecutionContext.Implicits.global

object CosmosDBTest {
  def main(args: Array[String]): Unit = {
    val cosmosDB = new CosmosDB
    val spPath = "dbs/AtomicCounterTest/colls/Counter/sprocs/spCountUp"
    cosmosDB.executeStoredProcedure(spPath, "testKey", Seq("testKey", "3"))
  }
}

まとめ

Azure Cosmos DBでAtomic Counterを実現するためにStored Procedureを使った。Cosmos DBでトランザクション管理をするときには是非。

参考

ストアドプロシージャ、トリガー、および UDF を Azure Cosmos DB に記述する | Microsoft Docs

Azure Cosmos DB の JavaScript クエリ API を使用してストアドプロシージャとトリガーを記述する | Microsoft Docs

Akkaを使ってScalaのFutureにタイムアウトメソッドを生やす(あと注意点)

ScalaのFutureにはタイムアウトを設定するメソッドはない。
Await.result で設定時間待つことができるが、失敗した場合は例外がスローされてしまうのでTryなりで扱う必要があるし、なによりブロッキングするのでスレッドを無駄遣いしてしまっている。
そこで akka.pattern.after メソッドを使う。

akka.pattern.afterを使ったFutureのタイムアウト設定

こちらを参考にした stackoverflow.com

import akka.actor.ActorSystem

import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future, TimeoutException }

object Main {
  implicit class FutureHelper[T](val f: Future[T]) {
    def withTimeout(timeout: FiniteDuration)(implicit ec: ExecutionContext, system: ActorSystem): Future[T] = {
      val timeoutF = akka.pattern.after(timeout, system.scheduler) {
        Future.failed(new TimeoutException("timeout occurred!"))
      }
      Future.firstCompletedOf(Seq(f, timeoutF))
    }
  }
}

implicit classFuturewithTimeout というメソッドを生やしている。
akka.pattern.after メソッドを使って timeout 時間経つとFutureが失敗し TimeoutException が発生する。で、Future.firstCompletedOf メソッドで先に完了した方を結果として返すことでFutureのタイムアウト設定を実現している。

使い方はこんな感じ

import scala.concurrent.ExecutionContext.Implicits.global

implicit val actorSystem: ActorSystem = ActorSystem("ActorSystem")

// 1秒後にタイムアウトするFuture
val future1 = Future {
  println("1st future start at " + Thread.currentThread().getId)
  Thread.sleep(3000) // 3秒スリープ
  true
}.withTimeout(1.second)

future1 onComplete {
  case Success(value) => println("future1 done: " + value)
  case Failure(exception) => println("future1 failed: " + exception)
}

結果

1st future start at 17
future1 failed: java.util.concurrent.TimeoutException: timeout occurred!

ちゃんとタイムアウトしているヽ(´ー`)ノ

注意点

implicitで渡すExecutionContextのスレッドプールが枯渇していたときは期待する動作をしない。

implicit val actorSystem: ActorSystem = ActorSystem("ActorSystem")
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) // スレッド数1のスレッドプール作成

// 1秒後にタイムアウトするはずのFuture
val future1 = Future {
  println("1st future start at " + Thread.currentThread().getId)
  Thread.sleep(3000) // 3秒スリープ
  true
}.withTimeout(1.second)

future1 onComplete {
  case Success(value) => println("future1 done: " + value)
  case Failure(exception) => println("future1 failed: " + exception)
}

結果

1st future start at 17
future1 done: true

Why?

3秒スリープする Future が生成され、スレッドを取得して実行

withTimeout メソッド内の timeoutF はスレッドを取得できず実行待ち(今回作ったExecutionContextはスレッドプールのサイズが1のため)

timeoutF がスレッドを取得できるのは3秒スリープする Future が完了してスレッドがプールに戻されてから

future1 が完了してしまう

今回は極端な例だが実際のアプリケーションでも起こりうる(というか起こってしまった)ので気をつけたい。

Gatlingのexec { session => ???}, jsonPathではまったこと

Gatlingとは?

https://gatling.io/

負荷テストのためのツール。Scalaで書くことができる。うれしい。

ここではGatlingを書いていくなかではまったことを書いていく。 

1. exec { session => ???} ではリクエストは投げられない

execメソッドの基本的な使い方は以下の通り。

exec {
  http("テストリクエスト")
    .get("/users") // /usersにGETリクエストを投げる
}

しかし、以下の場合は実際に/usersにリクエストは投げられない。

exec { session =>
  http("テストリクエスト")
    .get("/users") // 実行時にリクエストが投げられない!
 session // Session型を返すようにしないとコンパイルエラー
}

execにsessionを受け取る関数を渡す場合は、リクエストを投げる処理は書かないようにする。

なぜこのようなことをしようとしたかというと、http("テストリクエスト").get("/users")を関数にしてかつSessionに保存された値をリクエストパラメータとして使おうとしたため。 次のようなことをしようとした。

def usersRequest(session: Session): HttpRequestBuilder = {
  http("テストリクエスト")
    .get("/users")
    .queryParam("id", session.get("userId"))
}

exec { session =>
  usersRequest(session) // リクエストは投げられない
  session
}

この場合は以下のようにすればよい

def usersRequest(): HttpRequestBuilder = {
  http("テストリクエスト")
    .get("/users")
    .queryParam("id", "${userId}")
}

exec {
  usersRequest(session)
}

sessionを渡していないのにどこかに保存されているのであろうsessionの内容にアクセスできてしまうことに個人的には気持ち悪さを感じるが、そこはGatlingのお作法ということで。

2. レスポンスJSONからjsonPathメソッドを使って値を取り出すときは、一度Sessionに保存すること

レスポンスJSONが期待するものかどうかを確認するには以下のように書く。

http("request")
...
.check(jsonPath("$..result").exists) // 返ってきたJSONが{"result": ...}であるかどうかチェック

resultキーに対する値(Int型)を取り出してなにかしら処理をしたい場合に以下のように書いたがうまく値を取り出せなかった。

http("request")
...
.check {
  val result = jsonPath("$..result").ofType[Int] // resultに期待した値が入らない
}

値を取り出したい場合は一度Sessionに保存することでうまくいった。

http("request")
...
.check {
  jsonPath("$..result").ofType[Int].find.saveAs("result")
}

// ↑のあと次のように取り出せる
exec { session =>
  val result = session("result").as[Int]
}

がんばればjsonPathから値を取り出すこともやろうと思えばできるらしい。

参考:scala - How do I retrieve a value from a JSON string using gatling JsonPath.query? - Stack Overflow

import com.fasterxml.jackson.databind.ObjectMapper

http("request")
  .check {
    val json = """{"id":"derp"}"""
    val parsed = new ObjectMapper().readValue(json, classOf[Object])
    
    // to retrieve the ids as a list:
    val ids = JsonPath.query("$.id", parsed).right.get.toList
    
    // the only "id" result of the query
    val id = ids.head
}