Wednesday, 22 December 2010

Building an NServiceBus Timeout Manager using Quartz

I have read and accepted that the built in NServiceBus Timeout Manager is not suitable for production largely due to it’s heavy disc IO. Udi explains the the complexities of dealing with time sensitive problems here. In summary it is difficult to provide a solution to everyone's use cases as a balance between time resolution and fail-over duration make things difficult.

However, in my case I do not need sub-second or even sub minute resolution nor am I completely convinced the NServiceBus Timeout manager would provide either of those anyway, as you cannot guarantee message delivery times or the latency involved due to the inherent Fallacies of Distributed Computing. My use case can probably accept a resolution of up to 5 minutes although I don’t believe it will be anywhere near as bad as that as we can run quartz in a fail over cluster and fail over in around 60 seconds using VMWare HA.

We use Quartz as a scheduling service already, if you haven’t checked it out do so, I really rate it and have been using it for a few years now. So I decided to see if I could implement a timeout manager using it.

Timeouts are really just a message sent by a saga to another service with the expectation that it will receive it back when the specified expiry elapses. There are some specific differences in how they are handled by the NServiceBus infrastructure but we’ll ignore this for now.

I simply created a new class library with an endpoint configured like so:

public class EndpointConfig : IConfigureThisEndpoint, AsA_Server,
    IWantCustomInitialization, IWantToRunAtStartup, 
    ISpecifyMessageHandlerOrdering 
{
    void IWantCustomInitialization.Init()
    {            
        var builder = new ContainerBuilder();           
        IoC.Initialize(builder.Build());          
 
        NServiceBus.Configure.With()
            .Autofac2Builder(IoC.Container)
            .XmlSerializer()
            .UnicastBus();
    }
 
    public void Run()
    {
        if (SchedulerInitializer.IsAllowedToStart())
            SchedulerInitializer.Start();
    }
 
    public void Stop()
    {
        SchedulerInitializer.Shutdown();
    }
 
    void ISpecifyMessageHandlerOrdering.SpecifyOrder(Order order)
    {
        order.Specify(First<TimeoutMessageHandler>.Then<SagaMessageHandler>());
    }
}



This is mostly standard configuration, except I’m using autofac, but you can use any conatiner of your choice. I told NserviceBus I wanted to do some stuff at startup by implementating the IWantToRunAtStartUp interface. This is where I start and stop the quartz scheduler. The code to start and stop the scheduler is trivial:






public static class SchedulerInitializer
{
   private static IScheduler Scheduler
   {
       get { return IoC.Resolve<IScheduler>(); }
   }
 
   public static void Start()
   {
       if (!IsAllowedToStart())
       {
           return;
       }
 
       //set up factory
       Scheduler.JobFactory = new AutofacJobFactory();
       Scheduler.Start();
   }
 
   public static void Shutdown()
   {
       Scheduler.Shutdown();
   }
 
   public static bool IsAllowedToStart()
   {
       var start = ConfigurationManager.AppSettings["StartScheduler"];
 
       if (string.IsNullOrEmpty(start))
           throw new ConfigException("StartScheduler has no value");
       bool startValue;
       if (!bool.TryParse(start, out startValue))
           throw new ConfigException("StartScheduler must be 'true' or 'false' but is '" + start + "'");
 
       return startValue;
   }
}



You can configure quartz to run using an in memory store or the database of your choice. You can hook up different behaviour by implementing your own NServiceBus profiles for dev, testiong and production if you like. For now I will use and in memory one and the config looks like this:

<quartz>
    <add key="quartz.scheduler.instanceName" value="Scheduler"/>
    <add key="quartz.threadPool.type" value="Quartz.Simpl.SimpleThreadPool, Quartz"/>
    <add key="quartz.threadPool.threadCount" value="10"/>
    <add key="quartz.threadPool.threadPriority" value="2"/>
    <add key="quartz.jobStore.misfireThreshold" value="60000"/>
    <add key="quartz.jobStore.type" value="Quartz.Simpl.RAMJobStore, Quartz"/>
  </quartz>


For details on how to configure quartz see the documentation. One other class worth mentioning is the AutofacJobFactory which is required if you want to use your container to resolve different quartz jobs. It is again trvial and looks like so:






public class AutofacJobFactory : IJobFactory 
{
    public IJob NewJob(TriggerFiredBundle bundle)
    {
        JobDetail jobDetail = bundle.JobDetail;
        Type jobType = jobDetail.JobType;   
       
        return (IJob)IoC.Resolve(jobDetail.JobType);         
    }
}




BTW: I have removed exception handling and logging code for the sake of brevity in most of these snippets.



Now for the TimeoutmessageHandler which is much like any other NserviceBus message handler:






public class TimeoutMessageHandler : IHandleMessages<TimeoutMessage>
{
    private readonly IBus bus;
    private readonly IScheduler scheduler;
    private string groupName = "TimeOut";
 
    public TimeoutMessageHandler(IBus bus, IScheduler scheduler)
    {
        this.bus = bus;
        this.scheduler = scheduler;
    }
 
