しばやん雑記

Azure とメイドさんが大好きなフリーランスのプログラマーのブログ

Azure Container Instances の軽量なオーケストレーションを Durable Functions を使って実現する

Azure には Docker Image を指定するだけで簡単に動かせるサービスがいくつかあり、その中でも Container Instances は非常に起動が早くてシンプルなので利用範囲が広いのですが、オーケストレーターが存在しないので並列処理のように複数立ち上げたい場合には管理が面倒です。

公式ドキュメントでもオーケストレーターについて触れられていますが、AKS と Virtual Nodes 経由で使う方法が書かれていて、ACI はシンプルなのに途端に重量級になってしまいます。

サービスが既にかなり大規模な AKS を導入している場合以外では、Virtual Nodes 経由で ACI を使うメリットはほぼ存在しません。ACI を使うために AKS というのは本末転倒です。

一方で Azure Architecture Center には Durable Functions と ACI を組み合わせて、AKS を使わずに軽量なオーケストレーションを実現するサンプルが公開されています。こちらの方が圧倒的に筋が良いです。

Durable Functions の Activity として ACI の起動と削除を実装しておけば、後は Orchestrator から自由に呼び出してインスタンスをいくつでも作れるという仕組みです。更に Durable Functions のイベントやタイマーを使うと、さらに効率的に動かすことが出来ます。

サンプルは 1 つの ACI を動かすだけになっていたので、今回は更に拡張して複数の ACI を立ち上げることで大規模かつ効率的に並列処理出来るようにしてみました。

方針としては 1 つの ACI を操作する部分は Sub Orchestrator として実装することで、並列実行を容易にしつつ ACI の最大数制限に引っかかりにくくしています。

Resource Manager SDK を使って ACI を操作

まずは Durable Functions が関係ない部分から用意しました。ACI は古い設計の SDK は公開されているので、これを使って起動と削除を C# から簡単に行えます。

ACI の軌道に必要なパラメータは多いので、SDK を直接触るのではなく 1 枚被せて使いやすくしています。

今回のサンプルではリソースグループや Docker Image の情報などは固定にしていますが、必要に応じて Options などで取るようにすれば良いです。

public class ContainerInstanceService
{
    public ContainerInstanceService(ContainerInstanceManagementClient containerInstanceManagementClient)
    {
        _containerInstanceManagementClient = containerInstanceManagementClient;
    }

    private readonly ContainerInstanceManagementClient _containerInstanceManagementClient;

    private const string ResourceGroupName = "rg-aci-worker";
    private const string Location = "westus2";
    private const string ContainerImage = "debian:bullseye";

    public Task CreateAsync(string id)
    {
        return _containerInstanceManagementClient.ContainerGroups.BeginCreateOrUpdateAsync(
            ResourceGroupName,
            $"ci-{id}",
            new ContainerGroup
            {
                Location = Location,
                Containers = new[]
                {
                    new Container
                    {
                        Name = id,
                        Image = ContainerImage,
                        Command = new[] { "echo", "Hello, world" },
                        Resources = new ResourceRequirements
                        {
                            Requests = new ResourceRequests
                            {
                                Cpu = 1,
                                MemoryInGB = 1.5
                            }
                        }
                    }
                },
                OsType = "Linux",
                RestartPolicy = "Never"
            });
    }

    public Task DeleteAsync(string id)
    {
        return _containerInstanceManagementClient.ContainerGroups.BeginDeleteAsync(ResourceGroupName, $"ci-{id}");
    }

    public async Task<string> GetStateAsync(string id)
    {
        var containerGroup = await _containerInstanceManagementClient.ContainerGroups.GetAsync(ResourceGroupName, $"ci-{id}");

        return containerGroup.Containers[0].InstanceView?.CurrentState?.State ?? "Unknown";
    }
}

実際に利用する場合は Docker Image 側でエントリポイントを定義して、コンテナの起動と同時に処理が走るようにするはずですが、用意するのが面倒だったので Command を指定しています。

Dockerfile の指定と同様に若干癖があるので、以下のドキュメントを読んでおくのをお勧めします。

後は Managed Identity でターゲットとなるリソースグループに権限を追加すると、Azure Functions から ACI の起動と削除が簡単に行えるようになります。

