Archiver Appliance 事务处理流程

Posted 中兔西维亚

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Archiver Appliance 事务处理流程相关的知识,希望对你有一定的参考价值。

当 Archiver Appliance 开始运行后发生了什么:

mgmt服务:

config\\DefaultConfigService.java

initialize()   
	this.mgmtRuntime = new MgmtRuntimeState(this);
	
	
	startupExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() 
		@Override
		public Thread newThread(Runnable r) 
			Thread t = new Thread(r);
			t.setName("Startup executor");
			return t;
		
	);
	
	
	MgmtPostStartup mgmtPostStartup = new MgmtPostStartup(this);
	ScheduledFuture<?> postStartupFuture = startupExecutor.scheduleAtFixedRate(mgmtPostStartup, 10, 20, TimeUnit.SECONDS);
	mgmtPostStartup.setCancellingFuture(postStartupFuture);

创建 mgmt 状态机 —— MgmtRuntimeState;
定期执行 mgmt 后处理 —— MgmtPostStartup。

mgmt\\MgmtPostStartup.java

public MgmtPostStartup(ConfigService configService) 
	this.configService = configService;


	
@Override
public void run() 
	if(this.configService.isStartupComplete()) 
		this.checkIfAllComponentsHaveStartedUp();
		if(this.configService.getMgmtRuntimeState().haveChildComponentsStartedUp())  
			cancellingFuture.cancel(false);
		
	 else 
		configService.postStartup();
	



checkIfAllComponentsHaveStartedUp()  
	if(retrievalStartupState == ConfigService.STARTUP_SEQUENCE.STARTUP_COMPLETE)  
		configService.getMgmtRuntimeState().componentStartedUp(WAR_FILE.RETRIEVAL);
	
	... // etl 和 engine

定期执行的 mgmt 后处理包括:
若 configService 未完成,则执行 configService.postStartup() ;
若 configService 已完成,检查 mgmt、engine、etl、retrieval 四个服务是否都启动成功;
每个服务启动成功时,都会更新 mgmt 状态机中已启动服务列表;
若四个服务都成功启动,则开始存储 PV 值 —— startArchivePVRequests()。

config\\DefaultConfigService.java

/*
 * config\\DefaultConfigService.java
 */


postStartup() 
	Config config = new XmlConfigBuilder().build();
	...
	hzinstance = Hazelcast.newHazelcastInstance(config);  // 建立集群信息

	pubSub = hzinstance.getTopic("pubSub");

	initializePersistenceLayer();						// 初始化 mysql 数据库中存储的 PV 信息
	loadTypeInfosFromPersistence();		
	loadAliasesFromPersistence();
	loadArchiveRequestsFromPersistence();			
	loadExternalServersFromPersistence();			
	registerForNewExternalServers(hzinstance.getMap("channelArchiverDataServers"));
			
	eventBus.register(this);  							 // 事务通知
	pubSub.addMessageListener(new MessageListener<PubSubEvent>() 
			@Override
			public void onMessage(Message<PubSubEvent> pubSubEventMsg) 
				PubSubEvent pubSubEvent = pubSubEventMsg.getMessageObject();
				if(pubSubEvent.getDestination().equals("ALL") 
					|| (pubSubEvent.getDestination().startsWith(myIdentity) && pubSubEvent.getDestination().endsWith(DefaultConfigService.this.warFile.toString()))
					) 
						pubSubEvent.markSourceAsCluster();
						eventBus.post(pubSubEvent);	
								
	);
	
	this.startupState = STARTUP_SEQUENCE.STARTUP_COMPLETE;

将数据库中存储的 PV 信息加载到当前环境;
建立集群信息、事务通知机制:
集群中某个appliance发送消息后,先通过 PubSubEvent 推送到集群,该消息的目的appliance收到并发现是发给自己的某个服务的消息后,再通过 eventBus 推送到本地。

initializePersistenceLayer() throws ConfigException 
	persistanceLayer = new MySQLPersistence();			// 与 MySQL 交互的接口

mgmt\\MgmtRuntimeState.java

private static int threadNumber = 1;
ScheduledExecutorService archivePVWorkflow = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() 
	@Override
	public Thread newThread(Runnable r) 
		Thread t = new Thread(r);
		t.setName("MgmtArchivePVWorkflow" + threadNumber++);
		return t;
	
);


