Using Asynchronous Gateways in Spring Integration and Spring’s Async Annotation

By Lynn Walton – Sr. Software Engineer – ADP Cobalt SEO Team

Introduction

Recently the SEO team configured a new job using Spring Integration. The job includes several phases where steps need to be done for each item in a collection. While I could have used the same single-integration approach we have used before, there were disadvantages with the previous approach that I was hoping to eliminate by using new techniques.

First I’ll describe two features used in the new approach.

Asynchronous Gateways

Spring Integration’s  @Gateway annotation allows you to designate a POJO defining an interface which, when wired up with <int:gateway> configuration, will allow a call to the interface to send the passed-in argument as the message payload for the configured channel.  It’s convenient for “kicking off” an integration flow to operate with a given set of data.  With configuration like the following, Spring Integration creates a GatewayFactoryProxyBean that implements your interface.

<int:gateway id="myJobGateway"
    service-interface="com.cobalt.services.seo.integration.support.MyJobGateway"
   default-request-channel="startJob" error-channel="jobErrorChannel"  />
public interface MyJobGateway {
@Gateway
    void startJob(List<Acccount> accounts);
}

In the single-integration approach we typically have our interface return void and do not configure a default-reply-channel.  By not having a reply channel, and by having the integration use an ExecutorChannel early in the flow to break the single-thread context between sender/receiver, we end up with behavior similar to making an asynchronous call to the gateway.  (Using an ExecutorChannel is done by adding <int:dispatcher taskExecutor=”…”/> to a channel.)  In the new approach we use a true asynchronous gateway.

Since Spring Integration 2.0 there has been support for making the Gateway asynchronous. You automatically get an AsynchronousGateway just by defining your interface to return a Future<T>.

public interface MyAsyncJobGateway {
@Gateway
Future<MyResultClass> startJob(List<Account>);
}

Spring’s @Async annotation

Since Spring 3.0, annotating a method with @Async causes Spring to wrap your service in a proxy.  When your method is called, the caller will get an immediate return, while the actual execution occurs in a task submitted to a Spring TaskExecutor.

@Service
public class MyServiceImpl  implements MyService {
    @Async
    void myMethod(){
        // do work
    }
}

For a method where you wish to return something so callers have the option of choosing to wait for a result, you return your result by passing it to the constructor of the AsyncResult<T> class.

@Service
public class MyServiceImpl  implements MyService {
    @Async
    Future myMethod(){
        // do work
        return new AsyncResult(instanceOfMyResultClass);
    }
}

In our normal use case, our service is called by a REST endpoint (which is called from cron).  We know the job will be long-running so the REST endpoint does not wait for the Future.  But having the service defined to return a Future allows us to wait for it in our Integration tests. These tests are configured to work with a small dataset so waiting is feasible, and making assertions on the returned object simplifies the integration tests.

A High-level Overview of the Two Approaches

Note: the integration flow pseudo-code shown below attempts to provide clarity by leaving out all components other than those which show the flow of the work.

Previous Approach
(Note: processing for each item in a collection is handled by splitters and aggregators)
New Approach
1 integration flow:GetDataForAllSitesInAllAccounts – a synchronous gateway (with null return and no reply-channel *) which sends a list of accounts into the flow:

splitter
  each account
    get list of sites
    splitter
      each site
        convert to list of urls to call
        splitter
          each url
            make call and store
        aggregator
    aggregator
aggregator
2 integration flows:GetSitesForAccount – an asynchronous gateway which sends a single account into the flow:

get list of sites

GetSiteData –  an asynchronous gateway which sends each site into a second flow

convert to list of urls to call
splitter
  each url
    make call and store
aggregator
1 service method:

  • obtain the list of accounts
  • call the gateway
  • returns null almost immediately after gateway call because the gateway has a null return and no reply channel, and the integration has ExecutorChannel early in the flow.
