Android P update_engine分析 --update_attempter_android 升级与ActionProcessor工作

Posted Give.Me.Five

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Android P update_engine分析 --update_attempter_android 升级与ActionProcessor工作相关的知识,希望对你有一定的参考价值。

上篇文章分析了UpdateEngine服务的启动,初始化了四种Action(InstallPlanAction, DownloadAction, FileSystemVerifierAction, PostinstallRunnerAction), 然后将四种Action添加到列表中,最后将Actions列表加入到ActionProcessor的工作任务中去。

ActionProcessor的启动

我们从ActionProcessor如何启动的开始分析。在上篇分析的UpdateAttermpterandroid的ApplyPayload中,在调用了BuildUpdateAction(payload_url)之后,有调用 UpdateBootFlags(), 这个就是启动ActionProcessor的关键。

void UpdateAttempterAndroid::UpdateBootFlags() {
  if (updated_boot_flags_) {
    LOG(INFO) << "Already updated boot flags. Skipping.";
    CompleteUpdateBootFlags(true);
    return;
  }
  // This is purely best effort.
  LOG(INFO) << "Marking booted slot as good.";
  //设置当前Slot的flag为 bootSucessful, 同时调用 CompleteUpdateBootFlags
  if (!boot_control_->MarkBootSuccessfulAsync(
          Bind(&UpdateAttempterAndroid::CompleteUpdateBootFlags,
               base::Unretained(this)))) {
    LOG(ERROR) << "Failed to mark current boot as successful.";
    CompleteUpdateBootFlags(false);
  }
}
void UpdateAttempterAndroid::CompleteUpdateBootFlags(bool successful) {
  updated_boot_flags_ = true;
  //调度启动ActionProcessor
  ScheduleProcessingStart();
}
void UpdateAttempterAndroid::ScheduleProcessingStart() {
  LOG(INFO) << "Scheduling an action processor start.";
  //启动工作任务 {processor->StartProcessing()}
  brillo::MessageLoop::current()->PostTask(
      FROM_HERE,
      Bind([](ActionProcessor* processor) { processor->StartProcessing(); },
           base::Unretained(processor_.get())));
}
void ActionProcessor::StartProcessing() {
  CHECK(!IsRunning());
  if (!actions_.empty()) {
  	//获取第一个action
    current_action_ = actions_.front();
    LOG(INFO) << "ActionProcessor: starting " << current_action_->Type();
    actions_.pop_front();
    //启动第一个action, 调用PerformAction
    current_action_->PerformAction();
  }
}

从上面的注释中可以看到,在之前将各种Action 加入到Actions列表中后,然后交Actions列表中的action再遍历加到的ActionProcessor之后,就调用UpdateBootFlags来启动ActionProcessor了, 通过PostTask的方式来启动。启动之后调用第一个Action的performAction() 开始工作干活。

InstallPlanAction的工作

上面ActionProcessor的启动中分析到了StartProcessing()中,会启动actions列中的第一个acion。然后执行它的PerformAction. 之前我们有分析到 在UpdateAttermpterAndroid::BuildUpdateActions 中 依次将install_plan_action, download_action, filesystem_verifier_action, postinstall_runner_action 添加到了ActionProcessor中。这些Action都是从AbstractAction类上继承而来的。上节看到的调用第一个Action的PerformAction(), 第一个Action就是 InstallPlanAction,来看看InstallPlanAction的PerformAction():

  void PerformAction() override {
  //如果有output管理,设置为当前的install_plan, installPlan 当前没有outputPipe
    if (HasOutputPipe()) {
      SetOutputObject(install_plan_);
    }
    //执行进程的ActionComplete
    processor_->ActionComplete(this, ErrorCode::kSuccess);
  }
  void ActionProcessor::ActionComplete(AbstractAction* actionptr,
                                     ErrorCode code) {
  CHECK_EQ(actionptr, current_action_);
  //如果有委托,执行委托的ActionCompleted,当前Processor的委托项为UpdateAttermpterAndroid
  if (delegate_)
    delegate_->ActionCompleted(this, actionptr, code);
  //做一些任务销毁的善后事情
  string old_type = current_action_->Type();
  current_action_->ActionCompleted(code);
  current_action_->SetProcessor(nullptr);
  current_action_ = nullptr;
  LOG(INFO) << "ActionProcessor: finished "
            << (actions_.empty() ? "last action " : "") << old_type
            << (suspended_ ? " while suspended" : "")
            << " with code " << utils::ErrorCodeToString(code);
  if (!actions_.empty() && code != ErrorCode::kSuccess) {
    LOG(INFO) << "ActionProcessor: Aborting processing due to failure.";
    actions_.clear();
  }
  if (suspended_) {
    // If an action finished while suspended we don't start the next action (or
    // terminate the processing) until the processor is resumed. This condition
    // will be flagged by a nullptr current_action_ while suspended_ is true.
    suspended_error_code_ = code;
    return;
  }
  //执行下一个任务
  StartNextActionOrFinish(code);
}

