既に何回も書いていますが Durable Functions は非常に便利で、並列処理や時間のかかる処理をスケーラブルかつ高い信頼性を保ったまま実行できます。
並列処理の場合は全体として処理時間は短くなる傾向にあるのでまだ良いのですが、時間のかかる処理を書いた場合にはどこまで処理が完了したのかというモニタリングが結構難しいです。
ドキュメントにもある非同期 HTTP API パターンで完了するのをポーリングで待機することは簡単に出来ますが、当然ながらそこには処理の進捗などは含まれていないです。
実際に最近書いた処理が入力によっては処理に時間がかかるもので、進捗がわからないと利用者側が不安になりそうだったので、いくつかの実装方法を検討して試しました。
Application Insights に送信されたログを KQL で調べることもできますが、それを組み込むのは結構面倒なのでシンプルな方法で攻めました。
Custom Status を利用するパターン
最初はだれでも考え付く Custom Status を使った方法です。ドキュメントにも Custom Status を使ってオーケストレーターの状態を返す方法が載っているので、割と手堅い方法ではあります。
以下のような単純なオーケストレーターの場合は、適度なタイミングで Custom Status に設定するだけなので実装も簡単です。とりあえず雑にパーセンテージを返すようにしました。
public class Function1 { [FunctionName("Function1")] public async Task<List<string>> RunOrchestrator( [OrchestrationTrigger] IDurableOrchestrationContext context) { var inputs = context.GetInput<string[]>(); var outputs = new List<string>(); for (var i = 0; i < inputs.Length; i++) { // Replace "hello" with the name of your Durable Activity Function. outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", inputs[i])); context.SetCustomStatus(new { progress = (i + 1) * 100 / inputs.Length }); } // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"] return outputs; } }
Custom Status はサイズ制限以内でシリアライズ可能なデータなら設定できるので、もっと複雑な情報を返すこともできます。とはいえ乱用しないように注意したいところです。
実際に上のオーケストレーターを動かしてステータスチェック API を叩くと値が返ってきます。
実行時間の長い処理の場合は、このステータスチェック API を叩き続けることになるので、そこに進捗の情報が含まれているというのは扱いやすいです。
Durable Entities を利用するパターン
シンプルなオーケストレーターの場合は単純でしたが、サブオーケストレーターを使っている場合には Custom Status を使う方法は利用できません。
理由としてはオーケストレーター間で Custom Status は分離されているので、サブオーケストレーター内で Custom Status を設定しても親のオーケストレーターのステータスチェック API には含まれないからです。
オーケストレーター間でデータを安全に保持する方法として Durable Entities を使うことにしました。
Durable Entities では受信したメッセージは 1 つずつ順番に実行されます。なので複数のメッセージが同時に送信されても競合することなく、状態のアップデートが可能というメリットがあります。
サブオーケストレーターを使う場合は並列実行させることが多いはずなので、Durable Entities が持つこの特徴はかなり都合が良いものです。今回は以下のような Entity を用意しました。
[JsonObject(MemberSerialization.OptIn)] public class ProgressEntity { [JsonProperty("currentItem")] public int CurrentItem { get; set; } [JsonProperty("totalItem")] public int TotalItem { get; set; } public int Progress => CurrentItem * 100 / TotalItem; public void Reset(int totalItem) { CurrentItem = 0; TotalItem = totalItem; } public void Increment() => CurrentItem += 1; [FunctionName(nameof(ProgressEntity))] public static Task Run([EntityTrigger] IDurableEntityContext context) => context.DispatchAsync<ProgressEntity>(); }
もっと複雑な状態を持たせることも出来ますが、考え方は変わらないのでシンプルに全体の件数と処理済みの件数を保持して、パーセンテージだけを返す Entity になります。
それぞれのオーケストレーターでは Entity に対してメッセージを投げるだけです。最初に全体の処理件数をセットしたいので、親オーケストレーターでは最初に Reset
を投げています。このタイミングで新しい Entity が作成されます。
public class Function2 { [FunctionName("Function2")] public async Task<List<string>> RunOrchestrator( [OrchestrationTrigger] IDurableOrchestrationContext context) { var inputs = context.GetInput<string[]>(); var outputs = new List<string>(); await context.CallEntityAsync(new EntityId(nameof(ProgressEntity), context.InstanceId), "Reset", inputs.Length); foreach (var input in inputs) { // Replace "hello" with the name of your Durable Activity Function. outputs.Add(await context.CallSubOrchestratorAsync<string>("Function2_Sub", input)); } // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"] return outputs; } [FunctionName("Function2_Sub")] public async Task<string> RunSubOrchestrator( [OrchestrationTrigger] IDurableOrchestrationContext context) { var input = context.GetInput<string>(); var output = await context.CallActivityAsync<string>("Function2_Hello", input); context.SignalEntity(new EntityId(nameof(ProgressEntity), context.ParentInstanceId), "Increment"); return output; } }
サブオーケストレーターでは処理が完了したタイミングで SignalEntity
でメッセージを非同期で投げています。EntityId
として親の InstanceId
を使っているので、ParentInstanceId
を見るようにします。
Entity の状態は API でも確認できるので、直接読み取ってみます。API は以下のドキュメントにあります。
実行すると Entity の状態がちゃんと返ってきました。シリアライズ対象のデータのみ返ってきます。
実際には Custom Status を使った時のように API を用意して、欲しいデータだけ返すという方法が現実的です。Entity の状態は InstanceId
があれば取れるので、作るのは簡単です。
[FunctionName("Function2_HttpPolling")] public async Task<IActionResult> HttpPolling( [HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequest req, [DurableClient] IDurableClient starter) { var instanceId = req.Query["instanceId"]; var state = await starter.ReadEntityStateAsync<ProgressEntity>(new EntityId(nameof(ProgressEntity), instanceId)); return new OkObjectResult(new { state.EntityState.Progress }); }
上の API を実行すると、今度はパーセンテージだけが返ってくるようになります。
本来なら Durable Functions のステータスチェック API をオーバーライドするのが正しい方向ですが、カスタマイズではなく置き換えになってしまうので少し面倒です。
仕事では Durable Entities を使って実装しましたが、結局は標準の実装を捨てて独自に API を用意しました。
SignalR Service を利用するパターン
これまでは非同期 HTTP API のパターン上で実装してきましたが、ポーリングではなくプッシュで情報が欲しいときもあるはずです。多少毛色が異なりますが、SignalR Service を使った方法も試しました。
SignalR Service の出力バインディングを使えば、アクティビティ関数からも簡単にクライアントに対して情報をプッシュできるので、SPA と組み合わせている場合には割と有用な気がします。
public class Function3 { [FunctionName("Function3")] public async Task<List<string>> RunOrchestrator( [OrchestrationTrigger] IDurableOrchestrationContext context) { var inputs = context.GetInput<string[]>(); var outputs = new List<string>(); foreach (var input in inputs) { // Replace "hello" with the name of your Durable Activity Function. outputs.Add(await context.CallActivityAsync<string>("Function3_Hello", input)); } // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"] return outputs; } [FunctionName("Function3_Hello")] public async Task<string> SayHello( [ActivityTrigger] string name, [SignalR(HubName = "progress")] IAsyncCollector<SignalRMessage> signalRMessages, ILogger log) { await Task.Delay(TimeSpan.FromSeconds(5)); log.LogInformation($"Saying hello to {name}."); await signalRMessages.AddAsync(new SignalRMessage { Target = "updateStatus", Arguments = new object[] { name } }); return $"Hello {name}!"; } }
適当に Vue.js で画面を作って、SignalR Service から送信されたデータを表示するようにしました。処理が完了したタイミングで、ほぼリアルタイムに更新されるのは便利です。
この方法の課題としては、処理全体の結果を把握するためには工夫が必要という点です。
別途これまで通りポーリングするか、オーケストレーターでエラーハンドリングをきっちり実装して通知させないと、画面側に処理が失敗したことを伝えられないです。上手く使い分けましょう。