ravineport blog

日々の学びをつらつらと

Azure Cosmos DBでAtomic Counterを実現する

Atomic Counter

一意性を保ったカウンタのこと(そのまま)。

あるキーに対してカウントが保存されているDBを考える。このDBに保存されているデータをカウントアップすることを普通に考えるなら、キーでSELECT、そのカウントを+1してデータUPDATEするという実装になるはず。

しかし、2つのクライアントが同時にデータを読み込み、同時に書き込みを行うようなタイミングが想定されるときには不整合が起きえてしまう。例えば、testというキーに1が保存されているときに、2つのクライアントが同時にカウントアップしようとした場合は期待する結果は3となる。しかし、読み取り時にどちらのクライアントも1を読み取り、カウントアップして2を書き込むと最終結果が2になってしまう。これが一意性が保たれていない状態。Atomic Counterはこのような場合でも3が記録されることを保証する。

Azure Cosmos DB

とにかく速くて世界中にスケールできるDB(雑)。AWSDynamo DB、GCPのCloud Spannerに相当するサービス。 docs.microsoft.com

今回はDatabase id: AtomicCounterTest、Container id: Counter(Partition key: /key、Unique keys: /key)で作成した。ドキュメントはkeyとcountプロパティを持つ。

f:id:taniT91:20200517114702p:plain
今回作成したデータベースとコンテナ

Cosmos DBでAtomic Counterの実現

Stored Procedureを使ったトランザクション管理

Atomic性を実現するためにトランザクション管理をする必要がある。そのままではAtomic性が保証されないので、Stored Procedureという機能を使うことで実現する。 SDKCLIを使って登録する他、Azure Portalからも作ることもできる。

f:id:taniT91:20200517004152p:plain
Azure PortalからStored Procedureを作成

今回はspCountUpという名前でStored Procedureを作ってみた。Stored ProcedureはJavaScriptで記述する。

f:id:taniT91:20200517113248p:plain
Stored Procedureを実装してみた

実装は以下。js力に自信がないのでもっといい書き方あったら教えて下さい。

// keyのcountに+countUpNumStrして更新、なければ新規作成
function countUp(key, countUpNumStr) {
    const context = getContext();
    const container = context.getCollection();
    if (!key) throw "Error: key is null or empty. key=" + key;
    const countUpNum = Number(countUpNumStr);
    if (isNaN(countUpNum)) throw "Error: countUpNumStr is not Number. countUpNumNumStr" + countUpNumNumStr;

    // keyでSELECT
    const filterQuery =
    {
        'query' : 'SELECT * FROM Counter c where c.key = @key',
        'parameters' : [{'name':'@key', 'value':key}]
    };

    // Query実行
    const accept = container.queryDocuments(container.getSelfLink(), filterQuery, {},
        function (err, items, responseOptions) {
            if (err) throw "Failed to read document: key=" + key;

            // なければ新規作成して終了
            if (items.length != 1) {
                const document = {'key': key, 'count': countUpNum};
                const accept2 =  container.createDocument(container.getSelfLink(), document, {},
                    function (error, resource, options) {
                        if (error) throw "Error: " + error.message;
                    }
                );
                if (!accept2) throw "Failed to create document: key=" + key;
                return;
            }
            // unique key制約により1つしか存在しないことが保証されている
            let document = items[0];
            document.count += countUpNum;

            // 該当ドキュメントを更新
            const accept2 = container.replaceDocument(document._self, document,
                function (err2, replacedDocument) {
                    if (err2) throw "Error: " + err2.message;
                }
            );
            if (!accept2) throw "Failed to update document: key=" + key;
        });
    if (!accept) throw "Failed to read document: key=" + key;
    return;
}

このStored Procedureはkey(更新したいkey。String)とcountUpNumStr(いくつカウントアップするか。String)という2つの入力パラメータを受け取る。keyでドキュメントを検索して、なければcountがcountUpNumStrのドキュメントを新規作成、あればそのcountを+countUpNumStrするという実装になっている。各処理で失敗したときはエラーを返す。context.getResponse().setBody()を使えばレスポンスボディも変更できる。

入力パラメータは常にStringで渡ってくる。

Azure Portalから実行

作成したStored Procedure、spCountUpをAzure Portal上からテストすることができる。

f:id:taniT91:20200517120652p:plain
Azure Portalから実行

Stored Procedureを実行するときは、Stored Procedure内で定義した引数の他にPartition keyを指定する必要があるので注意。1つのStored Procedureで複数のPartition keyを参照することはできないらしい。図の例ではtestというkeyをもつドキュメントのcountを+3する。

Java SDKから実行

次にJava SDK経由で実行してみる。Scalaが好きなのでScalaで。

package cosmos_db

import com.microsoft.azure.documentdb._
import scala.concurrent.{ ExecutionContext, Future }

class CosmosDB {
  private val cosmosDBClient = new DocumentClient(
    "DATABASE_URI",
    "MASTER_KEY",
    connectionPolicy,
    ConsistencyLevel.Session
  )

  private val connectionPolicy: ConnectionPolicy = {
    val retryOptions = new RetryOptions
    retryOptions.setMaxRetryAttemptsOnThrottledRequests(5)
    retryOptions.setMaxRetryWaitTimeInSeconds(30)

    val policy = new ConnectionPolicy
    policy.setConnectionMode(ConnectionMode.DirectHttps)
    policy.setMaxPoolSize(100)
    policy.setIdleConnectionTimeout(60)
    policy.setRetryOptions(retryOptions)
    policy
  }

  def executeStoredProcedure(storedProcedureLink: String, partitionKeyValue: String, args: Seq[String])(
    implicit ec: ExecutionContext
  ): Future[Unit] = {
    val requestOptions = new RequestOptions
    requestOptions.setPartitionKey(new PartitionKey(partitionKeyValue))
    Future(
      cosmosDBClient.executeStoredProcedure(storedProcedureLink, requestOptions, args.toArray)
    ).map { _ =>
      ()
    }.recoverWith {
      case e: Throwable =>
        Future.failed(
          new RuntimeException(
            s"failed to execute stored procedure($partitionKeyValue). arg: ${args.toString()}. message: ${e.getMessage}"
          )
        )
    }
  }
}

マルチリージョンの場合は一貫性レベル(コードのConsistencyLevel)をStrongにする必要があるかも。

使うときはこんな感じ。

import scala.concurrent.ExecutionContext.Implicits.global

object CosmosDBTest {
  def main(args: Array[String]): Unit = {
    val cosmosDB = new CosmosDB
    val spPath = "dbs/AtomicCounterTest/colls/Counter/sprocs/spCountUp"
    cosmosDB.executeStoredProcedure(spPath, "testKey", Seq("testKey", "3"))
  }
}

まとめ

Azure Cosmos DBでAtomic Counterを実現するためにStored Procedureを使った。Cosmos DBでトランザクション管理をするときには是非。

参考

ストアドプロシージャ、トリガー、および UDF を Azure Cosmos DB に記述する | Microsoft Docs

Azure Cosmos DB の JavaScript クエリ API を使用してストアドプロシージャとトリガーを記述する | Microsoft Docs