void ActionProcessor::StartNextActionOrFinish(ErrorCode code) {
  if (actions_.empty()) {
  //任务列表空了之后,当前Processor的委托项为UpdateAttermpterAndroid,执行其ProcessingDone
    if (delegate_) {
      delegate_->ProcessingDone(this, code);
    }
    return;
  }
  //执行下一个Action行动
  current_action_ = actions_.front();
  actions_.pop_front();
  LOG(INFO) << "ActionProcessor: starting " << current_action_->Type();
  current_action_->PerformAction();
}

分析完上面的代码,在InstallPlanAction中,没有具体做任何事情,做一个ActionProcessor的ActionComplete,然后就开始下一个Action行为了。当然在ActionProcessor的ActionComplete中,UpdateAttermpterAndroid 作为其委托项,需要执行其ActionComplete,这里面针对DownloadAction和 PostInstallRunnerAction做一些特定的操作和保存其状态。

到这里InstallPlanAction就做完了,下一个Action行为就是DownloadAction行为。

DownloadAction 的工作

从上面就可以看到installPlanAction 执行完了就开始执行DownloadAction, 现在来看看DownloadAction的PerformAction():


void DownloadAction::PerformAction() {
  http_fetcher_->set_delegate(this);

  //获取input pipe对象,也就通过BondActions绑定的instll_plan.
  CHECK(HasInputObject());
  install_plan_ = GetInputObject();
  install_plan_.Dump();

  bytes_received_ = 0;
  bytes_received_previous_payloads_ = 0;
  bytes_total_ = 0;
  for (const auto& payload : install_plan_.payloads)
    bytes_total_ += payload.size;

  if (install_plan_.is_resume) {
    int64_t payload_index = 0;
    //如果是之前有下载过一部分的,目前就是继续下载。
    if (prefs_->GetInt64(kPrefsUpdateStatePayloadIndex, &payload_index) &&
        static_cast<size_t>(payload_index) < install_plan_.payloads.size()) {
      // Save the index for the resume payload before downloading any previous
      // payload, otherwise it will be overwritten.
      resume_payload_index_ = payload_index;
      for (int i = 0; i < payload_index; i++)
        install_plan_.payloads[i].already_applied = true;
    }
  }
  // TODO(senj): check that install plan has at least one payload.
  if (!payload_)
    payload_ = &install_plan_.payloads[0];

//先把目标对象Slot设置为不可启动
  LOG(INFO) << "Marking new slot as unbootable";
  if (!boot_control_->MarkSlotUnbootable(install_plan_.target_slot)) {
    LOG(WARNING) << "Unable to mark new slot "
                 << BootControlInterface::SlotName(install_plan_.target_slot)
                 << ". Proceeding with the update anyway.";
  }
 //开始下载
  StartDownloading();
}

