しばやん雑記

Azure とメイドさんが大好きなフリーランスのプログラマーのブログ

Cosmos DB .NET SDK v3 を使って快適に LINQ を書くコツ

Cosmos DB .NET SDK v2 の時は LINQ への変換が結構イマイチで、直接 SQL を書くことが多かったですが v3 ではかなり改善されているので、大体のクエリは LINQ だけで書けるようになっていました。

対象が v2 か v3 なのかハッキリしませんが、ドキュメントも用意されています。

.NET SDK v3 の簡単な使い方については、前にチートシートという形で書きました。

今回はもうちょっと LINQ に絞って実際に使いそうなクエリに絞りました。基本は非同期で書いていくべきなので、そういう書き方しかしていません。

テスト用の共通コードは以下のようなものを用意しました。雑なデータモデルです。

public class Entry
{
    [JsonProperty("id")]
    public string Id { get; set; }

    [JsonProperty("title")]
    public string Title { get; set; }

    [JsonProperty("body")]
    public string Body { get; set; }

    [JsonProperty("tags")]
    public IList<string> Tags { get; set; }

    [JsonProperty("authors")]
    public IList<Author> Authors { get; set; }

    [JsonProperty("createdAt")]
    public DateTimeOffset CreatedAt { get; set; }
}

public class Author
{
    [JsonProperty("name")]
    public string Name { get; set; }
}
class Program
{
    static async Task Main(string[] args)
    {
        var connectionString = "<connection string>";

        var cosmosClient = new CosmosClient(connectionString, new CosmosClientOptions
        {
            ConnectionMode = ConnectionMode.Direct
        });

        var container = cosmosClient.GetContainer("Blog", "Entry");
    }
}

テストデータは以下のようなものを入れておきました。モデルに従ってそれっぽいものを入れています。

このコードとデータをベースに LINQ をいろいろ書いて試していきます。

LINQ Provider が生成したクエリを調べる

まずは LINQ がどのように SQL に変換されるのか調べる方法ですが、単純に ToQueryDefinition を呼び出すと SQL への変換結果が返ってきます。

var query = container.GetItemLinqQueryable<Entry>()
                     .Select(x => new { x.Id, x.Title, x.Body, x.Authors[0].Name })
                     .ToQueryDefinition();

Console.WriteLine($"QueryText: {query.QueryText}");

上のクエリ式は実際には以下のような SQL に変換されます。

SELECT VALUE {"Id": root["id"], "Title": root["title"], "Body": root["body"], "Name": root["authors"][0]["name"]} FROM root

返ってきた QueryDefinition はそのまま GetItemQueryIterator に渡せば実行できます。

Paging (OFFSET / LIMIT) の注意点

今年のアップデートで OFFSET / LIMIT が使えるようになったので、これまでのような Continuation Token を使ったページングではなく、RDB などに近い形でページングが実装できるようになりました。

SQL を直接書く分には問題ないですが、v3 SDK では TakeSkip の書き順によって変換される SQL が大きく変わるので注意が必要でした。

var query = container.GetItemLinqQueryable<Entry>()
                     .Take(10)
                     .Skip(10)
                     .ToQueryDefinition();

Console.WriteLine($"QueryText: {query.QueryText}");

上のコードのように Take を先に書くと SELECT TOP への変換が優先されるらしく、変換された SQL はサブクエリを使ったものになりました。

SELECT VALUE r0 FROM (SELECT TOP 10 VALUE root FROM root ) AS r0 OFFSET 10 LIMIT 2147483647

動きそうな SQL ですが、実行するとサブクエリで TOP は指定できないというエラーになります。サブクエリは ORDER BY が使えなかったり地味に制約が多いです。

今度は先に Skip 書いたコードで試してみます。本当に順番を入れ替えただけです。

var query = container.GetItemLinqQueryable<Entry>()
                     .Skip(10)
                     .Take(10)
                     .ToQueryDefinition();

Console.WriteLine($"QueryText: {query.QueryText}");

今度は期待しているように OFFSET / LIMIT が使われた SQL に変換されました。

SELECT VALUE root FROM root OFFSET 10 LIMIT 10

この動作は SDK の不具合っぽいのでフィードバックしておきました。ひょっとしたら直るかもしれません。