public MgmtRuntimeState(final ConfigService configService) 
	this.configService = configService;
	myIdentity = this.configService.getMyApplianceInfo().getIdentity();
	...
	configService.getEventBus().register(this);



componentStartedUp(WAR_FILE component)  
	componentsThatHaveCompletedStartup.add(component);
	if(this.haveChildComponentsStartedUp())  
		this.startArchivePVRequests();					// 若四个服务都成功启动,则开始存储 PV 值
	



private void startArchivePVRequests()  
	// 对数据库 ArchivePVRequests 表中的 PV 开始存储流程
	for(String pvNameFromPersistence : configService.getArchiveRequestsCurrentlyInWorkflow()) 
			this.startPVWorkflow(pvNameFromPersistence);
	
		
	startArchivePVWorkflow(initialDelayInSeconds);



// 单个 PV 的存储流程开始
startPVWorkflow()   
	ArchivePVState pvState = new ArchivePVState(pvName, configService);		// 创建单个 PV 的存储状态机
	currentPVRequests.put(pvName, pvState);



// 对 currentPVRequests 中最多前 archivePVWorkflowBatchSize 个 PV 定期执行 runWorkFlowForPV.nextStep();
private void startArchivePVWorkflow() 
	theArchiveWorkflow = archivePVWorkflow.scheduleAtFixedRate(new Runnable() 		
		@Override
		public void run() 
			LinkedList<ArchivePVState> archivePVStates = new LinkedList<ArchivePVState>(currentPVRequests.values());
			int totRequests = archivePVStates.size();
			int maxRequestsToProcess = Math.min(archivePVWorkflowBatchSize, totRequests);
			int pvCount = 0;
			while(pvCount < maxRequestsToProcess)  
				ArchivePVState runWorkFlowForPV = archivePVStates.pop();
				runWorkFlowForPV.nextStep();   // 推动Archive PV步骤进程
				pvCount++;
								
	, initialDelayInSeconds, archivePVWorkflowTickSeconds, TimeUnit.SECONDS);

在四个服务都成功启动、开始存储 PV 值后,执行:
对数据库 ArchivePVRequests 表中的 PV 开始存储流程;
定期对 currentPVRequests 中最多前 archivePVWorkflowBatchSize 个 PV 的存储状态机执行状态更新操作。

当在web端点击”Archive PV”后发生了什么:

mgmt\\BPLServlet.java:

doPost()   
	BasicDispatcher.dispatch(req, resp, configService, postActions);

common\\BasicDispatcher.java:

dispatch()    
	action = actionClass.getConstructor().newInstance();   // ArchivePVAction.class
	action.execute(req, resp, configService);

mgmt\\bpl\\ArchivePVAction.java

execute()    
	archivePV(out, pv, samplingPeriodSpecified, samplingMethod, samplingPeriod, controllingPV, policyName, null, skipCapacityPlanning, configService, fieldsAsPartOfStream);


archivePV()   
	configService.addToArchiveRequests(actualPVName, userSpecifiedSamplingParams);
	configService.getMgmtRuntimeState().startPVWorkflow(pvName);

config\\DefaultConfigService.java

archivePVRequests = hzinstance.getMap("archivePVRequests");

addToArchiveRequests()   
	archivePVRequests.put(pvName, userSpecifiedSamplingParams);
	persistanceLayer.putArchivePVRequest(pvName, userSpecifiedSamplingParams);

mgmt\\MgmtRuntimeState.java

startPVWorkflow()   
	ArchivePVState pvState = new ArchivePVState(pvName, configService);		// 创建单个 PV 的存储状态机
	currentPVRequests.put(pvName, pvState);

mgmt\\archivepv\\ArchivePVState.java

private ArchivePVStateMachine currentState = ArchivePVStateMachine.START;

public ArchivePVState(String pvName, ConfigService configService) 
	this.myIdentity = this.configService.getMyApplianceInfo().getIdentity();
	this.fieldsArchivedAsPartOfStream = configService.getFieldsArchivedAsPartOfStream();


