少し前にリリースされた Azure Cosmos DB v3 SDK の Bulk Support を試そうと思いつつ放置してたのですが、仕事で割と良い感じのお題が降ってきたのでこれを機に試してみました。
開発チームの人がブログで色々と紹介してくれているので、一通り目を通しておくと良い感じです。Medium の方も同じようなタイトルですが、内容は違うので両方読んでおいた方が良いです。
v3 SDK の Bulk Support の特徴は、クライアント作成時にオプションを有効化すれば、後は透過的に複数のリクエストをバッチに変換して投げてくれるところです。
これまでも Bulk Executor を使うと Cosmos DB の RU を最大限に使って項目の追加が行えていましたが、未だに .NET Core への対応はプレビューのままで改善が期待できなかったり、初期化周りで準備が色々と必要でライブラリとしては使い勝手が悪かったです。
Bulk Executor のドキュメントにも v3 SDK を勧めるメッセージが表示されています。
If you are using bulk executor, please see the latest version 3.x of the .NET SDK, which has bulk executor built into the SDK.
Azure Cosmos DB: Bulk executor .NET API, SDK & resources | Microsoft Docs
同じように Change Feed も v3 SDK に組み込まれているので、v3 を勧めるメッセージが出てきます。
If you are using change feed processor, please see the latest version 3.x of the .NET SDK, which has change feed built into the SDK.
Azure Cosmos DB: .NET Change Feed Processor API, SDK & resources | Microsoft Docs
将来的に Bulk Executor と Change Feed Processor は v3 / v4 を使うコードに変更する必要が出てくるはずです。ただし Azure Functions の Trigger に関しては事情が複雑です。
Bulk Support が公開されたのは v3.4.0 なので、それより新しいライブラリをインストールしておきます。v3 SDK はかなり開発ペースが速いのであっという間に古くなるので注意。
そもそも v3 SDK をまだ使っていない人はサクッとアップデートしておきましょう。DocumentClient
から v3 の CosmosClient
に移行すると API がシンプルになっています。
基本的な使い方については GA のタイミングでまとめておいたので参考にしてください。
説明はこのくらいにして実際にコードを書いて試していきます。今回は適当なデータを大量に Cosmos DB に投入するコードを v3 SDK と Bulk Executor の両方で書いて試してみました。
折角なので Cosmos DB 側は Autopilot を有効にしたデータベースを用意しておきました。
Autopilot もちゃんと検証しておこうと思っていましたが、何だかんだで後回しにしていたので丁度良かったです。上の設定では最低 2000 RU/s から最大 20000 RU/s まで自動的にスケールします。
まだプレビューだからかもしれないですが、上限が固定値だったりメトリクスが分かりにくいので今後に期待してます。あと RU の単価が 1.5 倍になるのはちょっと高い気がします。
v3 SDK の Bulk Support を使ったバージョン
先に新しい v3 SDK を使ったコードです。1 万件のデータを Cosmos DB に投入するだけの単純なコードですが、AllowBulkExecution
を設定するだけで内部的にバッチで処理されます。
コード的には Task.WhenAll
を使って複数 Task
の完了を待つようにしているだけです。この辺りのイディオムは C# では日常的に出てくるやつです。
class Program { static async Task Main(string[] args) { var connectionString = "AccountEndpoint=https://xxxx.documents.azure.com:443/;AccountKey=xxxx;"; var cosmosClient = new CosmosClient(connectionString, new CosmosClientOptions { AllowBulkExecution = true, SerializerOptions = new CosmosSerializationOptions { PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase } }); var container = cosmosClient.GetContainer("TestDB", "TestContainer"); var tasks = Enumerable.Range(1, 10000) .Select(x => new DemoItem { Id = Guid.NewGuid().ToString(), Name = $"kazuakix-{x}", CreatedOn = DateTime.Now, Pk = Guid.NewGuid().ToString() }) .Select(x => container.CreateItemAsync(x, new PartitionKey(x.Pk))); await Task.WhenAll(tasks); } }
Bulk Executor と同じように消費した RU を見ながら適切に処理を行っているので、Cosmos DB を使っているときによく見る 429 が出続けたりすることはありませんでした。
今回は Autopilot で 20000 RU/s までスケールするので 429 は発生しにくいですが、固定 RU/s で AllowBulkExecution
の設定をせずに実行すると、普通に 429 の連発になります。
古い Bulk Executor を使ったバージョン
そして今度は Bulk Executor を使ったコードです。処理自体は BulkImportAsync
を呼び出すだけなのでシンプルですが、実行するまでの準備が地味に複雑です。
リトライ設定などを手動で上書きしないといけないのはかなり分かりにくいですし、DocumentClient
ベースなので Database / Container の扱いが冗長です。
class Program { static async Task Main(string[] args) { var endpoint = new Uri("https://xxxx.documents.azure.com:443"); var accountKey = "xxxx"; var connectionPolicy = new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp }; var client = new DocumentClient(endpoint, accountKey, connectionPolicy); var documentCollection = await client.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri("TestDB", "TestContainer")); client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9; var bulkExecutor = new BulkExecutor(client, documentCollection.Resource); await bulkExecutor.InitializeAsync(); client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0; var documents = Enumerable.Range(1, 10000) .Select(x => new DemoItem { Id = Guid.NewGuid().ToString(), Name = $"kazuakix-{x}", CreatedOn = DateTime.Now, Pk = Guid.NewGuid().ToString() }); await bulkExecutor.BulkImportAsync(documents); } }
こちらも当然ながら 429 を連発することなく、スループットを見ながら最適な処理を行ってくれます。
それぞれのコードを実行してみたところ、処理時間はほぼ変わっていないので良い感じです。気になった点としては RU/s の消費が v3 SDK の方が多かったことですが、処理時間はほぼ同じなので問題ないです。
RU/s は時間課金なのでこういうケースの場合は上限まで使い切れた方が効率が良いです。書き込みデータサイズは同じはずなので、内部処理に違いが結構あるのかもしれません。
Azure Functions で v3 SDK を使う
前述したように Azure Functions の Cosmos DB Trigger は残念ながら v2 ベースなので、v3 の CosmosClient
を使うためには手動でのインストールが必要です。
一応内部的には v3 への移行を行っているらしいですが、まだ時間がかかりそうです。
I'm working on it, currently the blocker is on the new V3 estimator that does not have the same public surface. We are also working on trigger support for Cassandra and Mongo, but no ETA yet. Meanwhile if you need the V3 SDK you can pull it through DI https://t.co/EQqroEvuQO
— Matías Quaranta (@Ealsur) 2019年12月6日
とはいえ、v3 を使った方が圧倒的にコードが書きやすいので、Change Feed 以外は CosmosClient
を使って書いていくようにした方が良いです。