Spring Boot系列之Quartz实现多任务动态新增/删除调度
  TEZNKK3IfmPf 2023年11月12日 15 0

项目老旧,并没有使用xxl-job,或者其他开源调度管理系统,也没有使用公司内部自研的(或者二次开发)的调度系统,而是使用Java开源里最老牌的调度工具-Quartz。

现在有个需求就是,实现任务的动态(即无需应用的重启)注册(新增)删除以及调整调度Cron表达式时间。

调研

基于Spring Boot 2.1.0.RELEASE版本:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>

另外,项目使用Apollo作为配置中心,有一个开关控制任务的启停。

服务器部署采用简单的分布式部署,有两个IP在跑Job,故需要引入Redis,作为分布式锁,控制一个Job只有一个节点在跑。

引入Quartz注解@DisallowConcurrentExecution防止并行执行Job。

任务是记录在数据表里面,主键是Id,任务执行时间Cron表达式有对应的数据表字段。

通过实现Spring提供的InitializingBean接口,并重写方法​​afterPropertiesSet()​​,并在该方法内部写应用启动逻辑:获取Quartz的SchedulerFactoryBean关联的调度器Scheduler,检查开关,扫描Job数据表全部有效任务,遍历数据表记录,将每一个任务添加到调度器Scheduler中,任务唯一键是数据表的主键,任务执行时间则取自数据表cronExep,

源码:

package com.XXX.task;

import com.alibaba.fastjson.JSONObject;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
import com.ctrip.framework.apollo.spring.annotation.ApolloConfig;
import com.XXX.model.VarMonitor;
import com.XXX.service.varMonitor.impl.VarMonitorServiceImpl;
import com.XXX.utils.DateUtil;
import com.XXX.utils.RedisUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Date;
import java.util.List;

@Service
@DisallowConcurrentExecution
public class JobService implements InitializingBean {

private static final Logger LOGGER = LoggerFactory.getLogger(JobService.class);

private static final String PREFIX = "varMonitor_";

private static final String JOB_MAP_KEY = "varMonitor";

@Resource
public JedisCluster jedisCluster;

@Resource
private SchedulerFactoryBean schedulerFactoryBean;

@Resource
private TaskMapper taskMapper;

@Resource
private VarMonitorServiceImpl varMonitorService;

@Resource
private RedisUtil redisUtil;

@ApolloConfig
private transient Config config;

private void configScheduler() {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
try {
scheduler.clear();
} catch (SchedulerException e) {
LOGGER.error("configScheduler clear error:" + e.toString());
}

if (config == null) {
config = ConfigService.getAppConfig();
}
boolean sendWitch = config.getBooleanProperty("switch", true);
if (!sendWitch) {
LOGGER.info(String.format("configScheduler sendWitch=%s", false));
return;
}

// 应用启动时读取DB已有的定时任务,并执行
List<VarMonitor> monitorList = taskMapper.queryAllActive();
if (CollectionUtils.isEmpty(monitorList)) {
LOGGER.info("configScheduler: empty monitorList");
return;
}
for (VarMonitor job : monitorList) {
try {
if (StringUtils.isNotEmpty(job.getCronExp())) {
JobDetail jobDetail = JobBuilder.newJob(VarAlertJob.class)
.withIdentity(PREFIX + job.getId().toString()).build();
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(PREFIX + job.getId().toString())
.withSchedule(CronScheduleBuilder.cronSchedule(job.getCronExp())).build();
jobDetail.getJobDataMap().put(JOB_MAP_KEY, job);
scheduler.scheduleJob(jobDetail, trigger);
}
} catch (Exception e) {
LOGGER.error("configScheduler error:" + e.toString());
}
}
}

private Class<? extends Job> getJobExecutor(VarMonitor job) {
return VarAlertJob.class;
}

/**
* 新增任务
*/
public void addJob(VarMonitor job) {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
try {
JobDetail jobDetail = JobBuilder.newJob(getJobExecutor(job))
.withIdentity(job.getId().toString(), Scheduler.DEFAULT_GROUP).build();
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(job.getId().toString())
.withSchedule(CronScheduleBuilder.cronSchedule(job.getCronExp())).build();
jobDetail.getJobDataMap().put(JOB_MAP_KEY, job);
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
LOGGER.error("addJob with error:" + e);
}
}

/**
* 删除任务
*/
public void removeJob(Integer id) {
try {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
TriggerKey triggerKey = new TriggerKey(id.toString(), Scheduler.DEFAULT_GROUP);
// 停止、移除触发器
scheduler.pauseTrigger(triggerKey);
scheduler.unscheduleJob(triggerKey);
scheduler.deleteJob(new JobKey(PREFIX + id.toString(), Scheduler.DEFAULT_GROUP));
} catch (Exception e) {
LOGGER.error("removeJob with error:" + e);
}
}

@Override
public void afterPropertiesSet() {
configScheduler();
}

/**
* process VarMonitor one by one
*/
private void processItem(VarMonitor item) {
String key = String.format("%s_%s_%s", "processItem", DateUtil.getDateString(new Date(), DateUtil.DATE_COMPACT_FORMAT), item.getId());
try {
boolean result = redisUtil.tryGetDistributedLock(key, 10);
if (result) {
LOGGER.info(String.format("id=%d and varName=%s and monitorName=%s running at %s", item.getId(), item.getVarName(),
item.getMonitorName(), DateUtil.getDateString(new Date(), DateUtil.DATE_TIME_FORMAT)));
if (item.getMonitorId() <= 2) {
varMonitorService.dealWithTimes(item);
}
if (item.getMonitorId() == 3) {
varMonitorService.dealWithCalcValue(item);
}
}
} catch (Exception e) {
LOGGER.error("processItem:" + e);
} finally {
redisUtil.releaseDistributedLock(key);
}
}

private class VarAlertJob implements Job {

private VarMonitor monitor;

@Override
public void execute(JobExecutionContext context) {
// 关键:利用context从JobDataMap取出所有需要定时调度的task
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
if (monitor == null) {
monitor = JSONObject.parseObject(JSONObject.toJSONString(dataMap.get(JOB_MAP_KEY)), VarMonitor.class);
}
processItem(monitor);
}
}

}
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月12日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2024年05月17日   46   0   0 JSpspring
TEZNKK3IfmPf