public synchronized void nextStep() 
	switch(currentState) 
		case START: 												// [1]
			PubSubEvent pubSubEvent = new PubSubEvent("ComputeMetaInfo", myIdentity + "_" + ConfigService.WAR_FILE.ENGINE, pvName);
			...  // userSpec
			pubSubEvent.setEventData(encoder.encode(userSpec).toJSONString());
			configService.getEventBus().post(pubSubEvent);
			currentState = ArchivePVStateMachine.METAINFO_REQUESTED;
			return;	
		
		case METAINFO_REQUESTED: 									// [1]
			logger.debug("A request to gather metainfo has been published for the PV " + pvName);
			return;
		
		case METAINFO_GATHERING: 									// [4]
			logger.debug("Metainfo has been requested and is being gathered for " + pvName);
			return;
		
		case METAINFO_OBTAINED:   									// [7]
			PolicyConfig thePolicy = configService.computePolicyForPV(pvName, metaInfo, userSpec);
			...			// 设置该 PV 的存储策略
			PVTypeInfo typeInfo = new PVTypeInfo(pvName, metaInfo.getArchDBRTypes(), !metaInfo.isVector(), metaInfo.getCount());
			...			// 设置该 PV 的基本信息:metaInfo、thePolicy、CreationTime、aliasFieldName、applianceInfoForPV 等
			ApplianceInfo applianceInfoForPV = null;
			...			// 设置该 PV 所属的 appliance
			configService.registerPVToAppliance(pvName, applianceInfoForPV);		// pv2appliancemapping.put(pvName, applianceInfo);
			typeInfo.setApplianceIdentity(applianceIdentityAfterCapacityPlanning);
			configService.updateTypeInfoForPV(pvName, typeInfo);			// typeInfos.put(pvName, typeInfo);
			currentState = ArchivePVStateMachine.POLICY_COMPUTED;
			return;
		
		case POLICY_COMPUTED:  									// [8]
			PVTypeInfo typeInfo = configService.getTypeInfoForPV(pvName);
			if(typeInfo.getApplianceIdentity().equals(applianceIdentityAfterCapacityPlanning)) 
				currentState = ArchivePVStateMachine.TYPEINFO_STABLE;
			
			return;
		
		case TYPEINFO_STABLE:  									// [9]
			PVTypeInfo typeInfo = configService.getTypeInfoForPV(pvName);
			// 开始存储 PV
			ArchivePVState.startArchivingPV(pvName, configService, configService.getAppliance(typeInfo.getApplianceIdentity()));
			registerAliasesIfAny(typeInfo);
			currentState = ArchivePVStateMachine.ARCHIVE_REQUEST_SUBMITTED;
			return;
		
		case ARCHIVE_REQUEST_SUBMITTED: 							// [9]
			// The part in mgmt is basically over; we are waiting for the engine to indicate that it has commenced archiving
			// Until then, we stay in this state.
			return;
		case ARCHIVING: 											// [12]	
			configService.archiveRequestWorkflowCompleted(pvName);
			configService.getMgmtRuntimeState().finishedPVWorkflow(pvName);
			currentState = ArchivePVStateMachine.FINISHED;
			return;
		
		case ABORTED: 
			configService.archiveRequestWorkflowCompleted(pvName);
			configService.getMgmtRuntimeState().finishedPVWorkflow(pvName);
			currentState = ArchivePVStateMachine.FINISHED;
			return;
		
		case FINISHED: 											// [13]	
			logger.error("Archive state for PV " + this.pvName + " is finished.");
			return;
		
	

public void metaInfoRequestAcknowledged()  							// [4]
	metaInfoRequestedSubmitted = TimeUtils.now();
	this.currentState = ArchivePVStateMachine.METAINFO_GATHERING;


public void metaInfoObtained(MetaInfo metaInfo)   						// [7]
	this.metaInfo = metaInfo;
	this.currentState = ArchivePVStateMachine.METAINFO_OBTAINED;


public static void startArchivingPV() throws IOException 				// [9]
	PubSubEvent pubSubEvent = new PubSubEvent("StartArchivingPV", applianceInfoForPV.getIdentity()  + "_" + ConfigService.WAR_FILE.ENGINE, pvName);
	configService.getEventBus().post(pubSubEvent);


