DolphinScheduler实例表备份清理
Posted DATA数据猿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DolphinScheduler实例表备份清理相关的知识,希望对你有一定的参考价值。
😋DolphinScheduler实例表备份、清理
👊一、前言
DolphinScheduler至今已经在项目中使用了将近一年,工作流实例和任务流实例都积累了百万级的数据量。在查看工作流实例和任务实例的时候,都要等待后台去查询数据库,感觉在使用上不太方便。所以想着以某一日期为界限,备份后再清除这部分数据。
👊二、查看实例表
🙇♀2.1 工作流实例
表结构
CREATE TABLE `t_ds_process_instance` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`name` varchar(255) DEFAULT NULL COMMENT '流程定义名称',
`process_definition_id` int(11) DEFAULT NULL COMMENT '流程定义ID',
`state` tinyint(4) DEFAULT NULL COMMENT '流程实例状态:0提交成功,1运行,2准备暂停,3暂停,4准备停止,5停止,6失败,7成功,8需要容错,9终止,10等待线程,11等待依赖项完成',
`recovery` tinyint(4) DEFAULT NULL COMMENT '流程实例故障转移标志:0:正常,1:故障转移实例',
`start_time` datetime DEFAULT NULL COMMENT '流程实例开始事件',
`end_time` datetime DEFAULT NULL COMMENT '流程实例结束事件',
`run_times` int(11) DEFAULT NULL COMMENT '流程实例运行时间',
`host` varchar(135) DEFAULT NULL COMMENT '主机',
`command_type` tinyint(4) DEFAULT NULL COMMENT '命令类型',
`command_param` text COMMENT 'json命令参数',
`task_depend_type` tinyint(4) DEFAULT NULL COMMENT '任务取决于类型。0:仅当前节点,1:在节点之前,2:在节点之后',
`max_try_times` tinyint(4) DEFAULT '0' COMMENT '最大尝试次数',
`failure_strategy` tinyint(4) DEFAULT '0' COMMENT '失败策略。0:节点失败时结束进程,1:节点失败后继续运行其他节点',
`warning_type` tinyint(4) DEFAULT '0' COMMENT '警告类型。0:无警告,1:进程成功时警告,2:进程失败时警告,3:成功时警告',
`warning_group_id` int(11) DEFAULT NULL COMMENT '告警组ID',
`schedule_time` datetime DEFAULT NULL COMMENT '调度事件',
`command_start_time` datetime DEFAULT NULL COMMENT '命令开始事件',
`global_params` text COMMENT '全局参数',
`process_instance_json` longtext COMMENT '流程实例json(复制的过程定义的json)',
`flag` tinyint(4) DEFAULT '1' COMMENT '标志',
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`is_sub_process` int(11) DEFAULT '0' COMMENT '标志,进程是否为子进程',
`executor_id` int(11) NOT NULL COMMENT '执行者ID',
`locations` text COMMENT '节点位置信息',
`connects` text COMMENT '节点连接信息',
`history_cmd` text COMMENT '流程实例操作的历史命令',
`dependence_schedule_times` text COMMENT '取决于火灾时间的安排',
`process_instance_priority` int(11) DEFAULT NULL COMMENT '进程实例优先级。0最高,1高,2中等,3低,4最低',
`worker_group` varchar(64) DEFAULT NULL COMMENT '工作组id',
`timeout` int(11) DEFAULT '0' COMMENT '超时时间',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT '租户id',
PRIMARY KEY (`id`),
KEY `process_instance_index` (`process_definition_id`,`id`) USING BTREE,
KEY `start_time_index` (`start_time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
表数据案例
mysql> select * from t_ds_process_instance where start_time = '2022-09-02 10:45:03' \\G;
id: 839795
name: 流程实例-0-1662086702806
process_definition_id: 20
state: 7
recovery: 0
start_time: 2022-09-02 10:45:03
end_time: 2022-09-02 10:45:17
run_times: 1
host: 10.23.165.209:5678
command_type: 6
command_param: NULL
task_depend_type: 2
max_try_times: 0
failure_strategy: 1
warning_type: 0
warning_group_id: 0
schedule_time: 2022-09-02 10:45:00
command_start_time: 2022-09-02 10:45:02
global_params: NULL
process_instance_json: "globalParams":[],"tasks":["conditionResult":"successNode":[""],"failedNode":[""],"description":"流程实例","runFlag":"NORMAL","type":"SQL","params":"type":"POSTGRESQL","datasource":1,"sql":"select dws.p_dws_table()","udfs":"","sqlType":"0","sendEmail":false,"displayRows":10,"limit":"1","title":"","receivers":"123456789@qq.com","receiversCc":"","showType":"TABLE","localParams":[],"connParams":"","preStatements":[],"postStatements":[],"timeout":"strategy":"FAILED","interval":30,"enable":true,"maxRetryTimes":"0","taskInstancePriority":"MEDIUM","name":"dws.p_dws_table","dependence":,"retryInterval":"1","preTasks":[],"id":"tasks-80778","workerGroup":"default"],"tenantId":1,"timeout":0
flag: 1
update_time: 2022-09-02 10:45:16
is_sub_process: 0
executor_id: 5
locations: "tasks-80778":"name":"dws.p_dws_table","targetarr":"","nodenumber":"0","x":212,"y":98
connects: []
history_cmd: SCHEDULER
dependence_schedule_times: NULL
process_instance_priority: 2
worker_group: default
timeout: 0
tenant_id: 1
🙇♀2.2 任务实例
表结构
CREATE TABLE `t_ds_task_instance` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`name` varchar(255) DEFAULT NULL COMMENT '任务名称',
`task_type` varchar(64) DEFAULT NULL COMMENT '任务类型',
`process_definition_id` int(11) DEFAULT NULL COMMENT '流程定义ID',
`process_instance_id` int(11) DEFAULT NULL COMMENT '流程实例ID',
`task_json` longtext COMMENT '任务内容Json',
`state` tinyint(4) DEFAULT NULL COMMENT '状态:0提交成功,1运行,2准备暂停,3暂停,4准备停止,5停止,6失败,7成功,8需要容错,9终止,10等待线程,11等待依赖项完成',
`submit_time` datetime DEFAULT NULL COMMENT '任务提交时间',
`start_time` datetime DEFAULT NULL COMMENT '任务开始时间',
`end_time` datetime DEFAULT NULL COMMENT '任务结束时间',
`host` varchar(135) DEFAULT NULL COMMENT '任务运行主机',
`execute_path` varchar(200) DEFAULT NULL COMMENT '主机中的任务执行路径',
`log_path` varchar(200) DEFAULT NULL COMMENT '任务日志路径',
`alert_flag` tinyint(4) DEFAULT NULL COMMENT '告警标志',
`retry_times` int(4) DEFAULT '0' COMMENT '任务重试时间',
`pid` int(4) DEFAULT NULL COMMENT '任务的PID',
`app_link` text COMMENT 'yarn app id',
`flag` tinyint(4) DEFAULT '1' COMMENT '0不可用,1可用',
`retry_interval` int(4) DEFAULT NULL COMMENT '任务失败时的重试间隔',
`max_retry_times` int(2) DEFAULT NULL COMMENT '最大重试时间',
`task_instance_priority` int(11) DEFAULT NULL COMMENT '任务实例优先级:0最高,1高,2中等,3低,4最低',
`worker_group` varchar(64) DEFAULT NULL COMMENT '工作组id',
`executor_id` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
KEY `task_instance_index` (`process_definition_id`,`process_instance_id`) USING BTREE,
CONSTRAINT `foreign_key_instance_id` FOREIGN KEY (`process_instance_id`) REFERENCES `t_ds_process_instance` (`id`) ON DELETE CASCADE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
表数据案例
ysql> select * from t_ds_task_instance where submit_time = '2022-09-02 10:34:13' \\G;
*************************** 1. row ***************************
id: 4089352
name: p_dws_wms_table()
task_type: SQL
process_definition_id: 12
process_instance_id: 839778
task_json: "conditionResult":"\\"successNode\\":[\\"\\"],\\"failedNode\\":[\\"\\"]","conditionsTask":false,"depList":["dws.p_dws_wms_table"],"dependence":"","forbidden":false,"id":"tasks-49159","maxRetryTimes":0,"name":"p_dws_wms_table()","params":"\\"postStatements\\":[],\\"connParams\\":\\"\\",\\"receiversCc\\":\\"\\",\\"udfs\\":\\"\\",\\"type\\":\\"POSTGRESQL\\",\\"title\\":\\"\\",\\"sql\\":\\"select dws.p_dws_wms_table()\\",\\"preStatements\\":[],\\"sqlType\\":\\"0\\",\\"sendEmail\\":false,\\"receivers\\":\\"\\",\\"datasource\\":1,\\"displayRows\\":10,\\"limit\\":10000,\\"showType\\":\\"TABLE\\",\\"localParams\\":[]","preTasks":"[\\"dws.p_dws_wms_table\\"]","retryInterval":1,"runFlag":"NORMAL","taskInstancePriority":"MEDIUM","taskTimeoutParameter":"enable":true,"interval":30,"strategy":"FAILED","timeout":"\\"enable\\":true,\\"interval\\":30,\\"strategy\\":\\"FAILED\\"","type":"SQL","workerGroup":"default"
state: 7
submit_time: 2022-09-02 10:34:13
start_time: 2022-09-02 10:34:13
end_time: 2022-09-02 10:34:15
host: 10.23.165.209:1234
execute_path: NULL
log_path: /opt/soft/dolphinscheduler/logs/12/839778/4089352.log
alert_flag: 0
retry_times: 0
pid: 0
app_link: NULL
flag: 1
retry_interval: 1
max_retry_times: 0
task_instance_priority: 2
worker_group: default
executor_id: 3
👊三、查看实例取值逻辑
🙇♀3.1 工作流实例(queryProcessInstanceListPaging)
<select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time,
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run ,instance.next_process_instance_id,
restart_time
from t_ds_process_instance instance
join t_ds_process_definition define ON instance.process_definition_code = define.code
where instance.is_sub_process=0
and define.project_code = #projectCode
<if test="processDefinitionCode != 0">
and instance.process_definition_code = #processDefinitionCode
</if>
<if test="searchVal != null and searchVal != ''">
and instance.name like concat('%', #searchVal, '%')
</if>
<if test="startTime != null ">
and instance.start_time > #startTime and instance.start_time <![CDATA[ <=]]> #endTime
</if>
<if test="states != null and states.length > 0">
and instance.state in
<foreach collection="states" index="index" item="i" open="(" separator="," close=")">
#i
</foreach>
</if>
<if test="host != null and host != ''">
and instance.host like concat('%', #host, '%')
</if>
<if test="executorId != 0">
and instance.executor_id = #executorId
</if>
order by instance.start_time desc,instance.end_time desc
</select>
🙇♀3.2 任务实例(queryTaskInstanceListPaging)
<select id="queryTaskInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSqlV2">
<property name="alias" value="instance"/>
</include>
,
process.name as process_instance_name
from t_ds_task_instance instance
left join t_ds_task_definition_log define on define.code=instance.task_code and define.version=instance.task_definition_version
left join t_ds_process_instance process on process.id=instance.process_instance_id
where define.project_code = #projectCode
<if test="startTime != null">
and instance.start_time > #startTime and instance.start_time <![CDATA[ <=]]> #endTime
</if>
<if test="processInstanceId != 0">
and instance.process_instance_id = #processInstanceId
</if>
<if test="searchVal != null and searchVal != ''">
and instance.name like concat('%', #searchVal, '%')
</if>
PG数据库急着上线前,快速上线后,计划时间备份清理历史数据
DolphinScheduler无故删除HDFS上的Hive库表目录
DolphinScheduler无故删除HDFS上的Hive库表目录
Dolphinscheduler调度生成工作流实例未生成任务实例解决历程 one result (or null) to be returned by selectOne(),but found: 2