Data in each record must by enhanced with information provided by 3rd party web service, then messages with fixed number of records (exactly 3 - transport channel limitation) delivered to customer.
Processing is successful only if all messages with all records are delivered to customer - partial delivery is not allowed (that’s why transaction is spanned on records processing and chunk delivery).
call-LongRunningTask invokes http call to 3rd party web service for additional data. It takes always 1 second to complete. For the test input of 10 elements processing time takes 10 seconds and 4 messages are delivered to finalMessage vm endpoint.
Problems….
In production usage there will be millions of records.
Current performance will not satisfy expectations. In order to improve processing time solution must be switched from single-threaded to multi-threaded processing. Thankfully Mule ESB allows to execute portion of single threaded process with multiple threads using request-reply scope.
So let’s refactor inner foreach loop of the processing flow to execute call-LongRunningTask step in separate thread for each record, then aggregate results and go back to single threaded processing to send output messages - similarly to scatter-gather pattern. Results of 20 minutes of work below:
Main flow with request-reply scope inside which request vm outbound endpoint sends records for multi-thread processing, and reply vm inbound endpoints receive processing results.
Flow receives records sent to request endpoint to split them and pass to aggr vm endpoint for further processing.
Flow receives record from aggr endpoint, invoke call-LongRunningTask, waits for all records in collection-aggragator stage and finally sends back aggregated results to reply endpoint.
Let’s run the test and see how it works. Well, it doesn’t work at all.
Issue#1: VM endpoint inside request-reply cannot be in transactional scope
It turned out to be rather obvious rule. It is highlighted even in official Mule ESB documentation for Request-Reply Scope - warning sign and yellow highlighted note draw attention. However documentation says nothing regarding how to fix it. And it turned out to be quite easy. It is enough to disable transaction for request-reply VM endpoints. It doesn’t affect process logic at all - HTTP calls inside call-LongRunningTaks are not transactional and finalMessage VM outbound endpoint still is within transaction. Explicit definition of proper vm:transaction action inside request-reply scope below:
After the changes flow finally started to work. But logs don’t look right:
All tasks finished before any Task processed? It means that aggregation step was executed without waiting for web service responses.
Issue #2: Default processing strategies doesn’t work
Without defined processing strategies each stage in the flow is executed without waiting on results of previous stage. For the simplicity let’s start with processing both flows synchronously by setting up processing strategy to synchronous:
Change didn’t help - process still fails, but this time with exception:
After long and not so easy debugging new issue was identified:
Issue #3: If aggregation is needed, avoid nested loops
Mule uses internal properties MULE_CORRELATION_SEQUENCE and MULE_CORRELATION_GROUP_SIZE for tracing split collection elements to enable their aggregation. It turned out that those parameters are overwritten by foreach and splitter message processors. The solution would be either to store original values of MULE_CORRELATION_SEQUENCE and MULE_CORRELATION_GROUP_SIZE in outer loop and restore them after nested loop is finished or separate those two iterations. It is much easier to separate two loops.
After that change solution finally works es expected!
All tasks finished message is at the end of processing, after all records were enhanced and all final messages were sent.
In terms of processing time improvement is significant - right now flows ends within 5 seconds for 10 records and just 39 seconds for 1000 records. Nice improvement comparing to original 1000 seconds.
There are still hidden bottlenecks in the processing and there is a huge gap for error processing, but that will be the subject of next article.
Source code for the flows and sample munit tests are available on github.