public void confirmedStartedArchivingPV() 								// [12]	
	this.currentState = ArchivePVStateMachine.ARCHIVING;

对单个 PV 的存储流程主要分两大部分:
创建 PV 通道,元数据信息获取;
开始执行数据存储。
TYPEINFO_STABLE 和 ARCHIVE_REQUEST_SUBMITTED 这两个状态,是这两大部分的分界点。

若 ”Archive PV” 不成功,或者成功后出现 PV unconnected 等错误情况,应首先定位问题属于这两大部分的哪一部分,再去具体定位问题。

关于 applianceInfoForPV 的设置过程,详见 Archiver Appliance 建立集群时可能出现的问题

engine\\pv\\EngineContext.java

@Subscribe public void computeMetaInfo(PubSubEvent pubSubEvent) 
	if(pubSubEvent.getType().equals("ComputeMetaInfo")) 				// [2]
		// 通过 MetaGet 创建 PV 通道,添加事件监听,获取 PV 元数据信息
		ArchiveEngine.getArchiveInfo(pvName, configService, extraFields, userSpec.isUsePVAccess(), new ArchivePVMetaCompletedListener(pvName, configService, myIdentity));
		// 对 mgmt 提出的 ComputeMetaInfo 操作进行应答	
		PubSubEvent confirmationEvent = new PubSubEvent("MetaInfoRequested", pubSubEvent.getSource() + "_" + ConfigService.WAR_FILE.MGMT, pvName);
		configService.getEventBus().post(confirmationEvent);
	 else if(pubSubEvent.getType().equals("StartArchivingPV")) 		// [10]
		String pvName = pubSubEvent.getPvName();
		this.startArchivingPV(pvName);				// 开始存储 PV
											
		PubSubEvent confirmationEvent = new PubSubEvent("StartedArchivingPV", pubSubEvent.getSource() + "_" + ConfigService.WAR_FILE.MGMT, pvName);
		configService.getEventBus().post(confirmationEvent);
		
	 else if(pubSubEvent.getType().equals("AbortComputeMetaInfo")) 
		String pvName =

pt-archiver 归档数据

pt-archiver 参数说明
pt-archiver是Percona-Toolkit工具集中的一个组件,是一个主要用于对MySQL表数据进行归档和清除工具。它可以将数据归档到另一张表或者是一个文件中。pt-archiver在清除表数据的过程中并不会影响OLTP事务的查询性能。对于数据的归档,它可以归档到另一台服务器上的另一张表,也可归档到一个文件中,文件可以用LOAD DATA INFILE进行数据装载,这个功能其实就类似是表历史数据的增量删除。

基本说明
pt-archiver [OPTIONS] --source DSN --where WHERE

常用选项(OPTIONS)
--analyze
指定工具完成数据归档后对表执行‘ANALYZE TABLE‘操作。指定方法如‘--analyze=ds‘,s代表源端表,d代表目标端表,也可以单独指定。

--ask-pass
命令行提示密码输入,保护密码安全,前提需安装模块perl-TermReadKey。

--buffer
指定缓冲区数据刷新到选项‘--file‘指定的文件并且在提交时刷新。
只有当事务提交时禁用自动刷新到‘--file‘指定的文件和刷新文件到磁盘,这意味着文件是被操作系统块进行刷新,因此在事务进行提交之前有一些数据隐式刷新到磁盘。默认是每一行操作后进行文件刷新到磁盘。

--bulk-delete
指定单个语句删除chunk的方式来批量删除行,会隐式执行选项‘--commit-each‘。
使用单个DELETE语句删除每个chunk对应的表行,通常的做法是通过主键进行逐行的删除,批量删除在速度上会有很大的提升,但如果有复杂的‘WHERE‘条件就可能会更慢。

--[no]bulk-delete-limit
默认值:yes
指定添加选项‘--bulk-delete‘和‘--limit‘到进行归档的语句中。

--bulk-insert
使用LOAD DATA LOCAL INFILE的方法,通过批量插入chunk的方式来插入行(隐式指定选项‘--bulk-delete‘和‘--commit-each‘)
而不是通过逐行单独插入的方式进行,它比单行执行INSERT语句插入的速度要快。通过隐式创建临时表来存储需要批量插入的行(chunk),而不是直接进行批量插入操作,当临时表中完成每个chunk之后再进行统一数据加载。为了保证数据的安全性,该选项会强制使用选项‘--bulk-delete‘,这样能够有效保证删除是在插入完全成功之后进行的。

