Spring Batch - 기본 개념 및 사례

1. Spring Batch 의 설계도
중요 한 domain 몇 개
  • Job 퀘 스 트
  • Step 퀘 스 트 에 포 함 된 절차
  • ItemReader 단일 단계 의 입력 (input)
  • ItemProccesor input 의 처리
  • ItemWriter 단일 단계 에서 의 출력 (output)
  • ItemReader, ItemProccesor, ItemWriter 이것 은 자바 8 리 funtional 프로 그래 밍 과 유사 합 니 다.
  • public interface Supplier
  • public interface Function
  • public interface Consumer

  • 2. Job
    Job 이 실 행 된 후에 하나의 JobInstance 가 생 겼 습 니 다. 예 를 들 어 클래스 와 인 스 턴 스 의 관계 와 같 습 니 다. 1 개의 JobInstance 는 여러 개의 JobExcution 을 가 질 수 있 습 니 다.
    2.1 JobParameter
    JobParameters 는 job 가 실행 하 는 계산 매개 변수 입 니 다.
    예 를 들 어 cmd 를 통 해 endOFDay 작업 을 실행 합 니 다. 들 어 오 는 인 자 는 schedule. date (date) = 2007 / 05 / 05 입 니 다. 이것 은 JobParameter 를 구성 합 니 다.
    java CommandLineJobRunner io.spring.EndOfDayJobConfiguration endOfDay schedule.date(date)=2018/05/05

    위의 명령 설명
    CommandLine JobRunner 는 spring batch 가 제공 하 는 main 방법 을 가 진 클래스 입 니 다. 수신 매개 변 수 는 다음 과 같 습 니 다.
  • io. spring. EndOfDayJobConfiguration 은 Job 의 Spring @ configuration 류 로 기본 적 인 Job 의 구성 step
  • 가 포함 되 어 있 습 니 다.
  • endOfDay 는 Job 의 정의 이 고 Spring 의 @ Bean
  • schedule. date (date) = 2007 / 05 / 05 는 매개 변수
  • 2.2 JobInstance 와 JobExcution 의 관계
    다음 명령 실행 n 회 모두 JobInstance 1 개 만 생 성
    java CommandLineJobRunner io.spring.EndOfDayJobConfiguration endOfDay schedule.date(date)=2018/05/05

    그 러 니까 JobInstance = Job + identifying JobParameters같은 Job 인 스 턴 스 가 다 르 게 실행 되 는 JobExcution, JobExcution 은 시작 시간, 종료 시간, 상태 등의 필드 를 기록 합 니 다. JobExcution 이 어떤 필드 를 포함 하 는 지 구체 적 으로 보 세 요.
    JobInstance 에 인자 가 있 으 면 한 번 만 실행 할 수 있 습 니 다. 여러 번 실행 하면 보고 합 니 다.
    JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={readCountPerTime=10}.  If you want to run this job again, change the parameters.

    그 이 유 는 Simple JobRepository 가 JobExecution 을 만 들 때 판단 하기 때 문 입 니 다.
  • jobInstance 에 인자 가 있 는 지 여부
  • jobInstance 가 실행 중인 jobExecution 에 상태 가 있 는 지 COMPLETED 가 위의 2 가지 조건 을 만족 시 키 면 이상 을 던 져 작업 이 완료 되 었 음 을 표시 합 니 다. 다시 실행 하려 면 매개 변 수 를 변경 해 야 합 니 다
  •     public class SimpleJobRepository implements JobRepository {
        public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
        			throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        		Assert.notNull(jobName, "Job name must not be null.");
        		Assert.notNull(jobParameters, "JobParameters must not be null.");
        		 * Find all jobs matching the runtime information.
        		 * If this method is transactional, and the isolation level is
        		 * REPEATABLE_READ or better, another launcher trying to start the same
        		 * job in another thread or process will block until this transaction
        		 * has finished.
        //  job jobParameters  jobInstance
        		JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
        		ExecutionContext executionContext;
        		// existing job instance found
        		if (jobInstance != null) {
        //  jobInstance      JobExecution
        			List executions = jobExecutionDao.findJobExecutions(jobInstance);
        			//        execution
        			for (JobExecution execution : executions) {
        				if (execution.isRunning() || execution.isStopping()) {
        					throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
        							+ jobInstance);
        				BatchStatus status = execution.getStatus();
        				if (status == BatchStatus.UNKNOWN) {
        					throw new JobRestartException("Cannot restart job from UNKNOWN status. "
        							+ "The last execution ended with a failure that could not be rolled back, "
        							+ "so it may be dangerous to proceed. Manual intervention is probably necessary.");
        				//     execution   ,            
        				if (execution.getJobParameters().getParameters().size() > 0 && (status == BatchStatus.COMPLETED || status == BatchStatus.ABANDONED)) {
        					throw new JobInstanceAlreadyCompleteException(
        							"A job instance already exists and is complete for parameters=" + jobParameters
        							+ ".  If you want to run this job again, change the parameters.");
        			executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
        		else {
        			// no job found, create one
        			jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
        			executionContext = new ExecutionContext();
        		JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, null);
        		jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));
        		// Save the JobExecution so that it picks up an ID (useful for clients
        		// monitoring asynchronous executions):
        		return jobExecution;

    이것 은 어려움 을 가 져 올 수 있 습 니 다. 만약 에 batch 가 하나의 매개 변수 만 있다 면 java CommandLineJobRunner MyJobConfiguration path=/c/abc.txt 하나의 경 로 를 받 고 경로 가 변 하지 않 으 면 중복 운행 을 해 야 합 니 다. 어떻게 해 야 합 니까?batch 는 하나의 인터페이스 류 JobParametersIncrementer 를 제공 합 니 다. spring - batch 는 우리 에 게 실현 류 RunIdIncrementer 를 제공 합 니 다.
    public class RunIdIncrementer  implements JobParametersIncrementer{
        public RunIdIncrementer()    {
            key = RUN_ID_KEY;
        public void setKey(String key)    {
            this.key = key;
        //     run.id   ,   1 batch   1 
        public JobParameters getNext(JobParameters parameters)    {
            JobParameters params = parameters != null ? parameters : new JobParameters();
            long id = params.getLong(key, 0L).longValue() + 1L;
            return (new JobParametersBuilder(params)).addLong(key, Long.valueOf(id)).toJobParameters();
        private static String RUN_ID_KEY = "run.id";
        private String key;

    쉽게 말 하면 실행 할 때 인위적인 가입 증가 id 입 니 다. 매개 변수 와 같은 제한 을 돌아 서 어떻게 사용 합 니까?job 를 정의 하 는 곳 에 만 가입 하 세 요.
    	public Job testJob(@Qualifier("testStep") Step step) {
    		return jobBuilderFactory.get("testJob")
    				.incrementer(new RunIdIncrementer())

    우리 의 batch 는 일반적으로 명령 행 의 셸 을 통 해 실 행 됩 니 다. spring 이 우리 에 게 제공 하 는 CommandLineJobRunner 을 본 적 이 있 습 니 다. 다음 과 같 습 니 다.
                jobParameters = (new JobParametersBuilder(jobParameters, jobExplorer)).getNextJobParameters(job).toJobParameters();

    즉, -next 을 더 하면 명령 이 java CommandLineJobRunner MyJobConfiguration path=/c/abc.txt -next 로 바 뀌 는 것 이다.
    2. spring batch 의 내장 시 계 를 통 해 위의 관 계 를 직접 볼 수 있 습 니 다.
    2017-01-01 00:00:00
    2017-01-01 00:00:00
    2017-01-02 00:00:00
    2017-01-01 21:00
    2017-01-01 21:30
    2017-01-02 21:00
    2017-01-02 21:30
    2017-01-02 21:31
    2017-01-02 22:29
    spring batch 내장 표 의 관계 도 는 다음 과 같다.
    3. Step
    step 가 실 행 될 때마다 stepExecution 이 발생 합 니 다. step 는 job 와 달리 stepinstance 가 없습니다.stepExecution 에 대응 하 는 표 는
  • BATCH_STEP_EXECUTION 시작 시간, 종료 시간, 상태 등 필드 기록
  • BATCH_STEP_EXECUTION_CONTEXT executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition()); 를 통 해 표 에 기록 을 저장 할 수 있다
  • 4.Sample
    파일 에서 데 이 터 를 읽 고 Person 대상 으로 봉인 하고 인쇄 합 니 다.

    public class Person {
        private int id;
        private String name;
        private int age;
        //getter and setter

    Batch 의 @ Configuration
    Job 에 Step 설정
  • step 의 reader 는 파일 Person. txt 에서 데 이 터 를 읽 고 각 줄 은 Person 의 정 보 를 대표 하여 Person 대상 으로 봉 인 됩 니 다
  • step 의 Processor 가 Person 을 인쇄 합 니 다
  • @Configuration
    public class SimpleBatchConfiguration {
        JobBuilderFactory jobBuilders;
        private StepBuilderFactory steps;
    //  1 Job,job   1 step
        public Job simpleJob(Step step){
            return jobBuilders.get("simpleJob").start(step).build();
    //step   reader writer
        protected Step step(ItemReader reader,
                             ItemWriter writer) {
            return steps.get("step1")
                    . chunk(10)
        //reader   sprinb batch   FlatFileItemReader,  1      1 Person   
        protected ItemReader reader(){
            FlatFileItemReader reader=new FlatFileItemReader();
            FileInputStream fis = null;
            try {
                fis = new FileInputStream(new File("E:\\person.txt"));
            } catch (FileNotFoundException e) {
            reader.setResource( new InputStreamResource(fis));
                String[] str = line.split(",");
                Person p = new Person();
                return p;
            return reader;
    //reader         ,   ItemWriterAdapter,                  ,          Person    PersonProcessor
        protected ItemWriter writer(){
            ItemWriterAdapter adapter = new ItemWriterAdapter();
            adapter.setTargetObject(new PersonProcessor());
            return adapter;

    public class PersonProcessor {
        public void print(Person p){

    main 의 시작 클래스
    public class SimpleDemo {
        public static void main(String[] args) {
            AnnotationConfigApplicationContext ctx =  new AnnotationConfigApplicationContext();
            JobLauncher launcher = (JobLauncher)ctx.getBean("jobLauncher");
            JobParameters parameters = new JobParameters();
            try {
            } catch (Exception e) {

    인쇄 된 결과
    Person{id=1, name='Rechard', age=20}
    Person{id=2, name='James', age=30}
    Person{id=3, name='Cury', age=28}
    Person{id=4, name='Durant', age=26}

    5. 구덩이
    정의 ItemReader 는 다음 과 같 습 니 다.
    	public ItemReader itemReader(@Value("#{jobParameters[a]}") String affiliate) {	
    		String sql ="select * from PENDINGUSER WHERE AFFILIATEID=?";	
    	    return  new JdbcCursorItemReaderBuilder().name("dataSourceReader").dataSource(dataSource)
    	                .preparedStatementSetter(ps->ps.setString(1, affiliate.toUpperCase()))
    	                .rowMapper((rs, index)->{
    	                    return rs.getInt("SSMREQUESTKY")+"";

    다음 과 같은 잘못 을 계속 보고 하 다.
    org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.

    해결 방법, 반환 값 을 인터페이스 에서 실제 클래스 로 바 꿉 니 다.
    public ItemReader JdbcCursorItemReader(@Value("#{jobParameters[a]}") String affiliate) {

