Purpose of this Post:
- Solve the following problem: Improve performance and processing time for an application that processes a large quantity of files in bulk.
- To show an implementation of Spring Batch using the multi-threaded-step approach.
- To describe how the multi-threaded-step approach is behaving.
Resources
- Spring Batch project
- Spring Batch's multi-threaded-step approach.
- My github multi-threaded-step example.
Implementation
Click on my multi-threaded-step example for the github repository of my sample.For simplicity the multi-threaded-step consists of a Reader, Processor, and Writer. The project consists of a single job and a single step.
This implementation uses a Java configuration rather than a XML configuration. The complete configuration can be found at Application.java. To implement a multi-threaded-step follow these steps in your java configuration file.
- Create a Task Executor bean. In this case I used SimpleAsyncTaskExecutor.
@Bean public TaskExecutor taskExecutor() { SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); taskExecutor.setConcurrencyLimit(maxThreads); return taskExecutor; } - Add Task Executor to Step bean by calling taskExecutor(). Also add a throttle limit below.
@Bean public Step step() { return stepBuilderFactory.get("step").<Attempt, Attempt>chunk(chunkSize) .reader(processAttemptReader()) .processor(processAttemptProcessor()) .writer(processAttemptWriter()) .taskExecutor(taskExecutor()) .listener(stepExecutionListener()) .listener(chunkListener()) .throttleLimit(maxThreads).build(); } - The maxThreads variable is a set in the application.properties file and then is added to the configuration file using the @Value annotation as follows:
@Value("${max-threads}") private int maxThreads; - When implementing a multi-threaded solution, it is important to use the keyword synchronized on a shared resource or shared method. In the example provided here, I synchronized the read() method to prevent thread collision on the queue. Here is the excerpt from the AttemptReader:
public synchronized Attempt read() throws Exception { Attempt attempt = null; logger.info("Attempt Queue size {}.", attemptQueue.size()); if (attemptQueue.size() > 0) { attempt = attemptQueue.remove(); } return attempt; }
The following is focused on the Spring Batch steps for this implementation for those that are newer to Spring Batch and want a little more explanation.
- The reader and writer are necessary to create the step while the processor and listeners are optional. As the reader, processor, and writer are typical code for Spring Batch, I will not go into details of implementation in this post, but you can reference the code here: AttemptReader, AttemptProcessor, and AttemptWriter.
- Adding the Job bean using JobBuilderFactory and adding the step to the flow:
@Bean public Job processAttemptJob() { return jobBuilderFactory.get("process-attempt-job") .incrementer(new RunIdIncrementer()) .listener(jobExecutionListener()) .flow(step()).end().build(); } - OPTIONAL: Implementing the StepExecutionNotificationListener in this project was to add logging for analysis purposes. Simply implement the class, add the listener in the step bean (above), and add the following bean to the configuration file:
@Bean public StepExecutionNotificationListener stepExecutionListener() { return new StepExecutionNotificationListener(); } - OPTIONAL: Implementing the ChunkExecutionListener is again used for logging and analysis purposes. To use this simply implement the class, add the listener in the step bean (above), and add the following bean to the configuration file:
@Bean public ChunkExecutionListener chunkListener() { return new ChunkExecutionListener(); } - OPTIONAL: Implementing the JobCompletionNotificationListener is used for logging and analysis purposes in this project. Implement the class provided, add the listener in the step bean (above), and add the following bean to the configuration file:
@Bean public JobCompletionNotificationListener jobExecutionListener() { return new JobCompletionNotificationListener(); }
Analysis
Chunk Oriented Processing Review
Before understanding how Spring Batch works using multi-threaded-step, you should understand Chunk Oriented Processing.To review quickly for you the following picture shows how a single step would be processed for a single chunk of size 2.
For instance, in a single threaded application that uses chunk oriented processing, an item will be read by the ItemReader, processed by the ItemProcessor, and then queued. Once all of the items in a given chunk are read and processed, the ItemWriter will perform a write for an array of items provided in a chunk.
Multi-Threaded-Step Analysis
Now how does this affect the multi-threaded-step? The multi-threaded-step will spawn multiple chunks up to the max-threads applied.
Example 1:
- items (files) = 150
- chunk-size = 5
- max-threads = 10
Since 10 threads are spawned, 10 chunks are kicked off simultaneously meaning 10 files will be processed simultaneously.
For this example of 150 items, 30 chunks are created with 5 items each. I included the log file for this run in my github repository multi-threaded-step.files150-chunk5-threads10.log. You will notice the Write attempt log shows the list of the files (items) that are in a single chunk in a single thread.
Example 2:
- items (files) = 50
- chunk-size = 7
- max-threads = 10
Again with 10 threads spawned, 10 chunks are kicked off simultaneously meaning 10 files will be processed simultaneously.
For this example of 150 items, 22 chunks are created. 21 of the chunks will have 7 items each, and 1 chunk will have 3 items in it. The chunk with the 3 items in it will be the last chunk created. Again I included the log for this run in my github repository multi-threaded-step.files150-chunk7-threads10.log. You will notice the "Write attempt" text in the log showing the list of files (items) that are in a single chunk in a single thread.
Important note: the last 2 chunks start at approximately the same time. Despite the thread/chunk with 5 items having started first, the last thread/chunk with 3 items will finish first. This happens because the last thread/chunk required less processing time.
In case of reading from database via JDBC - do we need to synchronized the read in case of multi thread? does JDBC not take care of synchronization among threads ?
ReplyDelete