Archiver Appliance 事务处理流程
Posted 中兔西维亚
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Archiver Appliance 事务处理流程相关的知识,希望对你有一定的参考价值。
当 Archiver Appliance 开始运行后发生了什么:
mgmt服务:
config\\DefaultConfigService.java
initialize()
this.mgmtRuntime = new MgmtRuntimeState(this);
startupExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory()
@Override
public Thread newThread(Runnable r)
Thread t = new Thread(r);
t.setName("Startup executor");
return t;
);
MgmtPostStartup mgmtPostStartup = new MgmtPostStartup(this);
ScheduledFuture<?> postStartupFuture = startupExecutor.scheduleAtFixedRate(mgmtPostStartup, 10, 20, TimeUnit.SECONDS);
mgmtPostStartup.setCancellingFuture(postStartupFuture);
创建 mgmt 状态机 —— MgmtRuntimeState;
定期执行 mgmt 后处理 —— MgmtPostStartup。
mgmt\\MgmtPostStartup.java
public MgmtPostStartup(ConfigService configService)
this.configService = configService;
@Override
public void run()
if(this.configService.isStartupComplete())
this.checkIfAllComponentsHaveStartedUp();
if(this.configService.getMgmtRuntimeState().haveChildComponentsStartedUp())
cancellingFuture.cancel(false);
else
configService.postStartup();
checkIfAllComponentsHaveStartedUp()
if(retrievalStartupState == ConfigService.STARTUP_SEQUENCE.STARTUP_COMPLETE)
configService.getMgmtRuntimeState().componentStartedUp(WAR_FILE.RETRIEVAL);
... // etl 和 engine
定期执行的 mgmt 后处理包括:
若 configService 未完成,则执行 configService.postStartup() ;
若 configService 已完成,检查 mgmt、engine、etl、retrieval 四个服务是否都启动成功;
每个服务启动成功时,都会更新 mgmt 状态机中已启动服务列表;
若四个服务都成功启动,则开始存储 PV 值 —— startArchivePVRequests()。
config\\DefaultConfigService.java
/*
* config\\DefaultConfigService.java
*/
postStartup()
Config config = new XmlConfigBuilder().build();
...
hzinstance = Hazelcast.newHazelcastInstance(config); // 建立集群信息
pubSub = hzinstance.getTopic("pubSub");
initializePersistenceLayer(); // 初始化 mysql 数据库中存储的 PV 信息
loadTypeInfosFromPersistence();
loadAliasesFromPersistence();
loadArchiveRequestsFromPersistence();
loadExternalServersFromPersistence();
registerForNewExternalServers(hzinstance.getMap("channelArchiverDataServers"));
eventBus.register(this); // 事务通知
pubSub.addMessageListener(new MessageListener<PubSubEvent>()
@Override
public void onMessage(Message<PubSubEvent> pubSubEventMsg)
PubSubEvent pubSubEvent = pubSubEventMsg.getMessageObject();
if(pubSubEvent.getDestination().equals("ALL")
|| (pubSubEvent.getDestination().startsWith(myIdentity) && pubSubEvent.getDestination().endsWith(DefaultConfigService.this.warFile.toString()))
)
pubSubEvent.markSourceAsCluster();
eventBus.post(pubSubEvent);
);
this.startupState = STARTUP_SEQUENCE.STARTUP_COMPLETE;
将数据库中存储的 PV 信息加载到当前环境;
建立集群信息、事务通知机制:
集群中某个appliance发送消息后,先通过 PubSubEvent 推送到集群,该消息的目的appliance收到并发现是发给自己的某个服务的消息后,再通过 eventBus 推送到本地。
initializePersistenceLayer() throws ConfigException
persistanceLayer = new MySQLPersistence(); // 与 MySQL 交互的接口
mgmt\\MgmtRuntimeState.java
private static int threadNumber = 1;
ScheduledExecutorService archivePVWorkflow = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
@Override
public Thread newThread(Runnable r)
Thread t = new Thread(r);
t.setName("MgmtArchivePVWorkflow" + threadNumber++);
return t;
);
public MgmtRuntimeState(final ConfigService configService)
this.configService = configService;
myIdentity = this.configService.getMyApplianceInfo().getIdentity();
...
configService.getEventBus().register(this);
componentStartedUp(WAR_FILE component)
componentsThatHaveCompletedStartup.add(component);
if(this.haveChildComponentsStartedUp())
this.startArchivePVRequests(); // 若四个服务都成功启动,则开始存储 PV 值
private void startArchivePVRequests()
// 对数据库 ArchivePVRequests 表中的 PV 开始存储流程
for(String pvNameFromPersistence : configService.getArchiveRequestsCurrentlyInWorkflow())
this.startPVWorkflow(pvNameFromPersistence);
startArchivePVWorkflow(initialDelayInSeconds);
// 单个 PV 的存储流程开始
startPVWorkflow()
ArchivePVState pvState = new ArchivePVState(pvName, configService); // 创建单个 PV 的存储状态机
currentPVRequests.put(pvName, pvState);
// 对 currentPVRequests 中最多前 archivePVWorkflowBatchSize 个 PV 定期执行 runWorkFlowForPV.nextStep();
private void startArchivePVWorkflow()
theArchiveWorkflow = archivePVWorkflow.scheduleAtFixedRate(new Runnable()
@Override
public void run()
LinkedList<ArchivePVState> archivePVStates = new LinkedList<ArchivePVState>(currentPVRequests.values());
int totRequests = archivePVStates.size();
int maxRequestsToProcess = Math.min(archivePVWorkflowBatchSize, totRequests);
int pvCount = 0;
while(pvCount < maxRequestsToProcess)
ArchivePVState runWorkFlowForPV = archivePVStates.pop();
runWorkFlowForPV.nextStep(); // 推动Archive PV步骤进程
pvCount++;
, initialDelayInSeconds, archivePVWorkflowTickSeconds, TimeUnit.SECONDS);
在四个服务都成功启动、开始存储 PV 值后,执行:
对数据库 ArchivePVRequests 表中的 PV 开始存储流程;
定期对 currentPVRequests 中最多前 archivePVWorkflowBatchSize 个 PV 的存储状态机执行状态更新操作。
当在web端点击”Archive PV”后发生了什么:
mgmt\\BPLServlet.java:
doPost()
BasicDispatcher.dispatch(req, resp, configService, postActions);
common\\BasicDispatcher.java:
dispatch()
action = actionClass.getConstructor().newInstance(); // ArchivePVAction.class
action.execute(req, resp, configService);
mgmt\\bpl\\ArchivePVAction.java
execute()
archivePV(out, pv, samplingPeriodSpecified, samplingMethod, samplingPeriod, controllingPV, policyName, null, skipCapacityPlanning, configService, fieldsAsPartOfStream);
archivePV()
configService.addToArchiveRequests(actualPVName, userSpecifiedSamplingParams);
configService.getMgmtRuntimeState().startPVWorkflow(pvName);
config\\DefaultConfigService.java
archivePVRequests = hzinstance.getMap("archivePVRequests");
addToArchiveRequests()
archivePVRequests.put(pvName, userSpecifiedSamplingParams);
persistanceLayer.putArchivePVRequest(pvName, userSpecifiedSamplingParams);
mgmt\\MgmtRuntimeState.java
startPVWorkflow()
ArchivePVState pvState = new ArchivePVState(pvName, configService); // 创建单个 PV 的存储状态机
currentPVRequests.put(pvName, pvState);
mgmt\\archivepv\\ArchivePVState.java
private ArchivePVStateMachine currentState = ArchivePVStateMachine.START;
public ArchivePVState(String pvName, ConfigService configService)
this.myIdentity = this.configService.getMyApplianceInfo().getIdentity();
this.fieldsArchivedAsPartOfStream = configService.getFieldsArchivedAsPartOfStream();
public synchronized void nextStep()
switch(currentState)
case START: // [1]
PubSubEvent pubSubEvent = new PubSubEvent("ComputeMetaInfo", myIdentity + "_" + ConfigService.WAR_FILE.ENGINE, pvName);
... // userSpec
pubSubEvent.setEventData(encoder.encode(userSpec).toJSONString());
configService.getEventBus().post(pubSubEvent);
currentState = ArchivePVStateMachine.METAINFO_REQUESTED;
return;
case METAINFO_REQUESTED: // [1]
logger.debug("A request to gather metainfo has been published for the PV " + pvName);
return;
case METAINFO_GATHERING: // [4]
logger.debug("Metainfo has been requested and is being gathered for " + pvName);
return;
case METAINFO_OBTAINED: // [7]
PolicyConfig thePolicy = configService.computePolicyForPV(pvName, metaInfo, userSpec);
... // 设置该 PV 的存储策略
PVTypeInfo typeInfo = new PVTypeInfo(pvName, metaInfo.getArchDBRTypes(), !metaInfo.isVector(), metaInfo.getCount());
... // 设置该 PV 的基本信息:metaInfo、thePolicy、CreationTime、aliasFieldName、applianceInfoForPV 等
ApplianceInfo applianceInfoForPV = null;
... // 设置该 PV 所属的 appliance
configService.registerPVToAppliance(pvName, applianceInfoForPV); // pv2appliancemapping.put(pvName, applianceInfo);
typeInfo.setApplianceIdentity(applianceIdentityAfterCapacityPlanning);
configService.updateTypeInfoForPV(pvName, typeInfo); // typeInfos.put(pvName, typeInfo);
currentState = ArchivePVStateMachine.POLICY_COMPUTED;
return;
case POLICY_COMPUTED: // [8]
PVTypeInfo typeInfo = configService.getTypeInfoForPV(pvName);
if(typeInfo.getApplianceIdentity().equals(applianceIdentityAfterCapacityPlanning))
currentState = ArchivePVStateMachine.TYPEINFO_STABLE;
return;
case TYPEINFO_STABLE: // [9]
PVTypeInfo typeInfo = configService.getTypeInfoForPV(pvName);
// 开始存储 PV
ArchivePVState.startArchivingPV(pvName, configService, configService.getAppliance(typeInfo.getApplianceIdentity()));
registerAliasesIfAny(typeInfo);
currentState = ArchivePVStateMachine.ARCHIVE_REQUEST_SUBMITTED;
return;
case ARCHIVE_REQUEST_SUBMITTED: // [9]
// The part in mgmt is basically over; we are waiting for the engine to indicate that it has commenced archiving
// Until then, we stay in this state.
return;
case ARCHIVING: // [12]
configService.archiveRequestWorkflowCompleted(pvName);
configService.getMgmtRuntimeState().finishedPVWorkflow(pvName);
currentState = ArchivePVStateMachine.FINISHED;
return;
case ABORTED:
configService.archiveRequestWorkflowCompleted(pvName);
configService.getMgmtRuntimeState().finishedPVWorkflow(pvName);
currentState = ArchivePVStateMachine.FINISHED;
return;
case FINISHED: // [13]
logger.error("Archive state for PV " + this.pvName + " is finished.");
return;
public void metaInfoRequestAcknowledged() // [4]
metaInfoRequestedSubmitted = TimeUtils.now();
this.currentState = ArchivePVStateMachine.METAINFO_GATHERING;
public void metaInfoObtained(MetaInfo metaInfo) // [7]
this.metaInfo = metaInfo;
this.currentState = ArchivePVStateMachine.METAINFO_OBTAINED;
public static void startArchivingPV() throws IOException // [9]
PubSubEvent pubSubEvent = new PubSubEvent("StartArchivingPV", applianceInfoForPV.getIdentity() + "_" + ConfigService.WAR_FILE.ENGINE, pvName);
configService.getEventBus().post(pubSubEvent);
public void confirmedStartedArchivingPV() // [12]
this.currentState = ArchivePVStateMachine.ARCHIVING;
对单个 PV 的存储流程主要分两大部分:
创建 PV 通道,元数据信息获取;
开始执行数据存储。
TYPEINFO_STABLE 和 ARCHIVE_REQUEST_SUBMITTED 这两个状态,是这两大部分的分界点。
若 ”Archive PV” 不成功,或者成功后出现 PV unconnected 等错误情况,应首先定位问题属于这两大部分的哪一部分,再去具体定位问题。
关于 applianceInfoForPV 的设置过程,详见 Archiver Appliance 建立集群时可能出现的问题 。
engine\\pv\\EngineContext.java
@Subscribe public void computeMetaInfo(PubSubEvent pubSubEvent)
if(pubSubEvent.getType().equals("ComputeMetaInfo")) // [2]
// 通过 MetaGet 创建 PV 通道,添加事件监听,获取 PV 元数据信息
ArchiveEngine.getArchiveInfo(pvName, configService, extraFields, userSpec.isUsePVAccess(), new ArchivePVMetaCompletedListener(pvName, configService, myIdentity));
// 对 mgmt 提出的 ComputeMetaInfo 操作进行应答
PubSubEvent confirmationEvent = new PubSubEvent("MetaInfoRequested", pubSubEvent.getSource() + "_" + ConfigService.WAR_FILE.MGMT, pvName);
configService.getEventBus().post(confirmationEvent);
else if(pubSubEvent.getType().equals("StartArchivingPV")) // [10]
String pvName = pubSubEvent.getPvName();
this.startArchivingPV(pvName); // 开始存储 PV
PubSubEvent confirmationEvent = new PubSubEvent("StartedArchivingPV", pubSubEvent.getSource() + "_" + ConfigService.WAR_FILE.MGMT, pvName);
configService.getEventBus().post(confirmationEvent);
else if(pubSubEvent.getType().equals("AbortComputeMetaInfo"))
String pvName =pt-archiver 归档数据
pt-archiver 参数说明
pt-archiver是Percona-Toolkit工具集中的一个组件,是一个主要用于对MySQL表数据进行归档和清除工具。它可以将数据归档到另一张表或者是一个文件中。pt-archiver在清除表数据的过程中并不会影响OLTP事务的查询性能。对于数据的归档,它可以归档到另一台服务器上的另一张表,也可归档到一个文件中,文件可以用LOAD DATA INFILE进行数据装载,这个功能其实就类似是表历史数据的增量删除。
基本说明
pt-archiver [OPTIONS] --source DSN --where WHERE
常用选项(OPTIONS)
--analyze
指定工具完成数据归档后对表执行‘ANALYZE TABLE‘操作。指定方法如‘--analyze=ds‘,s代表源端表,d代表目标端表,也可以单独指定。
--ask-pass
命令行提示密码输入,保护密码安全,前提需安装模块perl-TermReadKey。
--buffer
指定缓冲区数据刷新到选项‘--file‘指定的文件并且在提交时刷新。
只有当事务提交时禁用自动刷新到‘--file‘指定的文件和刷新文件到磁盘,这意味着文件是被操作系统块进行刷新,因此在事务进行提交之前有一些数据隐式刷新到磁盘。默认是每一行操作后进行文件刷新到磁盘。
--bulk-delete
指定单个语句删除chunk的方式来批量删除行,会隐式执行选项‘--commit-each‘。
使用单个DELETE语句删除每个chunk对应的表行,通常的做法是通过主键进行逐行的删除,批量删除在速度上会有很大的提升,但如果有复杂的‘WHERE‘条件就可能会更慢。
--[no]bulk-delete-limit
默认值:yes
指定添加选项‘--bulk-delete‘和‘--limit‘到进行归档的语句中。
--bulk-insert
使用LOAD DATA LOCAL INFILE的方法,通过批量插入chunk的方式来插入行(隐式指定选项‘--bulk-delete‘和‘--commit-each‘)
而不是通过逐行单独插入的方式进行,它比单行执行INSERT语句插入的速度要快。通过隐式创建临时表来存储需要批量插入的行(chunk),而不是直接进行批量插入操作,当临时表中完成每个chunk之后再进行统一数据加载。为了保证数据的安全性,该选项会强制使用选项‘--bulk-delete‘,这样能够有效保证删除是在插入完全成功之后进行的。
--channel
指定当主从复制环境是多源复制时需要进行归档哪个主库的数据,适用于多源复制中多个主库对应一个从库的情形。
--charset,-A
指定连接字符集。
--[no]check-charset
默认值:yes
指定检查确保数据库连接时字符集和表字符集相同。
--[no]check-columns
默认值:yes
指定检查确保选项‘--source‘指定的源端表和‘--dest‘指定的目标表具有相同的字段。
不检查字段在表的排序和字段类型,只检查字段是否在源端表和目标表当中都存在,如果有不相同的字段差异,则工具报错退出。如果需要禁用该检查,则指定‘--no-check-columns‘。
--check-slave-lag
指定主从复制延迟大于选项‘--max-lag‘指定的值之后暂停归档操作。默认情况下,工具会检查所有的从库,但该选项只作用于指定的从库(通过DSN连接方式)。
--check-interval
默认值:1s
如果同时指定了选项‘--check-slave-lag‘,则该选项指定的时间为工具发现主从复制延迟时暂停的时间。每进行操作100行时进行一次检查。
--columns,-c
指定需要归档的表字段,如有多个则用‘,‘(逗号)隔开。
--commit-each
指定按每次获取和归档的行数进行提交,该选项会禁用选项‘--txn-size‘。
在每次获取表数据并进行归档之后,在获取下一次数据和选项‘--sleep‘指定的休眠时间之前,进行事务提交和刷新选项‘--file‘指定的文件,通过选项‘--limit‘控制事务的大小。
--host,-h
指定连接的数据库IP地址。
--port,-P
指定连接的数据库Port端口。
--user,-u
指定连接的数据库用户。
--password,-p
指定连接的数据库用户密码。
--socket,-S
指定使用SOCKET文件连接。
--databases,-d
指定连接的数据库
--source
指定需要进行归档操作的表,该选项是必须指定的选项,使用DSN方式表示。
--dest
指定要归档到的目标端表,使用DSN方式表示。
如果该选项没有指定的话,则默认与选项‘--source‘指定源端表为相同表。
--where
指定通过WHERE条件语句指定需要归档的数据,该选项是必须指定的选项。不需要加上‘WHERE‘关键字,如果确实不需要WHERE条件进行限制,则指定‘--where 1=1‘。
--file
指定表数据需要归档到的文件。使用类似MySQL DATE_FORMAT()格式化命名方式。
文件内容与MySQL中SELECT INTO OUTFILE语句使用相同的格式,文件命名选项如下所示:
‘
%Y:年,4位数(Year, numeric, four digits)
%m:月,2位数(Month, numeric (01..12))
%d:日,2位数(Day of the month, numeric (01..31))
%H:小时(Hour (00..23))
%i:分钟(Minutes, numeric (00..59))
%s:秒(Seconds (00..59))
%D:数据库名(Database name)
%t:表名(Table name)
例如:--file ‘/var/log/archive/%Y-%m-%d-%D.%t‘
‘
--output-format
指定选项‘--file‘文件内容输出的格式。
默认不指定该选项是以制表符进行字段的分隔符,如果指定该选项,则使用‘,‘(逗号)作为字段分隔符,使用‘"‘(双引号)将字段括起。用法示例:‘--output-format=dump‘。
--for-update
指定为每次归档执行的SELECT语句添加FOR UPDATE子句。
--share-lock
指定为每次归档执行的SELECT语句添加LOCK IN SHARE MODE子句。
--header
指定在文件中第一行写入字段名称作为标题。
--ignore
指定为INSERT语句添加IGNORE选项。
--limit
默认值:1
指定每条语句获取表和归档表的行数。
--local
指定不将OPTIMIZE和ANALYZE语句写入binlog。
--max-lag
默认值:1s
指定允许主从复制延迟时长的最大值,单位秒。如果在每次获取行数据之后主从延迟超过指定的值,则归档操作将暂停执行,暂停休眠时间为选项‘--check-interval‘指定的值。待休眠时间结束之后再次检查主从延迟时长,检查方法是通过从库查询的‘Seconds_Behind_Master‘值来确定。如果主从复制延迟一直大于该参数指定值或者从库停止复制,则操作将一直等待直到从库重新启动并且延迟小于该参数指定值。
--no-delete
指定不删除已被归档的表数据。
--optimize
指定工具完成数据归档后对表执行‘OPTIMIZE TABLE‘操作。指定方法如‘--analyze=ds‘,s代表源端表,d代表目标端表,也可以单独指定。
--primary-key-only
指定只归档主键字段,是选项‘--columns=主键‘的简写。
如果工具归档的操作是进行DELETE清除时最有效,因为只需读取主键一个字段而无需读取行所有字段。
--progress
指定每多少行打印进度信息,打印当前时间,已用时间以及多少行进行归档。
--purge
指定执行的清除操作而不是归档操作。允许忽略选项‘--dest‘和‘--file‘进行操作,如果只是清除操作可以结合选项‘--primary-key-only‘会更高效。
--quiet,-q
指定工具静默执行,不输出任何的执行信息。
--replace
指定写入选项‘--dest‘指定目标端表时改写INSERT语句为REPLACE语句。
--retries
默认值:1
指定归档操作遇到死锁或超时的重试次数。当重试次数超过该选项指定的值时,工具将报错退出。
--run-time
指定工具归档操作在退出之前需要运行的时间。允许的时间后缀名为s=秒,m=分,h=小时,d=天,如果没指定,默认为s。
--[no]safe-auto-increment
默认值:yes
指定不使用自增列(AUTO_INCREMENT)最大值对应的行进行归档。
该选项在进行归档清除时会额外添加一条WHERE子句以防止工具删除单列升序字段具有的具有AUTO_INCREMENT属性最大值的数据行,为了在数据库重启之后还能使用到AUTO_INCREMENT对应的值,但这会引起无法归档或清除字段对应最大值的行。
--set-vars
默认:
wait_timeout=10000
innodb_lock_wait_timeout=1
lock_wait_timeout=60
工具归档时指定参数值,如有多个用‘,‘(逗号)分隔。如‘--set-vars=wait_timeout=5000‘。
--skip-foreign-key-checks
指定使用语句SET FOREIGN_KEY_CHECKS = 0禁用外键检查。
--sleep
指定工具在通过SELECT语句获取归档数据需要休眠的时间,默认值是不进行休眠。在休眠之前事务并不会提交,并且选项‘--file‘指定的文件不会被刷新。如果指定选项‘--commit-each‘,则在休眠之前会进行事务提交和文件刷新。
--statistics
指定工具收集并打印操作的时间统计信息。
--txn-size
默认:1
指定每个事务处理的行数。如果是0则禁用事务功能。
--version
显示工具的版本并退出。
--[no]version-check
默认值:yes
检查Percona Toolkit、MySQL和其他程序的最新版本。
--why-quit
指定工具打印当非因完成归档行数退出的原因。
在执行一个自动归档任务时该选项与选项‘--run-time‘一起使用非常方便,这样可以确定归档任务是否在指定的时间内完成。如果同时指定了选项‘--statistics‘,则会打印所有退出的原因。
DSN选项(DSN)
可以使用DSN方式来连接数据库,DSN选项为key=value方式,在等号的两侧不能有空格出现,并且区分大小写,多个选项之前以‘,‘(逗号)隔开,主要选项如下:
a
归档操作是在哪个库下进行的,相当于USE操作。
A
指定默认字符集。
b
当值为true时,禁止SQL_LOG_BIN,相当于SQL_LOG_BIN = 0。
D
指定包含需要归档表的数据库。
h
指定连接的主机。
u
指定连接的用户。
p
指定连接需要的密码。
P
指定连接的端口。
S
指定连接的SOCKET文件。
t
指定需要归档的表。
i
指定需要使用的索引。
选项用法说明
工具至少需指定选项--dest、--file或--purge其中之一;
选项--ignore和--replace是互斥的;
选项--txn-size和--commit-each是互斥的;
选项--share-lock和--for-update是互斥的;
--analyze和--optimize是互斥的;
本次测试是基于employees表以及新建的yoon表
CREATE TABLE `yoon` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`v_int` int(11) DEFAULT NULL,
`v_string` varchar(50) DEFAULT NULL,
`s_string` char(20) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=80001 DEFAULT CHARSET=utf8;
创建存储过程i_yoon插入测试数据
delimiter $$
CREATE PROCEDURE i_yoon (IN row_num INT)
BEGIN
DECLARE i INT DEFAULT 0 ;
WHILE i < row_num DO
INSERT INTO yoon (v_int, v_string, s_string)
VALUES
(
floor(1 + rand() * 1000000),
substring(
MD5(RAND()),
1,
floor(1 + rand() * 20)
),
substring(MD5(RAND()), 1, 20)
) ;
SET i = i + 1 ;
END
WHILE ; END$$
delimiter ;
call i_yoon(200000);
在目标端创建测试库hank,表结构yoon:
CREATE TABLE `yoon` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`v_int` int(11) DEFAULT NULL,
`v_string` varchar(50) DEFAULT NULL,
`s_string` char(20) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=80001 DEFAULT CHARSET=utf8;
执行归档,不删除源端数据:--source 源端,--dest:目标端,--no-delete:不删除源端数据
pt-archiver --source h=127.0.0.1,P=3306,u=admin,p=‘admin‘,D=yoon,t=yoon,A=utf8 --dest h=127.0.0.1,P=3307,u=admin,p=‘admin‘,D=hank,t=yoon,A=utf8 --charset=utf8 --where ‘id <= 20000‘ --progress 50 --txn-size=1000 --statistics --no-delete --ask-pass --limit=200 --sleep=1 --dry-run
执行归档,删除源端数据:--source 源端,--dest:目标端
pt-archiver --source h=127.0.0.1,P=3306,u=admin,p=‘admin‘,D=yoon,t=yoon,A=utf8 --dest h=127.0.0.1,P=3307,u=admin,p=‘admin‘,D=hank,t=yoon,A=utf8 --charset=utf8 --where ‘id <= 20000‘ --progress 50 --txn-size=1000 --statistics --ask-pass --limit=200 --sleep=1 --dry-run
表归档到表(批量进行)
批量进行归档涉及的选项是--limit,批量进行插入涉及的选项为--bulk-insert,指定选项--bulk-insert同时也会指定选项--bulk-delete,如果不删除已归档数据,则需要指定选项--no-delete
pt-archiver --source h=127.0.0.1,P=3306,u=admin,p=‘admin‘,D=yoon,t=yoon,A=utf8 --dest h=127.0.0.1,P=3308,u=admin,p=‘admin‘,D=hank,t=yoon,A=utf8 --charset=utf8 --where "id <= 20000" --progress=50 --txn-size=1000 --limit=50 --statistics --no-delete --bulk-insert --ask-pass --dry-run
表归档到文件
表归档到文件将选项--dest换成--file,并且根据需要添加选项--output-format
pt-archiver --source h=127.0.0.1,P=3306,u=admin,p=‘admin‘,D=employees,t=employees,A=utf8 --file=‘/tmp/yoon_%Y-%m-%d.sql‘ --charset=utf8 --output-format=‘dump‘ --where "id <= 20000" --progress=50 --txn-size=1000 --limit=50 --statistics --no-delete --ask-pass --dry-run
表清除数据:--txn-size 表示200行提交一次失误,若最后删除的数据低于200行报错,改--txn-size值
如果只是进行表数据清除操作而不做归档操作,则可以忽略选项--dest或--file,通过指定选项--purge,可以先使用选项--dry-run打印查询需要清除数据的执行语句,做好确认之后再执行。
pt-archiver --source h=localhost,P=3306,u=root,p=‘Asd.123@#‘,D=yoon,t=yoon --purge --charset=utf8 --where "id <= 400000" --progress=200 --limit=200 --sleep=1 --txn-size=200 --statistics --dry-run
以上是关于Archiver Appliance 事务处理流程的主要内容,如果未能解决你的问题,请参考以下文章
VMware vCenter Server Appliance 的磁盘空间不足问题处理
ORA-00257 archiver error. 错误的处理方法