본문 바로가기
Spring

Srping Boot Quartz DB Cluster,Log 구현

by 하르싼 2023. 1. 18.
반응형

개요

  • 스프링스케줄러+shedLock 로 이중화 기능 구현하였으나 스케줄에 대한 설정을 위해 Quartz 로 변경
  • 구현 목록
    • DB Cluster 를 통한 이중화
    • CronTrigger 생성,수정,일시정지,재기동
    • 스케줄 변경 정보, 스케줄 수행 이력
    • Job 실패시 기존 등록된 수행시간에 맞춰 지정된 횟수만큼 수행 (횟수 초과시 중지)

목차

공식 홈페이지 : http://www.quartz-scheduler.org/

0.Quartz란?

1.개발환경

2.용어정리

3.프로젝트 구조

4.소스 설명

Quartz란?

Job Scheduling 라이브러리 이며 자바로 개발되어 모든 자바 프로그램에서 사용 가능하고
간단한 interval형식이나 Cron 표현식 스케줄링 지원

  • 장점
    • DB 기반의 클러스터 기능 제공
    • 시스템 Fail-over / Random 방식의 로드 분산처리 지원
    • In-memory Job scheduler 제공
    • 여러 기본 플러그인 제공
      • ShutdownHookPlugin – JVM 종료 이벤트 캐치
      • LoggingJobHistoryPlugin – Job 실행 로그 남기기
  • 단점
    • Random 방식 클러스터링 기능이라 완벽한 로드 분산 안됨
    • 스케줄링 실행에 대한 히스토리 보관에 대한 개발 필요
  • Quartz 흐름

출처 : https://www.javarticles.com/2016/03/quartz-scheduler-model.html#prettyPhoto

개발환경

용어정리

  • Job
  • JobDetail
    • Job 을 실행시키기 위한 정보를 담고 있는 객체
    • Job 의 Name,Group JobDataMap 속성 등을 지정
    • Trigger 가 Job 을 수행할 때 JobDetail 기반으로 스케줄링
  • Trigger
    • Job 을 실행시킬 스케줄링 조건 (반복 횟수, 시작시간) 등을 담고 있고 Scheduler 는 이 정보를 기반으로 Job 을 수행
    • Trigger 와 Job 연관관계
      • 1 = 1 : 하나의 Trigger 는 하나의 Job 을 반드시 지정
      • N = 1 : 여러 Trigger 가 하나의 Job 을 지정하여 다양한 시간에 스케줄
    • Trigger 종류
      • SimpleTrigger
      • CronTrigger
  • JobStroe
    • Job, Trigger에 대한 정보를 저장하는 방식 설정(Memory,DB)
  • Listener
    • JobListener : Job 실행 전후로 이벤트를 받을 수 있음
    • TriggerListener : Trigger 실행 전후로 이벤트를 받을 수 있음

프로젝트구조

프로젝트내_주요_소스설명

QuartzConfig

@Configuration
public class QuartzConfig {

    //...생략
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(ApplicationContext applicationContext) {
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();

        AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
        jobFactory.setApplicationContext(applicationContext);
        schedulerFactoryBean.setJobFactory(jobFactory);

        schedulerFactoryBean.setApplicationContext(applicationContext);

        Properties properties = new Properties();
        properties.putAll(quartzProperties.getProperties());

        schedulerFactoryBean.setGlobalTriggerListeners(triggersListener);
        schedulerFactoryBean.setGlobalJobListeners(jobsListener);
        schedulerFactoryBean.setOverwriteExistingJobs(true);
        schedulerFactoryBean.setDataSource(dataSource);
        schedulerFactoryBean.setQuartzProperties(properties);
        schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(true);


        return schedulerFactoryBean;
    }
}

AutowiringSpringBeanJobFactory

public class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {
    private transient AutowireCapableBeanFactory beanFactory;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        beanFactory = applicationContext.getAutowireCapableBeanFactory();
    }

    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        final Object job = super.createJobInstance(bundle);
        beanFactory.autowireBean(job);
        return job;
    }

}
  • Quartz Job 에서 Spring bean 을 참조하기 위해 설정추가

quartz.properteis

#Quartz
spring.quartz.scheduler-name=QuartzScheduler
spring.quartz.properties.org.quartz.scheduler.instanceId=AUTO
spring.quartz.properties.org.quartz.threadPool.threadCount=20
spring.quartz.properties.org.quartz.threadPool.threadNamePrefix=QuartzScheduler
spring.quartz.properties.org.quartz.jobStore.tablePrefix=QRTZ_
spring.quartz.properties.org.quartz.jobStore.isClustered=true
spring.quartz.properties.org.quartz.jobStore.class=org.springframework.scheduling.quartz.LocalDataSourceJobStore
#spring.quartz.properties.org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
#spring.quartz.properties.org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreCMT
spring.quartz.properties.org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
spring.quartz.properties.org.quartz.jobStore.useProperties=true
spring.quartz.properties.org.quartz.jobStore.misfireThreshold=60000

