しばやん雑記

Azure とメイドさんが大好きなフリーランスのプログラマーのブログ

Azure Cosmos DB .NET SDK v3 GA 記念チートシート

5 月の Build で月末 GA が発表されていた Azure Cosmos DB の .NET SDK v3 ですが、昨日ついに正式版がリリースされました。特に Public Preview の時からは API が大きく変わっているので注意です。

元は JavaScript SDK に近い API でしたが、途中で大幅に変更されています。

そして Cosmos DB チームがセマンティックバージョンをミスったようで、プレビュー版の NuGet パッケージをインストールしていると、正式版が更新対象として出てこないので更に注意が必要です。

新しい v3 SDK の特徴としては .NET Standard 2.0 ベースになったことでしょう。これで、これまでのように .NET Framework 向けと .NET Core 向けで別々のパッケージを使う必要がなくなります。

他にも Stream ベースの API が追加されたり、LINQ への対応も行われました。

設計が凄くイマイチで拡張性が皆無だった v2 からさっさと移行したいので、よく使うパターンについて正式版向けにメモを残しておきます。足りないものは後で追加するかもしれません。

これぐらいあれば v2 からスムーズに移行出来そうな気がします。v3 から追加された機能は、v2 にはなかった拡張性を提供してくれるものがあるのでかなり嬉しい部分です。

基本的な Cosmos DB の操作

CosmosClient の作成

// Direct mode を使う CosmosClient を作る
var cosmosClient = new CosmosClient("AccountEndpoint=https://**.documents.azure.com:443/;AccountKey=**;", new CosmosClientOptions
{
    ConnectionMode = ConnectionMode.Direct
});

// Builder を利用して CosmosClient を作る
//var cosmosClient = new CosmosClientBuilder("AccountEndpoint=https://**.documents.azure.com:443/;AccountKey=**;")
//                   .UseConnectionModeDirect()
//                   .Build();

// 操作対象の Container を取得
var container = cosmosClient.GetContainer("TestDB", "Users");

アイテムの作成

// 新しいアイテムを用意する
var user = new User
{
    Id = Guid.NewGuid().ToString(),
    Name = "kazuakix",
    Age = 50,
    Prefecture = "wakayama"
};

// アイテムを新規作成
await container.CreateItemAsync(user);

// Upsert も同じように実行できる
//await container.UpsertItemAsync(user);

アイテムの更新(入れ替え)

// アイテムのプロパティを更新
user.Age = 60;

// Id を明示的に指定する
await container.ReplaceItemAsync(user, user.Id);

アイテムの削除

// 削除時は PartitonKey を明示的に指定する、型引数に注意
await container.DeleteItemAsync<User>(user.Id, new PartitionKey(user.Prefecture));

アイテムの読み込み

// 読み込み時は PartitionKey を明示的に指定する(Preview の時から大きく変わっているので注意)
var response = await container.ReadItemAsync<User>(user.Id, new PartitionKey(user.Prefecture));

Console.WriteLine($"{response.Resource.Name},{response.Resource.Age}");

クエリを実行する

全体的に FeedIterator<T> を使ってデータを読み込むようになっています。

SQL を利用したクエリの実行

// PartitionKey を指定しないと Cross-Partition Query になるので注意
var queryRequestOptions = new QueryRequestOptions { PartitionKey = new PartitionKey("wakayama") };

// SQL を使ってアイテムを読み込む
var iterator = container.GetItemQueryIterator<User>("SELECT * FROM c WHERE c.age > 70", requestOptions: queryRequestOptions);

// SQL にパラメータを渡す場合は QueryDefinition を使う(パラメータの型には注意)
//var query = new QueryDefinition("SELECT * FROM c WHERE c.age > @age").WithParameter("@age", 70);

// QueryDefinition を使ってアイテムを読み込む
//var iterator = container.GetItemQueryIterator<User>(query, requestOptions: queryRequestOptions);

do
{
    // 結果セットを取得する
    var result = await iterator.ReadNextAsync();

    foreach (var item in result)
    {
        Console.WriteLine($"{item.Name},{item.Age}");
    }

    // 続きがあれば繰り返す
} while (iterator.HasMoreResults);

LINQ を利用したクエリの実行

// PartitionKey を指定しないと Cross-Partition Query になるので注意
var queryRequestOptions = new QueryRequestOptions { PartitionKey = new PartitionKey("wakayama") };

// 同期的にアイテムを読み込む (allowSynchronousQueryExecution = true は必須)
var result = container.GetItemLinqQueryable<User>(allowSynchronousQueryExecution: true, requestOptions: queryRequestOptions)
                      .Where(x => x.Age > 70)
                      .ToArray();

foreach (var item in result)
{
    Console.WriteLine($"{item.Name},{item.Age}");
}

// 非同期の場合は FeedIterator<T> に変換してから取得する
var iterator = container.GetItemLinqQueryable<User>(requestOptions: queryRequestOptions)
                        .Where(x => x.Age > 70)
                        .ToFeedIterator();

do
{
    // 結果セットを取得する
    var result = await iterator.ReadNextAsync();

    foreach (var item in result)
    {
        Console.WriteLine($"{item.Name},{item.Age}");
    }

    // 続きがあれば繰り返す
} while (iterator.HasMoreResults);

v3 で追加された機能

