しばやん雑記

Azure Serverless とメイドさんが大好きなフリーランスのプログラマーのブログ

Azure Cosmos DB v3 SDK の Bulk Support がとても使い勝手が良かった

少し前にリリースされた Azure Cosmos DB v3 SDK の Bulk Support を試そうと思いつつ放置してたのですが、仕事で割と良い感じのお題が降ってきたのでこれを機に試してみました。

開発チームの人がブログで色々と紹介してくれているので、一通り目を通しておくと良い感じです。Medium の方も同じようなタイトルですが、内容は違うので両方読んでおいた方が良いです。

v3 SDK の Bulk Support の特徴は、クライアント作成時にオプションを有効化すれば、後は透過的に複数のリクエストをバッチに変換して投げてくれるところです。

これまでも Bulk Executor を使うと Cosmos DB の RU を最大限に使って項目の追加が行えていましたが、未だに .NET Core への対応はプレビューのままで改善が期待できなかったり、初期化周りで準備が色々と必要でライブラリとしては使い勝手が悪かったです。

Bulk Executor のドキュメントにも v3 SDK を勧めるメッセージが表示されています。

If you are using bulk executor, please see the latest version 3.x of the .NET SDK, which has bulk executor built into the SDK.

Azure Cosmos DB: Bulk executor .NET API, SDK & resources | Microsoft Docs

同じように Change Feed も v3 SDK に組み込まれているので、v3 を勧めるメッセージが出てきます。

If you are using change feed processor, please see the latest version 3.x of the .NET SDK, which has change feed built into the SDK.

Azure Cosmos DB: .NET Change Feed Processor API, SDK & resources | Microsoft Docs

将来的に Bulk Executor と Change Feed Processor は v3 / v4 を使うコードに変更する必要が出てくるはずです。ただし Azure Functions の Trigger に関しては事情が複雑です。

Bulk Support が公開されたのは v3.4.0 なので、それより新しいライブラリをインストールしておきます。v3 SDK はかなり開発ペースが速いのであっという間に古くなるので注意。

そもそも v3 SDK をまだ使っていない人はサクッとアップデートしておきましょう。DocumentClient から v3 の CosmosClient に移行すると API がシンプルになっています。

基本的な使い方については GA のタイミングでまとめておいたので参考にしてください。

説明はこのくらいにして実際にコードを書いて試していきます。今回は適当なデータを大量に Cosmos DB に投入するコードを v3 SDK と Bulk Executor の両方で書いて試してみました。

折角なので Cosmos DB 側は Autopilot を有効にしたデータベースを用意しておきました。

f:id:shiba-yan:20191212192701p:plain

Autopilot もちゃんと検証しておこうと思っていましたが、何だかんだで後回しにしていたので丁度良かったです。上の設定では最低 2000 RU/s から最大 20000 RU/s まで自動的にスケールします。

まだプレビューだからかもしれないですが、上限が固定値だったりメトリクスが分かりにくいので今後に期待してます。あと RU の単価が 1.5 倍になるのはちょっと高い気がします。

v3 SDK の Bulk Support を使ったバージョン

先に新しい v3 SDK を使ったコードです。1 万件のデータを Cosmos DB に投入するだけの単純なコードですが、AllowBulkExecution を設定するだけで内部的にバッチで処理されます。

コード的には Task.WhenAll を使って複数 Task の完了を待つようにしているだけです。この辺りのイディオムは C# では日常的に出てくるやつです。

class Program
{
    static async Task Main(string[] args)
    {
        var connectionString = "AccountEndpoint=https://xxxx.documents.azure.com:443/;AccountKey=xxxx;";

        var cosmosClient = new CosmosClient(connectionString, new CosmosClientOptions
        {
            AllowBulkExecution = true,
            SerializerOptions = new CosmosSerializationOptions
            {
                PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase
            }
        });

        var container = cosmosClient.GetContainer("TestDB", "TestContainer");

        var tasks = Enumerable.Range(1, 10000)
                              .Select(x => new DemoItem
                              {
                                  Id = Guid.NewGuid().ToString(),
                                  Name = $"kazuakix-{x}",
                                  CreatedOn = DateTime.Now,
                                  Pk = Guid.NewGuid().ToString()
                              })
                              .Select(x => container.CreateItemAsync(x, new PartitionKey(x.Pk)));

        await Task.WhenAll(tasks);
    }
}

Bulk Executor と同じように消費した RU を見ながら適切に処理を行っているので、Cosmos DB を使っているときによく見る 429 が出続けたりすることはありませんでした。

今回は Autopilot で 20000 RU/s までスケールするので 429 は発生しにくいですが、固定 RU/s で AllowBulkExecution の設定をせずに実行すると、普通に 429 の連発になります。

古い Bulk Executor を使ったバージョン

そして今度は Bulk Executor を使ったコードです。処理自体は BulkImportAsync を呼び出すだけなのでシンプルですが、実行するまでの準備が地味に複雑です。

リトライ設定などを手動で上書きしないといけないのはかなり分かりにくいですし、DocumentClient ベースなので Database / Container の扱いが冗長です。

class Program
{
    static async Task Main(string[] args)
    {
        var endpoint = new Uri("https://xxxx.documents.azure.com:443");
        var accountKey = "xxxx";

        var connectionPolicy = new ConnectionPolicy
        {
            ConnectionMode = ConnectionMode.Direct,
            ConnectionProtocol = Protocol.Tcp
        };

        var client = new DocumentClient(endpoint, accountKey, connectionPolicy);

        var documentCollection = await client.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri("TestDB", "TestContainer"));

        client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
        client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;

        var bulkExecutor = new BulkExecutor(client, documentCollection.Resource);

        await bulkExecutor.InitializeAsync();

        client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
        client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;

        var documents = Enumerable.Range(1, 10000)
                                  .Select(x => new DemoItem
                                  {
                                      Id = Guid.NewGuid().ToString(),
                                      Name = $"kazuakix-{x}",
                                      CreatedOn = DateTime.Now,
                                      Pk = Guid.NewGuid().ToString()
                                  });

        await bulkExecutor.BulkImportAsync(documents);
    }
}

こちらも当然ながら 429 を連発することなく、スループットを見ながら最適な処理を行ってくれます。

それぞれのコードを実行してみたところ、処理時間はほぼ変わっていないので良い感じです。気になった点としては RU/s の消費が v3 SDK の方が多かったことですが、処理時間はほぼ同じなので問題ないです。

f:id:shiba-yan:20191213180637p:plain

RU/s は時間課金なのでこういうケースの場合は上限まで使い切れた方が効率が良いです。書き込みデータサイズは同じはずなので、内部処理に違いが結構あるのかもしれません。

Azure Functions で v3 SDK を使う

前述したように Azure Functions の Cosmos DB Trigger は残念ながら v2 ベースなので、v3 の CosmosClient を使うためには手動でのインストールが必要です。

一応内部的には v3 への移行を行っているらしいですが、まだ時間がかかりそうです。

とはいえ、v3 を使った方が圧倒的にコードが書きやすいので、Change Feed 以外は CosmosClient を使って書いていくようにした方が良いです。