ACI の管理を行う Activity を実装

Durable Functions では副作用のある処理は Activity として実装する必要があるので、先ほど作成したメソッドは以下のような Activity を用意して呼び出すようにします。

[FunctionName(nameof(CreateContainerInstance))]
public Task CreateContainerInstance([ActivityTrigger] IDurableActivityContext context, ILogger log)
{
    var id = context.GetInput<string>();

    return _containerInstanceService.CreateAsync(id);
}

[FunctionName(nameof(GetContainerInstanceStatus))]
public Task<string> GetContainerInstanceStatus([ActivityTrigger] IDurableActivityContext context, ILogger log)
{
    var id = context.GetInput<string>();

    return _containerInstanceService.GetStateAsync(id);
}

[FunctionName(nameof(DeleteContainerInstance))]
public Task DeleteContainerInstance([ActivityTrigger] IDurableActivityContext context, ILogger log)
{
    var id = context.GetInput<string>();

    return _containerInstanceService.DeleteAsync(id);
}

単純にメソッドを呼び出しているだけの中身のない Activity です。パラメータとして Orchestrator で生成されたユニークな ID を渡すことで、ACI の名前やイベント名として利用しています。

処理の起点となる Orchestrator を実装

実際に ACI の起動と削除を行う Sub Orchestrator を用意する前に、起点となる Orchestrator から用意しておきます。サンプルでもよく見る Fan-out / Fan-in の実装になっています。

[FunctionName(nameof(RunOrchestrator))]
public async Task RunOrchestrator(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var tasks = new List<Task>();

    // 5 並列で ACI を起動する
    for (var i = 0; i < 5; i++)
    {
        //tasks.Add(context.CallSubOrchestratorAsync(nameof(RunSubOrchestrator_Polling), i));
        //tasks.Add(context.CallSubOrchestratorAsync(nameof(RunSubOrchestrator_Event), i));
    }

    // 全ての ACI が終了するまで待機
    await Task.WhenAll(tasks);
}

サンプルなので ACI 完了後の処理は書いていませんが、並列処理をした結果を統合して 1 つにするといった処理が簡単に書けるのがメリットです。

並列実行数が少なければ特に問題はないですが、以下のドキュメントにある通り ACI には 5 分 / 1 時間当たりの最大作成数が設定されているので、何も考えずに 1000 個作成しようとするとエラーになります。

理想としては Semaphore 的なものを使って、同時に実行される ACI の数を効率的に制限したいところですが、Durable Functions では実現するのが難しいので現実的には並列数を調整するか、クオータの上限変更をリクエストするのが手っ取り早いです。

ACI の起動と削除を行う Sub Orchestrator を実装

残りは ACI の起動と削除を行うだけですが、処理が完了したことをどのように検知するかでポーリングとイベントの 2 種類が考えられるので、今回はそれぞれ用意してみました。

完了したことをポーリングで検知する

ポーリングで完了したことを検知するには、ACI のステータスを取得して Terminated になっていることを判定すれば良いので、Durable Functions の Timer を使って 10 秒ごとにチェックする処理を書きました。

[FunctionName(nameof(RunSubOrchestrator_Polling))]
public async Task<string> RunSubOrchestrator_Polling(
    [OrchestrationTrigger] IDurableOrchestrationContext context,
    ILogger logger)
{
    var id = context.NewGuid().ToString();

    await context.CallActivityAsync(nameof(CreateContainerInstance), id);

    while (true)
    {
        var instanceState = await context.CallActivityAsync<string>(nameof(GetContainerInstanceState), id);

        if (instanceState == "Terminated")
        {
            break;
        }

        await context.CreateTimer(context.CurrentUtcDateTime.AddSeconds(10), CancellationToken.None);
    }

    await context.CallActivityAsync(nameof(DeleteContainerInstance), id);

    return id;
}

この Orchestrator 内で ACI の起動と削除が完結しているので分かりやすいです。パッと見は無限ループにしか見えませんが CreateTimer を使っているので効率的にポーリングが実行されます。

実行すると Sub Orchestrator 単位で ACI が 1 つずつ起動されるので合計 5 つ起動されます。Sub Orchestrator に分離することで ACI の起動と処理完了まで Orchestrator 単位で待機されるので、特定の 1 つが極端に起動に時間がかかったとしても他の処理には影響は与えません。