--channel
指定当主从复制环境是多源复制时需要进行归档哪个主库的数据,适用于多源复制中多个主库对应一个从库的情形。

--charset,-A
指定连接字符集。

--[no]check-charset
默认值:yes
指定检查确保数据库连接时字符集和表字符集相同。

--[no]check-columns
默认值:yes
指定检查确保选项‘--source‘指定的源端表和‘--dest‘指定的目标表具有相同的字段。
不检查字段在表的排序和字段类型,只检查字段是否在源端表和目标表当中都存在,如果有不相同的字段差异,则工具报错退出。如果需要禁用该检查,则指定‘--no-check-columns‘。

--check-slave-lag
指定主从复制延迟大于选项‘--max-lag‘指定的值之后暂停归档操作。默认情况下,工具会检查所有的从库,但该选项只作用于指定的从库(通过DSN连接方式)。

--check-interval
默认值:1s
如果同时指定了选项‘--check-slave-lag‘,则该选项指定的时间为工具发现主从复制延迟时暂停的时间。每进行操作100行时进行一次检查。

--columns,-c
指定需要归档的表字段,如有多个则用‘,‘(逗号)隔开。

--commit-each
指定按每次获取和归档的行数进行提交,该选项会禁用选项‘--txn-size‘。
在每次获取表数据并进行归档之后,在获取下一次数据和选项‘--sleep‘指定的休眠时间之前,进行事务提交和刷新选项‘--file‘指定的文件,通过选项‘--limit‘控制事务的大小。

--host,-h
指定连接的数据库IP地址。

--port,-P
指定连接的数据库Port端口。

--user,-u
指定连接的数据库用户。

--password,-p
指定连接的数据库用户密码。

--socket,-S
指定使用SOCKET文件连接。

--databases,-d
指定连接的数据库

--source
指定需要进行归档操作的表,该选项是必须指定的选项,使用DSN方式表示。

--dest
指定要归档到的目标端表,使用DSN方式表示。
如果该选项没有指定的话,则默认与选项‘--source‘指定源端表为相同表。

--where
指定通过WHERE条件语句指定需要归档的数据,该选项是必须指定的选项。不需要加上‘WHERE‘关键字,如果确实不需要WHERE条件进行限制,则指定‘--where 1=1‘。

--file
指定表数据需要归档到的文件。使用类似MySQL DATE_FORMAT()格式化命名方式。
文件内容与MySQL中SELECT INTO OUTFILE语句使用相同的格式,文件命名选项如下所示:

%Y:年,4位数(Year, numeric, four digits)
%m:月,2位数(Month, numeric (01..12))
%d:日,2位数(Day of the month, numeric (01..31))
%H:小时(Hour (00..23))
%i:分钟(Minutes, numeric (00..59))
%s:秒(Seconds (00..59))
%D:数据库名(Database name)
%t:表名(Table name)

例如:--file ‘/var/log/archive/%Y-%m-%d-%D.%t‘

--output-format
指定选项‘--file‘文件内容输出的格式。
默认不指定该选项是以制表符进行字段的分隔符,如果指定该选项,则使用‘,‘(逗号)作为字段分隔符,使用‘"‘(双引号)将字段括起。用法示例:‘--output-format=dump‘。

--for-update
指定为每次归档执行的SELECT语句添加FOR UPDATE子句。

--share-lock
指定为每次归档执行的SELECT语句添加LOCK IN SHARE MODE子句。

--header
指定在文件中第一行写入字段名称作为标题。

--ignore
指定为INSERT语句添加IGNORE选项。

--limit
默认值:1
指定每条语句获取表和归档表的行数。

--local
指定不将OPTIMIZE和ANALYZE语句写入binlog。

--max-lag
默认值:1s
指定允许主从复制延迟时长的最大值,单位秒。如果在每次获取行数据之后主从延迟超过指定的值,则归档操作将暂停执行,暂停休眠时间为选项‘--check-interval‘指定的值。待休眠时间结束之后再次检查主从延迟时长,检查方法是通过从库查询的‘Seconds_Behind_Master‘值来确定。如果主从复制延迟一直大于该参数指定值或者从库停止复制,则操作将一直等待直到从库重新启动并且延迟小于该参数指定值。