1 service method marked with Spring’s @Async:

  • obtain the list of accounts
  • logic to control flow and concurrency for calling both gateways (See “Logic in the Service Method” below)
  • returns a Future so callers have the option of waiting for result (great for testing)

Comparisons of Advantages and Disadvantages (PRO/CON)

Previous Approach New Approach with multiple Spring Async Gateways  and a @Async service method
CON: The integration flow is large and complex enough to make understanding more difficult. (Note: the pseudo-code above doesn’t show the real difference in complexity.) PRO: The separate integrations are much simpler to understand.CON: The looping that would be done by splitters/aggregators is now done with custom logic in the service method.
CON: Getting a summary report is critical but quite difficult with this approach as you must code separate integration components that can handle 1) failures that might have occurred at different stages and 2) summarization of both error and non-error results. The error handling code is complicated because the available payload and header information is different in all stages.Additionally, for the summary (in our logs) to be easily understood, you need to be able to make a good estimate of the time each aggregator needs to wait for the typical case to be finished. (This estimated time is set as the MessageGroupStoreReaper’s timeout value.) Estimating gets more difficult when there are nested splitters and aggregators as each estimated time has to take into account the timeouts for the preceding aggregators. If you set these too short or if unusual circumstances make the job take longer than what you’ve estimated, the summary logging will be difficult to interpret as it relies on counts of items released to the aggregator – which might have happened more than once.  Finally, the “best estimate” for production is not easy to guess in advance when pre-production environments differ significantly from production. PRO: Summary reporting is easier and more accurate because you’re catching any errors in the sections of code where you know the stage in which the error has occurred. Because of this there is no need to complicate the integration to store metadata in headers for retrieval from an error handling component. Also, there is only one aggregator that needs a time estimate.
CON: Integration tests need to use Thread.sleep() with a long enough value to give the integration time to run before verifying results.  This time varies in different computer environments with different loads, so to prevent build failures you’re forced to choose a pessimistically high value. The tests then take longer than they might have needed. PRO: Integration tests can wait with Future.get(estimatedTime, TimeUnit.SECONDS). You can set estimatedTime on the high side to avoid build failures without suffering the penalty of having to wait longer than necessary.
CON: Integration tests have to perform before-and-after state querying to make meaningful assertions, since the gateway returns null to simulate asynchronous behavior. PRO: Integration tests can easily make meaningful assertions on the Future object returned rather than querying before-and-after state.
PRO: Doing all of the work in one integration allows easy configuration of the number of concurrently executing tasks in each phase, by setting <int:dispatcher task-executor=”myExecutor”/> and configuring the desired pool-size on the executor. CON: We have to write the logic in Java to control a “quasi” level of concurrency for a particular stage. I say “quasi” because the nature of it is different than the way it would actually run in the case of a dispatcher with a task executor on a channel. With the coded logic we allow a degree of concurrency but only in batches of a set size where the batch is delayed until each task in the batch has finished. So there are pauses in the throughput that wouldn’t happen with the normal task-executor on a channel, which can always start on the next task when it has an available thread in the pool.PRO: Despite the CON, coding the concurrency logic can also be seen as an advantage, because we can dynamically pass the desired batch size to our service method. This allows the level of “quasi-concurrency” to be changed without deploying a new build and makes experimentation for arriving at the best value easier.

Logic in the Service Method

The relevant portions of the service method are listed below to show the extra amount of logic we implement to get the other benefits described above.  We do our own looping in place of the nested splitters/aggregators. We code the logic for performing some work concurrently in batches, but in exchange for this we get the flexibility of being able to dynamically change the batch size.  Finally, the code below for creating and updating a JobStats (summary) object is not really extra.  In our previous approach, it would still need to be coded in a separate summarization component used by the integration.