ポーリング中のログとして、分かりやすくするために ACI のステータスを出力しました。ポーリング毎にステータスが変化して、最終的に Terminated になり処理が完了していることが分かります。

時間のかかる処理の場合はポーリングでも気にならないと思いますが、短時間で終わる処理の場合はポーリング間の待ち時間が大きく影響するので、そういった処理の場合はイベントを使うのが良いです。

完了したことをイベントで検知する

Durable Functions にはイベントが発火されるまで待機する機能が用意されているので、これを使ってコンテナ側から処理が完了したタイミングでイベントを発火させることで効率化を図ります。

イベントの待機は WaitForExternalEvent メソッドにイベント名を渡すだけで終わるので、Sub Orchestrator の実装は以下のように非常にシンプルです。

ただしコンテナ側にイベントを発火させるための URI を渡す必要があるので、コンテナを起動する際に CreateHttpManagementPayload を呼び出してインスタンス単位の URI を生成しています。

[FunctionName(nameof(RunSubOrchestrator_Event))]
public async Task<string> RunSubOrchestrator_Event(
    [OrchestrationTrigger] IDurableOrchestrationContext context,
    ILogger logger)
{
    var id = context.NewGuid().ToString();

    await context.CallActivityAsync(nameof(CreateContainerInstance), id);

    await context.WaitForExternalEvent(id);

    await context.CallActivityAsync(nameof(DeleteContainerInstance), id);

    return id;
}

[FunctionName(nameof(CreateContainerInstance))]
public Task CreateContainerInstance(
    [ActivityTrigger] IDurableActivityContext context,
    [DurableClient] IDurableOrchestrationClient client,
    ILogger log)
{
    var id = context.GetInput<string>();

    var payload = client.CreateHttpManagementPayload(context.InstanceId);

    var sendEventPostUri = payload.SendEventPostUri.Replace("{eventName}", id);

    return _containerInstanceService.CreateAsync(id, sendEventPostUri);
}

生成されたエンドポイント URI は呼び出し用のキーが付いているので安全に利用できます。

後は ACI の起動時に環境変数経由でエンドポイント URI を渡してあげて、Docker Image 側で処理が完了した時に HTTP POST を投げるようにするだけです。今回はサンプルなので curl を使って呼び出しています。

public class ContainerInstanceService
{
    private const string ContainerImage = "curlimages/curl:7.83.1";

    public Task CreateAsync(string id, string sendEventPostUri)
    {
        return _containerInstanceManagementClient.ContainerGroups.BeginCreateOrUpdateAsync(
            ResourceGroupName,
            $"ci-{id}",
            new ContainerGroup
            {
                Location = Location,
                Containers = new[]
                {
                    new Container
                    {
                        Name = id,
                        Image = ContainerImage,
                        Command = new[] { "/bin/sh", "-c", "curl -X POST -H 'Content-Type: application/json' -d '{}' $SENDEVENTPOSTURI" },
                        EnvironmentVariables = new[]
                        {
                            new EnvironmentVariable("SENDEVENTPOSTURI", sendEventPostUri)
                        },
                        Resources = new ResourceRequirements
                        {
                            Requests = new ResourceRequests
                            {
                                Cpu = 1,
                                MemoryInGB = 1.5
                            }
                        }
                    }
                },
                OsType = "Linux",
                RestartPolicy = "Never"
            });
    }
}

このコードを実行するとポーリングを使った時よりも全体的な処理時間は短縮されました。今回は処理内容が 1 秒以下で終わるものだったので、特に効果がありました。

Application Insights から Sub Orchestrator の実行ログを確認すると、以下のようにイベントの待機と発火されたことが分かるようになっています。

ARM REST API の呼び出し制限に引っかかることが無いので、可能な限りイベントを使った完了通知を行うべきですが、ポーリングは Durable Functions 向けの特別な処理が必要ないというメリットも大きいです。

ACI の GPU インスタンスを使う場合に一番力を発揮してくれそうなのですが、現状 ACI の GPU インスタンスを選ぶと起動に 10 分近くかかってしまうので若干利用しにくいです。