void DownloadAction::StartDownloading() {
  download_active_ = true;
  http_fetcher_->ClearRanges();
  //获取下载的offset, 如果是继续下载,就从上次下载的位置处下载,
  //如果是新的下载就重新下载
  if (install_plan_.is_resume &&
      payload_ == &install_plan_.payloads[resume_payload_index_]) {
    // Resuming an update so fetch the update manifest metadata first.
    int64_t manifest_metadata_size = 0;
    int64_t manifest_signature_size = 0;
    prefs_->GetInt64(kPrefsManifestMetadataSize, &manifest_metadata_size);
    prefs_->GetInt64(kPrefsManifestSignatureSize, &manifest_signature_size);
    http_fetcher_->AddRange(base_offset_,
                            manifest_metadata_size + manifest_signature_size);
    int64_t next_data_offset = 0;
    prefs_->GetInt64(kPrefsUpdateStateNextDataOffset, &next_data_offset);
    uint64_t resume_offset =
        manifest_metadata_size + manifest_signature_size + next_data_offset;
    if (!payload_->size) {
      http_fetcher_->AddRange(base_offset_ + resume_offset);
    } else if (resume_offset < payload_->size) {
      http_fetcher_->AddRange(base_offset_ + resume_offset,
                              payload_->size - resume_offset);
    }
  } else {
    if (payload_->size) {
      http_fetcher_->AddRange(base_offset_, payload_->size);
    } else {
      // If no payload size is passed we assume we read until the end of the
      // stream.
      http_fetcher_->AddRange(base_offset_);
    }
  }
  //初始化writer_
  if (writer_ && writer_ != delta_performer_.get()) {
    LOG(INFO) << "Using writer for test.";
  } else {
    delta_performer_.reset(new DeltaPerformer(prefs_,
                                              boot_control_,
                                              hardware_,
                                              delegate_,
                                              &install_plan_,
                                              payload_,
                                              is_interactive_));
    writer_ = delta_performer_.get();
  }
  if (system_state_ != nullptr) {
    const PayloadStateInterface* payload_state = system_state_->payload_state();
    string file_id = utils::CalculateP2PFileId(payload_->hash, payload_->size);
    if (payload_state->GetUsingP2PForSharing()) {
      // If we're sharing the update, store the file_id to convey
      // that we should write to the file.
      p2p_file_id_ = file_id;
      LOG(INFO) << "p2p file id: " << p2p_file_id_;
    } else {
      // Even if we're not sharing the update, it could be that
      // there's a partial file from a previous attempt with the same
      // hash. If this is the case, we NEED to clean it up otherwise
      // we're essentially timing out other peers downloading from us
      // (since we're never going to complete the file).
      FilePath path = system_state_->p2p_manager()->FileGetPath(file_id);
      if (!path.empty()) {
        if (unlink(path.value().c_str()) != 0) {
          PLOG(ERROR) << "Error deleting p2p file " << path.value();
        } else {
          LOG(INFO) << "Deleting partial p2p file " << path.value()
                    << " since we're not using p2p to share.";
        }
      }
    }

    // 设置http_fetcher的相关参数
    if (payload_state->GetUsingP2PForDownloading() &&
        payload_state->GetP2PUrl() == install_plan_.download_url) {
      LOG(INFO) << "Tweaking HTTP fetcher since we're downloading via p2p";
      http_fetcher_->set_low_speed_limit(kDownloadP2PLowSpeedLimitBps,
                                         kDownloadP2PLowSpeedTimeSeconds);
      http_fetcher_->set_max_retry_count(kDownloadP2PMaxRetryCount);
      http_fetcher_->set_connect_timeout(kDownloadP2PConnectTimeoutSeconds);
    }
  }
  //开始下载
  http_fetcher_->BeginTransfer(install_plan_.download_url);
}

上面主要是开始下载的工作,具体的http_fetcher_如何下载的,不是我们这篇的重点,我们后面有时间找一篇专门来分析。这个Action走完之后,就开始FilesystemVerifierAction的工作

FilesystemVerifierAction的工作

那直接分析FilesystemVerifierAction的PerformAction():


void FilesystemVerifierAction::PerformAction() {
  // Will tell the ActionProcessor we've failed if we return.
  ScopedActionCompleter abort_action_completer(processor_, this);
 //判断前端输入端
  if (!HasInputObject()) {
    LOG(ERROR) << "FilesystemVerifierAction missing input object.";
    return;
  }
  install_plan_ = GetInputObject();

  if (install_plan_.partitions.empty()) {
    LOG(INFO) << "No partitions to verify.";
    if (HasOutputPipe())
      SetOutputObject(install_plan_);
    abort_action_completer.set_code(ErrorCode::kSuccess);
    return;
  }
  //开始分区的hashing
  StartPartitionHashing();
  abort_action_completer.set_should_complete(false);
}

