しばやん雑記

Azure Serverless とメイドさんが大好きなフリーランスのプログラマーのブログ

Durable Functions を使ってタイマーで起動される処理の開始時間をランダムに遅延させる

Azure Functions で TimerTrigger を使って特定の時間に起動される処理が増えると、それらはほぼ同時に実行されるため外部 API の呼び出し時にスロットリングが発生しやすくなります。特に Azure Functions は同一 Stamp に載っていると Outbound IP が被りやすいので、問題となる可能性があります。

実際に TimerTrigger によって同時刻に起動される Acmebot の証明書更新処理では、Let's Encrypt の API でスロットリングらしきものが発生しています。

常に 0 秒に起動する必要がない処理も多いので、リトライの時のように開始時間にジッターを追加することで、出来るだけ同時に呼び出されないように対策する必要がありますが、Durable Functions だとタイマーを使って簡単かつ効率的に実装できるという話です。

Durable Functions は CreateTimer を呼び出すと、後続の処理の開始を遅延させることが出来ます。挙動としては Task.Delay に近いですが、待機中は CPU 時間が消費されないという特徴があります。

単純に実装しようとすると、以下のコードのように乱数を生成して CreateTimer に指定すれば良さそうですが、Durable Functions の Orchestrator では非決定論的な API は使えないため、乱数の生成など実行の度に結果が変わるものは使えません。

public class Function1
{
    private static readonly Random _random = new Random();

    [FunctionName("Function1")]
    public async Task<List<string>> RunOrchestrator(
        [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
    {
        // Orchestrator では非決定論的な API は利用してはいけない
        var randomNumber = _random.Next(60);

        log.LogWarning($"Jitter {randomNumber}sec");

        // リプレイを含む Orchestrator の実行毎に randomNumber の値が変わってしまう
        await context.CreateTimer(context.CurrentUtcDateTime.AddSeconds(randomNumber), CancellationToken.None);

        var outputs = new List<string>();

        // Replace "hello" with the name of your Durable Activity Function.
        outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "Tokyo"));
        outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "Seattle"));
        outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "London"));

        // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
        return outputs;
    }
}

このコードを実行してみると見た目上は正しく動作しますが、ログを見るとリプレイ毎に randomNumber の値が変わってしまっていることが分かります。

f:id:shiba-yan:20210701011428p:plain

最近は Analyzer によってコンパイル時に使用できない API については警告が出るようにもなりましたが、完全ではないので基本は以下のドキュメントを一読してもらったほうが良いかと思います。

非決定論な API は Activity 内では自由に呼び出せますが、Orchestrator では Activity の呼び出しか決定論的な API のみ利用可能なので、それぞれの方法で実現するコードを書いてみました。

Activity で乱数を生成して利用する

素直に非決定論な API である乱数の生成を Activity 内で行うようにしたコードが以下になります。乱数生成用の Activity では Random.Next を呼び出しているだけですが、Durable Functions によって呼び出しと戻り値が管理されているため安全です。

public class Function1
{
    private static readonly Random _random = new Random();

    [FunctionName("Function1_RandomNumberGenerator")]
    public int RandomNumberGenerator([ActivityTrigger] int maxValue)
    {
        return _random.Next(maxValue);
    }

    [FunctionName("Function1")]
    public async Task<List<string>> RunOrchestrator(
        [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
    {
        var randomNumber = await context.CallActivityAsync<int>("Function1_RandomNumberGenerator", 60);

        log.LogWarning($"Jitter {randomNumber}sec");

        await context.CreateTimer(context.CurrentUtcDateTime.AddSeconds(randomNumber), CancellationToken.None);

        var outputs = new List<string>();

        // Replace "hello" with the name of your Durable Activity Function.
        outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "Tokyo"));
        outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "Seattle"));
        outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "London"));

        // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
        return outputs;
    }
}

このコードを実行してみると、常に最初に生成された乱数が使われていることが分かります。

f:id:shiba-yan:20210701011149p:plain

ログのタイムスタンプを見てもらえれば分かるように、乱数で生成された秒数だけ後続の処理の実行が遅延されていることが分かります。数秒のずれがあるのは Queue Storage が使われている限り仕方ない部分です。

多少のずれはあっても、これで 0 秒付近に処理が集中するという問題を回避することは出来ました。

GUID のハッシュ値を利用する

Durable Functions には決定論的な API として CurrentUtcDateTime の他に NewGuid が用意されています。この 2 つの API は Orchestrator 内でも安全に呼び出し可能です。

NewGuid は 128bit のユニークな値を生成するので、これを利用します。

C# では GetHashCode は元データが同じであれば変化しないので、これを利用してランダムに遅延させる秒数を生成します。あくまでもハッシュ値であって乱数ではないですが、ハッシュ値は分散することが期待されているのでこういうケースでは利用できます。

public static class Function2
{
    [FunctionName("Function2")]
    public static async Task<List<string>> RunOrchestrator(
        [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
    {
        var randomNumber = (uint)context.NewGuid().GetHashCode() % 60;

        log.LogWarning($"Jitter {randomNumber}sec");

        await context.CreateTimer(context.CurrentUtcDateTime.AddSeconds(randomNumber), CancellationToken.None);

        var outputs = new List<string>();

        // Replace "hello" with the name of your Durable Activity Function.
        outputs.Add(await context.CallActivityAsync<string>("Function2_Hello", "Tokyo"));
        outputs.Add(await context.CallActivityAsync<string>("Function2_Hello", "Seattle"));
        outputs.Add(await context.CallActivityAsync<string>("Function2_Hello", "London"));

        // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
        return outputs;
    }
}

このコードを実行してみると Random を使うケースと同様に、リプレイ毎に常に同じ randomNumber が生成されていることが確認できます。

f:id:shiba-yan:20210701011223p:plain

これも後続の処理の実行が遅延されていることも、タイムスタンプから簡単に確認できるはずです。

おまけ:Sub Orchestrator を使ってメインの処理と分ける

ここまで紹介した 2 つのサンプルコードではメインの Orchestrator 内に直接遅延させる処理を書いていましたが、Sub Orchestrator を使うことで以下のように処理を綺麗に分けることが出来ます。

public class Function1
{
    private static readonly Random _random = new Random();

    [FunctionName("Function1_RandomNumberGenerator")]
    public int RandomNumberGenerator([ActivityTrigger] int maxValue)
    {
        return _random.Next(maxValue);
    }

    [FunctionName("Function1_WithJitter")]
    public async Task<List<string>> RunOrchestratorWithJitter(
        [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
    {
        var randomNumber = await context.CallActivityAsync<int>("Function1_RandomNumberGenerator", 60);

        log.LogWarning($"Jitter {randomNumber}sec");

        await context.CreateTimer(context.CurrentUtcDateTime.AddSeconds(randomNumber), CancellationToken.None);

        return await context.CallSubOrchestratorAsync<List<string>>("Function1", null);
    }

    [FunctionName("Function1")]
    public async Task<List<string>> RunOrchestrator(
        [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger logger)
    {
        var outputs = new List<string>();

        // Replace "hello" with the name of your Durable Activity Function.
        outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "Tokyo"));
        outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "Seattle"));
        outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "London"));

        // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
        return outputs;
    }
}

元々の Orchestrator をネストして呼び出すことで、既存の Orchestrator 実装を変更することなく遅延させることが出来ます。個人的には StartNewAsync の時点で遅延させるオプションが欲しいと思いました。