Mule ESB - Request-Reply error handling
In the previous post I presented working solution for synchronous batch process with multi-thread processing stage implemented with request-reply scope. By ‘working’ i mean it successfully processed input. But how process will behave, if something goes wrong during processing? Let’s test it.
RequestReplyErrorHandlingTest is a sample munit test that mocks exception thrown by web service invoked in call-LongRunningTask
1
2
3
4
5
6
7
8
9
10
11
@Test
public void shouldInovokeLongRunningProcessConcurrently() throws Exception {
whenMessageProcessor("component").ofNamespace("scripting").withAttributes(attribute("name").
ofNamespace("doc").withValue("Long running activity")).
thenThrow(new RuntimeException());
runFlow("request-reply-flow" , testEvent(SAMPLE_INPUT_1_RECORD.split(",")));
}
And results are horrible - exception was thrown, process never finished and execution is stuck. Test must be stopped manually. What exactly happen? Diagram below provides good explenation.
- Without reply message Request-Reply scope cannot finish and waits indefinitely blocking processing and resources.
- Aggregate Results timeout is set up to 10s. But it never triggers
- Once request-reply-long-task-aggr is stopped even for one chunk, Aggregate Results stage will never be completed and final message will never be sent to reply vm outbound endpoint
Let’s deal with the issues in ‘easier first’ order.
Issue #1 - Separate timeout must be set up for request-reply scope
There is a timeout attribute in Request-Reply scope definition. After setting up timeout value to 11s. (it should be longer than internal Aggregate timeout set up to 10s) Request-Reply stage finishes once timeout elapsed with error:
Issue #2 - Aggregate Results timeout starts counting once first message in the group is aggregated
In the above test it never happenes, because every message fails with exception, so it will never reach aggregate step. Still we can rely on Request-Reply timeout - it stops no matter what.
issue #3 - If even single message in the group failed, Aggregate Results stage always finish in timeout
My first idea was to catch exception from remote web service call and pass message to aggregate step. Having second thoughts i realized, that even if processing failed for single chunk, there is no need to wait for the results of other chunks processing. It is better to stop whole processing, so it can be retried or handled in different way. To accomplish that I have added catch exception strategy StopRequestReplyExceptioStrategy that will set exception message as payload and pass message directly to reply vm inbound endpoint.
In theory that should allow to finish processing earlier. However the first result was not successful - reply vm inbound endpoint from reqest-reply rejected message with exception ObjectAlreadyExistsException followed by timeout:
It didn’t make any sense. How given correlation id could be rejected only to be reported as not delivered 10 seconds later? The explanation is hidden in the code of class org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.
Method getAsyncReplyCorrelationId is adding correlation sequence to correlation id. And property MULE_CORRELATION_SEQUENCE is set on the flow , because Split stage is earlier used to divide paylod into chunks. Exception strategy shortcut omits Aggregate step, so that additionall property is still there and is automatically added to correlation id. But it can be overwritten in exception strategy
It is rather a hack, not clean solution. Perfect solution should rather remove inbound property MULE_CORRELATION_SEQUENCE, but instead of that inbound property is overwritten by outbound property empty value. This hack causes warning:
Nevertheless it works and request-reply solution got proper error handling - what is proved by the additional error scenario tests available together with final version of the flows on github.