--no-delete
指定不删除已被归档的表数据。

--optimize
指定工具完成数据归档后对表执行‘OPTIMIZE TABLE‘操作。指定方法如‘--analyze=ds‘,s代表源端表,d代表目标端表,也可以单独指定。

--primary-key-only
指定只归档主键字段,是选项‘--columns=主键‘的简写。
如果工具归档的操作是进行DELETE清除时最有效,因为只需读取主键一个字段而无需读取行所有字段。

--progress
指定每多少行打印进度信息,打印当前时间,已用时间以及多少行进行归档。

--purge
指定执行的清除操作而不是归档操作。允许忽略选项‘--dest‘和‘--file‘进行操作,如果只是清除操作可以结合选项‘--primary-key-only‘会更高效。

--quiet,-q
指定工具静默执行,不输出任何的执行信息。

--replace
指定写入选项‘--dest‘指定目标端表时改写INSERT语句为REPLACE语句。

--retries
默认值:1
指定归档操作遇到死锁或超时的重试次数。当重试次数超过该选项指定的值时,工具将报错退出。

--run-time
指定工具归档操作在退出之前需要运行的时间。允许的时间后缀名为s=秒,m=分,h=小时,d=天,如果没指定,默认为s。

--[no]safe-auto-increment
默认值:yes
指定不使用自增列(AUTO_INCREMENT)最大值对应的行进行归档。
该选项在进行归档清除时会额外添加一条WHERE子句以防止工具删除单列升序字段具有的具有AUTO_INCREMENT属性最大值的数据行,为了在数据库重启之后还能使用到AUTO_INCREMENT对应的值,但这会引起无法归档或清除字段对应最大值的行。

--set-vars
默认:
wait_timeout=10000
innodb_lock_wait_timeout=1
lock_wait_timeout=60
工具归档时指定参数值,如有多个用‘,‘(逗号)分隔。如‘--set-vars=wait_timeout=5000‘。

--skip-foreign-key-checks
指定使用语句SET FOREIGN_KEY_CHECKS = 0禁用外键检查。

--sleep
指定工具在通过SELECT语句获取归档数据需要休眠的时间,默认值是不进行休眠。在休眠之前事务并不会提交,并且选项‘--file‘指定的文件不会被刷新。如果指定选项‘--commit-each‘,则在休眠之前会进行事务提交和文件刷新。

--statistics
指定工具收集并打印操作的时间统计信息。

--txn-size
默认:1
指定每个事务处理的行数。如果是0则禁用事务功能。

--version
显示工具的版本并退出。

--[no]version-check
默认值:yes
检查Percona Toolkit、MySQL和其他程序的最新版本。

--why-quit
指定工具打印当非因完成归档行数退出的原因。
在执行一个自动归档任务时该选项与选项‘--run-time‘一起使用非常方便,这样可以确定归档任务是否在指定的时间内完成。如果同时指定了选项‘--statistics‘,则会打印所有退出的原因。


DSN选项(DSN)
可以使用DSN方式来连接数据库,DSN选项为key=value方式,在等号的两侧不能有空格出现,并且区分大小写,多个选项之前以‘,‘(逗号)隔开,主要选项如下:

a
归档操作是在哪个库下进行的,相当于USE操作。
A
指定默认字符集。
b
当值为true时,禁止SQL_LOG_BIN,相当于SQL_LOG_BIN = 0。
D
指定包含需要归档表的数据库。
h
指定连接的主机。
u
指定连接的用户。
p
指定连接需要的密码。
P
指定连接的端口。
S
指定连接的SOCKET文件。
t
指定需要归档的表。
i
指定需要使用的索引。


选项用法说明
工具至少需指定选项--dest、--file或--purge其中之一;
选项--ignore和--replace是互斥的;
选项--txn-size和--commit-each是互斥的;
选项--share-lock和--for-update是互斥的;
--analyze和--optimize是互斥的;