JobListener

@Component
@RequiredArgsConstructor
public class JobsListener implements JobListener {

  private final QuartzBatchLogService quartzBatchLogService;

  @Override
  public String getName() {
    return "globalJob";
  }

  /**
   * Job 수행 전
   * @param context
   */
  @Override
  public void jobToBeExecuted(JobExecutionContext context) {
    JobKey jobKey = context.getJobDetail().getKey();
    log.info("jobToBeExecuted :: jobKey : {}", jobKey);
  }

  /**
   * Job 중단된 상태
   * @param context
   */
  @Override
  public void jobExecutionVetoed(JobExecutionContext context) {
    JobKey jobKey = context.getJobDetail().getKey();
    log.info("jobExecutionVetoed :: jobKey : {}", jobKey);
  }

  /**
   * Job 수행 완료 후
   * retry N : Job Exception 발생 시 해당 Trigger,Job Pause
   * retry Y : Job Exception 발생 시 등록된 기존 Cron Expression 시간에 맞춰 재시도 / 총 3번 실패 시 해당 Trigger 중지
   * @param context
   * @param jobException
   */
  @Override
  public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {

    final int maxCnt = 3;

    int failCnt = context.getTrigger().getJobDataMap().getIntValue("failCnt");
    String stop = (String) context.getTrigger().getJobDataMap().get("stop");
    String retry = (String) context.getTrigger().getJobDataMap().get("retry");
    String schedName = "";
    JobKey jobKey = context.getJobDetail().getKey();
    JobDataMap jobDataMap = context.getTrigger().getJobDataMap();

    log.info("jobWasExecuted :: jobKey : {}", jobKey);
    try {
      schedName = context.getScheduler().getSchedulerName();
      if(jobException != null){
        log.debug("Exception : {}",jobException.getMessage());

        if("N".equals(retry)){
          //context.getScheduler().pauseJob(jobKey);
          jobException.setUnscheduleAllTriggers(true);
        }else {
          ++failCnt;

          if (maxCnt == failCnt) {
            jobDataMap.put("stop", "Y");
          } else {
            jobDataMap.put("failCnt", String.valueOf(failCnt));
          }
          if (("N").equals(stop)) {

            CronTrigger cronTrigger = (CronTrigger) context.getTrigger();
            Trigger newTrigger = TriggerBuilder
                    .newTrigger()
                    .startAt(new Date(System.currentTimeMillis() + 60000)) //reschedule 진행시 즉시 수행되는것을 방지하기 위해
                    .withIdentity(context.getTrigger().getKey())
                    //.withSchedule(CronScheduleBuilder.cronSchedule("*****"))
                    .withSchedule(cronTrigger.getScheduleBuilder())
                    .usingJobData(jobDataMap) // 실패 횟수,정지여부를 Trigger JobDataMap에 추가
                    .build();

            context.getScheduler().rescheduleJob(context.getTrigger().getKey(), newTrigger);
            //정해진 시간이 아닌 즉시 실행을 위해서는 RefireImmediately 사용 필요
            //jobException.setRefireImmediately(true);

          } else {
            //해당 Trigger 중지
            jobException.setUnscheduleAllTriggers(true);
            //context.getScheduler().pauseJob(jobKey);
          }

        }
        //실패 관련 로직(알림,Email)
        log.info("notified :: context : {}", context);


      }
    } catch (SchedulerException e) {
      e.printStackTrace();
    }
    QuartzBatchLogDto quartzBatchLogDto = QuartzBatchLogDto
            .builder()
            .schedName(schedName)
            .jobName(jobKey.getName())
            .jobGroup(jobKey.getGroup())
            .triggerName(context.getTrigger().getKey().getName())
            .triggerGroup(context.getTrigger().getKey().getGroup())
            .startTime(context.getFireTime())
            .endTime(new Date(context.getFireTime().getTime() + context.getJobRunTime()))
            .result(jobException != null ?"N":"Y")
            .exceptionMessage(jobException != null ? jobException.getMessage():"")
            .build();

    quartzBatchLogService.save(quartzBatchLogDto);
  }
}
  • jobWasExecuted : Job Exception 발생 시 Cron에 맞춰 재수행을 위해 기능 추가
    • Trigger 등록시 JobDataMap 에 재수행(retry),정지(stop),실패횟수(failcnt) 추가
    • Job에서 Exception 발생시 retry 값의 따라 즉시정지 / 재수행 결정
    • jobException.setRefireImmediately(true) 로 진행시 Cron에 맞춘 수행이 아닌 즉시수행

TriggerListener

