Azure Cosmos DBでAtomic Counterを実現する
Atomic Counter
一意性を保ったカウンタのこと(そのまま)。
あるキーに対してカウントが保存されているDBを考える。このDBに保存されているデータをカウントアップすることを普通に考えるなら、キーでSELECT、そのカウントを+1してデータUPDATEするという実装になるはず。
しかし、2つのクライアントが同時にデータを読み込み、同時に書き込みを行うようなタイミングが想定されるときには不整合が起きえてしまう。例えば、testというキーに1が保存されているときに、2つのクライアントが同時にカウントアップしようとした場合は期待する結果は3となる。しかし、読み取り時にどちらのクライアントも1を読み取り、カウントアップして2を書き込むと最終結果が2になってしまう。これが一意性が保たれていない状態。Atomic Counterはこのような場合でも3が記録されることを保証する。
Azure Cosmos DB
とにかく速くて世界中にスケールできるDB(雑)。AWSのDynamo DB、GCPのCloud Spannerに相当するサービス。 docs.microsoft.com
今回はDatabase id: AtomicCounterTest、Container id: Counter(Partition key: /key、Unique keys: /key)で作成した。ドキュメントはkeyとcountプロパティを持つ。
Cosmos DBでAtomic Counterの実現
Stored Procedureを使ったトランザクション管理
Atomic性を実現するためにトランザクション管理をする必要がある。そのままではAtomic性が保証されないので、Stored Procedureという機能を使うことで実現する。 SDKやCLIを使って登録する他、Azure Portalからも作ることもできる。
今回はspCountUp
という名前でStored Procedureを作ってみた。Stored ProcedureはJavaScriptで記述する。
実装は以下。js力に自信がないのでもっといい書き方あったら教えて下さい。
// keyのcountに+countUpNumStrして更新、なければ新規作成 function countUp(key, countUpNumStr) { const context = getContext(); const container = context.getCollection(); if (!key) throw "Error: key is null or empty. key=" + key; const countUpNum = Number(countUpNumStr); if (isNaN(countUpNum)) throw "Error: countUpNumStr is not Number. countUpNumNumStr" + countUpNumNumStr; // keyでSELECT const filterQuery = { 'query' : 'SELECT * FROM Counter c where c.key = @key', 'parameters' : [{'name':'@key', 'value':key}] }; // Query実行 const accept = container.queryDocuments(container.getSelfLink(), filterQuery, {}, function (err, items, responseOptions) { if (err) throw "Failed to read document: key=" + key; // なければ新規作成して終了 if (items.length != 1) { const document = {'key': key, 'count': countUpNum}; const accept2 = container.createDocument(container.getSelfLink(), document, {}, function (error, resource, options) { if (error) throw "Error: " + error.message; } ); if (!accept2) throw "Failed to create document: key=" + key; return; } // unique key制約により1つしか存在しないことが保証されている let document = items[0]; document.count += countUpNum; // 該当ドキュメントを更新 const accept2 = container.replaceDocument(document._self, document, function (err2, replacedDocument) { if (err2) throw "Error: " + err2.message; } ); if (!accept2) throw "Failed to update document: key=" + key; }); if (!accept) throw "Failed to read document: key=" + key; return; }
このStored Procedureはkey(更新したいkey。String)とcountUpNumStr(いくつカウントアップするか。String)という2つの入力パラメータを受け取る。keyでドキュメントを検索して、なければcountがcountUpNumStrのドキュメントを新規作成、あればそのcountを+countUpNumStrするという実装になっている。各処理で失敗したときはエラーを返す。context.getResponse().setBody()
を使えばレスポンスボディも変更できる。
入力パラメータは常にStringで渡ってくる。
Azure Portalから実行
作成したStored Procedure、spCountUp
をAzure Portal上からテストすることができる。
Stored Procedureを実行するときは、Stored Procedure内で定義した引数の他にPartition keyを指定する必要があるので注意。1つのStored Procedureで複数のPartition keyを参照することはできないらしい。図の例ではtestというkeyをもつドキュメントのcountを+3する。
Java SDKから実行
次にJava SDK経由で実行してみる。Scalaが好きなのでScalaで。
package cosmos_db import com.microsoft.azure.documentdb._ import scala.concurrent.{ ExecutionContext, Future } class CosmosDB { private val cosmosDBClient = new DocumentClient( "DATABASE_URI", "MASTER_KEY", connectionPolicy, ConsistencyLevel.Session ) private val connectionPolicy: ConnectionPolicy = { val retryOptions = new RetryOptions retryOptions.setMaxRetryAttemptsOnThrottledRequests(5) retryOptions.setMaxRetryWaitTimeInSeconds(30) val policy = new ConnectionPolicy policy.setConnectionMode(ConnectionMode.DirectHttps) policy.setMaxPoolSize(100) policy.setIdleConnectionTimeout(60) policy.setRetryOptions(retryOptions) policy } def executeStoredProcedure(storedProcedureLink: String, partitionKeyValue: String, args: Seq[String])( implicit ec: ExecutionContext ): Future[Unit] = { val requestOptions = new RequestOptions requestOptions.setPartitionKey(new PartitionKey(partitionKeyValue)) Future( cosmosDBClient.executeStoredProcedure(storedProcedureLink, requestOptions, args.toArray) ).map { _ => () }.recoverWith { case e: Throwable => Future.failed( new RuntimeException( s"failed to execute stored procedure($partitionKeyValue). arg: ${args.toString()}. message: ${e.getMessage}" ) ) } } }
マルチリージョンの場合は一貫性レベル(コードのConsistencyLevel
)をStrongにする必要があるかも。
使うときはこんな感じ。
import scala.concurrent.ExecutionContext.Implicits.global object CosmosDBTest { def main(args: Array[String]): Unit = { val cosmosDB = new CosmosDB val spPath = "dbs/AtomicCounterTest/colls/Counter/sprocs/spCountUp" cosmosDB.executeStoredProcedure(spPath, "testKey", Seq("testKey", "3")) } }
まとめ
Azure Cosmos DBでAtomic Counterを実現するためにStored Procedureを使った。Cosmos DBでトランザクション管理をするときには是非。
参考
ストアドプロシージャ、トリガー、および UDF を Azure Cosmos DB に記述する | Microsoft Docs
Azure Cosmos DB の JavaScript クエリ API を使用してストアドプロシージャとトリガーを記述する | Microsoft Docs