Saturday 2 June 2018

Splitter and Aggregator

The Splitter Flow Control splits a message into separate fragments, then sends these fragments one at a time to the next message processor in the flow. Segments are identified based on an expression parameter, usually written in Mule Expression Language (MEL), but other formats can be employed also. You can then use a Collection Aggregator Flow Control to reassemble the parts of the original message. You can also include a Resequencer Flow Control to put the parts back into the original sequence in case they are shuffled out of order.

Let’s take a scenario to understand the problem, there is process which calculates payroll of all employees. To design this problem in the following steps
  •       Initial process queries the database and fetch the employee records
  •        There is a payroll process defined, it takes the employee record as input and calculates the salary for an employee.
  •        Employee collection is retrieved from the Step-1, Iterate the employee collection and pass an employee to Step-2 to calculate the salary for an employee
  •       If salary calculatio n for an employee takes 1 sec. salary calculation of 100 employee takes around 100 seconds.
  •     To increase the performance to above design we engage the splitter-aggregator component instead of iterating the employee collection get from Step-1

Splitter splits the employee collection in to parts of employee records, each part consists of employee record, processing of salary calculation of each employee handled by a separate thread, salary calculation of all employees happens concurrently by each thread. So now whole payroll calculation happens in I sec.


In the above example we got 5 records from the database, if we are not using splitter to split the resultset, we need to iterate the collection and process each record sequentially. In this scenario usage of Splitter improves the performance of application.


Aggregating the Payload


When the splitter splits a message, it adds three new outbound variables into each of the output fragments. These three variables are later used by the Aggregator to reassemble the message:
  • MULE_CORRELATION_GROUP_SIZE: number of fragments into which the original message was split.
  • MULE_CORRELATION_SEQUENCE: position of a fragment within the group.
  • MULE_CORRELATION_ID: single ID for entire group (all output fragments of the same original message share the same value).
Create table in mysql Database

CREATE TABLE `employee` (
  `id` varchar(20) NOT NULL DEFAULT '',
  `EMP_NAME` varchar(45) DEFAULT NULL,
  `EMP_DESIG` varchar(45) DEFAULT NULL,
  `SALARY` int(11) DEFAULT NULL,
  `LOCATION` varchar(45) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Insert 2 sample records using below Insert statement:
INSERT INTO `mule_support`.`employee`
(`id`,
`EMP_NAME`,
`EMP_DESIG`,
`SALARY`,
`LOCATION`)
VALUES
(<{id: }>,
<{EMP_NAME: }>,
<{EMP_DESIG: }>,
<{SALARY: }>,

<{LOCATION: }>);


<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
    <http:listener-config name="HTTP_Listener_Configuration" host="${http.host}" port="${http.port}" doc:name="HTTP Listener Configuration"/>
    <db:mysql-config name="MySQL_Configuration" host="${db.host}" port="${db.port}" user="${db.username}" database="${db.schema}" doc:name="MySQL Configuration" password="${db.password}"/>
    <flow name="splitter_demoFlow">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/" doc:name="HTTP"/>
        <db:select config-ref="MySQL_Configuration" doc:name="Database">
            <db:parameterized-query><![CDATA[select * from  employee]]></db:parameterized-query>
        </db:select>
        <collection-splitter doc:name="Collection Splitter">
            <expression-message-info-mapping messageIdExpression="#[message.id]" correlationIdExpression="#[message.correlationId]"/>
        </collection-splitter>
        <logger message="Salary of employee is #[payload.SALARY]" level="INFO" doc:name="Logger"/>
        <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
        <logger message="after splitter------------------" level="INFO" doc:name="Logger"/>
        <byte-array-to-string-transformer doc:name="Byte Array to String"/>
    </flow>
</mule>

Complete source code of application can download from
https://drive.google.com/open?id=19YKTSCyvsxI23imE-FCjORYVENZuSDSR

Testing application with POSTMan


               




No comments:

Post a Comment

How to Design Mule API to process Attachments

This blog Explains , how to design Mule API to process attachments. Quite often we get requirement to design API's that process attachme...