前回は Durable Functions について大雑把に試したりしていましたが、ちょうど今仕事で Durable Functions を適用するのにちょうどよいタスクがあったので、実際に使ってみることにしました。
内容はタイトルの通りです。Durable Functions については前回のエントリを。
あまり実装したくない部類の機能ですが、どうしても必要となるのがデータのエクスポート機能だったりします。一瞬で終わるようなデータ量なら良いのですけど、こういうのに限って大量のデータだったり、複雑なデータ構造を要求されたりします。
今回はデータ量はそこまで多くないはずですが、全て Cosmos DB で作られているので考慮せずにクエリを投げまくると RU が爆発して即死します。今回の機能は要件としては以下のような感じです。
- 時間はかかっても良いからリソースを集中して消費しないように
- エラーが発生しても柔軟にリトライしたい
- ファイルの準備が完了したかどうか確認したい
- 間違ったときのためにキャンセルもしたい
Durable Functions を使うだけで上の 4 つを全て満たす処理を書くことが出来るわけです。
今回のデータエクスポートで必要な処理は以下の通りです。
- エクスポートの対象をリストアップする
- 実際のエクスポートするデータを取得する
- データを整形して Blob Storage に書き出す
流石に本番のコードを出すことは出来ないので、同じようなサンプルを用意したので各アクティビティ単位で見ていくことにします。サンプルなのでわざと Task.Delay を入れてます。
エクスポートする対象を抽出するアクティビティ
対象を抽出するためのアクティビティです。サンプルなので適当に10 件分のデータを返しています。並列処理が必要な場合はいい感じに分割しても良いと思います。
[FunctionName("AggregateTargets")] public static async Task<string[]> AggregateTargets([ActivityTrigger] DurableActivityContext context) { await Task.Delay(TimeSpan.FromSeconds(10)); return Enumerable.Range(0, 10).Select(x => $"target-{x}").ToArray(); }
単純に 1 クエリ引くだけで終わる処理の場合は Durable Functions にする必要はないわけです。複数のテーブルやコレクションから引っ張ってくるからこそ、今回 Durable Functions を使うように実装しました。
実際のデータを取得するアクティビティ
実際のデータを何かしらのストレージから取得するアクティビティです。仕事では全て Cosmos DB なので地味につらかったですが、今回はあくまでもサンプルなので入力値を適当に加工して返すだけです。
ActivityTrigger では DurableActivityContext を指定することも出来るので、覚えておいて損はないです。実行中の InstanceId が必要になった場合には、このコンテキスト経由で取得できます。
[FunctionName("GetDataFromStorage")] public static async Task<string> GetDataFromStorage([ActivityTrigger] DurableActivityContext context) { var input = context.GetInput<string>(); await Task.Delay(TimeSpan.FromSeconds(5)); return $"{input}: sample data"; }
Durable Functions では一度実行されたアクティビティの結果は Table Storage に保存されるようになってますが、当然ながら Table Storage のエンティティサイズの上限という壁があることをお忘れなく。
Table Storage では 1MB 以上のデータは保存できないので、アクティビティで巨大なデータを返すと死にます。その場合はいったんファイルに書き出して、そのパスを返すなどしましょう。
Storage Blob へ書き出すアクティビティ
取得したデータを整形して Blob Storage に保存するアクティビティです。Blob を使うのは地味に面倒ですが、Azure Functions なら Binder を使って楽が出来ます。
[FunctionName("WriteToBlobFile")] public static async Task WriteToBlobFile([ActivityTrigger] DurableActivityContext context, Binder binder) { var outputs = context.GetInput<IList<string>>(); var filePath = "outputs/sample.txt"; using (var destination = await binder.BindAsync<CloudBlobStream>(new BlobAttribute(filePath, FileAccess.Write))) using (var writer = new StreamWriter(destination)) { foreach (var value in outputs) { writer.WriteLine(value); } } }
GetInput を使ってデータを取得して、BindAsync を使って Blob へ書き込んでいきます。拍子抜けするぐらい簡単なコードで実現することが出来ました。
アクティビティの入力も Table Storage に保存されているので、出力と同様にサイズ制限があることをお忘れなく。1MB が上限といっても、他のカラムにもデータが入っていて目減りするので注意。
アクティビティを実行するオーケストレーター
最後は作成したアクティビティを実行していくオーケストレーターを用意します。といっても大したコード量ではないですし、await で分かりやすいので説明は不要な気がします。
[FunctionName("ExportFile")] public static async Task<int> RunOrchestrator([OrchestrationTrigger] DurableOrchestrationContext context) { var targets = await context.CallActivityAsync<string[]>("AggregateTargets"); var outputs = new List<string>(); foreach (var target in targets) { outputs.Add(await context.CallActivityAsync<string>("GetDataFromStorage", target)); } await context.CallActivityAsync("WriteToBlobFile", outputs); return outputs.Count; }
今回は並列処理をさせたくなかったので foreach の中で await して結果を待つようにしています。これで Durable Functions が順番を保ったままアクティビティを実行してくれます。
動作を確認する
コードを完成したので実際に動かしてテストします。とりあえずサクッとオーケストレーターを起動。
起動に成功すると 202 とオーケストレーターの状態を確認できる URL などが返ってきます。この URL を叩けばオーケストレーターの状態を JSON で取得できます。
実際に叩いてみると Running と返ってきました。ちゃんと動いているようですね。
地味に起動の応答は素早く返して、非同期で実行させながら状態を確認する API を作るのはめんどくさいですが、Durable Functions ならデフォルトで用意されてます。
何回か確認しているうちに runtimeStatus が Completed に変わりました。そして output には 10 が返ってきていますが、これは実際に処理された件数となります。
オーケストレーターの戻り値が output の値になるので、正常終了時に追加の情報を返すことも簡単です。
Storage Explorer で Blob を確認すると、ファイルがちゃんと生成されています。
ダウンロードして開いてみると、実行順を保ったまま書き出されていることが確認できます。
これまで Azure Functions では画像のサイズ変換などを行うサンプルが多かったですが、Durable Functions を使うと処理の状態や結果を返すように出来ますね。処理を中断出来るのもポイントです。
補足 : シングルトンで処理を行う
良くある話として、同じデータに対する処理を同時に実行させたくないことが多いと思います。特に Durable Functions のように非同期でオーケストレーターが走る場合には簡単に並列実行が出来てしまいます。
同じデータへの並列実行を防ぐためにシングルトンでの実行をさせます。Durable Functions のシングルトンは単純でインスタンス ID を入力値から一意に決めてしまえば実装出来ます。
エクスポートする場合には ID や日付などで範囲を指定することがあると思うので、そういったデータからハッシュを計算してインスタンス ID とすれば良いでしょう。
[FunctionName("ExportFile_HttpStart")] public static async Task<HttpResponseMessage> HttpStart([HttpTrigger(AuthorizationLevel.Anonymous, "post")] HttpRequestMessage req, [OrchestrationClient] DurableOrchestrationClient starter, TraceWriter log) { // POST されたデータから InstanceId を一意に生成 var instanceId = "..."; var existingInstance = await starter.GetStatusAsync(instanceId); if (existingInstance == null) { // Function input comes from the request content. await starter.StartNewAsync("ExportFile", instanceId, null); log.Info($"Started orchestration with ID = '{instanceId}'."); return starter.CreateCheckStatusResponse(req, instanceId); } return req.CreateErrorResponse(HttpStatusCode.Conflict, $"An instance with ID '{instanceId}' already exists."); }
これで同じパラメータに対して、複数のオーケストレーターが実行されることは無くなりました。Durable Functions はもっと早くから触っておけばよかったと実感する日々です。