2022 年も仕事で Cosmos DB と Change Feed を無限に使ってきたので、2022 年最後の日のブログはそれに相応しい Cosmos DB の話題で締めたいと思います。*1
何気なく仕事で Cosmos DB と Change Feed を使っていると、時々 Change Feed は変更履歴を別で持っていないのに読み出せるのかという質問を貰います。Change Feed の特徴として Container にデータが残っている限り、最初から順序保証された状態でリプレイ可能という特徴がありますが、どのように実装されているのかやはり気になるのは Cosmos DB 使いとして一度は通る道です。
そのヒントは Change Feed が使っている REST API にあります。以下のドキュメントにある LSN がそれに該当します。日本語では論理シーケンス番号と訳されています。
この API から分かることは、Cosmos DB は項目毎に LSN を持っていて Change Feed はその順番に読み出しているだけなので、LSN がある(= データが残っている)限りリプレイ可能ということです。
そして Change Feed を読み出す側が、処理の成功時にどの LSN まで読みだしたかを記録しておけば、処理が失敗したとしても続きから再度読み出せるという仕組みです。その LSN を保存しているのが Change Feed Processor では leases と呼ばれるものです。
誰もが一度は leases Container の中身を見たことがあると思いますが、最新の Cosmos DB Extension v4 を使うと以下のようなフォーマットのデータが保存されています。
FeedRange
は Partition Key の範囲を持っている内部形式、すなわち物理パーティションの情報なので今回は無視します。重要なのは ContinuationToken
になり、この値が読み出し済みの LSN そのものです。物理パーティションの情報を同時に持っているということは、LSN は物理パーティション単位で採番されているということになります。
LSN についての更なる情報は Change Feed のドキュメントに少しだけ存在します。LSN 自体は実体はトランザクション ID で全ての項目が持っていますが、実際に API で返ってくるのは Change Feed だけで、同じ LSN の値を複数の項目が持つ可能性があることも判明します。
それでは実際に Change Feed を読みだして、項目毎に LSN を持っているか確認してみます。Change Feed Processor や CosmosDBTrigger
を使うと読み出しタイミングの制御が面倒なので、今回のサンプルでは Pull Model を使って実装しています。
LSN は _lsn
という JSON プロパティで渡されるので、適当にマッピングしてあげればよいです。
var iterator = container.GetChangeFeedIterator<MyItemWithLSN>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.Incremental); var items = await iterator.ReadNextAsync(); foreach (var item in items) { Console.WriteLine($"Id = {item.Id}, LSN = {item.Lsn}"); } public class MyItemWithLSN { [JsonProperty("id")] public string Id { get; set; } [JsonProperty("_lsn")] public long Lsn { get; set; } }
このサンプルでは先頭から Change Feed を読み出すようにしています。Change Feed は開始時間指定も出来るので、実際には DateTime.MinValue
を使っているだけです。
実行してみると、以下のように Change Feed の応答には LSN が含まれていることが確認出来ます。データ自体は 5 件しかないですが、LSN は書き込みが発生する度にインクリメントされるので大きな値になりますし、値も連続していないことが分かります。
そして実行した後の ContinuationToken
を確認してみると、以下のような JSON になっていることが分かります。パッと見で State
の中に最後に読みだした LSN が含まれていることも確認出来ます。
{ "V": 2, "Rid": "mpJxAK7Vee4=", "Continuation": [ { "FeedRange": { "type": "Effective Partition Key Range", "value": { "min": "", "max": "FF" } }, "State": { "type": "continuation", "value": "\"486\"" } } ] }
余談ですが、Change Feed の推定機能は leases に保存されている LSN と最新の LSN の差を計算しているだけなので、同一項目への更新を繰り返していると異常に大きな値が返ってくる可能性があります。
あくまでも推定機能なので問題はないですが、返ってきた件数に頼り切るのは良くないです。*2
同じ LSN を持つ場合とトランザクション
ここまでの検証で Change Feed は LSN を使ってデータを読みだしていることが分かりましたが、ドキュメントにあった同じ LSN を持った項目の存在が気になるため追加で確認していきます。
LSN はトランザクション ID ということなのでストアドを使うと再現できそうですが、用意するのが非常に手間なので Transactional Batch を利用して試してみます。
Transactional Batch は Partition Key の指定が必要になるので、同一論理パーティションのデータである必要があります。サンプルでは以下のようにシンプルに 2 つの項目を追加するようにします。
var batch = container.CreateTransactionalBatch(new PartitionKey("test")); batch.CreateItem(new MyItem { Id = Guid.NewGuid().ToString(), Pk = "test" }) .CreateItem(new MyItem { Id = Guid.NewGuid().ToString(), Pk = "test" }); await batch.ExecuteAsync();
このサンプルを実行して Change Feed を使ってデータを読みだしてみると、以下のように同一の LSN とタイムスタンプを持った項目が完成しました。当然ながら LSN が同じなのでどちらが先に書き込まれたか判断は付きませんが、同一トランザクションで書き込まれたデータなので問題ないはずです。
ここで気になるのは、Cosmos DB のタイムスタンプ(_ts
)は秒単位の精度しか持っていないということです。Cosmos DB に複数から同時に書き込めば同じタイムスタンプを持ったデータが多く出来る可能性がありますが、タイムスタンプでソートしていれば非常に荒い順序保証しか実現できないことになります。
同時に書き込みリクエストを実行するとタイムスタンプは同一になりますが、LSN が異なっていれば読み出し時の順序が確定するはずなので、以下のようなコードを使って試してみました。
var task1 = container.CreateItemAsync(new MyItem { Id = Guid.NewGuid().ToString(), Pk = "test" }); var task2 = container.CreateItemAsync(new MyItem { Id = Guid.NewGuid().ToString(), Pk = "test" }); await Task.WhenAll(task1, task2);
実行してみると予定通り同一タイムスタンプとなりましたが、LSN は異なっていました。これは書き込みを同時に行っても、リクエストが別だとトランザクションも別になるというシンプルな話です。
従って Cosmos DB の Change Feed はタイムスタンプや独自の値でソートする必要なしに、高い精度で実際に書き込みされた順序保証が実現されています。ちなみに Bulk を有効化すると同じ Partition Key を持っていると同一トランザクションで処理されるようになるので、その点は注意が必要です。
Change Feed とトランザクションの関係
ここまでの検証で同一の LSN を持つ可能性は十分あることが分かりましたが、Change Feed を使って読み出す際にはトランザクション単位とドキュメントに記載されているため、さらに追加で検証していきます。
以下のようなデータを持つ Container を使っていきますが、前半は Transactional Batch を使って同一 LSN を持つデータで、後半は非同期処理を使って書き込んだデータです。
このようなデータに対して Change Feed を使って 5 件分だけ読み取るように指定すると、どうなるのかというのが検証の趣旨です。読み出し件数は PageSizeHint
を使って指定します。
var options = new ChangeFeedRequestOptions { PageSizeHint = 5 }; var iterator = container.GetChangeFeedIterator<MyItem>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.Incremental, options); var items = await iterator.ReadNextAsync(); foreach (var item in items) { Console.WriteLine($"Id = {item.Id}, Ts = {item.Ts}, LSN = {item.Lsn}"); }
同一トランザクションで書き込まれたデータは、読み出し時にも分割されないことが期待されますが、実行してみると以下のように同一 LSN を持つデータは分割して読み出すことなく、LSN の切れ目で読み込まれていることが分かります。
ぶっちゃけ PageSizeHint
というプロパティ名がネタバレ感ありますね。あくまでも Hint なので、指定した件数がキッチリ取れることは保証されていないという話でした。
同一 LSN を持つデータは同じリクエスト内で取れるというのは、順序保証の観点からも重要ですね。実に Cosmos DB と Change Feed はよく出来ているなと実感します。
Consistency Levels に使われている LSN の話
ここまで LSN と Change Feed の話でしたが、LSN は Cosmos DB の Consistency 周りでも使われています。Geo Redundancy や Quorum の書き込み時に LSN を使って完了したかを判断しているようですが、一番わかりやすいのは Session Consistency です。
Cosmos DB の Session Consistency は一番選択されていますが、その肝となる Session Token には LSN が含まれています。以下のようなコードを書くと Session Token を確認出来ます。
var response = await container.ReadItemAsync<MyItem>("<id>", new PartitionKey("<pk>")); Console.WriteLine($"Session Token = {response.Headers.Session}");
以下は実行してみた例ですが、一目で LSN が含まれていることが分かりますね。現在のセッションで書き込んだ LSN を保持することで、読み込み時にはその LSN より大きいデータが必要だと判断できます。
実際には Session Token には物理パーティションの情報や、Geo Redundancy を有効にしている場合はリージョン毎の情報を持つように設計されています。この辺りの情報は以下のドキュメントで紹介されています。
自分は知らなかったのですが、アプリケーションで Direct Mode を有効化している場合には、Strong と Bounded Staleness は SDK レベルで実装されているようです。当然ながらそこでは LSN が使われています。
少し前から Direct Mode 向けの実装が GitHub で公開されるようになったようです。こういった SDK のコメントから Cosmos DB の内部実装が透けて見えるのは非常に面白いですね。
明らかに読む人を選ぶというか、俺得なエントリとなりましたが、Cosmos DB と Change Feed は期待した通りの挙動になっていることが証明できたので満足です。