しばやん雑記

Azure とメイドさんが大好きなフリーランスのプログラマーのブログ

Azure Container Apps で Event Hubs を利用した場合にスケールアウトし続ける問題を直す

タイトルの通りなのですが、Azure Container Apps で Event Hubs からデータを受信する処理を Dapr と KEDA を使って実装した場合に、何故か常に最大までスケールアウトし続けてしまう問題に遭遇しました。

調べたところ Issue が上がっていて、同様の問題に遭遇している人は意外にいるようでした。

Azure Functions では Event Hubs からメッセージを読み取る場合には EventHubTrigger を使えば、バインディングとスケーリングの両方を Azure Functions のランタイムとスケールコントローラーが連携して上手く扱ってくれていましたが、Container Apps の場合はバインディングは Dapr で、スケーリングは KEDA というようにそれぞれで設定する必要があります。

Dapr の設定は Container Apps Environment に対して Dapr Component として追加する形になります。Event Hub と Storage Account を作成してリファレンスの通りに設定を追加すれば、アプリケーションにメッセージが HTTP や gRPC で送信されます。

Entra ID 認証に対応しているので項目数はぱっと見多いですが、実際に必要なものは意外に少ないです。

Dapr Component の設定が終われば、次は KEDA のスケールルールを追加して Event Hub の未処理メッセージ数によって指定したレプリカ数の範囲でスケーリングが行われるようにします。

こちらも項目が色々ありますが、例によって Entra ID 認証の場合に必要な項目も多いので最小限の設定で済ませます。今回の肝となるのが checkpointStrategy の値になりますが、Event Hub からのメッセージ読み取りには Dapr Pub/Sub を使うので dapr を設定しました。

ここまでの設定で Event Hub のメッセージを Dapr で読み取って、KEDA でスケーリングが行えるようになったはずです。後は Container App 側で Dapr を有効化して、Dapr SDK を組み込んだアプリケーションをデプロイすれば動作が確認出来ます。

ちなみに Dapr 経由で Event Hub のメッセージを受信するアプリケーションは、ASP.NET Core の Minimal API で書くと以下のようになります。Dapr の Pub/Sub バインディングを利用するので Topic 属性を使って Dapr Component 名と Event Hub 名を指定しています。

処理速度が速すぎるとスケールアウトしない可能性があるので、あえて遅延を入れて Event Hub 側に未処理メッセージが貯まりやすいようにしています。

using System.Text.Json;

using Dapr;

var builder = WebApplication.CreateBuilder(args);

var app = builder.Build();

app.UseCloudEvents();
app.MapSubscribeHandler();

app.MapPost("/orders", [Topic("eventhubs-pubsub", "evh-scale-test")] async (JsonElement order) => {
    Console.WriteLine("Subscriber received : " + order);
    await Task.Delay(1000);
    return Results.Ok(order);
});

app.Run();

地味にはまったのが Event Hub に送信するメッセージのフォーマットは CloudEvents じゃないと Dapr 側でエラーになることでした。CloudEvents 形式には詳しくないので、Dapr の Pub / Sub ドキュメントにあったサンプル JSON をそのまま使って対応しました。

最近の Event Hubs は SDK を使って送信用アプリを作ることなく、Azure Portal からメッセージを最大 100 まで送信できるので、今回のようなケースでは非常に便利です。あえて贅沢を言えば事前に用意された Dataset に CloudEvents が欲しかったところです。

Azure Portal から Event Hub に対して CloudEvents 形式のメッセージを送信すると、即座に KEDA が新しくレプリカを起動し始めるのですが、何故か少ないメッセージを送った場合であっても、以下のように最大のレプリカ数にまでスケールアウトが行われます。

更に以下のグラフは Azure Monitor から確認出来る Event Hub のメッセージ処理数と Container App のレプリカ数の比較ですが、Event Hub では 1 件もメッセージが処理されていないのに対して、Container Apps のレプリカ数はスケールインすることなく常に最大を保っていることが確認出来ます。

現状の設定ではスケールアウトし続けてしまいますが、Event Hub のメッセージ TTL を過ぎて受信するメッセージが無くなったタイミングでスケールインが行われることは確認したので、KEDA が正しく Event Hub の未処理メッセージ数を認識できていない可能性が高いと考えました。

Event Hubs の仕組みとして設定された TTL まではメッセージは全て保持されているので、チェックポイントを Azure Storage に保存することでクライアントがどのメッセージまで読み取ったのかを保持しています。

Dapr で設定した Azure Storage のコンテナーを確認すると、以下のようなフォーマットで Event Hub のパーティション数のファイルが作成されているはずです。これがチェックポイントとなるので、KEDA はこのファイルを読まないと未処理メッセージ数を正しく判断できなくなります。*1

しかし、KEDA の設定では同じ Azure Storage とコンテナーを設定していて、Dapr を使って Event Hub からメッセージを読み込んでいるので checkpointStrategy の値に問題があるとは思いませんでした。

仕方ないので KEDA のソースコードを読んで、checkpointStrategy の値によってどのような実装になっているのかを確認しました。チェックポイントに関連する実装は以下のファイルにまとまっています。

実装を確認したところ checkpointStrategy の値によって Blob 上のパス形式が大きく異なっていることが分かりました。それぞれの違いは以下にまとめたので参考にしてください。

  • azureFunction
    • /azure-webjobs-eventhub/<EventHubNamespace>/<EventHubName>/<ConsumerGroup>/<PartitionId>
  • blobMetadata
    • /<ContainerName>/<EventHubNamespace>/<EventHubName>/<ConsumerGroup>/checkpoint/<PartitionId>
  • goSdk
    • /<ContainerName>/<PartitionId>
  • dapr
    • /<ContainerName>/dapr-<EventHubName>-<ConsumerGroup>-<PartitionId>
  • default
    • /<ContainerName>/<EventHubName>/<PartitionId>

ここまでで分かったことは、現状の Container Apps で使われている Dapr は blobMetadata の形式でチェックポイントを保存しているということです。そうなると dapr 設定の存在意義が一気にわからなくなるのですが、Azure SDK for Go 側の実装が変わったなどの理由がありそうな予感でした。

最終的には checkpointStrategy の値を blobMetadata に設定すると無事にスケールインが行われるようになりました。最終的な Dapr と KEDA の設定について参考までに共有しておきます。

再度この状態で Azure Monitor で確認出来る Event Hub のメッセージ処理数と Container App のレプリカ数でグラフを描いてみると、メッセージが送信された場合にはスケールアウトされますが、暫くすると 0 にまでスケールインされていることが分かります。

正直なところ checkpointStrategydapr は混乱させるだけの予感なので、削除も検討した方が良いのではと思っています。時間のある時にでも Issue で聞いてみたいところです。

Azure Container Apps では Dapr と KEDA の両方で同じような設定をしないといけないのが結構苦痛だったのと、手動では非常に間違いやすい部分だと感じたので利用する場合には IaC を必須にするか、Azure Functions の Container Apps ホスティングを利用した方が安全だと思いました。

ドキュメントにもあるように、Azure Functions の Container Apps ホスティングではトリガーによって自動でスケーリングの設定を行ってくれるようなので、今回のようにはまる部分は大幅に減るはずです。

*1:つまり TTL が切れるまでは大量にメッセージが残ったという状態になる