void FilesystemVerifierAction::StartPartitionHashing() {
  //如果partition_index_ 到了最后一个,相当于所有分区都遍历了一次,hash结束
  if (partition_index_ == install_plan_.partitions.size()) {
    Cleanup(ErrorCode::kSuccess);
    return;
  }
  //获取当前分区信息
  InstallPlan::Partition& partition =
      install_plan_.partitions[partition_index_];
 //判断verify的阶段,刚开始是kVerifyTargetHash
  string part_path;
  switch (verifier_step_) {
    case VerifierStep::kVerifySourceHash:
      part_path = partition.source_path;
      remaining_size_ = partition.source_size;
      break;
    case VerifierStep::kVerifyTargetHash:
      part_path = partition.target_path;
      remaining_size_ = partition.target_size;
      break;
  }
  LOG(INFO) << "Hashing partition " << partition_index_ << " ("
            << partition.name << ") on device " << part_path;
  if (part_path.empty())
    return Cleanup(ErrorCode::kFilesystemVerifierError);
  //打开分区路径,然后准备开始读
  brillo::ErrorPtr error;
  src_stream_ = brillo::FileStream::Open(
      base::FilePath(part_path),
      brillo::Stream::AccessMode::READ,
      brillo::FileStream::Disposition::OPEN_EXISTING,
      &error);

  if (!src_stream_) {
    LOG(ERROR) << "Unable to open " << part_path << " for reading";
    return Cleanup(ErrorCode::kFilesystemVerifierError);
  }
 //设置读取到buffer的地方和初始化hasher_
  buffer_.resize(kReadFileBufferSize);
  read_done_ = false;
  hasher_.reset(new HashCalculator());

  // 开始第一次的读取
  ScheduleRead();
}

void FilesystemVerifierAction::ScheduleRead() {
  size_t bytes_to_read = std::min(static_cast<int64_t>(buffer_.size()),
                                  remaining_size_);
  if (!bytes_to_read) {
    OnReadDoneCallback(0);
    return;
  }
//开始读取数据,每次读bytes_to_read字节,然后保存在buffer_中,读取成功后,就会回调OnReadDoneCallback,读取失败就会调用OnReadErrorCallback,
  bool read_async_ok = src_stream_->ReadAsync(
    buffer_.data(),
    bytes_to_read,
    base::Bind(&FilesystemVerifierAction::OnReadDoneCallback,
               base::Unretained(this)),
    base::Bind(&FilesystemVerifierAction::OnReadErrorCallback,
               base::Unretained(this)),
    nullptr);

  if (!read_async_ok) {
    LOG(ERROR) << "Unable to schedule an asynchronous read from the stream.";
    Cleanup(ErrorCode::kError);
  }
}

在上面的startPartitionHashing中可以看到,有分一个验证阶段,第一个是kVerifySourceHash, 第二个是kVerifyTargetHash,默认的是kVerifyTargetHash,也就是从kVerifyTargetHash阶段开始,这个验证完了之后,再来验证kVerifySourceHash。打开了对应的分区路径之后,就开始了读取的过程,读取时又设置了两个回调,读取完成时回调OnReadDoneCallback, 读取出错时回调OnReadErrorCallback。
那我们继续来看OnReadDoneCallback。


void FilesystemVerifierAction::OnReadDoneCallback(size_t bytes_read) {
//判断是否已经读取完成
  if (bytes_read == 0) {
  	//设置读取完成状态
    read_done_ = true;
  } else {
    <

以上是关于Android P update_engine分析 --update_attempter_android 升级与ActionProcessor工作的主要内容,如果未能解决你的问题,请参考以下文章

Android P update_engine分析--升级核心DeltaPerformer的分析

Android P update_engine分析--升级核心DeltaPerformer的分析

Android P update_engine分析--升级核心DeltaPerformer的分析

Android P update_engine分析--升级核心DeltaPerformer的分析

Android P update_engine分析-- PostinstallRunnerAction的工作

Android P update_engine分析-- PostinstallRunnerAction的工作