2022 年 5 月の Azure SDK アップデートを確認していると、Event Hubs SDK に Buffered Producer Client が追加されたという情報を得ました。明らかに面白そうなので、実際に動かして挙動を確認してみました。
必要な Event Hubs SDK はバージョン 5.7.0 になります。このエントリでは全て C# 向けで確認していますが、他の言語にも用意されているのかは未確認です。
実際には Preview として少し前から使えるようになっていたみたいですが、今回のバージョン 5.7.0 のリリースで正式版になったようです。
この Buffered Producer Client が追加された理由などは、GitHub で公開されている設計ドキュメントに記載されています。API 名が若干異なる部分はありますが、基本はこのドキュメントの通りです。
要約すると Event Hubs SDK には 1 つのイベントを送信する機能と、バッチとして複数イベントをまとめて送信する機能が別々で用意されていますが、バッチをアプリケーション開発者が上手く管理して使うのは難しいので、バッファリングを追加することで SDK レベルでバッチで送信可能にするという実装です。
当然ながら 100 イベントを 1 つずつ送信するよりも、1 度に送信した方が効率が良いので、バッファリングによる遅延が許容できるシナリオではかなり有用だと思います。設計ドキュメント曰く Event Hubs 以外のメッセージング SDK には、全てでバッファリングが用意されているようです。
リファレンスはまだ正式リリース版に更新されていませんが、Preview バージョンが用意されています。
まずは基本的なコードを書いて、どのような挙動になっているのかを確認しておきます。通常の Event Hubs Producer Client とはイベント送信に利用するメソッドが異なります。
イベント送信に使うメソッド名から分かるように、このメソッドを呼び出しても即時送信されずバッファに追加されるだけです。実際のイベント送信はバックグラウンドかつ非同期で実行されるので、エラーハンドリングのために SendEventBatchFailedAsync
イベントの登録が必須になっています。
この EnqueueEventAsync
メソッドは送信を行わず基本的に失敗しないので、Fire and Forget と同じ使い方が出来ます。Event Hubs にテレメトリを送信する場合に完了まで待ってしまうと、アプリケーションの処理自体が遅くなるので本末転倒ですが、Buffered Producer Client を使うと簡単に回避できます。
using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer; var connectionString = "EVENT_HUB_CONNECTION_STRING"; var producer = new EventHubBufferedProducerClient(connectionString); producer.SendEventBatchSucceededAsync += arg => { Console.WriteLine($"{DateTime.Now}: PartitionId = {arg.PartitionId}, Batch Size = {arg.EventBatch.Count}"); return Task.CompletedTask; }; producer.SendEventBatchFailedAsync += arg => { Console.WriteLine($"{DateTime.Now}: PartitionId = {arg.PartitionId}, Batch Size = {arg.EventBatch.Count}"); return Task.CompletedTask; }; Console.WriteLine("Started"); try { for (var i = 0; i < 100; i++) { var eventData = new EventData($"test-{i}"); await producer.EnqueueEventAsync(eventData); await Task.Delay(TimeSpan.FromMilliseconds(50)); } } finally { await producer.CloseAsync(); } Console.WriteLine("Completed");
バッファリングされている関係上、アプリケーションが終了する前には CloseAsync
や DisposeAsync
メソッドを呼び出す必要があります。DI を使っていると問題ないですが、インスタンスを自前管理する場合には呼び出しておかないとイベントが失われる可能性があります。
デフォルト設定では 1 秒間隔でバッファリングされたイベントを送信するようになっているので、このコードを実行すると以下のように 1 秒間隔で送信完了のイベントが呼び出されます。
そして受け取る側を Azure Functions の EventHubTrigger を使って適当に実装してみたところ、こちらも 1 秒間隔でイベントが受信されていることが確認出来ました。
送信時と受信時のバッチサイズは一致しないので数は異なっています。Event Hubs にはパーティションという概念もあるので、バッチサイズに依存した処理を書くのは NG です。
内部実装として System.Threading.Channels
が使われているので、信頼性とパフォーマンス面での不安はありません。パーティション単位で Channel<T>
をキャッシュする実装になっているので、上手く使っている印象を持っています。個人的にも勉強になりました。
先ほどのサンプルではデフォルト設定のまま使いましたが、Buffered Producer Client にはパフォーマンスとスループットに関係するオプションがいくつか用意されています。
変更する可能性が高いのは MaximumWaitTime
と MaximumEventBufferLengthPerPartition
だと思います。
中でも MaximumWaitTime
は名前の通りバッファリングされたイベントを送信する間隔です。これがデフォルトでは 1 秒になっているので、アプリケーションの特性に合わせて変更可能です。
例えば以下のように Buffered Producer Client 作成時に MaximumWaitTime
を 5 秒に変更してみます。
var producer = new EventHubBufferedProducerClient(connectionString, new EventHubBufferedProducerClientOptions { MaximumWaitTime = TimeSpan.FromSeconds(5) });
実行してみるとイベントの送信が 5 秒間隔に変更されたことが確認出来ます。
もう一つの MaximumEventBufferLengthPerPartition
はパーティション単位でのバッファリングする最大イベント数となります。デフォルトでは 1500 になっているので十分だと思いますが、メモリ使用量に関わってくる値なのでイベントのサイズによっては調整した方が良いケースもありそうです。
ドキュメントによるとイベント数の上限に達した場合は、Enqueue 系のメソッドがバッファに空きが出来るまでブロッキングされるとありますが、自分が検証した範囲ではそのような挙動になりませんでした。内部では Channel<T>
を使っているのでブロッキングするはずなのですが、これが不具合なのかはよくわかっていません。ただし追加していってもイベントが欠落することはありませんでした。
最後にバッファリングとバッチの違いについて説明されているドキュメントを共有しておきます。
要約するとメソッド呼び出し時に失敗したかどうかが分かるのがバッチ、分からないのがバッファリングという感じです。用途に合わせて上手く使い分けていきましょう。