@Override
@Async()
public Future startJobForAccounts(final int numConcurrentSites, final String... acctLogins) {
    try {
        final List accounts = accountsUtil.createAccountsListForAcctEmail(acctLogins);
        final JobStats stats = new JobStats();
        // [1] LOOP REPLACING FIRST SPLITTER/AGGREGATOR
        for (Account account : accounts) {
            startJobForSingleAccount(numConcurrentSites, account, stats);
        }
        LOGGER.info(stats.createStatsLogStr());
        return new AsyncResult(stats);
    } catch (Exception exc) {
        LOGGER.error("Unexpected error: " + exc.getMessage());
        final JobStats stats = new JobStats();
        // set properties on JobStats appropriate for indicating the error
        return new AsyncResult(stats);
    }
}

private void startJobForSingleAccount(final int numConcurrentSites, final Account account, final JobStats stats) {
    Assert.isTrue(numConcurrentSites > 0, NUM_CONCURRENT_SITES_MUST_BE_GT_ZERO_MSG);
    final List sites = getSitesForAccount(account, stats);

    final Map<String, Future> futuresBatchMap = new LinkedHashMap<String, Future>();
    final Iterator siteIter = sites.iterator();
    int idx = 0;

    // [2] LOOP REPLACING SECOND SPLITTER/AGGREGATOR
    while (siteIter.hasNext()) {
        final Site site = siteIter.next();
        idx++;
        // [3] 1<sup>st</sup> Gateway call
        futuresBatchMap.put(site.toString(), getSiteDataGateway.startGetSiteData(site));

        /*
         * sites.size() - idx < numConcurrentSites makes sure that any final partial batch gets processed.
         * It also means some of the last entries are processed in smaller batches or even one at a time.
         */
         if (idx % numConcurrentSites == 0 || sites.size() - idx < numConcurrentSites) {
             blockToProcessBatch(futuresBatchMap, stats);
         }
    }
}

private List getSitesForAccount(final Account account, final JobStats stats) {
    // [4] 2nd Gateway call
    final Future future = getSitesForAccountGateway.getSitesForAccount(account);
    try {
        final List sites = future.get(secondsToWaitForSitesListFuture, TimeUnit.SECONDS);
        stats.getAccountsSucceeded().add(account.getLogin());
        return sites;
    } catch (Exception exc) {
        stats.getAccountsFailed().add(account.getLogin());
        LOGGER.error("Failed to process Account {}", account.getLogin(), exc);
        return new ArrayList();
    }
}

private void blockToProcessBatch(final Map<String, Future> futuresBatchMap, final JobStats stats) {
    for (Map.Entry<String, Future> futureEntry : futuresBatchMap.entrySet()) {
        try {
            final SiteDataSummary summary = futureEntry.getValue()
                    .get(secondsToWaitForSiteDataFuture, TimeUnit.SECONDS);
            summary.setSuccessful(true);
            stats.incrementSitesSucceeded();
        } catch (Exception exc) {
            stats.getSitesFailed().add(futureEntry.getKey());
            LOGGER.error("Failed to process {}", futureEntry.getKey(), exc);
        }
    }
    futuresBatchMap.clear();
}

Conclusion

With a complex framework such as Spring Integration, there are often many ways to implement a desired task.  We generally lean toward solutions that require less of our own business logic code and many times this approach serves us well. But it is worthwhile to think about alternative approaches. Sometimes writing a little more code to increase control and improve testability can be better than taking advantage of the features within a complex framework.

I’m glad I tried the approach as I learned a lot and believe the benefits listed above outweigh the only disadvantage – that of having slightly more service code to implement.

Reference Links:
http://docs.spring.io/spring/docs/3.0.x/reference/scheduling.html
http://docs.spring.io/spring-integration/docs/2.0.0.RC1/reference/html/gateway.html

About collectivegenius
Everyone has a voice and great ideas come from anyone. At Cobalt, we call it the collective genius. When technical depth and passion meets market opportunity, the collective genius is bringing it’s best to the table and our customers win.

One Response to Using Asynchronous Gateways in Spring Integration and Spring’s Async Annotation

  1. I think the admin of this web site is really working hard in favor of his website, because here every material is quality
    based data.

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: