Saturday, 30 June 2018

Scatter Gather In Mule


Routers are Mule components that can direct, resequence, split, and aggregate Mule events.

Routers (Flow Controls in Anypoint Studio) route messages to various destinations in a Mule flow. Some routers incorporate logic to analyse and possibly transform messages before routing takes place. For example, various flow controls can:

·       Split a message into several segments, then route each segment to a different processor

·       Combine several messages into a single message before sending it to the next processor in the flow

·       Reorder a list of messages before sending it to the next processor

·       Evaluate a message to determine which of several possible processors it should be routed to next

·       Broadcast the same message to multiple processors
The Last bullet point  (Broadcast the same message to multiple processors) deals with sending the same message across different groups of message processors,
The routing message processor Scatter-Gather sends a request message to multiple targets concurrently. It collects the responses from all routes, and aggregates them into a single message.
The Scatter-Gather router sends a message for concurrent processing to all configured routes. The thread executing the flow that owns the router waits until all routes complete or time out.
If there are no failures, Mule aggregates the results from each of the routes into a message collection (MessageCollection class). Failure in one route does not stop the Scatter-Gather from sending messages to its other configured routes, so it is possible that many, or all routes may fail concurrently.
By default, if any route fails, Scatter-Gather performs the following actions:
·       Sets the exception payload accordingly for each route.
·       Throws a CompositeRoutingException, which maps each exception to its corresponding route using a sequential route ID.
Catching the CompositeRoutingException allows you to gather information on all failed routes. 

CompositeRoutingException

The CompositeRoutingException extends the Mule MessagingException to aggregate exceptions from different routes in the context of a single message router. Exceptions are correlated to each route through a sequential ID.
This exception exposes two methods which allow you to obtain the IDs of failed routes and the exceptions returned by each route.
·       The getExceptions method returns a map where the key is an integer that identifies the index of the failed route, and the value is the exception itself.
·       The getExceptionForRouteIndex(int) method returns the exception of the requested route ID.

Customizing Aggregation Strategies

Scatter-Gather allows you to define a custom aggregation strategy which overrides its default aggregation strategy. Among other things, custom gathering strategies allow you to:
·       Discard message responses
·       Merge message properties that originated in different routes
·       Discard failed messages without throwing an exception
·       Select only one from multiple responses
Please find flow file as below


<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
 xmlns:spring="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd">
    <http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8082" basePath="scatter" doc:name="HTTP Listener Configuration"/>
    <db:mysql-config name="MySQL_Configuration" host="localhost" port="3306" user="root" database="mule_support" doc:name="MySQL Configuration"/>
  
    <flow name="scatter-gather-2Flow">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/" doc:name="HTTP"/>
        <byte-array-to-string-transformer doc:name="Byte Array to String"/>
        <scatter-gather doc:name="Scatter-Gather">
            <custom-aggregation-strategy class="strategy.MyAggregationStrategy"/>
            <processor-chain>
                <logger message="route-1" level="INFO" doc:name="Logger"/>
                <db:select config-ref="MySQL_Configuration" doc:name="Database">
                    <db:parameterized-query><![CDATA[select * from movie]]></db:parameterized-query>
                </db:select>
                <json:object-to-json-transformer doc:name="Object to JSON"/>
            </processor-chain>
            <processor-chain>
                <logger message="route-2" level="INFO" doc:name="Logger"/>
                <vm:outbound-endpoint exchange-pattern="one-way" path="test" doc:name="VM"/>
            </processor-chain>
            <processor-chain>
                <logger message="route-3" level="INFO" doc:name="Logger"/>
                <file:outbound-endpoint path="src/main/resources" responseTimeout="10000" doc:name="File"/>
            </processor-chain>
            <scripting:component doc:name="Groovy">
                <scripting:script engine="Groovy"><![CDATA[throw new java.lang.Exception();]]></scripting:script>
            </scripting:component>
        </scatter-gather>
        <combine-collections-transformer doc:name="Combine Collections"/>
        <choice-exception-strategy doc:name="Choice Exception Strategy">
            <catch-exception-strategy when="org.mule.routing.CompositeRoutingException" doc:name="Catch Exception Strategy">
                <logger message="composite exception handled" level="INFO" doc:name="Logger"/>
            </catch-exception-strategy>
        </choice-exception-strategy>
    </flow>
</mule>
The scatter gather in above example is configured with aggregation strategy class MyAggregationStrategy.java
package strategy;
import java.util.List;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.routing.AggregationContext;
import org.mule.routing.AggregationStrategy;
public class MyAggregationStrategy implements AggregationStrategy {
 @Override
 public MuleEvent aggregate(AggregationContext context) throws MuleException {
 List<MuleEvent> successEvents = context.collectEventsWithoutExceptions();
   return successEvents.get(0);
 }
}
In the above example, The Aggregation Strategy returning only First branch payload, the integration developer responsibility to design the Aggregation Strategy,  

Sunday, 17 June 2018


Database Connectivity in Mule
The Database connector establishes communication between your Mule app and a relational database

You can perform predefined queries, dynamically constructed queries, and template queries that are self-sufficient and customizable. 
You can perform multiple SQL requests in a single bulk update and make Data Definition Language (DDL) requests that alter the data structure rather than the data itself.
The database connector supports the following operations:


·       Select
·       Insert
·       Update 
·       Delete
·       Stored Procedure
·       Bulk Execute
·       DDL operations such as CREATE, ALTER, etc.
How to Configure Database Connectivity in Mule:
Drag and drop the below DB connector from Component pallet to Canvas

I choose MySql as database

There are 3 ways to configure Database connectivity in Mule
a)       By Providing hostname, port, username, password and DB schema details on UI
b)      By providing the details through Spring bean
c)       By providing the URL.


