侧输入的高效 ParDo 设置或 start_bundle

Posted

技术标签:

【中文标题】侧输入的高效 ParDo 设置或 start_bundle【英文标题】:Efficient ParDo setup or start_bundle for side input 【发布时间】:2020-03-17 13:03:15 【问题描述】:

列表 A:25M 哈希 列表 B:175K 哈希

我想检查列表 B 中的每个散列是否存在于列表 A 中。为此,我有一个 ParDo 函数,当它不匹配时我会屈服。这是一个重复数据删除过程。

如何有效地设置这个 ParDo,现在我在处理列表 B 时对列表 A 进行侧面输入。但是侧面输入不应该转到 ParDo 的 setup() 或 start_bundle(),所以我存储查找列表 ( A)在工人中只有一次?

class Checknewrecords(beam.DoFn):
    def process(self, element, hashlist):
        if element['TA_HASH'] not in hashlist:
            yield element
        else:
            pass

如果您有答案,请附上文档链接,因为我没有找到 Python 版本的任何好的文档。

transformed_records 是来自先前转换的 PCollection

current_data 是来自 BigQuery.read 的 PCollection

new_records = 转换记录 | 'Checknewrecords' >> beam.ParDo(Checknewrecords(), pvalue.AsList(current_data))

【问题讨论】:

【参考方案1】:

我相信pvalue.AsDict 是你所需要的,它会为你提供一个字典风格的侧边输入界面。您可以在 Apache Beam Github search 上找到一些示例。

这是我刚刚写的一个简化示例,但请参阅下面的签入示例(虽然有点复杂),以防我犯了错误。

class ComputeHashes(beam.DoFn):
  def process(self, element):
      # use the element as a key to produce a KV, value is not used
      yield (HashFunction(element), true) 

initial_elements = beam.Create("foo")
computed_hashes = initial_elements | beam.ParDo(ComputeHashes())

class FilterIfAlreadyComputedHash(beam.DoFn):
  def process(self, element, hashes):
    # Filter if it already exists in hashes
    if not hashes.get(element):
      yield element

more_elements = beam.Create("foo", "bar") # Read from your pipeline's source
small_words = more_elements | beam.ParDo(FilterIfAlreadyComputedHash(), beam.pvalue.AsDict(computed_hashes))

在检查的示例中,来自 beam github 存储库,在 visionml_test.py 中,使用 beam.PValue.AsDict() 将 PCollection 转换为 Dictionary 类型视图。

class VisionMlTestIT(unittest.TestCase):
  def test_text_detection_with_language_hint(self):
    IMAGES_TO_ANNOTATE = [
        'gs://apache-beam-samples/advanced_analytics/vision/sign.jpg'
    ]
    IMAGE_CONTEXT = [vision.types.ImageContext(language_hints=['en'])]

    with TestPipeline(is_integration_test=True) as p:
      contexts = p | 'Create context' >> beam.Create(
          dict(zip(IMAGES_TO_ANNOTATE, IMAGE_CONTEXT)))

      output = (
          p
          | beam.Create(IMAGES_TO_ANNOTATE)
          | AnnotateImage(
              features=[vision.types.Feature(type='TEXT_DETECTION')],
              context_side_input=beam.pvalue.AsDict(contexts))
          | beam.ParDo(extract))

侧输入被传递到 FlatMap(visionml.py)中,并且在 FlatMap 的函数中,使用 .get() 从字典中检索条目。这也可以传递到 Map 或 ParDo。请参阅:beam python side input documentation(这里他们使用 .AsSingleton 而不是 .AsDict)。您可以在此处找到在流程调用中使用它的示例。

class AnnotateImage(PTransform):
  """A ``PTransform`` for annotating images using the GCP Vision API.
  ref: https://cloud.google.com/vision/docs/
  Batches elements together using ``util.BatchElements`` PTransform and sends
  each batch of elements to the GCP Vision API.
  Element is a Union[text_type, binary_type] of either an URI (e.g. a GCS URI)
  or binary_type base64-encoded image data.
  Accepts an `AsDict` side input that maps each image to an image context.
  """

  MAX_BATCH_SIZE = 5
  MIN_BATCH_SIZE = 1

  def __init__(
      self,
      features,
      retry=None,
      timeout=120,
      max_batch_size=None,
      min_batch_size=None,
      client_options=None,
      context_side_input=None,
      metadata=None):
    """
    Args:
      features: (List[``vision.types.Feature.enums.Feature``]) Required.
        The Vision API features to detect
      retry: (google.api_core.retry.Retry) Optional.
        A retry object used to retry requests.
        If None is specified (default), requests will not be retried.
      timeout: (float) Optional.
        The time in seconds to wait for the response from the Vision API.
        Default is 120.
      max_batch_size: (int) Optional.
        Maximum number of images to batch in the same request to the Vision API.
        Default is 5 (which is also the Vision API max).
        This parameter is primarily intended for testing.
      min_batch_size: (int) Optional.
        Minimum number of images to batch in the same request to the Vision API.
        Default is None. This parameter is primarily intended for testing.
      client_options:
        (Union[dict, google.api_core.client_options.ClientOptions]) Optional.
        Client options used to set user options on the client.
        API Endpoint should be set through client_options.
      context_side_input: (beam.pvalue.AsDict) Optional.
        An ``AsDict`` of a PCollection to be passed to the
        _ImageAnnotateFn as the image context mapping containing additional
        image context and/or feature-specific parameters.
        Example usage::
          image_contexts =
            [(''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict,
            ``vision.types.ImageContext()``]),
            (''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict,
            ``vision.types.ImageContext()``]),]
          context_side_input =
            (
              p
              | "Image contexts" >> beam.Create(image_contexts)
            )
          visionml.AnnotateImage(features,
            context_side_input=beam.pvalue.AsDict(context_side_input)))
      metadata: (Optional[Sequence[Tuple[str, str]]]): Optional.
        Additional metadata that is provided to the method.
    """
    super(AnnotateImage, self).__init__()
    self.features = features
    self.retry = retry
    self.timeout = timeout
    self.max_batch_size = max_batch_size or AnnotateImage.MAX_BATCH_SIZE
    if self.max_batch_size > AnnotateImage.MAX_BATCH_SIZE:
      raise ValueError(
          'Max batch_size exceeded. '
          'Batch size needs to be smaller than '.format(
              AnnotateImage.MAX_BATCH_SIZE))
    self.min_batch_size = min_batch_size or AnnotateImage.MIN_BATCH_SIZE
    self.client_options = client_options
    self.context_side_input = context_side_input
    self.metadata = metadata

  def expand(self, pvalue):
    return (
        pvalue
        | FlatMap(self._create_image_annotation_pairs, self.context_side_input)
        | util.BatchElements(
            min_batch_size=self.min_batch_size,
            max_batch_size=self.max_batch_size)
        | ParDo(
            _ImageAnnotateFn(
                features=self.features,
                retry=self.retry,
                timeout=self.timeout,
                client_options=self.client_options,
                metadata=self.metadata)))

  @typehints.with_input_types(
      Union[text_type, binary_type], Optional[vision.types.ImageContext])
  @typehints.with_output_types(List[vision.types.AnnotateImageRequest])
  def _create_image_annotation_pairs(self, element, context_side_input):
    if context_side_input:  # If we have a side input image context, use that
      image_context = context_side_input.get(element)
    else:
      image_context = None

    if isinstance(element, text_type):
      image = vision.types.Image(
          source=vision.types.ImageSource(image_uri=element))
    else:  # Typehint checks only allows text_type or binary_type
      image = vision.types.Image(content=element)

    request = vision.types.AnnotateImageRequest(
        image=image, features=self.features, image_context=image_context)
    yield request

Note, in Java you use it as .asMap()。

【讨论】:

感谢您的详细回答,我特别想知道如何将侧面输入存储在工作人员中,所以它会流入 pardo 内的 setup() 或 start_bundle() 还是仅在进程中着陆() 这是否意味着整个列表 B 将与列表 A 的每个元素一起发送?【参考方案2】:

抱歉,我最初误解了这个问题。实际上,我认为在 start_bundle 中不可能有侧面输入。它只能在 process_bundle 中访问。但是您可以改为在第一次调用 process bundle 时执行该工作并获得类似的结果。

class DoFnMethods(beam.DoFn):
  def __init__(self):
    self.first_element_processed = False
    self.once_retrieved_side_input_data = None

  def called_once(self, side_input):
    if self.first_element_processed:
      return
    self.once_retrieved_side_input_data = side_input.get(...)
    self.first_element_processed = True

  def process(self, element, side_input):
    self.called_once(side_input)
    ...

注意:您确实需要注意这样一个事实,即启动捆绑包和完成捆绑包将在所有窗口中为捆绑包调用一次,并且提供给处理的侧面输入对于每个计算的窗口都是不同的。因此,如果您使用的是 windows,您可能需要对 self.first_element_processed 和 self.once_retrieved_side_input_data 变量使用 dict(按窗口键控),这样您就可以为每个窗口调用一次_onc。

【讨论】:

以上是关于侧输入的高效 ParDo 设置或 start_bundle的主要内容,如果未能解决你的问题,请参考以下文章

当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub

输出类型中 beam.ParDo 和 beam.Map 的区别?

在 ParDo 中访问 sideinput

车联网路侧设施设置指南

腾讯云TDSQL MySQL版 - 平台侧安全设计

[实践篇]13.5 QNX侧如何操作进程?