Routers are Mule components that can direct,
resequence, split, and aggregate Mule events.
Routers (Flow
Controls in Anypoint Studio) route messages to
various destinations in a Mule flow. Some routers incorporate logic to analyse
and possibly transform messages before routing takes place. For example,
various flow controls can:
·
Split a message into several segments, then
route each segment to a different processor
·
Combine several messages into a single
message before sending it to the next processor in the flow
·
Reorder a list of messages before sending
it to the next processor
·
Evaluate a message to determine which of
several possible processors it should be routed to next
·
Broadcast the same message to multiple
processors
The Last bullet point (Broadcast the same message to multiple processors) deals with sending the same message across different groups of message processors,
The routing message processor Scatter-Gather sends a request message to multiple targets concurrently. It
collects the responses from all routes, and aggregates them into a single
message.
The Scatter-Gather router sends a message
for concurrent processing to all configured routes. The thread executing the
flow that owns the router waits until all routes complete or time out.
If there are
no failures, Mule aggregates the results from each of the routes into a message
collection (MessageCollection class). Failure in one route does not stop the
Scatter-Gather from sending messages to its other configured routes, so it is
possible that many, or all routes may fail concurrently.
By default, if
any route fails, Scatter-Gather performs the following actions:
·
Sets the exception payload accordingly for
each route.
·
Throws a CompositeRoutingException, which
maps each exception to its corresponding route using a sequential route ID.
Catching the
CompositeRoutingException allows you to gather information on all failed
routes.
CompositeRoutingException
The
CompositeRoutingException extends the Mule MessagingException to aggregate
exceptions from different routes in the context of a single message router.
Exceptions are correlated to each route through a sequential ID.
This exception
exposes two methods which allow you to obtain the IDs of failed routes and the
exceptions returned by each route.
·
The
getExceptions
method returns a map where the key is an integer that identifies
the index of the failed route, and the value is the exception itself.
·
The
getExceptionForRouteIndex(int)
method returns the exception of the requested route ID.Customizing Aggregation Strategies
Scatter-Gather
allows you to define a custom aggregation strategy which overrides its default
aggregation strategy. Among other things, custom gathering strategies allow you
to:
·
Discard message responses
·
Merge message properties that originated in
different routes
·
Discard failed messages without throwing an
exception
·
Select only one from multiple responses
Please find flow file as below
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd">
<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8082" basePath="scatter" doc:name="HTTP Listener Configuration"/>
<db:mysql-config name="MySQL_Configuration" host="localhost" port="3306" user="root" database="mule_support" doc:name="MySQL Configuration"/>
<flow name="scatter-gather-2Flow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/" doc:name="HTTP"/>
<byte-array-to-string-transformer doc:name="Byte Array to String"/>
<scatter-gather doc:name="Scatter-Gather">
<custom-aggregation-strategy class="strategy.MyAggregationStrategy"/>
<processor-chain>
<logger message="route-1" level="INFO" doc:name="Logger"/>
<db:select config-ref="MySQL_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[select * from movie]]></db:parameterized-query>
</db:select>
<json:object-to-json-transformer doc:name="Object to JSON"/>
</processor-chain>
<processor-chain>
<logger message="route-2" level="INFO" doc:name="Logger"/>
<vm:outbound-endpoint exchange-pattern="one-way" path="test" doc:name="VM"/>
</processor-chain>
<processor-chain>
<logger message="route-3" level="INFO" doc:name="Logger"/>
<file:outbound-endpoint path="src/main/resources" responseTimeout="10000" doc:name="File"/>
</processor-chain>
<scripting:component doc:name="Groovy">
<scripting:script engine="Groovy"><![CDATA[throw new java.lang.Exception();]]></scripting:script>
</scripting:component>
</scatter-gather>
<combine-collections-transformer doc:name="Combine Collections"/>
<choice-exception-strategy doc:name="Choice Exception Strategy">
<catch-exception-strategy when="org.mule.routing.CompositeRoutingException" doc:name="Catch Exception Strategy">
<logger message="composite exception handled" level="INFO" doc:name="Logger"/>
</catch-exception-strategy>
</choice-exception-strategy>
</flow>
</mule>
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd">
<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8082" basePath="scatter" doc:name="HTTP Listener Configuration"/>
<db:mysql-config name="MySQL_Configuration" host="localhost" port="3306" user="root" database="mule_support" doc:name="MySQL Configuration"/>
<flow name="scatter-gather-2Flow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/" doc:name="HTTP"/>
<byte-array-to-string-transformer doc:name="Byte Array to String"/>
<scatter-gather doc:name="Scatter-Gather">
<custom-aggregation-strategy class="strategy.MyAggregationStrategy"/>
<processor-chain>
<logger message="route-1" level="INFO" doc:name="Logger"/>
<db:select config-ref="MySQL_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[select * from movie]]></db:parameterized-query>
</db:select>
<json:object-to-json-transformer doc:name="Object to JSON"/>
</processor-chain>
<processor-chain>
<logger message="route-2" level="INFO" doc:name="Logger"/>
<vm:outbound-endpoint exchange-pattern="one-way" path="test" doc:name="VM"/>
</processor-chain>
<processor-chain>
<logger message="route-3" level="INFO" doc:name="Logger"/>
<file:outbound-endpoint path="src/main/resources" responseTimeout="10000" doc:name="File"/>
</processor-chain>
<scripting:component doc:name="Groovy">
<scripting:script engine="Groovy"><![CDATA[throw new java.lang.Exception();]]></scripting:script>
</scripting:component>
</scatter-gather>
<combine-collections-transformer doc:name="Combine Collections"/>
<choice-exception-strategy doc:name="Choice Exception Strategy">
<catch-exception-strategy when="org.mule.routing.CompositeRoutingException" doc:name="Catch Exception Strategy">
<logger message="composite exception handled" level="INFO" doc:name="Logger"/>
</catch-exception-strategy>
</choice-exception-strategy>
</flow>
</mule>
The scatter gather in above example is configured with aggregation strategy class MyAggregationStrategy.java
package strategy;
import java.util.List;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.routing.AggregationContext;
import org.mule.routing.AggregationStrategy;
import org.mule.api.MuleException;
import org.mule.api.routing.AggregationContext;
import org.mule.routing.AggregationStrategy;
public class MyAggregationStrategy implements AggregationStrategy {
@Override
public MuleEvent aggregate(AggregationContext context) throws MuleException {
List<MuleEvent> successEvents = context.collectEventsWithoutExceptions();
return successEvents.get(0);
}
public MuleEvent aggregate(AggregationContext context) throws MuleException {
List<MuleEvent> successEvents = context.collectEventsWithoutExceptions();
return successEvents.get(0);
}
}
In the above example, The Aggregation Strategy returning only First branch payload, the integration developer responsibility to design the Aggregation Strategy,
Thank you, for sharing clear and useful information.
ReplyDeletevery useful information for beginners and aswelas experienced ones
ReplyDelete