    public void Handle(TimeoutMessage message)
    {
        if (message.ClearTimeout)
        {
            scheduler.UnscheduleJob(GetTriggerName(message), groupName);
        }
        else if (message.HasNotExpired())
        {
            var jobDetail = new JobDetail(GetJobName(message),groupName, typeof(TimeoutMessageDispatcherJob));
            jobDetail.JobDataMap.Add("ReturnAddressKey", bus.CurrentMessageContext.ReturnAddress);
            jobDetail.JobDataMap.Add("ClearTimeoutKey", message.ClearTimeout);
            jobDetail.JobDataMap.Add("ExpiresKey", message.Expires);
            jobDetail.JobDataMap.Add("SagaIdkey", message.SagaId);
            jobDetail.JobDataMap.Add("StateKey", message.State);
 
            Trigger trigger = new SimpleTrigger(GetTriggerName(message), groupName, message.Expires);
 
            scheduler.ScheduleJob(jobDetail, trigger);
 
            //for some reason the message is handled many times, this stops that???
            bus.DoNotContinueDispatchingCurrentMessageToHandlers();
        }
        else
        {
            bus.Send(bus.CurrentMessageContext.ReturnAddress, new IMessage[] { message });
        }
    }
 
    private static string GetTriggerName(TimeoutMessage message)
    {
        return "TimeoutJob-Trigger: " + message.SagaId;
    }
 
    private static string GetJobName(TimeoutMessage message)
    {
        return "TimeoutJob-JobDetail: " + message.SagaId;
    }
}



Ok so what’s going on here. Firstly I handle whether I should be clearing the timeout by checking the ClearTimeout on the message. If it is true I unschedule the job otherwise I check to see if the message has expired yet, if it hasn’t I create a new quartz job and a trigger using the expiry date and schedule it. I have stored all the message state in the JobDataMap which quartz will persist for us, you can store custom (Serializable) objects but just beware of versioning issues…I use simple BCL types here to avoid this. If the message has already expired when we receive it for the first time then we immediately send it on to the return address.



There is one line here which is annoying though and that is the bus.DoNotContinueDispatchingCurrentMessageToHandlers(), this is because of some strange behaviour where the same message is received over and over. I posted a question on the forums about this and although it seems that this is a known bug which has been fixed the resolution for it doesn’t seem to work for me, but I will update this post when I get any further information about it.



UPDATE: The reason the above behaviour is happening is because the NB SagaMessageHandler is loaded which will handle the TimeoutMessage in the same pipeline and place it back in the queue when it is done. So the combination of specifying ordering with the SagaMessageHandler last, as I did in the EndpointConfig above (ISpecifyMessageHandlerOrdering) and the call to bus.DoNotContinueDispatchingCurrentMessageToHandlers() will prevent this from happening as I observed. A better way to do this though, as Udi pointed out, (and if you do not require the use of sagas in this endpoint) is to prevent it from being loaded in the first place. You can do this by ensuring that the scanned types does not include SagaMessageHandler. I needed fine grained control over what I was loading so the code below worked for me. BTW: You can do a similar thing if you want to prevent NSB loading it’s NHibernateMessageModule and other builtin infrastructure components. Remember an empty .With() call will scan all assemblies in the executable folder.



 




toScan.Remove(typeof(SagaMessageHandler));
 
NServiceBus.Configure.With(toScan)
    .Autofac2Builder(IoC.Container)
    .XmlSerializer()
    .UnicastBus();



That's pretty much it for the Handler, although I could talk more about quartz here but that's probably best for another post or you can read up about quartz on the website.



Lastly we need to implement the job which we just scheduled:






public class TimeoutMessageDispatcherJob : IJob
{
    public const string ReturnAddressKey = "ReturnAddressKey";
    public const string ExpiresKey = "Expires";
    public const string ClearTimeoutKey = "ClearTimeout";
    public const string StateKey = "State";
    public const string SagaIdkey = "SagaId";
 
    private readonly IBus bus;
 
    public TimeoutMessageDispatcherJob(IBus bus)
    {
        this.bus = bus;
    }
 
    public void Execute(JobExecutionContext context)
    {
        var state = context.JobDetail.JobDataMap[StateKey];
        var clearTimeout = context.JobDetail.JobDataMap.GetBoolean(ClearTimeoutKey);
        var expires = context.JobDetail.JobDataMap.GetDateTime(ExpiresKey);
        var sagaId = Guid.Parse(context.JobDetail.JobDataMap[SagaIdkey].ToString());
        var returnAddress = context.JobDetail.JobDataMap.GetString(ReturnAddressKey);
 
        bus.Send(returnAddress, new[] { new TimeoutMessage()
                                            {
                                                Expires = expires,
                                                ClearTimeout = clearTimeout,
                                                SagaId = sagaId,
                                                State = state
                                            } });
    }
}



We just implement the IJob interface and provide an implementation for the Execute method. We retrieve the sate from the JobDataMap, create a new TimeoutMessage and send it to the return address which we also asked quartz to save for us. And that's it.



NOTE: I have used constants for the keys and use them between classes to ensure we save and get the right values, I have omitted some of this code in the handler for brevity.


UPDATE: The code (with a few generalisation modifications) is now in the NServiceBus-Contrib project on github, you can get it there.

2 comments:

David Craft said...

Excellent Post.. really enjoyed reading about the timeout manager. I've implemented my own.. for me though I was happy with the disk writes as like you I only required it to be accurate to about 1 minute.

However if that changes with the next saga, I'll deffo give this a go. The learning never ends.. next to read up about Quartz. Would be interested if you do a post about Quartz.

Dave @craftyfella

stuart cullinan said...

Thanks David, quartz is huge in java world where it originated from but for some reason is largely overlooked in .Net circles. If I get some time in the new year I may post something. Thanks for reading.