Saturday 30 June 2018

Scatter Gather In Mule


Routers are Mule components that can direct, resequence, split, and aggregate Mule events.

Routers (Flow Controls in Anypoint Studio) route messages to various destinations in a Mule flow. Some routers incorporate logic to analyse and possibly transform messages before routing takes place. For example, various flow controls can:

·       Split a message into several segments, then route each segment to a different processor

·       Combine several messages into a single message before sending it to the next processor in the flow

·       Reorder a list of messages before sending it to the next processor

·       Evaluate a message to determine which of several possible processors it should be routed to next

·       Broadcast the same message to multiple processors
The Last bullet point  (Broadcast the same message to multiple processors) deals with sending the same message across different groups of message processors,
The routing message processor Scatter-Gather sends a request message to multiple targets concurrently. It collects the responses from all routes, and aggregates them into a single message.
The Scatter-Gather router sends a message for concurrent processing to all configured routes. The thread executing the flow that owns the router waits until all routes complete or time out.
If there are no failures, Mule aggregates the results from each of the routes into a message collection (MessageCollection class). Failure in one route does not stop the Scatter-Gather from sending messages to its other configured routes, so it is possible that many, or all routes may fail concurrently.
By default, if any route fails, Scatter-Gather performs the following actions:
·       Sets the exception payload accordingly for each route.
·       Throws a CompositeRoutingException, which maps each exception to its corresponding route using a sequential route ID.
Catching the CompositeRoutingException allows you to gather information on all failed routes. 

CompositeRoutingException

The CompositeRoutingException extends the Mule MessagingException to aggregate exceptions from different routes in the context of a single message router. Exceptions are correlated to each route through a sequential ID.
This exception exposes two methods which allow you to obtain the IDs of failed routes and the exceptions returned by each route.
·       The getExceptions method returns a map where the key is an integer that identifies the index of the failed route, and the value is the exception itself.
·       The getExceptionForRouteIndex(int) method returns the exception of the requested route ID.

Customizing Aggregation Strategies

Scatter-Gather allows you to define a custom aggregation strategy which overrides its default aggregation strategy. Among other things, custom gathering strategies allow you to:
·       Discard message responses
·       Merge message properties that originated in different routes
·       Discard failed messages without throwing an exception
·       Select only one from multiple responses
Please find flow file as below


<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
 xmlns:spring="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd">
    <http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8082" basePath="scatter" doc:name="HTTP Listener Configuration"/>
    <db:mysql-config name="MySQL_Configuration" host="localhost" port="3306" user="root" database="mule_support" doc:name="MySQL Configuration"/>
  
    <flow name="scatter-gather-2Flow">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/" doc:name="HTTP"/>
        <byte-array-to-string-transformer doc:name="Byte Array to String"/>
        <scatter-gather doc:name="Scatter-Gather">
            <custom-aggregation-strategy class="strategy.MyAggregationStrategy"/>
            <processor-chain>
                <logger message="route-1" level="INFO" doc:name="Logger"/>
                <db:select config-ref="MySQL_Configuration" doc:name="Database">
                    <db:parameterized-query><![CDATA[select * from movie]]></db:parameterized-query>
                </db:select>
                <json:object-to-json-transformer doc:name="Object to JSON"/>
            </processor-chain>
            <processor-chain>
                <logger message="route-2" level="INFO" doc:name="Logger"/>
                <vm:outbound-endpoint exchange-pattern="one-way" path="test" doc:name="VM"/>
            </processor-chain>
            <processor-chain>
                <logger message="route-3" level="INFO" doc:name="Logger"/>
                <file:outbound-endpoint path="src/main/resources" responseTimeout="10000" doc:name="File"/>
            </processor-chain>
            <scripting:component doc:name="Groovy">
                <scripting:script engine="Groovy"><![CDATA[throw new java.lang.Exception();]]></scripting:script>
            </scripting:component>
        </scatter-gather>
        <combine-collections-transformer doc:name="Combine Collections"/>
        <choice-exception-strategy doc:name="Choice Exception Strategy">
            <catch-exception-strategy when="org.mule.routing.CompositeRoutingException" doc:name="Catch Exception Strategy">
                <logger message="composite exception handled" level="INFO" doc:name="Logger"/>
            </catch-exception-strategy>
        </choice-exception-strategy>
    </flow>
</mule>
The scatter gather in above example is configured with aggregation strategy class MyAggregationStrategy.java
package strategy;
import java.util.List;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.routing.AggregationContext;
import org.mule.routing.AggregationStrategy;
public class MyAggregationStrategy implements AggregationStrategy {
 @Override
 public MuleEvent aggregate(AggregationContext context) throws MuleException {
 List<MuleEvent> successEvents = context.collectEventsWithoutExceptions();
   return successEvents.get(0);
 }
}
In the above example, The Aggregation Strategy returning only First branch payload, the integration developer responsibility to design the Aggregation Strategy,  

2 comments:

  1. Thank you, for sharing clear and useful information.

    ReplyDelete
  2. very useful information for beginners and aswelas experienced ones

    ReplyDelete

How to Design Mule API to process Attachments

This blog Explains , how to design Mule API to process attachments. Quite often we get requirement to design API's that process attachme...