ThenBy / ThenByDescending は複合インデックスが必要

v3.3.0 から ThenBy / ThenByDescending に対応するようになりましたが、Cosmos DB 側の制約として複数のプロパティに対しての ORDER BY 実行には複合インデックスが必要です。

インデックスを作成せずに実行しようとすると、もちろんエラーになります。

複合インデックスを上手く使うとパフォーマンスを改善できるらしいので、じっくり試したいところです。

どのようなケースに適用できるのかはまだよくわかっていないですが、RUs を減らせるのは魅力的です。

Subquery + EXISTS を活用する

既に LINQ Provider がサブクエリを使う SQL を吐き出していますが、個人的にはサブクエリと EXISTS によって LINQ でいうところの Any への対応が簡単になった点が良かったです。

Cosmos DB は 1 ドキュメントにオブジェクトの配列など JSON で表現できるものは入れることが出来ますが、以前は配列に対してのクエリが JOIN が必要だったりで非常に面倒でした。

単純なプリミティブ型の配列なら Contains を使うと良い感じに変換してくれてました。

var query = container.GetItemLinqQueryable<Entry>()
                     .Where(x => x.Tags.Contains("日記"))
                     .ToQueryDefinition();

Console.WriteLine($"QueryText: {query.QueryText}");

出力された SQL は以下のように ARRAY_CONTAINS を使うものになります。これは分かりやすいです。

SELECT VALUE root FROM root WHERE ARRAY_CONTAINS(root["tags"], "日記")

問題はオブジェクトの配列があって、その中のプロパティに対して条件をかけたい場合です。今回のデータモデルの場合は AuthorsName でフィルタを実行したい場合です。

これまでは JOIN するか ARRAY_CONTAINS の条件にオブジェクトを指定するというように、直感的ではない書き方をする必要がありました。当然 LINQ では表現出来なかったのですが、サブクエリによって Any を使ってシンプルに書けるようになりました。

var query = container.GetItemLinqQueryable<Entry>()
                     .Where(x => x.Authors.Any(xs => xs.Name == "shibayan"))
                     .ToQueryDefinition();

Console.WriteLine($"QueryText: {query.QueryText}");

少し出力された SQL は分かりにくいですが、サブクエリと EXISTS を使ったものに変換されています。

SELECT VALUE root FROM root JOIN (SELECT VALUE EXISTS(SELECT VALUE xs0 FROM root JOIN xs0 IN root["authors"] WHERE (xs0["name"] = "shibayan") ) ) AS v0 WHERE v0

RDB に対して Any を使った時と同じようなクエリになりました。サブクエリが使えるようになったので、ドキュメントに含まれるコレクションを条件に使いやすくなったと思います。

拡張メソッドを追加して楽にする

Entity Framework Core には ToListAsyncFirstOrDefaultAsync などの Task に対応した拡張メソッドが用意されていますが、v3 SDK の場合は ToFeedIterator を呼び出してループで結果を取得する必要があるので、拡張メソッドを用意しておくとカジュアルに使えます。

とりあえず必要だったメソッドだけ用意しましたが、基本は ToFeedIterator を隠蔽しているだけです。

public static class CosmosAsyncLinqExtensions
{
    public static async Task<TSource> FirstOrDefaultAsync<TSource>(this IQueryable<TSource> source)
    {
        var iterator = source.ToFeedIterator();

        var result = await iterator.ReadNextAsync();

        return result.FirstOrDefault();
    }

    public static async Task<TSource> FirstOrDefaultAsync<TSource>(this IQueryable<TSource> source, Expression<Func<TSource, bool>> predicate)
    {
        var iterator = source.Where(predicate)
                             .ToFeedIterator();

        var result = await iterator.ReadNextAsync();

        return result.FirstOrDefault();
    }

    public static async Task<IList<TSource>> ToListAsync<TSource>(this IQueryable<TSource> source)
    {
        var result = new List<TSource>();

        var iterator = source.ToFeedIterator();

        do
        {
            result.AddRange(await iterator.ReadNextAsync());

        } while (iterator.HasMoreResults);

        return result;
    }
}

使い方は特に説明不要だと思いますが、クエリ式の最後に await 付きで呼び出すだけです。

