しばやん雑記

Azure とメイドさんが大好きなフリーランスのプログラマーのブログ

新しい Azure Service Bus for .NET を使う際はコネクションの管理に注意

現時点で Service Bus for .NET クライアントは以下の 2 つが存在します。片方は昔からある .NET Framework 専用のクライアント、もう片方は .NET Standard 2.0 に対応した新しいクライアントです。

新しいクライアントは AMQP がデフォルトで使われているので、プロトコルも異なります。

クライアント間で多少の API 互換性はありますが、ファクトリメソッドを使うのと、普通にインスタンスを作成するという感じで使い方は異なっています。

// 古いバージョンの TopicClient を作成する
var topicClient = TopicClient.CreateFromConnectionString("CONNECTION_STRING", "TOPIC_NAME");

// 新しいバージョンの TopicClient を作成する
var topicClient = new TopicClient("CONNECTION_STRING", "TOPIC_NAME");

基本的に TopicClient や QueueClient はシングルトンで扱うべきですが、古いバージョンの方ではコネクションプールを内部で持っているので問題は出ません。

.NET Core 移行の時などで新しいクライアントに上のように置き換えると、コネクションの管理周りの挙動が変わっているので問題となります。具体的に以下のような単純なコードで確認してみます。

var topicClient = TopicClient.CreateFromConnectionString("CONNECTION_STRING", "TOPIC_NAME");

await topicClient.SendAsync(new BrokeredMessage(Encoding.UTF8.GetBytes("hello, world")));

上のコードを複数回実行しても、特に TCP 接続は増えずに 1 つだけとなります。

f:id:shiba-yan:20190331171718p:plain

期待したとおりの挙動となっていますね。内部でコネクションをプールしているので問題ありません。

一方で新しいクライアントを使って以下のコードを実行すると結果が異なります。

var topicClient = new TopicClient("CONNECTION_STRING", "TOPIC_NAME");

await topicClient.SendAsync(new Message(Encoding.UTF8.GetBytes("hello, world")));

作成された TopicClient 分だけコネクションが作成されて、更に解放されていないことが確認できます。自動的に切断されたりはしないので、リソースの許す限り増えていくことになります。

f:id:shiba-yan:20190331171649p:plain

明示的に CloseAsync を呼び出したり、using を使えば回避できますが毎回コネクションを作成するのは非常に高コストなので、使いまわしていく必要があります。

ASP.NET Core アプリケーションの場合は DI を使えば解決します。普通に Singleton にすれば TopicClient は 1 つしか作成されないので問題は発生しません。

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton<ITopicClient>(_ => new TopicClient("CONNECTION_STRING", "TOPIC_NAME"));
    }
}
public class HomeController : Controller
{
    public HomeController(ITopicClient topicClient)
    {
        _topicClient = topicClient;
    }

    private readonly ITopicClient _topicClient;

    public async Task<IActionResult> Index()
    {
        await _topicClient.SendAsync(new Message(Encoding.UTF8.GetBytes("hello, world")));

        return View();
    }
}

実行してコネクション数を確認しても、増えることなく 1 つだけが使われていることが確認できます。アプリケーションが終了するタイミングで DI によって Dispose が呼び出されるので更に安心です。

f:id:shiba-yan:20190331172159p:plain

上のように Singleton で保持しておいても内部の ServiceBusConnection が再接続機能を持っているので、何らかの要因でコネクションが切断されたとしても自動的に再接続されます。

DI が使えない Azure Functions などでは static でインスタンスを保持するか、以下のように適当なファクトリクラスを作成してコネクションの管理をすればよいと思います。

public static class ServiceBusConnectionFactory
{
    private static readonly ConcurrentDictionary<int, Lazy<ServiceBusConnection>> _serviceBusConnections =
        new ConcurrentDictionary<int, Lazy<ServiceBusConnection>>();

    public static ServiceBusConnection CreateFromConnectionString(string connectionString)
    {
        return _serviceBusConnections.GetOrAdd(connectionString.GetHashCode(), _ =>
            new Lazy<ServiceBusConnection>(() => new ServiceBusConnection(connectionString))).Value;
    }
}

新しいクライアントは Service Bus との接続を表す ServiceBusConnection が追加されていて、TopicClientQueueClient は内部的に作成して使っていますが、コンストラクタで既存のインスタンスを渡すことで使用するコネクションを共有することも出来ます。

例えば複数の Topic や Queue を使う時に同じ接続を再利用することが出来ます。ちなみに古いクライアントではデフォルトでそのような実装になっています。

class Program
{
    static async Task Main(string[] args)
    {
        // 内部でコネクションを共有しているので 1 つだけ作られる
        var topicClient1 = TopicClient.CreateFromConnectionString("CONNECTION_STRING", "TOPIC_NAME1");
        var topicClient2 = TopicClient.CreateFromConnectionString("CONNECTION_STRING", "TOPIC_NAME2");

        await Task.WhenAll(
            topicClient1.SendAsync(new BrokeredMessage(Encoding.UTF8.GetBytes("hello, world"))),
            topicClient2.SendAsync(new BrokeredMessage(Encoding.UTF8.GetBytes("hello, world")))
        );
    }
}

古いクライアントでは接続先単位でコネクションがプールされるので、上のようなコードでも実際のコネクションは 1 つしか作成されません。

新しいクライアントでは先に ServiceBusConnection を 1 つだけ作成しておいて、その後 TopicClient のコンストラクタで渡すことで実現します。

class Program
{
    static async Task Main(string[] args)
    {
        // 共有用のコネクションを先に作成する
        var serviceBusConnection = new ServiceBusConnection("CONNECTION_STRING");

        var topicClient1 = new TopicClient(serviceBusConnection, "TOPIC_NAME1", RetryPolicy.Default);
        var topicClient2 = new TopicClient(serviceBusConnection, "TOPIC_NAME2", RetryPolicy.Default);

        await Task.WhenAll(
            topicClient1.SendAsync(new Message(Encoding.UTF8.GetBytes("hello, world"))),
            topicClient2.SendAsync(new Message(Encoding.UTF8.GetBytes("hello, world")))
        );
    }
}

ServiceBusConnection は 1 つしか作成されないので、実際のコネクションも 1 つだけとなります。

これで Service Bus のコネクション管理は問題ないでしょう。バージョンアップなどで API が変わった時には、その挙動もちゃんと確認しないと痛い目を見るという話でした。