Recently I run into a challenge. I’ working on a micro-service that receives and processes messages one by one. How to send Service Bus messages, not instantly, but when they pile up? The reason for cause it is expensive when it comes to performance. Let’s send messages after 10 piles up or after every 20 seconds.
It is not an obvious task, because Microsoft’s implementation does not support it. However, simple buffering can be done like this:
public class SimpleBufferMessagesService { private const string ServiceBusConnectionString = "Endpoint=sb://bialecki.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[key]"; private static readonly List<Message> _messages = new List<Message>(); private static DateTime _lastMessageSent = DateTime.Now; private readonly TopicClient _topicClient; public SimpleBufferMessagesService() { _topicClient = new TopicClient(ServiceBusConnectionString, "accountTransferUpdates"); } public async Task AddMessage(string message) { _messages.Add(new Message(Encoding.UTF8.GetBytes(message))); if (_messages.Count >= 10 || DateTime.Now - _lastMessageSent > TimeSpan.FromSeconds(20)) { await SendMessages(_messages); _messages.Clear(); _lastMessageSent = DateTime.Now; } } private async Task SendMessages(List<Message> messages) { await _topicClient.SendAsync(messages); } }
This solution works quite well. Notice that I used static fields, so they would be preserved between requests. On every request instance of SimpleBufferMessagesService will be created anew.
There are a few problems with it:
- it is not thread-safe. Two instances of SimpleBufferMessagesService can use the same _messages field and mess with it. It is a rather huge risk because sending Service Bus message takes some time
- some messages can wait a long time to be sent. When messages stay in the queue and 20 seconds pass, there has to be another request to send them. This is a threat of losing messages when service will be restarted. We shouldn’t keep messages longer then we need to
Having that in mind we need to think of something, that executes every 20 seconds, in intervals like… like… like Timer!
Timer solution
Timer needs to be registered in Startup class, I did that in the end of Configure method.
public void Configure(IApplicationBuilder app, IHostingEnvironment env) { // many things here var timeoutInMiliseconds = 20000; new Timer(s => { ServiceBusTimerCallback(); }, null, 0, timeoutInMiliseconds); } private static void ServiceBusTimerCallback() { var bufferService = new TimerBufferMessagesService(); bufferService.SendMessages(); }
And class that sends messages can be modified like that:
public class TimerBufferMessagesService { private const string ServiceBusConnectionString = "Endpoint=sb://bialecki.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[key]"; private static readonly ICollection<Message> _messages = new List<Message>(); private readonly TopicClient _topicClient; public TimerBufferMessagesService() { _topicClient = new TopicClient(ServiceBusConnectionString, "accountTransferUpdates"); } public void AddMessage(string message) { lock (((ICollection) _messages).SyncRoot) { _messages.Add(new Message(Encoding.UTF8.GetBytes(message))); } } public void SendMessages() { if (_messages.Count == 0) { return; } List<Message> localMessages; lock (((ICollection)_messages).SyncRoot) { localMessages = new List<Message>(_messages); _messages.Clear(); } Task.Run(async () => { await _topicClient.SendAsync(localMessages); }); } }
This implementation is much better. It will run every 20 seconds and it sends messages if there is any. The SendMessages method will be called by one instance and AddMessage will be called by many instances but is written in a safe way.
It was perfect till the moment I realized it wasn’t working.
The thing is that sooner or later timer is destroyed by a garbage collector. Even when I tried to save the reference to timer or use GC.KeepAlive(timer), it always got cleared.
But it can be done right
According to StackOverflow question: https://stackoverflow.com/questions/3635852/system-threading-timer-not-firing-after-some-time/ we can use ThreadPool.RegisterWaitForSingleObject.
That method can be used instead of timer:
public void Configure(IApplicationBuilder app, IHostingEnvironment env) { // many lines here const int timeoutInMiliseconds = 20000; var allTasksWaitHandle = new AutoResetEvent(true); ThreadPool.RegisterWaitForSingleObject( allTasksWaitHandle, (s, b) => { ServiceBusTimerCallback(); }, null, timeoutInMiliseconds, false); } private static void ServiceBusTimerCallback() { var bufferService = new TimerBufferMessagesService(); bufferService.SendMessages(); }
The result is the same, but it will work constantly.
Full code can be found in my github repository: https://github.com/mikuam/Blog/
If you’re more interested in Service Bus, have a look at my post: https://www.michalbialecki.com/2017/12/21/sending-a-azure-service-bus-message-in-asp-net-core/
Or maybe this one: https://www.michalbialecki.com/2018/04/19/how-to-send-many-requests-in-parallel-in-asp-net-core/
Enjoy!