Monthly Archives: May 2017

Implementing deferral mechanism in ServiceBus

Deferral is a method to leave a message in a queue or subscription when you cannot process it at the moment. When using PeekLock read mode you read a message but leave it in a queue. When processing of a message is done, you call Complete and message is removed from queue, but when something goes wrong, you can call Abandon and message will be available in a queue for next read. Important thing to remember is that there is a time interval for locking the message in a queue, it’s LockDuration. If message will not be processed and completed during that time, it will be again available in the queue and you will get MessageLockLostException when trying to do anything with it. When message is losing it’s lock and stays in a queue, either by being abandoned or lock has expired, it will get it’s DeliveryCount incremented. After reaching limit, which is by default 10, message will be moved to dead-letter queue.

message-lifecycle

It is a great life-cycle, where message that we just cannot process, will go to dead-letter queue. With retry policy all transient errors that occurs while connecting to Service Bus will be handled internally – you do not have to worry about it. Problems may occur when you would like to handle connection problems not related to Service Bus. Solution that Microsoft is proposing is to defer a message that you cannot process, leave it in the queue, but hide it from receivers. This message will be available only when asking for it with it’s sequence number. Whole loop can look like this:

private static async Task StartListenLoopWithDeferral()
{
    var client = GetQueueClient(ReceiveMode.PeekLock);
    var deferredMessages = new List<KeyValuePair>();

    while (true)
    {
        var messages = Enumerable.Empty();

        try
        {
            messages = await client.ReceiveBatchAsync(50, TimeSpan.FromSeconds(10));
            messages = messages ?? Enumerable.Empty();
            if (!messages.Any())
            {
                continue;
            }

            foreach (var message in messages)
            {
                Console.WriteLine("Received a message: " + message.GetBody());
            }
            await client.CompleteBatchAsync(messages.Select(m => m.LockToken));

            // handling dererred messages
            var messagesToProcessAgain = deferredMessages.Where(d => d.Value < DateTime.Now).Take(10).ToList();
            foreach (var messageToProcess in messagesToProcessAgain)
            {
                BrokeredMessage message = null;
                try
                {
                    deferredMessages.Remove(messageToProcess);
                    message = await client.ReceiveAsync(messageToProcess.Key);

                    if (message != null)
                    {
                        // processing
                        Console.WriteLine("Received a message: " + message.GetBody());

                        await client.CompleteAsync(message.LockToken);
                    }
                }
                catch (MessageNotFoundException) { }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    deferredMessages.Add(new KeyValuePair(
                        message.SequenceNumber,
                        DateTimeOffset.Now + TimeSpan.FromMinutes(2)));
                }
            }
        }
        catch (MessageLockLostException e)
        {
            Console.WriteLine(e);

            foreach (var message in messages)
            {
                await message.AbandonAsync();
            }
        }
        catch  (Exception e)
        {
            Console.WriteLine(e);

            // defer messages
            foreach (var message in messages)
            {
                deferredMessages.Add(new KeyValuePair(
                    message.SequenceNumber,
                    DateTimeOffset.Now + TimeSpan.FromMinutes(2)));
                await message.DeferAsync();
            }
        }
    }
}

Code contains of endless loop, that ReceiveBatchAsync messages and process them. If something goes wrong, messages are added to deferredMessages list with 2 minutes time span. After completing messages successfully, program checks if there are any messages that should be processed again. If there would be any problem again,  message will be added to deferredMessages list again. There is also a check for MessageLockLostException, that might occur when message went to dead-letter queue and we should no longer ask for it. Message ids are kept in memory and that is obvious potential issue, cause when program will be restarted, this list will be wpied and there will be no way to get those messages from the queue.

Deferral pros and cons

Deferral mechanism can be useful, because it basis on original Azure infrastructure and handling messages this way keeps messages DeliveryCount property incrementing and eventually moves message to dead-letter queue. It is also a way to handle one message and make it available again after a custom time.

However, algorithm itself is not easy and needs to handle many edge cases, that makes it error prone. Second thing is that it breaks the order of messages, where in some cases it is essential to keep FIFO. I wouldn’t recommend it to in every scenario. Maybe a better approach would be to add a simple wait, when something goes wrong and try again after a while.