基本的にほとんどの Azure リソースには Azure Monitor が組み込まれていて、そのリソースのログを Log Analytics や Blob に保存することが出来るようになっています。一般的には Log Analytics に保存してクエリ出来るようにすることが多いと思いますが、今回は Event Hubs へ送信して加工してみるという話です。
この Event Hubs への送信機能はサードパーティーのモニタリングサービス向けに使うことが多いようですが、当然ながら Azure Functions などを使って Event Hubs を読み取って使うことが可能です。この機能を使えばリアルタイムで Blob Storage のストレージ利用量や転送量をモニタリング出来るのではないかと考えたのが今回の切っ掛けです。
Azure Monitor から送信されるデータは JSON になっているので簡単に扱えますが、リソースによって送信される JSON スキーマが異なるので、汎用的な作りにするのではなく型を定義して使うのが良いです。
今回使う予定の Azure Storage Blob から送信されるログのデータ構造は以下のドキュメントに記載されていますが、実際に送信されたデータをキャプチャして確認した方が安全だと思います。
ドキュメントから JSON に対応するクラスを作るのが面倒だったので、今回は実際に Event Hubs に流れてきた JSON を使って C# のクラスに変換しました。ネストしているのでクラスが複数生成されますが、データ型含めてまあまあの出来でした。
本来であればプロパティの命名を Pascal Case にしたいところですが、変換するのが面倒だったのでそのままにしています。GitHub Copilot Chat を使うとある程度は変換してくれますが、変換後のプロパティ名周りの検証が面倒だったので止めておきました。
public class RootObject { public Record[] records { get; set; } } public class Record { public DateTime time { get; set; } public string resourceId { get; set; } public string category { get; set; } public string operationName { get; set; } public string operationVersion { get; set; } public string schemaVersion { get; set; } public int statusCode { get; set; } public string statusText { get; set; } public double durationMs { get; set; } public string callerIpAddress { get; set; } public string correlationId { get; set; } public Identity identity { get; set; } public string location { get; set; } public Properties properties { get; set; } public Uri uri { get; set; } public string protocol { get; set; } public string resourceType { get; set; } } public class Identity { public string type { get; set; } } public class Properties { public string accountName { get; set; } public string userAgentHeader { get; set; } public string serviceType { get; set; } public string objectKey { get; set; } public string conditionsUsed { get; set; } public string metricResponseType { get; set; } public double serverLatencyMs { get; set; } public long requestHeaderSize { get; set; } public long responseHeaderSize { get; set; } public string tlsVersion { get; set; } public string sourceAccessTier { get; set; } public string referrerHeader { get; set; } public long requestBodySize { get; set; } public long responseBodySize { get; set; } public long contentLengthHeader { get; set; } }
まずは定義したクラスと Azure Functions の EventHubTrigger を使って、実際にどのようなデータが Blob Storage から送信されてくるかを確認しました。Function の実装は以下のようにシンプルにログに書き出すだけなので難しくありません。ログには OperationName やサイズを書きだすようにしています。
public class Function1(ILogger<Function1> logger) { [Function(nameof(Function1))] public async Task Run([EventHubTrigger("evh-stream-01", Connection = "EventHubConnection")] EventData[] events) { foreach (var @event in events) { var data = @event.EventBody.ToObjectFromJson<RootObject>(); foreach (var record in data.records) { logger.LogInformation("Operation Name: {operationName}, Request Size: {requestSize}, Response Size: {responseSize}, Url: {url}", record.operationName, record.properties.requestBodySize, record.properties.responseBodySize, record.uri); } } } }
Event Hubs から受信したデータは EventBody プロパティに BinaryData クラスで入っているので、簡単に JSON としてデシリアライズ可能なので非常に便利です。
このコードを Azure にデプロイして Blob に対する操作を行った後に Application Insights を確認すると、以下のように複数の処理が入ってくることが確認出来ます。operationName は実体は呼び出された API に相当するので、この API で処理を区別できます。

要するに GetBlob が 20x 系で成功した場合は Azure からクライアントへのダウンロードが行われていて、PutBlob が同じく 20x 系で成功した場合はクライアントから Azure へのアップロードが行われたということです。この情報が分かってしまえば、イベントを順番通り処理すればダウンロード・アップロードされたサイズが簡単に把握できそうです。
この時に考える必要があるのが Event Hubs のパーティションです。もし Event Hubs に複数のパーティションがある場合には、それを受信する Azure Functions もパーティション分だけスケールアウトされる可能性があるため、送信されたイベントが順番通りに処理されない可能性もあるということです。
意外に知られていないかも知れない部分なので紹介しておきましたが、本来であれば Event Hubs の送信時にパーティションキーを指定すればハッシュでいい感じに同一パーティションにまとまってくれますが、Azure Monitor からの送信時にはパーティションキーは設定されていないようでした。
今回のようにスケールアウトされた場合に困る処理の場合は、以下のように Event Hubs を作成する時にパーティションの数を 1 にすることで回避するようにします。
実際に本番用の Event Hub を作成していきますが、今回は Blob のストレージ利用量と転送量の両方をモニタリングしたいので複数 Event Hub を作ることにします。まずは以下のように転送量用の Event Hub を作成していきますが、前述したようにパーティション数は 1 にしておきます。

作成した Event Hub は該当 Storage Account の Diagnostic Settings から Storage Read ログカテゴリを送信するように設定します。今回のようにログカテゴリ毎に Event Hub を分けて、更に Consumer Group を組み合わせると効率的に複数の処理を実装できるはずです。