var entry = await container.GetItemLinqQueryable<Entry>()
                           .OrderByDescending(x => x.CreatedAt)
                           .FirstOrDefaultAsync();

var result = await container.GetItemLinqQueryable<Entry>()
                            .Where(x => x.Tags.Contains("日記"))
                            .ToListAsync();

foreach (var item in result)
{
    Console.WriteLine(JsonConvert.SerializeObject(item, Formatting.Indented));
}

コードを書いていると ToFeedIterator を使う同じパターンが頻発するので、何かしら共通化がしたくなるはずです。実際に自分は頻発したので拡張メソッドを書きました。

Async Streams に対応させる

拡張メソッドを書いたので、ついでに C# 8.0 で追加された Async Streams にも対応させてみます。メソッド名は微妙な感じですが、以下のような拡張メソッドを用意するだけです。

public static async IAsyncEnumerable<TSource> AsAsyncEnumerable<TSource>(this IQueryable<TSource> source)
{
    var iterator = source.ToFeedIterator();

    do
    {
        var result = await iterator.ReadNextAsync();

        foreach (var item in result)
        {
            yield return item;
        }

    } while (iterator.HasMoreResults);
}

IAsyncEnumerable<T> を返すメソッドを用意するだけなので簡単です。

使い方は await foreach に渡すだけなので、これも特に説明は不要な簡単さです。

var source = container.GetItemLinqQueryable<Entry>()
                      .Where(x => x.Tags.Contains("日記"))
                      .AsAsyncEnumerable();

await foreach (var item in source)
{
    Console.WriteLine(JsonConvert.SerializeObject(item, Formatting.Indented));
}

これで大きなデータセットに対しても、効率的にページングを行いつつ処理できるようになります。

デフォルトでは 1000 件ずつ取ってくるようになっていますが、QueryRequestOptionsMaxItemCount を設定することで 1 回のリクエストで取得する件数をすることも出来ます。

どうしても SQL を書く必要がある場合

ほとんどのケースで LINQ を使ってクエリを表現できるはずですが、どうしても SQL を書く必要が出てきた場合には QueryDefinition を使ってパラメータ化クエリを組み立てて実行します。

クエリを組み立てて実行するにはまた ToFeedIterator を使う必要があるので、Dapper 風にラップした拡張メソッドを用意しておくと楽になると思います。

public static class CosmosQueryExtensions
{
    private static QueryDefinition CreateQuery(string sql, object param)
    {
        var query = new QueryDefinition(sql);

        if (param != null)
        {
            foreach (var propertyInfo in param.GetType().GetProperties())
            {
                query.WithParameter("@" + propertyInfo.Name, propertyInfo.GetValue(param));
            }
        }

        return query;
    }

    public static async Task<IEnumerable<T>> QueryAsync<T>(this Container container, string sql, object param = null)
    {
        var query = CreateQuery(sql, param);

        var iterator = container.GetItemQueryIterator<T>(query);

        var result = new List<T>();

        do
        {
            result.AddRange(await iterator.ReadNextAsync());

        } while (iterator.HasMoreResults);

        return result;
    }

    public static async Task<T> QueryFirstOrDefaultAsync<T>(this Container container, string sql, object param = null)
    {
        var query = CreateQuery(sql, param);

        var iterator = container.GetItemQueryIterator<T>(query, requestOptions: new QueryRequestOptions { MaxItemCount = 1 });

        return (await iterator.ReadNextAsync()).FirstOrDefault();
    }
}

インターフェースはほぼ Dapper と同じなので、使ったことがある人は違和感ないと思います。

パラメータ化クエリの書き方も Cosmos DB は @ Prefix なのでほぼ同じです。

var result = await container.QueryAsync<Entry>("SELECT * FROM c WHERE ARRAY_CONTAINS(c.tags, @tag)", new { tag = "日記" });

あまり使う機会が無いかもしれませんが、パラメータ化クエリは v2 より扱いやすくなっているので SQL を書く場合にも楽になっています。

長々と書いてきましたが、やはり自分的には Subquery + EXISTS が一番便利かなと思っています。RDB とは JOIN 周りの挙動が違いますが、LINQ を使えば SDK がその辺りを吸収してくれるのも良いです。