しばやん雑記

Azure Serverless とメイドさんが大好きなフリーランスのプログラマーのブログ

Durable Functions の Status Query API を独自に実装する

Durable Functions は最強に便利なので適した部分にはガンガン使っていっているのですが、REST API として提供したい場合には Internal な API が漏れ出てきてしまうので、実行結果のレスポンスが微妙にハンドリングしにくいと思うことが増えてきました。

具体的な例を挙げると、テンプレートから Durable Functions Orchestration を作成したままの API を実行した結果が以下になります。この中で一般的に必要となるのは statusQueryGetUri だけです。

f:id:shiba-yan:20200429001208p:plain

それ以外のデータは特に必要なものではなく、中には返す必要がないものも含まれています。

statusQueryGetUri と同じ URL は Location ヘッダーにも含まれているので、202 Accepted を返している場合はヘッダーの方を見るのが REST 的にも正しいでしょう。

そして statusQueryGetUri を実行した時のレスポンスは以下のようになっています。デバッグ用途としては有益な情報が多いですが、公開用の API と考えると不要な情報が多いです。

f:id:shiba-yan:20200429001215p:plain

Orchestrator の出力は output になります。WaitForCompletionOrCreateCheckStatusResponseAsync を使った時のレスポンスとは異なっていますし、もっとシンプルな情報を返すだけにしたいです。

Durable Functions では非同期 HTTP API だけではなく、同期 HTTP API も簡単に作ることが出来ます。単純に Orchestrator の完了まで待機してるだけなので、数秒で完了することが分かっている時のみに使います。

とまあ、前置きはこれくらいにして実際に Status Query API と同じ処理を独自に実装してみることにします。これまでに現状 API の不満点を挙げているので、これらを一気に解決するようにします。

Azure Functions で HTTP API を作る場合、別 Function の絶対 URL を生成するのが割と手間なので、以前に作った HTTP API を作りやすくするパッケージを使ってサクッと実装していきます。

202 Accepted を返すヘルパーは用意されているので、Location ヘッダーにセットしたい Function 名を指定するだけで良い感じのレスポンスを生成してくれます。

テンプレートから作成した Orchestrator / Activity を修正して、独自の Status Query API を使うように書き換えたのが以下の例になります。見た目は長いですが重要なのは Function1_HttpStartFunction1_HttpPoll の二つです。コメントを入れたので特に説明不要だと思います。

public class Function1 : HttpFunctionBase
{
    public Function1(IHttpContextAccessor httpContextAccessor)
        : base(httpContextAccessor)
    {
    }

    [FunctionName("Function1")]
    public async Task<List<string>> RunOrchestrator(
        [OrchestrationTrigger] IDurableOrchestrationContext context)
    {
        var outputs = new List<string>();

        // Replace "hello" with the name of your Durable Activity Function.
        outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "Tokyo"));
        outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "Seattle"));
        outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "London"));

        // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
        return outputs;
    }

    [FunctionName("Function1_Hello")]
    public string SayHello([ActivityTrigger] string name, ILogger log)
    {
        log.LogInformation($"Saying hello to {name}.");
        return $"Hello {name}!";
    }

    [FunctionName("Function1_HttpStart")]
    public async Task<IActionResult> HttpStart(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "function1/start")] HttpRequest req,
        [DurableClient] IDurableOrchestrationClient starter,
        ILogger log)
    {
        // Function input comes from the request content.
        string instanceId = await starter.StartNewAsync("Function1", null);

        log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

        // 202 Accepted と Location に polling 用の URL をセットして返す
        return AcceptedAtFunction("Function1_HttpPoll", new { instanceId }, null);
    }

    [FunctionName("Function1_HttpPoll")]
    public async Task<IActionResult> HttpPoll(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "function1/poll/{instanceId}")] HttpRequest req,
        string instanceId,
        [DurableClient] IDurableOrchestrationClient starter,
        ILogger log)
    {
        var status = await starter.GetStatusAsync(instanceId);

        // instanceId が見つからない場合は 404
        if (status == null)
        {
            return NotFound();
        }

        switch (status.RuntimeStatus)
        {
            // 実行中の場合は再度 202 と polling 用 URL をセットして返す
            case OrchestrationRuntimeStatus.Running:
            case OrchestrationRuntimeStatus.Pending:
            case OrchestrationRuntimeStatus.ContinuedAsNew:
                return AcceptedAtFunction("Function1_HttpPoll", new { instanceId }, null);

            // 失敗した場合は 500 とエラーメッセージを返す
            case OrchestrationRuntimeStatus.Failed:
                return StatusCode(StatusCodes.Status500InternalServerError, status.Output);

            // 完了、または停止中の場合は 200 と Orchestrator の出力を返す
            case OrchestrationRuntimeStatus.Completed:
            case OrchestrationRuntimeStatus.Canceled:
            case OrchestrationRuntimeStatus.Terminated:
                return Ok(status.Output);

            // 不明な状態の場合は 500 とメッセージを返す
            default:
                return StatusCode(StatusCodes.Status500InternalServerError, status.Output);
        }
    }
}

202 Accepted の処理は AcceptedAtFunction を呼ぶだけなので非常にシンプルです。気に入ってます。

Function1_HttpPoll の実装は Durable Functions を参考にしています。と言っても RuntimeStatus のハンドリングを既存実装に合わせたかったというだけの事情です。

新しく用意した API を使って Orchestrator を開始すると、以下のように 202 Accepted と Location が返ってきます。Durable Functions の実装では Retry-After も返ってきますが固定値なので省略しました。

f:id:shiba-yan:20200429001257p:plain

Location ヘッダーで返ってきた URL を実行すると、Orchestrator が完了している場合は 200 と出力が返ってきます。内部の状態は返さないので扱いやすく、Swagger / OpenAPI でのモデリングも簡単になります。

実行中の場合には 202 Accepted と Location が再び返ってくるので、クライアントではループなどで 202 以外が返ってくるまで単純に実行し続ければよいです。

f:id:shiba-yan:20200429001307p:plain

そして適当に例外を投げて Orchestrator を失敗させると 500 とエラーメッセージが返ってきます。雑なハンドリングなのでべた書きで返ってきますが、RFC 7807 に従った形で返す方が良いかもしれません。

f:id:shiba-yan:20200429001546p:plain

一般的な REST API のレスポンスと同じ挙動になったので扱いやすくなりました。特に Swagger / OpenAPI で定義を出力する際に Durable Functions の API を定義するのは大変なので、独自の API を用意してクライアントに最適化した方が圧倒的に便利です。

テンプレ的に用意しておくと色々と応用が利くので楽だと思いました。参考にしてみてください。