@Component
public class TriggersListener implements TriggerListener {
  //...생략
  @Override //Trigger 실행시, 리스너중 가장 먼저 실행됨
  public void triggerFired(Trigger trigger, JobExecutionContext context) {}
  @Override //Trigger 중단 여부를 확인하는 메소드
  public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {return false;}
  @Override
  public void triggerMisfired(Trigger trigger) {}
  @Override //Trigger 수행 완료 후 실행
  public void triggerComplete(Trigger trigger, JobExecutionContext context,
                              Trigger.CompletedExecutionInstruction triggerInstructionCode) {}

}

QuartzUtils

public class QuartzUtils {

    private QuartzUtils() {
    }

    /**
     * Job 생성
     * @param jobRequest - Quartz Job 정보
     * @param jobClass - Job 생성할 Class
     * @param context - ApplicationContext
     * @return JobDetail
     */
    public static JobDetail createJob(JobRequest jobRequest, Class<? extends Job> jobClass, ApplicationContext context) {
        JobDetailFactoryBean factoryBean = new JobDetailFactoryBean();
        factoryBean.setJobClass(jobClass);
        factoryBean.setDurability(false);
        factoryBean.setApplicationContext(context);
        factoryBean.setName(jobRequest.getJobName());
        factoryBean.setGroup(jobRequest.getJobGroup());
        factoryBean.setDescription(jobRequest.getDesc());
        if (jobRequest.getJobDataMap() != null) {
            factoryBean.setJobDataMap(jobRequest.getJobDataMap());
        }

        factoryBean.afterPropertiesSet();
        return factoryBean.getObject();
    }

    /**
     * Trigger 생성(Cron,Simple)
     * @param jobRequest - Quartz Job 정보
     * @return Trigger
     */
    public static Trigger createTrigger(JobRequest jobRequest) {
        String cronExpression = jobRequest.getCronExpression();
        if (!isValidExpression(cronExpression)) {
            throw new IllegalArgumentException("Provided expression " + cronExpression + " is not a valid cron expression");
        } else {
            return createCronTrigger(jobRequest);
        }
    }

    /**
     * CronTrigger 생성
     * @param jobRequest - Quartz Job 정보
     * @return Trigger
     */
    private static Trigger createCronTrigger(JobRequest jobRequest) {
        CronTriggerFactoryBean factoryBean = new CronTriggerFactoryBean();
        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.put("failCnt","0");
        jobDataMap.put("stop","N");
        jobDataMap.put("retry",jobRequest.getRetry());
        factoryBean.setName(jobRequest.getJobName().concat("Trigger"));
        factoryBean.setGroup(jobRequest.getJobGroup());
        factoryBean.setCronExpression(jobRequest.getCronExpression());
        factoryBean.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_FIRE_NOW);
        factoryBean.setJobDataMap(jobDataMap);
        try {
            factoryBean.afterPropertiesSet();
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return factoryBean.getObject();
    }

    /**
     * SimpleTrigger 생성
     * @param jobRequest - Quartz Job 정보
     * @return Trigger
     */
    private static Trigger createSimpleTrigger(JobRequest jobRequest) {
        SimpleTriggerFactoryBean factoryBean = new SimpleTriggerFactoryBean();
        factoryBean.setName(jobRequest.getJobName());
        factoryBean.setGroup(jobRequest.getJobGroup());
        factoryBean.setStartTime(Date.from(jobRequest.getStartDateAt().atZone(ZoneId.systemDefault()).toInstant()));
        factoryBean.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_FIRE_NOW);
        factoryBean.setRepeatInterval(jobRequest.getRepeatIntervalInSeconds() * 1000); //ms 단위임
        factoryBean.setRepeatCount(jobRequest.getRepeatCount());

        factoryBean.afterPropertiesSet();
        return factoryBean.getObject();
    }
}

QuartzService

  • 등록,수정,삭제,일시정지,재시작

TestCronJob1~4

  • TestCronJob1 : 단순 Job Test용
  • TestCronJob2 : @DisallowConcurrentExecution 을 통해 동기화 보장 테스트용 스케줄은 10초단위로 돌면서 쓰레드슬립을 15초로 설정
    • @DisallowConcurrentExecution 옵션 사용

-   @DisallowConcurrentExecution 옵션 주석처리

  • TestCronJob3 : interrupt 구현
  • TestCronJob4 : Exception 발생 시 Jobslitener jobWasExecuted 메소드에서 재시도를 위한 Test용
반응형

'Spring' 카테고리의 다른 글

JPA Batch JDBC Batch  (0) 2023.03.03
JPA+QueryDsl 게시판 CRUD 구현(1)  (0) 2023.02.15
Spring Boot zip END header not found  (0) 2022.11.02
Spring Boot Connection Pool 설정  (0) 2022.11.01
Spring boot + JPA (1)  (0) 2022.09.26

댓글