しばやん雑記

Azure とメイドさんが大好きなフリーランスのプログラマーのブログ

Cosmos DB では Repository パターンを使うのが楽だった話

Cosmos DB の API は地味にくせがあるというか、コレクション毎の違いは UriFactory のパラメータ以外あまりなくて、大体は同じような処理を書く感じだったので Repository を用意して共通化しました。

勿論これが正解とか言うわけではなく、あくまでも一例として捉えて貰えばと思って紹介します。実際のところ Cosmos DB に限らない話ではあるんですが、今回は Cosmos DB を使ったというだけです。

今回のケースについて

実際に仕事で Cosmos DB を使ってたわけですが、コレクション数が 10 ぐらいの中規模ぐらい?なアプリケーションでした。Cosmos DB は非正規化されたデータが入るので、SQL の時みたいにコレクションの数が規模に直結しないのが特徴ですね。

公式ドキュメントが揃っているので、この辺りを読んで事前に知識を入れておきました。

Cosmos DB はコレクション間での JOIN とか出来ないですし、LINQ で書いたり SQL で書いたり、UDF やストアドを使ったりといろいろありそうなので、あまりガッツリとラップしない方が良いなと考えました。

Document を用意

Cosmos DB のドキュメントには必ず id が存在するのと、Optimistic Concurrency 用に Etag も欲しかったので、全てのドキュメントが継承する AbstractDocument クラスを用意しました。

相変わらず名前付けが適当ですね。作成・更新日時が欲しかったので日付型を 2 つ用意しておきました。

public abstract class AbstractDocument
{
    [JsonProperty("id")]
    public string Id { get; set; }

    [JsonProperty("createdOn")]
    public DateTime CreatedOn { get; set; }

    [JsonProperty("updatedOn")]
    public DateTime UpdatedOn { get; set; }

    [JsonProperty("_etag")]
    public string Etag { get; set; }
}

Cosmos DB に保存するクラスは AbstractDocument を継承するようにします。Entity Framework でも同じような作りにしていた気がするので、個人的にはこういう設計にするのが好きなのかも知れません。

実際に MemberDocument というクラスを用意しました。名前の通り会員の情報を持つクラスです。

public class MemberDocument : AbstractDocument
{
    [JsonProperty("email")]
    public string Email { get; set; }

    [JsonProperty("lastName")]
    public string LastName { get; set; }

    [JsonProperty("firstName")]
    public string FirstName { get; set; }

    [JsonProperty("prefectureId")]
    public int PrefectureId { get; set; }
}

実装とはあまり関係ないですが、全てのプロパティの定義に JsonProperty 付けるかどうかは好みの問題という気もします。とりあえず今回は camelCase に合わせました。

Repository を作成

Entity Framework の時に割と使った気がする Repository ですが、Cosmos DB の場合は用意した方が良いなと今回実際に書いていて思いました。面倒だったのでインターフェースまでは最初から用意しなかったですが、ASP.NET Core の DI と相性は良い感じがしました。

これも Document と同様に AbstractRepository クラスを用意しました。基本的な CRUD 操作と DocumentDB API の操作に必要な情報を提供します。Upsert の方が良いかも知れません。

public abstract class AbstractRepository<TDocument> where TDocument : AbstractDocument
{
    protected AbstractRepository(DocumentClient documentClient, string databaseId, string collectionId)
    {
        DatabaseId = databaseId;
        CollectionId = collectionId;

        Client = documentClient;
    }

    protected string DatabaseId { get; }
    protected string CollectionId { get; }

    protected DocumentClient Client { get; }

    public async Task CreateAsync(TDocument document)
    {
        document.CreatedOn = DateTime.UtcNow;

        var response = await Client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId), document);

        document.Id = response.Resource.Id;
    }

    public async Task<TDocument> GetAsync(string id, string partitionKey)
    {
        try
        {
            var response = await Client.ReadDocumentAsync<TDocument>(UriFactory.CreateDocumentUri(DatabaseId, CollectionId, id), new RequestOptions
            {
                PartitionKey = new PartitionKey(partitionKey)
            });

            return response.Document;
        }
        catch (DocumentClientException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
        {
            return default(TDocument);
        }
    }

    public async Task UpdateAsync(TDocument document)
    {
        try
        {
            document.UpdatedOn = DateTime.UtcNow;

            var condition = new AccessCondition
            {
                Condition = document.Etag,
                Type = AccessConditionType.IfMatch
            };

            await Client.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(DatabaseId, CollectionId, document.Id), document, new RequestOptions { AccessCondition = condition });
        }
        catch (DocumentClientException ex) when (ex.StatusCode == HttpStatusCode.PreconditionFailed)
        {
            // optimistic concurrency に失敗
            throw;
        }
    }

    public async Task DeleteAsync(TDocument document, string partitionKey)
    {
        try
        {
            var condition = new AccessCondition
            {
                Condition = document.Etag,
                Type = AccessConditionType.IfMatch
            };

            await Client.DeleteDocumentAsync(UriFactory.CreateDocumentUri(DatabaseId, CollectionId, document.Id), new RequestOptions
            {
                AccessCondition = condition,
                PartitionKey = new PartitionKey(partitionKey)
            });
        }
        catch (DocumentClientException ex) when (ex.StatusCode == HttpStatusCode.PreconditionFailed)
        {
            // optimistic concurrency に失敗
            throw;
        }
    }
}