:
Providing details through Spring bean:  
Create a java class under source folder src/main/java
And override the method getConnection()
The Java should implement java.sql.DataSource
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.logging.Logger;
import javax.sql.DataSource;
public class DBConfigDetails implements DataSource{
// Keep the rest of methods as it is, I modified the below 2 methods
              @Override
              public Connection getConnection() throws SQLException {
                             Connection con = getConnection("root", "");
                             return con;
              }
              @Override
              public Connection getConnection(String username, String password) throws SQLException {
                             Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/mule_support", username, password);
                             return con;
              }
}

Configure the DB Connector as below




3) By providing the URL.:
jdbc:mysql://localhost:3306/mule_support?password=root&user=root
Querying the Database:
From the Mule we can query database in 3 ways
1)      Parameterized queries
2)      Dynamic Queries
3)      Template Based Queries

Parameterized Query
Parameterized (Recommended) - Mule replaces all Mule Expression Language (MEL) expressions inside a query with "?" to create a prepared statement, then evaluates the MEL expressions using the current event so as to obtain the value for each parameter.

Here only parameters replaced by MEL expressions
Dynamic - Mule replaces all MEL expressions in the query with the result of the expression evaluation, then sends the result to the database
flexibility - you have ultimate flexibility over the SQL statement. For example, all of the following are valid dynamic query statements:
·       select * from #[tablename] where id = 1;
·       insert into #[message.payload.restOfInsertStatement];
·       #[flowVars[‘deleteStatement’]]

From Template - Enables you to define a query statement once, in a global element in your application (global Template Query Reference element),
then reuse the query multiple times within the same application, dynamically varying specific values as needed.
Relative to parameterized and dynamic queries, from template queries offer the advantage of enabling you to reuse your query statements.





Saturday, 2 June 2018

Splitter and Aggregator

The Splitter Flow Control splits a message into separate fragments, then sends these fragments one at a time to the next message processor in the flow. Segments are identified based on an expression parameter, usually written in Mule Expression Language (MEL), but other formats can be employed also. You can then use a Collection Aggregator Flow Control to reassemble the parts of the original message. You can also include a Resequencer Flow Control to put the parts back into the original sequence in case they are shuffled out of order.

Let’s take a scenario to understand the problem, there is process which calculates payroll of all employees. To design this problem in the following steps
  •       Initial process queries the database and fetch the employee records
  •        There is a payroll process defined, it takes the employee record as input and calculates the salary for an employee.
  •        Employee collection is retrieved from the Step-1, Iterate the employee collection and pass an employee to Step-2 to calculate the salary for an employee
  •       If salary calculatio n for an employee takes 1 sec. salary calculation of 100 employee takes around 100 seconds.
  •     To increase the performance to above design we engage the splitter-aggregator component instead of iterating the employee collection get from Step-1

Splitter splits the employee collection in to parts of employee records, each part consists of employee record, processing of salary calculation of each employee handled by a separate thread, salary calculation of all employees happens concurrently by each thread. So now whole payroll calculation happens in I sec.


In the above example we got 5 records from the database, if we are not using splitter to split the resultset, we need to iterate the collection and process each record sequentially. In this scenario usage of Splitter improves the performance of application.


Aggregating the Payload


When the splitter splits a message, it adds three new outbound variables into each of the output fragments. These three variables are later used by the Aggregator to reassemble the message:
  • MULE_CORRELATION_GROUP_SIZE: number of fragments into which the original message was split.
  • MULE_CORRELATION_SEQUENCE: position of a fragment within the group.
  • MULE_CORRELATION_ID: single ID for entire group (all output fragments of the same original message share the same value).
Create table in mysql Database

CREATE TABLE `employee` (
  `id` varchar(20) NOT NULL DEFAULT '',
  `EMP_NAME` varchar(45) DEFAULT NULL,
  `EMP_DESIG` varchar(45) DEFAULT NULL,
  `SALARY` int(11) DEFAULT NULL,
  `LOCATION` varchar(45) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Insert 2 sample records using below Insert statement:
INSERT INTO `mule_support`.`employee`
(`id`,
`EMP_NAME`,
`EMP_DESIG`,
`SALARY`,
`LOCATION`)
VALUES
(<{id: }>,
<{EMP_NAME: }>,
<{EMP_DESIG: }>,
<{SALARY: }>,

<{LOCATION: }>);


<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
    <http:listener-config name="HTTP_Listener_Configuration" host="${http.host}" port="${http.port}" doc:name="HTTP Listener Configuration"/>
    <db:mysql-config name="MySQL_Configuration" host="${db.host}" port="${db.port}" user="${db.username}" database="${db.schema}" doc:name="MySQL Configuration" password="${db.password}"/>
    <flow name="splitter_demoFlow">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/" doc:name="HTTP"/>
        <db:select config-ref="MySQL_Configuration" doc:name="Database">
            <db:parameterized-query><![CDATA[select * from  employee]]></db:parameterized-query>
        </db:select>
        <collection-splitter doc:name="Collection Splitter">
            <expression-message-info-mapping messageIdExpression="#[message.id]" correlationIdExpression="#[message.correlationId]"/>
        </collection-splitter>
        <logger message="Salary of employee is #[payload.SALARY]" level="INFO" doc:name="Logger"/>
        <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
        <logger message="after splitter------------------" level="INFO" doc:name="Logger"/>
        <byte-array-to-string-transformer doc:name="Byte Array to String"/>
    </flow>
</mule>

Complete source code of application can download from
https://drive.google.com/open?id=19YKTSCyvsxI23imE-FCjORYVENZuSDSR

Testing application with POSTMan


               




How to quick start your Mulesoft professional journey

Mulesoft is leading and Fast growing Application integration framework in the industry. Do you want to start professional journey in Mulesof...