転送量を計算するには GetBlob が成功していて、更に responseBodySize を集計していけばよいことになります。集計先にはこういう用途に適している Cosmos DB を使って、更にパーティションキーと ID の設計でファイル単位と日別の集計も同時に行えるようにします。
今回のコードでは少し手を抜いているので、処理された時間でパーティションキーを生成しているので正確性に欠けます。本来ならイベント発生日時でグルーピングが必要になります。
public class BlobEgress(CosmosClient cosmosClient) { [Function(nameof(BlobEgress))] public async Task Run([EventHubTrigger("evh-stream-egress", Connection = "EventHubConnection")] EventData[] events) { var container = cosmosClient.GetContainer("Metrics", "Egress"); var allRecords = events.SelectMany(x => x.EventBody.ToObjectFromJson<RootObject>().records).ToArray(); var records = allRecords.Where(x => x is { statusCode: 200, operationName: "GetBlob" }) .GroupBy(x => x.uri.AbsolutePath) .Select(x => new { Path = x.Key, Size = x.Sum(xs => xs.properties.responseBodySize) }); foreach (var groupedRecords in records) { var id = Convert.ToHexString(MD5.HashData(Encoding.UTF8.GetBytes(groupedRecords.Path))); var containerName = $"{groupedRecords.Path.Split('/', StringSplitOptions.RemoveEmptyEntries)[0]}_{DateTime.Now:yyyyMMdd}"; BlobMetric blobMetric; try { var response = await container.ReadItemAsync<BlobMetric>(id, new PartitionKey(containerName)); blobMetric = response.Resource; } catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound) { blobMetric = new BlobMetric { Id = id, ContainerName = containerName }; } blobMetric.Path = groupedRecords.Path; blobMetric.Size += groupedRecords.Size; await container.UpsertItemAsync(blobMetric, new PartitionKey(containerName)); } } }
このコードでコンテナーと日別でのファイル単位での転送量を集計し続けることが可能です。ストリームで処理されるため Log Analytics とは異なりバッチ処理が必要なく、低コストでモニタリング可能になりました。
実際に Blob Storage へのアクセスを何回か行ってみた後に Cosmos DB を確認すると、ファイルと日別で項目が作成されていて size プロパティに転送されたサイズが集計されていることが分かります。

後は読み取る側がパーティションキーを指定して一覧で取得すれば、簡単に各ファイル単位での転送量を計算することが出来ます。この Cosmos DB はマテリアライズドビューと同じなので、要件に合わせて一覧取得しやすいパーティションキーを設計すれば良いですね。
次は同じ考え方でストレージ利用量を計算してみます。まずは Azure Monitor から Storage Write ログカテゴリを専用の Event Hub に送信する設定を追加するところから始まります。

Event Hub の設定が完了してしまえば Azure Functions での処理を用意するだけですが、こちらについては実装はほぼ同じで、読み取る Event Hub が異なっているのと処理する operationName が GetBlob から PutBlob に変わっているぐらいしか違いがありません。
こちらの場合はパーティションキーに日付を含めないことで、日別でのストレージ利用量として集計することはしていません。実際に使用量が必要になるケースとしては、コンテナー内での合計であることが多いと思うので、それも ID とパーティションキーの設計で解決できます。
public class BlobCapacity(CosmosClient cosmosClient) { [Function(nameof(BlobCapacity))] public async Task Run([EventHubTrigger("evh-stream-capacity", Connection = "EventHubConnection")] EventData[] events) { var container = cosmosClient.GetContainer("Metrics", "Capacity"); var allRecords = events.SelectMany(x => x.EventBody.ToObjectFromJson<RootObject>().records).ToArray(); var records = allRecords.Where(x => x is { statusCode: 201, operationName: "PutBlob" }) .GroupBy(x => x.uri.AbsolutePath) .Select(x => new { Path = x.Key, Size = x.Sum(xs => xs.properties.requestBodySize) }); foreach (var groupedRecords in records) { var id = Convert.ToHexString(MD5.HashData(Encoding.UTF8.GetBytes(groupedRecords.Path))); var containerName = groupedRecords.Path.Split('/', StringSplitOptions.RemoveEmptyEntries)[0]; BlobMetric blobMetric; try { var response = await container.ReadItemAsync<BlobMetric>(id, new PartitionKey(containerName)); blobMetric = response.Resource; } catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound) { blobMetric = new BlobMetric { Id = id, ContainerName = containerName }; } blobMetric.Path = groupedRecords.Path; blobMetric.Size += groupedRecords.Size; await container.UpsertItemAsync(blobMetric, new PartitionKey(containerName)); } } }
注意点としては PutBlob は同一ファイルに対して複数回実行される可能性があるため、合計値を保持するようにしています。これも複数ブロックや上書きの場合をあまり考慮できていないですが、削除イベントを追加することで対応できると考えています。
非常にシンプルなコードですがデプロイした後に複数ファイルのアップロードを実行してみると、それぞれのファイル別で Cosmos DB に項目が作成されて size プロパティにアップロードされたファイルサイズが入っていることが確認出来ました。

これまであまり使ってこなかった Azure Monitor のリソースログを Event Hubs に送信することで、これまで取得が難しかった情報を簡単に用意できるようになったのは使いどころがあると思います。特に Blob の利用量については Inventory を使う必要があったのでコストと時間がかかっていましたが、リアルタイムでマテリアライズドビューを用意してしまえば良いですね。




































