Thursday, January 31, 2013

Spring Batch - read multiple files/process/wirte

This is a demo to use spring batch framework to read multiple files, process, and write to another files or DB.

1) The start point of batch is batch launcher is used to launch a job


public void launch() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"spring-batch-job-publiccompany.xml");
JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("publicCompanyJob");

try {
JobExecution result = launcher.run(job, new JobParameters());
System.out.println("->>---" + result.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
}

2) take a look at the job settings.

This is the content of spring-batch-job-publiccompany.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch.xsd">

<beans:import resource="classpath:spring-batch.xml" />

<job id="publicCompanyJob" restartable="true">
<step id="readFile">
<tasklet transaction-manager="transactionManager"
start-limit="1" allow-start-if-complete="false">
<chunk reader="multifileReader" processor="publicCompanyItemProcessor"
writer="publicCompanyItemWriter" commit-interval="1">
</chunk>
</tasklet>
</step>
<!-- <step id="saveData"> </step> -->

<listeners>
<listener ref="publicCompanyJobListener" />
</listeners>
</job>

<beans:bean id="multifileReader"
class="org.springframework.batch.item.file.MultiResourceItemReader"
scope="step" lazy-init="true">
<beans:property name="resources" value="file://d:/temp/test/*.txt" />
<beans:property name="delegate" ref="publicCompanyItemReader" />
</beans:bean>

<beans:bean id="publicCompanyItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<beans:property name="resource" ref="inputFile"/>
<beans:property name="linesToSkip" value="1" />
<beans:property name="lineMapper">
<beans:bean
class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<beans:property name="lineTokenizer" ref="lineTokenizer" />
<beans:property name="fieldSetMapper">
<beans:bean
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<beans:property name="prototypeBeanName" value="publicCompanyInfo"></beans:property>
</beans:bean>
</beans:property>
</beans:bean>
</beans:property>
</beans:bean>

<beans:bean id="lineTokenizer"
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"
p:names="publicCompanyId,publicCompanyName" p:delimiter="," />

</beans:beans>

3) spring-batch.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">

<import resource="classpath:spring-context-jobs.xml" />

<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
        <!--  use in-memory db, change to actual db if need -->
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
</bean>
        <!-- use ResourcelessTransactionManager if using in-memory db  -->
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
</beans>

4) processor class


@Component
public class PublicCompanyItemProcessor ItemProcessor<PublicCompanyInfo, PublicCompanyInfo> {

public PublicCompanyInfo process(PublicCompanyInfo info) throws Exception {
System.out.println("processing info:"+info);

return info;

}
}

5) writer class


@Component
public class PublicCompanyItemWriter implements ItemWriter<PublicCompanyInfo> {

public void write(List<? extends PublicCompanyInfo> items) throws Exception {

               // write a list of beans to DB or file
}
}

so all the files ending with .txt in folder d:/temp/test will be read and processed, but it is hard to handle if we want to rename a file after a file is processed, and go on to process the next file. To achieve this target, we need to do some changes as follows,

1) change launcher to find out how many files in the folder, then launch job for each file


public void launch() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"spring-batch-job-publiccompany.xml");
JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("publicCompanyJob");

try {

File dir = new File("d:/temp/test");
for (File f : dir.listFiles()) {
JobParameter jp = new JobParameter(f.getAbsolutePath());
Map<String, JobParameter> map = new HashMap<String, JobParameter>();
map.put("inputFile", jp);  // need to configure in job settings
JobExecution result = launcher.run(job, new JobParameters(map));

System.out.println("->>---" + result.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
}



2) change job settings

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch.xsd">

<beans:import resource="classpath:spring-batch.xml" />

<job id="publicCompanyJob" restartable="true">
<step id="readFile">
<tasklet transaction-manager="transactionManager"
start-limit="1" allow-start-if-complete="false">
<chunk reader="publicCompanyItemReader" processor="publicCompanyItemProcessor"
writer="publicCompanyItemWriter" commit-interval="1">
</chunk>
</tasklet>
</step>
<!-- <step id="saveData"> </step> -->

<listeners>
<listener ref="publicCompanyJobListener" />
</listeners>
</job>

<beans:bean id="publicCompanyItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<beans:property name="resource" ref="inputFile"/>
<beans:property name="linesToSkip" value="1" />
<beans:property name="lineMapper">
<beans:bean
class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<beans:property name="lineTokenizer" ref="lineTokenizer" />
<beans:property name="fieldSetMapper">
<beans:bean
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<beans:property name="prototypeBeanName" value="publicCompanyInfo"></beans:property>
</beans:bean>
</beans:property>
</beans:bean>
</beans:property>
</beans:bean>

<beans:bean id="lineTokenizer"
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"
p:names="publicCompanyId,publicCompanyName" p:delimiter="," />

<beans:bean id="inputFile"
class="org.springframework.core.io.FileSystemResource" scope="step">
<beans:constructor-arg value="#{jobParameters[inputFile]}" />
</beans:bean>

</beans:beans>

3) we have a listener to rename the file has been processed after a job is completed

package abc;

import java.io.File;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.annotation.AfterJob;
import org.springframework.batch.core.annotation.BeforeJob;
import org.springframework.stereotype.Component;


@Component
public class PublicCompanyJobListener extends ServiceBase {

@AfterJob
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
String fName = jobExecution.getJobInstance().getJobParameters().getString("inputFile");
File f = new File (fName);
f.renameTo(new File(fName+".bak"));
System.out.println("job completed");
// Notifying when job successfully ends
} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
// Notifying when job ends with failure
System.out.println("job fail");
}
}

@BeforeJob
public void beforeJob(JobExecution jobExecution) {
System.out.println("job start");
}
}




3 comments:

  1. Buddy,

    Thanks a lot for writing such a wonderful article.

    You just saved my day.

    I am using Quartz and spring batch to run a job which reads the files from a directory periodically. the source location of files is on file system and not in web application directory structure/classpath. I was struggling with MultiResourceItemReader as this class is not able to read the new set of files on next job run. I was struggling for 2 days, and finally i came to your page.

    IMO, this is a common problem and Spring should mention this in their documentation.

    Thanks again, and write some more goodies like this.

    ReplyDelete
  2. This comment has been removed by the author.

    ReplyDelete
  3. Can you please share the download link of the whole project?

    ReplyDelete