ここ最近は Azure Functions と Append Blob を組み合わせて、非常に便利かつスケーラブルな処理をシンプルなコードで書くことが多かったので紹介します。
割と地味な立ち位置の Append Blob ですが、Serverless や Event-driven なコードとの相性が良かったです。
具体的には Azure Functions では Event Hubs / IoT Hub / Cosmos DB Change Feed などを使って、メッセージをストリームで処理することが多いのですが、そのデータの保存先として Append Blob は最適でした。
Append Blob の基本的なこと
あまり知られていない気がするので基本的な部分から。と言ってもリリース時のブログやドキュメントが参考になります。Append Blob の特徴としては書き込みを Block という単位で行い、追記に最適化されているので複数から同時に書き込むことが出来ます。
ただし書き込める Block 数に上限があって、1 Block あたり 4MB までの 50000 Blocks まで書き込めます。
なので以下のように 4MB 以上のデータを書き込もうとするとエラーになります。
従って最大ファイルサイズは以前の Block Blob と同じく約 195GB が上限になります。1 Block のサイズは問題になりにくいと思いますが、50000 Blocks の上限は書き込み側で考慮する必要があります。
単純に 1 行のデータを 1 Block として書いていけば 50000 行が上限になるので、時系列データの場合は適切な単位でのファイル分割が必要なります。1 つの巨大なファイルより複数のファイルの方がスケーリングを考えても有利なので、時間単位とかで分割するのが良いです。
複数からの書き込みと条件
基本的には Multi-writer に対応してますが、古い SDK の一部メソッドや Precondition を指定すれば Single-writer での書き込みを強制することも出来ます。
サンプルとして以下のような Append Blob に対して同時に書き込むコードを用意しました。
class Program { static async Task Main(string[] args) { var connectionString = "DefaultEndpointsProtocol=https;AccountName=XXXXX;AccountKey=XXXXXX;EndpointSuffix=core.windows.net"; var storageAccount = CloudStorageAccount.Parse(connectionString); var blobClient = storageAccount.CreateCloudBlobClient(); var container = blobClient.GetContainerReference("append"); var appendBlob = container.GetAppendBlobReference("sample.txt"); await appendBlob.CreateOrReplaceAsync(); var tasks = Enumerable.Range(0, 100) .Select(x => appendBlob.AppendTextAsync($"{DateTime.Now}\n")) .ToArray(); await Task.WhenAll(tasks); } }
実行すると以下のような例外が投げられて Append Blob への書き込みが失敗します。
古い SDK にある AppendBlock
/ AppendBlockAsync
以外のメソッドは Single-writer 向けに実装されているので、内部的に appendpos という書き込み位置を保持しているので同時に書き込むとエラーになります。
なので AppendBlockAsync
を使うように修正すると、エラーが出ることなく書き込みが完了します。
他の Blob の内容を Append Blob へ追記する
面白い機能として別の Blob から Append Blob への追記が API レベルで提供されています。複数のファイルを結合して 1 つの Blob にする場合には、この API を使うと処理が Blob 側で終わるので非常に効率的です。
古い SDK には API が用意されていないので、新しい SDK を使ったサンプルは以下になります。
class Program { static async Task Main(string[] args) { var connectionString = "DefaultEndpointsProtocol=https;AccountName=XXXXX;AccountKey=XXXXXX;EndpointSuffix=core.windows.net"; var sourceUri = new Uri("https://xxxxx.blob.core.windows.net/content/function.txt?st=2019-11-29T09%3A05%3A43Z&se=2019-11-30T09%3A05%3A43Z&sp=rl&sv=2018-03-28&sr=b&sig=XXXXX"); var appendBlobClient = new AppendBlobClient(connectionString, "append", "destination.txt"); await appendBlobClient.CreateIfNotExistsAsync(); await appendBlobClient.AppendBlockFromUriAsync(sourceUri); } }
API 名からわかるように、1 つの Block として追加するので 4MB の制約は受けます。
4MB 以上のファイルを Append Blob へ追記したい場合には HTTP の Range Header を指定できるので、複数回の API 呼び出しに分ければ良いです。
Azure Functions と組み合わせて使う
本題の Azure Functions との組み合わせですが、最初から Blob Binding が Append Blob に対応しているので、引数の型を CloudAppendBlob
に変えるだけで使えます。
サンプルとして QueueTrigger と組み合わせて、入ってきたデータを Append Blob に書く Function を書いてみましたが、コードは以下のようにシンプルです。
Event-driven なのでデータは細切れになってやってきますが、Append Blob の場合は簡単に追記が出来るので、難しいことを考えずに Azure Storage に保存できます。
public static class Function1 { [FunctionName("Function1")] [StorageAccount("StorageConnection")] public static async Task Run( [QueueTrigger("queue")] string queueItem, [Blob("append/year={datetime:yyyy}/month={datetime:MM}/day={datetime:dd}/hour={datetime:HH}/function.txt", FileAccess.ReadWrite)] CloudAppendBlob appendBlob, ILogger log) { if (!await appendBlob.ExistsAsync()) { await appendBlob.CreateOrReplaceAsync(); } await appendBlob.AppendBlockAsync(new MemoryStream(Encoding.UTF8.GetBytes($"{DateTime.Now} - {queueItem}\n"))); } }
Blob のパスが少し長いですが、日付でディレクトリを切るための指定です。
微妙に隠し仕様っぽいのですが、DateTime のバインド式ではフォーマットも指定できたので簡単でした。
このパスのフォーマットは Hive Streaming Conventions と Docs には書いてましたが、正しい名前は良くわからなかったです。Azure Monitor のログもこんな感じだったと思います。
Queue に適当なデータを入れると Function が走って Append Blob にデータが追記されていきます。
ディレクトリ構造は良い感じに日付単位で分けて Blob が作られているのが確認できます。
作成された Append Blob をダウンロードして中身を確認すると、ちゃんとデータが入っています。
2019/11/29 16:45:16 - kazuakix 2019/11/29 16:45:32 - daruyanagi 2019/11/29 16:45:38 - buchizo
サンプルは QueueTrigger を使いましたが、本命は Cosmos DB の Change Feed との組み合わせです。
Change Feed は Pull 型なので、ある程度まとまったデータを取ってきて 1 Block に書き込めるので効率的ですし、SQL Database に書き込む Change Feed と Storage に書き込む Change Feed を用意してしまえば、それぞれ並行稼働させることも簡単です。
惜しい点として Data Lake Storage Gen2 では Append Blob が使えないことですが、その辺りは Data Factory から Data Lake Storage Gen2 にコピーしてあげれば良いかなと思っています。Data Lake Storage に入れてしまえば後処理の自由度が高いです。