I have read and accepted that the built in NServiceBus Timeout Manager is not suitable for production largely due to it’s heavy disc IO. Udi explains the the complexities of dealing with time sensitive problems here. In summary it is difficult to provide a solution to everyone's use cases as a balance between time resolution and fail-over duration make things difficult.
However, in my case I do not need sub-second or even sub minute resolution nor am I completely convinced the NServiceBus Timeout manager would provide either of those anyway, as you cannot guarantee message delivery times or the latency involved due to the inherent Fallacies of Distributed Computing. My use case can probably accept a resolution of up to 5 minutes although I don’t believe it will be anywhere near as bad as that as we can run quartz in a fail over cluster and fail over in around 60 seconds using VMWare HA.
We use Quartz as a scheduling service already, if you haven’t checked it out do so, I really rate it and have been using it for a few years now. So I decided to see if I could implement a timeout manager using it.
Timeouts are really just a message sent by a saga to another service with the expectation that it will receive it back when the specified expiry elapses. There are some specific differences in how they are handled by the NServiceBus infrastructure but we’ll ignore this for now.
I simply created a new class library with an endpoint configured like so:
This is mostly standard configuration, except I’m using autofac, but you can use any conatiner of your choice. I told NserviceBus I wanted to do some stuff at startup by implementating the IWantToRunAtStartUp interface. This is where I start and stop the quartz scheduler. The code to start and stop the scheduler is trivial:
You can configure quartz to run using an in memory store or the database of your choice. You can hook up different behaviour by implementing your own NServiceBus profiles for dev, testiong and production if you like. For now I will use and in memory one and the config looks like this:
<add key="quartz.scheduler.instanceName" value="Scheduler"/>
<add key="quartz.threadPool.type" value="Quartz.Simpl.SimpleThreadPool, Quartz"/>
<add key="quartz.threadPool.threadCount" value="10"/>
<add key="quartz.threadPool.threadPriority" value="2"/>
<add key="quartz.jobStore.misfireThreshold" value="60000"/>
<add key="quartz.jobStore.type" value="Quartz.Simpl.RAMJobStore, Quartz"/>
For details on how to configure quartz see the documentation. One other class worth mentioning is the AutofacJobFactory which is required if you want to use your container to resolve different quartz jobs. It is again trvial and looks like so:
BTW: I have removed exception handling and logging code for the sake of brevity in most of these snippets.
Now for the TimeoutmessageHandler which is much like any other NserviceBus message handler:
Ok so what’s going on here. Firstly I handle whether I should be clearing the timeout by checking the ClearTimeout on the message. If it is true I unschedule the job otherwise I check to see if the message has expired yet, if it hasn’t I create a new quartz job and a trigger using the expiry date and schedule it. I have stored all the message state in the JobDataMap which quartz will persist for us, you can store custom (Serializable) objects but just beware of versioning issues…I use simple BCL types here to avoid this. If the message has already expired when we receive it for the first time then we immediately send it on to the return address.
There is one line here which is annoying though and that is the bus.DoNotContinueDispatchingCurrentMessageToHandlers(), this is because of some strange behaviour where the same message is received over and over. I posted a question on the forums about this and although it seems that this is a known bug which has been fixed the resolution for it doesn’t seem to work for me, but I will update this post when I get any further information about it.
UPDATE: The reason the above behaviour is happening is because the NB SagaMessageHandler is loaded which will handle the TimeoutMessage in the same pipeline and place it back in the queue when it is done. So the combination of specifying ordering with the SagaMessageHandler last, as I did in the EndpointConfig above (ISpecifyMessageHandlerOrdering) and the call to bus.DoNotContinueDispatchingCurrentMessageToHandlers() will prevent this from happening as I observed. A better way to do this though, as Udi pointed out, (and if you do not require the use of sagas in this endpoint) is to prevent it from being loaded in the first place. You can do this by ensuring that the scanned types does not include SagaMessageHandler. I needed fine grained control over what I was loading so the code below worked for me. BTW: You can do a similar thing if you want to prevent NSB loading it’s NHibernateMessageModule and other builtin infrastructure components. Remember an empty .With() call will scan all assemblies in the executable folder.
That's pretty much it for the Handler, although I could talk more about quartz here but that's probably best for another post or you can read up about quartz on the website.
Lastly we need to implement the job which we just scheduled:
We just implement the IJob interface and provide an implementation for the Execute method. We retrieve the sate from the JobDataMap, create a new TimeoutMessage and send it to the return address which we also asked quartz to save for us. And that's it.
NOTE: I have used constants for the keys and use them between classes to ensure we save and get the right values, I have omitted some of this code in the handler for brevity.
UPDATE: The code (with a few generalisation modifications) is now in the NServiceBus-Contrib project on github, you can get it there.