RequestHandler の追加

v3 から独自の RequestHandler を追加して、Cosmos DB へのリクエスト前後に処理を追加できるようになりました。HttpClient で言うところの DelegatingHandler に相当する機能です。

public class MyCosmosRequestHandler : RequestHandler
{
    public override async Task<ResponseMessage> SendAsync(RequestMessage request, CancellationToken cancellationToken)
    {
        var response = await base.SendAsync(request, cancellationToken);

        Console.WriteLine($"RequestCharge: {response.Headers.RequestCharge}");

        return response;
    }
}

上のサンプルのように RequestHandler を追加すると、消費された RUs を簡単に Application Insights に送信して可視化といったことも出来るはずです。

作成した RequestHandler は Builder を使ったパターンだと設定が楽です。

var cosmosClient = new CosmosClientBuilder("AccountEndpoint=https://**.documents.azure.com:443/;AccountKey=**;")
                   .WithConnectionModeDirect()
                   .AddCustomHandlers(new MyCosmosRequestHandler())
                   .Build();

これで独自の RequestHandler を使う CosmosClient が作成出来ます。v2 から比べると格段に便利です。

JsonSerializer の入れ替え

Cosmos DB で面倒なのが camelCase にしないと id がそもそも認識されないことです。これまではモデルが持つプロパティ全てを小文字で定義するか、属性で別名を付けてきたかと思います。

v3 SDK からは Serializer の拡張が行えるので、自動で camelCase にする Serializer を作れば解決します。

public class MyCosmosJsonSerializer : CosmosSerializer
{
    private static readonly Encoding DefaultEncoding = new UTF8Encoding(false, true);
    private static readonly JsonSerializer Serializer = new JsonSerializer()
    {
        NullValueHandling = NullValueHandling.Ignore,
        // 自動で camelCase に変換するように設定する
        ContractResolver = new CamelCasePropertyNamesContractResolver()
    };

    public override T FromStream<T>(Stream stream)
    {
        using (stream)
        {
            if (typeof(Stream).IsAssignableFrom(typeof(T)))
            {
                return (T)(object)(stream);
            }

            using (StreamReader sr = new StreamReader(stream))
            {
                using (JsonTextReader jsonTextReader = new JsonTextReader(sr))
                {
                    return MyCosmosJsonSerializer.Serializer.Deserialize<T>(jsonTextReader);
                }
            }
        }
    }

    public override Stream ToStream<T>(T input)
    {
        MemoryStream streamPayload = new MemoryStream();
        using (StreamWriter streamWriter = new StreamWriter(streamPayload, encoding: MyCosmosJsonSerializer.DefaultEncoding, bufferSize: 1024, leaveOpen: true))
        {
            using (JsonWriter writer = new JsonTextWriter(streamWriter))
            {
                writer.Formatting = Formatting.None;
                MyCosmosJsonSerializer.Serializer.Serialize(writer, input);
                writer.Flush();
                streamWriter.Flush();
            }
        }

        streamPayload.Position = 0;
        return streamPayload;
    }
}

上の実装はデフォルトの Serializer をそのまま持ってきて、camelCase にする設定だけ追加したものです。

Serializer も Builder を使った方が分かりやすく設定出来るのでお勧めです。

var cosmosClient = new CosmosClientBuilder("AccountEndpoint=https://**.documents.azure.com:443/;AccountKey=**;")
                   .WithConnectionModeDirect()
                   .WithCustomSerializer(new MyCosmosJsonSerializer())
                   .Build();

これで自動的に Cosmos DB へは camelCase になったモデルが保存されるようになります。デフォルトが camelCase でも良いぐらいです。

Stream API の追加

v3 SDK からは自動で JSON をシリアライズ、デシリアライズするのではなく、Stream を受けたり返したりするためのメソッドが用意されています。

とはいえ、個人的には使い道がまずないなという印象です。最初から最後まで UTF-8 な JSON を扱う場合のみエンコード変換やシリアライズのコストがかからないので、パフォーマンスは上がりそうです。

.NET Standard 2.1 になれば Span<T> や System.Text.Json などが使われて改善すると思います。

Change Feed Processor / Estimator

以前は別パッケージだった Change Feed Processor が v3 SDK からは統合されました。割とシンプルなコードで Change Feed の処理を追加できるようになっています。

var cosmosClient = new CosmosClientBuilder("AccountEndpoint=https://**.documents.azure.com:443/;AccountKey=**;")
                   .WithConnectionModeDirect()
                   .Build();

var container = cosmosClient.GetContainer("TestDB", "Users");
var leaseContainer = cosmosClient.GetContainer("TestDB", "leases");

var processor = container.GetChangeFeedProcessorBuilder<User>("processor1", OnChangeFeed)
                         .WithInstanceName("testinstance")
                         .WithLeaseContainer(leaseContainer)
                         .Build();

await processor.StartAsync();

実際に動かしてみましたが、別パーティションでも問題なく変更を取れていました。

f:id:shiba-yan:20190713023653p:plain

大半のケースでは Azure Functions の CosmosDBTrigger を使っていると思うので、そっちの対応待ちです。

おまけ的な紹介になりますが、変更通知がどのくらい溜まっているかを通知する機能も追加されています。スケーリングの指標に使ったりは出来るのではないかと思います。