前から Durable Functions が面白そうだと思いつつ、Visual Studio 2017 15.5 が必要とかで中々試していなかったのですが、少し時間があったので一通り調べて動かしてみました。
当然ながら全ては .NET Core の v2 ランタイム向けで試してあります。まずはドキュメントを。
後は牛尾さんの Qiita エントリを読んでおくと基本的な知識が入るはずです。
Visual Studio 向け拡張もアップデートが進んで、Durable Functions が簡単に追加できるようになっているので Qiita に書かれた当時と比べて、さらにイケてる機能となっています。
元々 App Service も Azure Functions も Cloud Design Pattern の集大成という感じなのですが、Durable Functions は更にいろんなパターンが内部で使われていて面白いです。とりあえず話を進めます。
Durable Functions の作成とデプロイ
Visual Studio で Azure Functions プロジェクトを新規作成し、新規追加から Azure Function を選ぶとダイアログが表示されます。そこで Durable Functions Orchestration を選べば必要なものが追加されます。
ローカルでの実行はストレージが必要になるっぽいので、Azure Storage Emulator か Azure Storage の接続文字列を設定すればよいと思います。私は久し振りに Storage Emulator を起動しました。
Azure へのデプロイはこれまで通りですが、.NET Core (v2) を選んでいるのでベータ版のランタイムを有効化する必要があります。Visual Studio が教えてくれるので簡単です。
デプロイはこれで終わりです。Azure 環境に置いておいた方がログとか見やすいのでデプロイしておきました。次は Durable Functions を構成する要素を大雑把につまみ食いして説明します。
ちなみに Durable Functions で実装されているクラスはこれだけしかないので、簡単に理解できます。
分かりやすいようにサンプルコードベースで説明することにします。
オーケストレーターは起点
OrchestrationTrigger が付けられているものが Durable Functions ではオーケストレーターとして扱われます。
渡される DurableOrchestrationContext には後述するアクティビティを実行するメソッドや、更にオーケストレーターを実行するもの、外部からのイベントを待機したり、入力値を取得するものがあります。
[FunctionName("Function1")] public static async Task<List<string>> RunOrchestrator( [OrchestrationTrigger] DurableOrchestrationContext context) { var outputs = new List<string>(); // Replace "hello" with the name of your Durable Activity Function. outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "Tokyo")); outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "Seattle")); outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "London")); // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"] return outputs; }
オーケストレーターはべき等である必要があるので、内部でネットワーク・ファイル I/O などは厳禁です。行える処理は DurableOrchestrationContext を介したものだけです。タスクベースのインターフェースなので、特に説明もなく使えると思います。
CallActivityAsync を呼び出すと、Queue Storage を経由して実際のアクティビティが実行されます。実行が Queue を経由するという部分は非常に重要なポイントとなってきます。
アクティビティが実際の処理を行う
オーケストレーターではネットワーク・ファイル I/O などの、べき等ではない処理は行えませんがアクティビティでは書くことが出来ます。SQL DB や Cosmos DB へのアクセスも問題なく行えます。
アクティビティはスケーリングされて実行される可能性があるので、基本的にはステートレスで書いた方がパフォーマンスが良いです。状態が必要になるとも思いにくいですし。
[FunctionName("Function1_Hello")] public static string SayHello([ActivityTrigger] string name, TraceWriter log) { log.Info($"Saying hello to {name}."); return $"Hello {name}!"; }
本来ならネットワークアクセスや CPU を食う処理などを適切な粒度で行わせた方が良いのですが、あくまでもサンプルコードなので気にしない方向で。
タスクベースで抽象化されているので、アクティビティがどのように実行されているか考える必要がないですが、スレッドプールではなく別インスタンス上で実行される可能性もあることを頭に入れて、オーバーヘッドのことも考慮しつつ設計する必要があるでしょう。
非常に細かくアクティビティを作成することも出来ますが、同一プロセス内の別スレッドで実行するのと比べて、数桁のオーバーヘッドが発生する可能性も十分あります。そもそも Queue Storage で処理されるので数秒は遅延が発生します。
オーケストレーターを起動する
ドキュメント的にはオーケストレーターを直接実行できるみたいですが、何故か上手くいかなかったので HttpTrigger と StartNewAsync を使って実行するようにします。
StartNewAsync を実行後に返ってくるインスタンス ID を使ってオーケストレーターを識別するので、レスポンスとして返す必要があります。
[FunctionName("Function1_HttpStart")] public static async Task<HttpResponseMessage> HttpStart( [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")]HttpRequestMessage req, [OrchestrationClient]DurableOrchestrationClient starter, TraceWriter log) { // Function input comes from the request content. string instanceId = await starter.StartNewAsync("Function1", null); log.Info($"Started orchestration with ID = '{instanceId}'."); return starter.CreateCheckStatusResponse(req, instanceId); }
DurableOrchestrationClient には名前の通りオーケストレーターの開始、状態取得、終了を行うメソッドがあります。後述する RaiseEventAsync といったイベントを発火させるメソッドもあります。
Durable Functions は同期的に動く部分が存在しないので、インスタンス ID を使ってオーケストレーターの状態を取得することは重要です。
外部イベントを受け取る
面白いと思ったのが外部イベントの仕組みです。オーケストレーター内で WaitForExternalEvent を待機させると、Webhook か RaiseEventAsync でイベントが発火されるまでは待ち続けます。
公式のサンプルでは SMS を使った 2FA を実装していましたが、外部に DB などを必要することなく実装出来ていて興味深いサンプルとなっています。
使い方もタスクベースなので説明はほぼ不要ですね。イベント名だけ決めれば良いです。
var eventValue = await context.WaitForExternalEvent<string>("SampleEvent");
イベントを発火させるには HttpTrigger などで DurableOrchestrationClient の RaiseEventAsync を呼び出すのが簡単です。当然ながらインスタンス ID が必要なので保持しておく必要があります。
一緒に渡すパラメータは普通にクラスでも良いです。
await client.RaiseEventAsync(instanceId, "SampleEvent", "testvalue");
もう一つは Durable Functions が標準で提供している Webhook を使う方法です。
Webhook なので上手く使えば、GitHub にマージされたタイミングでリリース確認のメールを送信し、更にメール内のリンクを踏めばリリース処理が継続といったことも出来そうです。
POST http://{host}/runtime/webhooks/DurableTaskExtension/instances/{instanceId}/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=***
この辺りを活用すれば Logic Apps とほぼ同じことが実現できます。
スケーリングを検証
使っていて気になるのが大体スケーリング周りですが、Durable Functions は内部的に Queue Storage が使われているので、ステートレスなアクティビティを書いている限りは幾らでもスケールアウト出来そうです。
ドキュメントには仕組みが分かりやすく絵入りで書いてあったので必見ですね。
実際にどんな感じでスケールするのか確認したかったので、以下のようなオーケストレーターを書いてステートレスなアクティビティを大量に実行するようにしました。サンプルを少し変えただけです。
元々は CallActivityAsync の時点で await していたのを、最後に Task.WhenAll するように変えただけです。
public static async Task<IList<string>> Run([OrchestrationTrigger] DurableOrchestrationContext context) { var outputs = new List<Task<string>>(); for (int i = 0; i < 100; i++) { outputs.Add(context.CallActivityAsync<string>("SayHello", i.ToString())); } return await Task.WhenAll(outputs); } [FunctionName("SayHello")] public static string SayHello([ActivityTrigger] string name, TraceWriter log) { return $"Hello {name}! from {Environment.MachineName}"; }
Durable Functions のランタイムによって Azure Storage にいくつかキューが作成されます。
ドキュメントによるとステートレスなアクティビティの場合は workitems が使われるらしいので、スケーリングの上限は秒間 2000 メッセージといったところでしょうか。
オーケストレーターを実行すると workitems に大量のメッセージが追加されます。このキューの長さを元にして Azure Functions のランタイムがオートスケールを行ってくれます。
Azure Functions の Consumption プランでは何インスタンスが動作しているか確認しにくいですが、ARM Explorer を使えば現在のインスタンス ID が取得できるので大体は把握が可能です。
実際に確認するとインスタンス ID が 2 つ取れたので、2 インスタンスが割り当てられています。
今回試して分かったこととして、Azure Functions のオートスケールは思っていたよりも速いということでした。素早くインスタンスが増えて、処理が終わればあっという間にいなくなっていて驚きました。
最後に出力結果を確認しておきます。マシン名を追加したので一目でスケールしたことが分かります。
ARM Explorer 上では 2 インスタンスしか確認できませんでしたが、マシン名を見るとユニークなものが 4 つほどあるので、実際には 4 インスタンスで実行されたということになります。
普通に Azure Functions を直接使うより、Durable Functions を被せておいた方が有利な場面が多そうに感じました。こっちの方が洗練されたサーバーレスという感じがします。