本次测试是基于employees表以及新建的yoon表
CREATE TABLE `yoon` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`v_int` int(11) DEFAULT NULL,
`v_string` varchar(50) DEFAULT NULL,
`s_string` char(20) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=80001 DEFAULT CHARSET=utf8;

创建存储过程i_yoon插入测试数据
delimiter $$
CREATE PROCEDURE i_yoon (IN row_num INT)
BEGIN

DECLARE i INT DEFAULT 0 ;
WHILE i < row_num DO
INSERT INTO yoon (v_int, v_string, s_string)
VALUES
(
floor(1 + rand() * 1000000),
substring(
MD5(RAND()),
1,
floor(1 + rand() * 20)
),
substring(MD5(RAND()), 1, 20)
) ;
SET i = i + 1 ;
END
WHILE ; END$$

delimiter ;

call i_yoon(200000);

在目标端创建测试库hank,表结构yoon:
CREATE TABLE `yoon` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`v_int` int(11) DEFAULT NULL,
`v_string` varchar(50) DEFAULT NULL,
`s_string` char(20) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=80001 DEFAULT CHARSET=utf8;

执行归档,不删除源端数据:--source 源端,--dest:目标端,--no-delete:不删除源端数据 
pt-archiver --source h=127.0.0.1,P=3306,u=admin,p=‘admin‘,D=yoon,t=yoon,A=utf8 --dest h=127.0.0.1,P=3307,u=admin,p=‘admin‘,D=hank,t=yoon,A=utf8 --charset=utf8 --where ‘id <= 20000‘ --progress 50 --txn-size=1000 --statistics --no-delete --ask-pass --limit=200 --sleep=1 --dry-run

执行归档,删除源端数据:--source 源端,--dest:目标端
pt-archiver --source h=127.0.0.1,P=3306,u=admin,p=‘admin‘,D=yoon,t=yoon,A=utf8 --dest h=127.0.0.1,P=3307,u=admin,p=‘admin‘,D=hank,t=yoon,A=utf8 --charset=utf8 --where ‘id <= 20000‘ --progress 50 --txn-size=1000 --statistics --ask-pass --limit=200 --sleep=1 --dry-run

表归档到表(批量进行)
批量进行归档涉及的选项是--limit,批量进行插入涉及的选项为--bulk-insert,指定选项--bulk-insert同时也会指定选项--bulk-delete,如果不删除已归档数据,则需要指定选项--no-delete
pt-archiver --source h=127.0.0.1,P=3306,u=admin,p=‘admin‘,D=yoon,t=yoon,A=utf8 --dest h=127.0.0.1,P=3308,u=admin,p=‘admin‘,D=hank,t=yoon,A=utf8 --charset=utf8 --where "id <= 20000" --progress=50 --txn-size=1000 --limit=50 --statistics --no-delete --bulk-insert --ask-pass --dry-run

表归档到文件
表归档到文件将选项--dest换成--file,并且根据需要添加选项--output-format
pt-archiver --source h=127.0.0.1,P=3306,u=admin,p=‘admin‘,D=employees,t=employees,A=utf8 --file=‘/tmp/yoon_%Y-%m-%d.sql‘ --charset=utf8 --output-format=‘dump‘ --where "id <= 20000" --progress=50 --txn-size=1000 --limit=50 --statistics --no-delete --ask-pass --dry-run

表清除数据:--txn-size 表示200行提交一次失误,若最后删除的数据低于200行报错,改--txn-size值
如果只是进行表数据清除操作而不做归档操作,则可以忽略选项--dest或--file,通过指定选项--purge,可以先使用选项--dry-run打印查询需要清除数据的执行语句,做好确认之后再执行。
pt-archiver --source h=localhost,P=3306,u=root,p=‘Asd.123@#‘,D=yoon,t=yoon --purge --charset=utf8 --where "id <= 400000" --progress=200 --limit=200 --sleep=1 --txn-size=200 --statistics --dry-run

 

以上是关于Archiver Appliance 事务处理流程的主要内容,如果未能解决你的问题,请参考以下文章

pt-archiver 归档数据

VMware vCenter Server Appliance 的磁盘空间不足问题处理

ORA-00257 archiver error. 错误的处理方法

ORA-00257:archiver error. 错误的处理方法

Spring事务处理流程和原理

基本的ado.net本地事务处理流程