何の変哲もない、基本的な実装のみ提供しています。特に説明も要らない気はしますが、Create 時に生成された Id を呼び出し元に反映するようにしました。この辺りは Entity Framework 的ですね。

更新処理はデフォルトで Optimistic Concurrency を有効にしています。同時実行制御を無効にする必要って正直あまりないかなと考えたので、こういう実装にしました。本来は専用の例外を用意するべきでしょうね。

そして先ほど作成した MemberDocument に対する Repository を作成してみます。

public class MemberRepository : AbstractRepository<MemberDocument>
{
    public MemberRepository(DocumentClient documentClient, string databaseId)
        : base(documentClient, databaseId, "Member")
    {
    }

    public async Task<MemberDocument> GetByEmailAsync(string email)
    {
        var feedOptions = new FeedOptions
        {
            MaxItemCount = 1,
            EnableCrossPartitionQuery = true
        };

        var documentQuery = Client.CreateDocumentQuery<MemberDocument>(UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId), feedOptions)
                                    .Where(x => x.Email == email)
                                    .AsDocumentQuery();

        return (await documentQuery.ExecuteNextAsync<MemberDocument>()).FirstOrDefault();
    }
}

コンストラクタでは DI で DocumentClient が注入されることを期待していて、databaseId は IOption とかで渡せば良いと思います*1。会員情報というのはメールアドレスで引きたいことが多いと思うので、専用のメソッドを追加しています。

パーティションとかはサンプルに付き検討してないので、とりあえず EnableCrossPartitionQuery = true でお茶を濁しました。この場合はパーティション固定でも良いかも知れません。

使い方など

ASP.NET Core アプリケーションで使う場合には DI を使って簡単に対応できます。ConfigureServices で DocumentClient と作成した Repository を Singleton として追加します。

本当なら接続先やプライマリキーは IConfiguration から取るようにしましょう。

public void ConfigureServices(IServiceCollection services)
{
    services.AddSingleton(provider =>
        {
            var connectionPolicy = new ConnectionPolicy
            {
                ConnectionMode = ConnectionMode.Direct,
                ConnectionProtocol = Protocol.Tcp
            };

            return new DocumentClient(new Uri("https://xxxx.documents.azure.com:443/"), "PRIMARYKEY", connectionPolicy);
        });

    services.AddSingleton<MemberRepository>();

    services.AddMvc();
}

必要なクラスを DI に登録したので、後はコントローラのコンストラクタで Repository を受け取るだけです。

テストなので大したコードではないですが、Repository を使うことでコレクション単位でのアクセスは扱いやすくなりました。裏側が SQL Database に変わっても違いはあまりないでしょう。

public class DemoController : Controller
{
    public DemoController(MemberRepository memberRepository)
    {
        _memberRepository = memberRepository;
    }

    private readonly MemberRepository _memberRepository;

    public async Task<IActionResult> Index()
    {
        var kazuakix = await _memberRepository.GetByEmailAsync("me@kazuakix.jp");

        kazuakix.LastName = "syachiku";

        await _memberRepository.UpdateAsync(kazuakix);

        return View();
    }
}

実際に仕事で Cosmos DB を使った時には Repository の上に Service 層を追加する形で実装しました。最近の仕事の中では上手く実装できた方かなと自分では思ってますが、次はもっと上手く作れそうです。

とまあ、長々と書いてきましたが、こういうのは既にみんな作ってますよね。私の Cosmos DB の初心者っぷりを発揮したところで、今回は終わりにしたいと思います。

*1:CollectionId と同様に定数でも良いのではないかとも思う