Android P update_engine分析 --update_attempter_android 升级与ActionProcessor工作
Posted Give.Me.Five
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Android P update_engine分析 --update_attempter_android 升级与ActionProcessor工作相关的知识,希望对你有一定的参考价值。
上篇文章分析了UpdateEngine服务的启动,初始化了四种Action(InstallPlanAction, DownloadAction, FileSystemVerifierAction, PostinstallRunnerAction), 然后将四种Action添加到列表中,最后将Actions列表加入到ActionProcessor的工作任务中去。
ActionProcessor的启动
我们从ActionProcessor如何启动的开始分析。在上篇分析的UpdateAttermpterandroid的ApplyPayload中,在调用了BuildUpdateAction(payload_url)之后,有调用 UpdateBootFlags(), 这个就是启动ActionProcessor的关键。
void UpdateAttempterAndroid::UpdateBootFlags() {
if (updated_boot_flags_) {
LOG(INFO) << "Already updated boot flags. Skipping.";
CompleteUpdateBootFlags(true);
return;
}
// This is purely best effort.
LOG(INFO) << "Marking booted slot as good.";
//设置当前Slot的flag为 bootSucessful, 同时调用 CompleteUpdateBootFlags
if (!boot_control_->MarkBootSuccessfulAsync(
Bind(&UpdateAttempterAndroid::CompleteUpdateBootFlags,
base::Unretained(this)))) {
LOG(ERROR) << "Failed to mark current boot as successful.";
CompleteUpdateBootFlags(false);
}
}
void UpdateAttempterAndroid::CompleteUpdateBootFlags(bool successful) {
updated_boot_flags_ = true;
//调度启动ActionProcessor
ScheduleProcessingStart();
}
void UpdateAttempterAndroid::ScheduleProcessingStart() {
LOG(INFO) << "Scheduling an action processor start.";
//启动工作任务 {processor->StartProcessing()}
brillo::MessageLoop::current()->PostTask(
FROM_HERE,
Bind([](ActionProcessor* processor) { processor->StartProcessing(); },
base::Unretained(processor_.get())));
}
void ActionProcessor::StartProcessing() {
CHECK(!IsRunning());
if (!actions_.empty()) {
//获取第一个action
current_action_ = actions_.front();
LOG(INFO) << "ActionProcessor: starting " << current_action_->Type();
actions_.pop_front();
//启动第一个action, 调用PerformAction
current_action_->PerformAction();
}
}
从上面的注释中可以看到,在之前将各种Action 加入到Actions列表中后,然后交Actions列表中的action再遍历加到的ActionProcessor之后,就调用UpdateBootFlags来启动ActionProcessor了, 通过PostTask的方式来启动。启动之后调用第一个Action的performAction() 开始工作干活。
InstallPlanAction的工作
上面ActionProcessor的启动中分析到了StartProcessing()中,会启动actions列中的第一个acion。然后执行它的PerformAction. 之前我们有分析到 在UpdateAttermpterAndroid::BuildUpdateActions 中 依次将install_plan_action, download_action, filesystem_verifier_action, postinstall_runner_action 添加到了ActionProcessor中。这些Action都是从AbstractAction类上继承而来的。上节看到的调用第一个Action的PerformAction(), 第一个Action就是 InstallPlanAction,来看看InstallPlanAction的PerformAction():
void PerformAction() override {
//如果有output管理,设置为当前的install_plan, installPlan 当前没有outputPipe
if (HasOutputPipe()) {
SetOutputObject(install_plan_);
}
//执行进程的ActionComplete
processor_->ActionComplete(this, ErrorCode::kSuccess);
}
void ActionProcessor::ActionComplete(AbstractAction* actionptr,
ErrorCode code) {
CHECK_EQ(actionptr, current_action_);
//如果有委托,执行委托的ActionCompleted,当前Processor的委托项为UpdateAttermpterAndroid
if (delegate_)
delegate_->ActionCompleted(this, actionptr, code);
//做一些任务销毁的善后事情
string old_type = current_action_->Type();
current_action_->ActionCompleted(code);
current_action_->SetProcessor(nullptr);
current_action_ = nullptr;
LOG(INFO) << "ActionProcessor: finished "
<< (actions_.empty() ? "last action " : "") << old_type
<< (suspended_ ? " while suspended" : "")
<< " with code " << utils::ErrorCodeToString(code);
if (!actions_.empty() && code != ErrorCode::kSuccess) {
LOG(INFO) << "ActionProcessor: Aborting processing due to failure.";
actions_.clear();
}
if (suspended_) {
// If an action finished while suspended we don't start the next action (or
// terminate the processing) until the processor is resumed. This condition
// will be flagged by a nullptr current_action_ while suspended_ is true.
suspended_error_code_ = code;
return;
}
//执行下一个任务
StartNextActionOrFinish(code);
}
void ActionProcessor::StartNextActionOrFinish(ErrorCode code) {
if (actions_.empty()) {
//任务列表空了之后,当前Processor的委托项为UpdateAttermpterAndroid,执行其ProcessingDone
if (delegate_) {
delegate_->ProcessingDone(this, code);
}
return;
}
//执行下一个Action行动
current_action_ = actions_.front();
actions_.pop_front();
LOG(INFO) << "ActionProcessor: starting " << current_action_->Type();
current_action_->PerformAction();
}
分析完上面的代码,在InstallPlanAction中,没有具体做任何事情,做一个ActionProcessor的ActionComplete,然后就开始下一个Action行为了。当然在ActionProcessor的ActionComplete中,UpdateAttermpterAndroid 作为其委托项,需要执行其ActionComplete,这里面针对DownloadAction和 PostInstallRunnerAction做一些特定的操作和保存其状态。
到这里InstallPlanAction就做完了,下一个Action行为就是DownloadAction行为。
DownloadAction 的工作
从上面就可以看到installPlanAction 执行完了就开始执行DownloadAction, 现在来看看DownloadAction的PerformAction():
void DownloadAction::PerformAction() {
http_fetcher_->set_delegate(this);
//获取input pipe对象,也就通过BondActions绑定的instll_plan.
CHECK(HasInputObject());
install_plan_ = GetInputObject();
install_plan_.Dump();
bytes_received_ = 0;
bytes_received_previous_payloads_ = 0;
bytes_total_ = 0;
for (const auto& payload : install_plan_.payloads)
bytes_total_ += payload.size;
if (install_plan_.is_resume) {
int64_t payload_index = 0;
//如果是之前有下载过一部分的,目前就是继续下载。
if (prefs_->GetInt64(kPrefsUpdateStatePayloadIndex, &payload_index) &&
static_cast<size_t>(payload_index) < install_plan_.payloads.size()) {
// Save the index for the resume payload before downloading any previous
// payload, otherwise it will be overwritten.
resume_payload_index_ = payload_index;
for (int i = 0; i < payload_index; i++)
install_plan_.payloads[i].already_applied = true;
}
}
// TODO(senj): check that install plan has at least one payload.
if (!payload_)
payload_ = &install_plan_.payloads[0];
//先把目标对象Slot设置为不可启动
LOG(INFO) << "Marking new slot as unbootable";
if (!boot_control_->MarkSlotUnbootable(install_plan_.target_slot)) {
LOG(WARNING) << "Unable to mark new slot "
<< BootControlInterface::SlotName(install_plan_.target_slot)
<< ". Proceeding with the update anyway.";
}
//开始下载
StartDownloading();
}
void DownloadAction::StartDownloading() {
download_active_ = true;
http_fetcher_->ClearRanges();
//获取下载的offset, 如果是继续下载,就从上次下载的位置处下载,
//如果是新的下载就重新下载
if (install_plan_.is_resume &&
payload_ == &install_plan_.payloads[resume_payload_index_]) {
// Resuming an update so fetch the update manifest metadata first.
int64_t manifest_metadata_size = 0;
int64_t manifest_signature_size = 0;
prefs_->GetInt64(kPrefsManifestMetadataSize, &manifest_metadata_size);
prefs_->GetInt64(kPrefsManifestSignatureSize, &manifest_signature_size);
http_fetcher_->AddRange(base_offset_,
manifest_metadata_size + manifest_signature_size);
int64_t next_data_offset = 0;
prefs_->GetInt64(kPrefsUpdateStateNextDataOffset, &next_data_offset);
uint64_t resume_offset =
manifest_metadata_size + manifest_signature_size + next_data_offset;
if (!payload_->size) {
http_fetcher_->AddRange(base_offset_ + resume_offset);
} else if (resume_offset < payload_->size) {
http_fetcher_->AddRange(base_offset_ + resume_offset,
payload_->size - resume_offset);
}
} else {
if (payload_->size) {
http_fetcher_->AddRange(base_offset_, payload_->size);
} else {
// If no payload size is passed we assume we read until the end of the
// stream.
http_fetcher_->AddRange(base_offset_);
}
}
//初始化writer_
if (writer_ && writer_ != delta_performer_.get()) {
LOG(INFO) << "Using writer for test.";
} else {
delta_performer_.reset(new DeltaPerformer(prefs_,
boot_control_,
hardware_,
delegate_,
&install_plan_,
payload_,
is_interactive_));
writer_ = delta_performer_.get();
}
if (system_state_ != nullptr) {
const PayloadStateInterface* payload_state = system_state_->payload_state();
string file_id = utils::CalculateP2PFileId(payload_->hash, payload_->size);
if (payload_state->GetUsingP2PForSharing()) {
// If we're sharing the update, store the file_id to convey
// that we should write to the file.
p2p_file_id_ = file_id;
LOG(INFO) << "p2p file id: " << p2p_file_id_;
} else {
// Even if we're not sharing the update, it could be that
// there's a partial file from a previous attempt with the same
// hash. If this is the case, we NEED to clean it up otherwise
// we're essentially timing out other peers downloading from us
// (since we're never going to complete the file).
FilePath path = system_state_->p2p_manager()->FileGetPath(file_id);
if (!path.empty()) {
if (unlink(path.value().c_str()) != 0) {
PLOG(ERROR) << "Error deleting p2p file " << path.value();
} else {
LOG(INFO) << "Deleting partial p2p file " << path.value()
<< " since we're not using p2p to share.";
}
}
}
// 设置http_fetcher的相关参数
if (payload_state->GetUsingP2PForDownloading() &&
payload_state->GetP2PUrl() == install_plan_.download_url) {
LOG(INFO) << "Tweaking HTTP fetcher since we're downloading via p2p";
http_fetcher_->set_low_speed_limit(kDownloadP2PLowSpeedLimitBps,
kDownloadP2PLowSpeedTimeSeconds);
http_fetcher_->set_max_retry_count(kDownloadP2PMaxRetryCount);
http_fetcher_->set_connect_timeout(kDownloadP2PConnectTimeoutSeconds);
}
}
//开始下载
http_fetcher_->BeginTransfer(install_plan_.download_url);
}
上面主要是开始下载的工作,具体的http_fetcher_如何下载的,不是我们这篇的重点,我们后面有时间找一篇专门来分析。这个Action走完之后,就开始FilesystemVerifierAction的工作
FilesystemVerifierAction的工作
那直接分析FilesystemVerifierAction的PerformAction():
void FilesystemVerifierAction::PerformAction() {
// Will tell the ActionProcessor we've failed if we return.
ScopedActionCompleter abort_action_completer(processor_, this);
//判断前端输入端
if (!HasInputObject()) {
LOG(ERROR) << "FilesystemVerifierAction missing input object.";
return;
}
install_plan_ = GetInputObject();
if (install_plan_.partitions.empty()) {
LOG(INFO) << "No partitions to verify.";
if (HasOutputPipe())
SetOutputObject(install_plan_);
abort_action_completer.set_code(ErrorCode::kSuccess);
return;
}
//开始分区的hashing
StartPartitionHashing();
abort_action_completer.set_should_complete(false);
}
void FilesystemVerifierAction::StartPartitionHashing() {
//如果partition_index_ 到了最后一个,相当于所有分区都遍历了一次,hash结束
if (partition_index_ == install_plan_.partitions.size()) {
Cleanup(ErrorCode::kSuccess);
return;
}
//获取当前分区信息
InstallPlan::Partition& partition =
install_plan_.partitions[partition_index_];
//判断verify的阶段,刚开始是kVerifyTargetHash
string part_path;
switch (verifier_step_) {
case VerifierStep::kVerifySourceHash:
part_path = partition.source_path;
remaining_size_ = partition.source_size;
break;
case VerifierStep::kVerifyTargetHash:
part_path = partition.target_path;
remaining_size_ = partition.target_size;
break;
}
LOG(INFO) << "Hashing partition " << partition_index_ << " ("
<< partition.name << ") on device " << part_path;
if (part_path.empty())
return Cleanup(ErrorCode::kFilesystemVerifierError);
//打开分区路径,然后准备开始读
brillo::ErrorPtr error;
src_stream_ = brillo::FileStream::Open(
base::FilePath(part_path),
brillo::Stream::AccessMode::READ,
brillo::FileStream::Disposition::OPEN_EXISTING,
&error);
if (!src_stream_) {
LOG(ERROR) << "Unable to open " << part_path << " for reading";
return Cleanup(ErrorCode::kFilesystemVerifierError);
}
//设置读取到buffer的地方和初始化hasher_
buffer_.resize(kReadFileBufferSize);
read_done_ = false;
hasher_.reset(new HashCalculator());
// 开始第一次的读取
ScheduleRead();
}
void FilesystemVerifierAction::ScheduleRead() {
size_t bytes_to_read = std::min(static_cast<int64_t>(buffer_.size()),
remaining_size_);
if (!bytes_to_read) {
OnReadDoneCallback(0);
return;
}
//开始读取数据,每次读bytes_to_read字节,然后保存在buffer_中,读取成功后,就会回调OnReadDoneCallback,读取失败就会调用OnReadErrorCallback,
bool read_async_ok = src_stream_->ReadAsync(
buffer_.data(),
bytes_to_read,
base::Bind(&FilesystemVerifierAction::OnReadDoneCallback,
base::Unretained(this)),
base::Bind(&FilesystemVerifierAction::OnReadErrorCallback,
base::Unretained(this)),
nullptr);
if (!read_async_ok) {
LOG(ERROR) << "Unable to schedule an asynchronous read from the stream.";
Cleanup(ErrorCode::kError);
}
}
在上面的startPartitionHashing中可以看到,有分一个验证阶段,第一个是kVerifySourceHash, 第二个是kVerifyTargetHash,默认的是kVerifyTargetHash,也就是从kVerifyTargetHash阶段开始,这个验证完了之后,再来验证kVerifySourceHash。打开了对应的分区路径之后,就开始了读取的过程,读取时又设置了两个回调,读取完成时回调OnReadDoneCallback, 读取出错时回调OnReadErrorCallback。
那我们继续来看OnReadDoneCallback。
void FilesystemVerifierAction::OnReadDoneCallback(size_t bytes_read) {
//判断是否已经读取完成
if (bytes_read == 0) {
//设置读取完成状态
read_done_ = true;
} else {
<以上是关于Android P update_engine分析 --update_attempter_android 升级与ActionProcessor工作的主要内容,如果未能解决你的问题,请参考以下文章
Android P update_engine分析--升级核心DeltaPerformer的分析
Android P update_engine分析--升级核心DeltaPerformer的分析
Android P update_engine分析--升级核心DeltaPerformer的分析
Android P update_engine分析--升级核心DeltaPerformer的分析