Cosmos DB には組み込みで RDB のようにシーケンス番号を生成する機能は用意されていないので、基本的には GUID / UUID を使うことになるのですが、稀にシーケンス番号が欲しくなることがあります。
RDB というか SQL Server の場合は以下のようにシンプルに生成できるので楽ですが、Cosmos DB ではトランザクションが用意されていないので実装に工夫が必要です。
偶にあるサンプルではストアドではトランザクションが効くのを利用しているケースもありますが、今の Cosmos DB では Partial Update がサポートされているため、サーバー側で値をインクリメントすることで安全にシーケンス番号を生成できます。
Partial Update を利用するとコンフリクトもサーバー側で自動的に解決してくれるのと、解決できないコンフリクトは SDK レベルでリトライも行われるため同時実行にも強いです。
サンプルコードは全て C# で書いていきますが、Partial Update が利用可能な SDK が用意された言語であれば、同じように利用できるはずです。SDK でサポートされている機能のまとめは以下を参照してください。
まずはシーケンス番号を保存するためのコンテナーとデータモデルを作成しますが、現在のシーケンス番号だけ保持すればよいので以下のようにシンプルなモデルで十分です。
Cosmos DB では id
は必須ですが、フォーマットは指定されていないのでシーケンス名などを入れておけば、複数のシーケンスを同時に生成出来るようになります。
public class Sequence { [JsonProperty("id")] public string Id { get; set; } [JsonProperty("value")] public long Value { get; set; } }
シーケンス番号を生成する前にドキュメントは生成しておきます。これは RDB でも事前にシーケンスの準備をする必要があるのと同じで、安全に生成するためにはなければ作るという処理を避ける必要があります。
現実的には ARM Template や Terraform を使ってリソースをデプロイするタイミングや、管理サイトでのオペレーション時など確実にアトミックに生成出来るタイミングで作るのが良いです。
ドキュメントを作成しておけば、後は Partial Update を使ってドキュメント内の value
プロパティをインクリメントしていくだけです。Cosmos DB SDK を使うと以下のようなコードで実現できます。
var connectionString = "<connection_string>"; var cosmosClient = new CosmosClient(connectionString); var container = cosmosClient.GetContainer("my-database", "my-sequence"); var operations = new[] { PatchOperation.Increment("/value", 1) }; var response = await container.PatchItemAsync<Sequence>("sample", new PartitionKey("sample"), operations); Console.WriteLine($"Seq = {response.Resource.Value}");
重要なのは PatchOperation.Increment
を呼び出している部分だけです。1 以外やマイナスも指定することも出来るので、様々な用途に対応出来るようになっています。
Partial Update が無い時代には Read して返ってきた値に 1 を足し、更に Replace を Optimistic Concurrency を使って呼び出す必要があったので、エラーハンドリングやコード自体も複雑でしたが Partial Update のおかげで非常に分かりやすいコードになっています。
このコードを実行すると、以下のようにサーバー側で value
プロパティへのインクリメントが行われて、その結果が返ってくることが確認出来ます。初回実行でしたので想定通り 1 が返ってきています。
Azure Portal からドキュメントを確認すると value
プロパティが現在のシーケンス番号になっています。
当然ながらサンプルコードを実行する度にインクリメントされた値が返ってきますので、この値がシーケンス番号になっていることが確認出来ます。
Partial Update を使うことで SDK とサーバー側でコンフリクトの解決まで行ってくれますが、その確認のために Task を使って同時に 10 件のリクエストを投げて正しくシーケンス番号が生成されるのか確認します。
サンプルコードとしては以下のように Task を使ってシンプルに同時実行しているだけです。
var tasks = new List<Task<ItemResponse<Sequence>>>(); for (var i = 0; i < 10; i++) { tasks.Add(container.PatchItemAsync<Sequence>("sample", new PartitionKey("sample"), operations)); } var response = await Task.WhenAll(tasks); foreach (var itemResponse in response) { Console.WriteLine($"Seq : {itemResponse.Resource.Value}, Consumed RU/s = {itemResponse.RequestCharge}"); } Console.WriteLine($"Next Seq : {response.Max(x => x.Resource.Value) + 1}");
このコードでは一緒に RU も表示するようにしているので、1 つのシーケンス番号を生成するのにかかった RU から秒間の最大生成数をある程度計算することが可能になります。
実行してみると、以下のような結果が返ってきました。大体 10 RU で 1 つのシーケンス番号が生成出来ていて、重複や抜けの発生なくリクエスト数分のシーケンス番号が生成されていることが確認出来ます。
デバッガーを確認するとコンフリクトは同時実行数が増えるほど発生しているのが確認出来ますが、SDK レベルで自動的にリトライが行われるため動作自体には影響しないようになっています。
今回のように id
以外でのアクセスを行わず、インデックスが全く必要ない場合にはポリシーを以下のように全てオフにすることで、処理に必要な RU を極限まで下げることが出来ます。
{ "indexingMode": "none", "automatic": false, "includedPaths": [], "excludedPaths": [] }
先ほどの結果は既にこのインデックスポリシーを反映した結果となっています。
Cosmos DB の Partial Update は RU の削減にはつながりませんが、アトミックな処理を簡単に実現出来るようになりますので、用途はかなり広いです。是非使っていきましょう。