侧输入的高效 ParDo 设置或 start_bundle
Posted
技术标签:
【中文标题】侧输入的高效 ParDo 设置或 start_bundle【英文标题】:Efficient ParDo setup or start_bundle for side input 【发布时间】:2020-03-17 13:03:15 【问题描述】:列表 A:25M 哈希 列表 B:175K 哈希
我想检查列表 B 中的每个散列是否存在于列表 A 中。为此,我有一个 ParDo 函数,当它不匹配时我会屈服。这是一个重复数据删除过程。
如何有效地设置这个 ParDo,现在我在处理列表 B 时对列表 A 进行侧面输入。但是侧面输入不应该转到 ParDo 的 setup() 或 start_bundle(),所以我存储查找列表 ( A)在工人中只有一次?
class Checknewrecords(beam.DoFn):
def process(self, element, hashlist):
if element['TA_HASH'] not in hashlist:
yield element
else:
pass
如果您有答案,请附上文档链接,因为我没有找到 Python 版本的任何好的文档。
transformed_records 是来自先前转换的 PCollectioncurrent_data 是来自 BigQuery.read 的 PCollection
new_records = 转换记录 | 'Checknewrecords' >> beam.ParDo(Checknewrecords(), pvalue.AsList(current_data))
【问题讨论】:
【参考方案1】:我相信pvalue.AsDict 是你所需要的,它会为你提供一个字典风格的侧边输入界面。您可以在 Apache Beam Github search 上找到一些示例。
这是我刚刚写的一个简化示例,但请参阅下面的签入示例(虽然有点复杂),以防我犯了错误。
class ComputeHashes(beam.DoFn):
def process(self, element):
# use the element as a key to produce a KV, value is not used
yield (HashFunction(element), true)
initial_elements = beam.Create("foo")
computed_hashes = initial_elements | beam.ParDo(ComputeHashes())
class FilterIfAlreadyComputedHash(beam.DoFn):
def process(self, element, hashes):
# Filter if it already exists in hashes
if not hashes.get(element):
yield element
more_elements = beam.Create("foo", "bar") # Read from your pipeline's source
small_words = more_elements | beam.ParDo(FilterIfAlreadyComputedHash(), beam.pvalue.AsDict(computed_hashes))
在检查的示例中,来自 beam github 存储库,在 visionml_test.py 中,使用 beam.PValue.AsDict() 将 PCollection 转换为 Dictionary 类型视图。
class VisionMlTestIT(unittest.TestCase):
def test_text_detection_with_language_hint(self):
IMAGES_TO_ANNOTATE = [
'gs://apache-beam-samples/advanced_analytics/vision/sign.jpg'
]
IMAGE_CONTEXT = [vision.types.ImageContext(language_hints=['en'])]
with TestPipeline(is_integration_test=True) as p:
contexts = p | 'Create context' >> beam.Create(
dict(zip(IMAGES_TO_ANNOTATE, IMAGE_CONTEXT)))
output = (
p
| beam.Create(IMAGES_TO_ANNOTATE)
| AnnotateImage(
features=[vision.types.Feature(type='TEXT_DETECTION')],
context_side_input=beam.pvalue.AsDict(contexts))
| beam.ParDo(extract))
侧输入被传递到 FlatMap(visionml.py)中,并且在 FlatMap 的函数中,使用 .get() 从字典中检索条目。这也可以传递到 Map 或 ParDo。请参阅:beam python side input documentation(这里他们使用 .AsSingleton 而不是 .AsDict)。您可以在此处找到在流程调用中使用它的示例。
class AnnotateImage(PTransform):
"""A ``PTransform`` for annotating images using the GCP Vision API.
ref: https://cloud.google.com/vision/docs/
Batches elements together using ``util.BatchElements`` PTransform and sends
each batch of elements to the GCP Vision API.
Element is a Union[text_type, binary_type] of either an URI (e.g. a GCS URI)
or binary_type base64-encoded image data.
Accepts an `AsDict` side input that maps each image to an image context.
"""
MAX_BATCH_SIZE = 5
MIN_BATCH_SIZE = 1
def __init__(
self,
features,
retry=None,
timeout=120,
max_batch_size=None,
min_batch_size=None,
client_options=None,
context_side_input=None,
metadata=None):
"""
Args:
features: (List[``vision.types.Feature.enums.Feature``]) Required.
The Vision API features to detect
retry: (google.api_core.retry.Retry) Optional.
A retry object used to retry requests.
If None is specified (default), requests will not be retried.
timeout: (float) Optional.
The time in seconds to wait for the response from the Vision API.
Default is 120.
max_batch_size: (int) Optional.
Maximum number of images to batch in the same request to the Vision API.
Default is 5 (which is also the Vision API max).
This parameter is primarily intended for testing.
min_batch_size: (int) Optional.
Minimum number of images to batch in the same request to the Vision API.
Default is None. This parameter is primarily intended for testing.
client_options:
(Union[dict, google.api_core.client_options.ClientOptions]) Optional.
Client options used to set user options on the client.
API Endpoint should be set through client_options.
context_side_input: (beam.pvalue.AsDict) Optional.
An ``AsDict`` of a PCollection to be passed to the
_ImageAnnotateFn as the image context mapping containing additional
image context and/or feature-specific parameters.
Example usage::
image_contexts =
[(''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict,
``vision.types.ImageContext()``]),
(''gs://cloud-samples-data/vision/ocr/sign.jpg'', Union[dict,
``vision.types.ImageContext()``]),]
context_side_input =
(
p
| "Image contexts" >> beam.Create(image_contexts)
)
visionml.AnnotateImage(features,
context_side_input=beam.pvalue.AsDict(context_side_input)))
metadata: (Optional[Sequence[Tuple[str, str]]]): Optional.
Additional metadata that is provided to the method.
"""
super(AnnotateImage, self).__init__()
self.features = features
self.retry = retry
self.timeout = timeout
self.max_batch_size = max_batch_size or AnnotateImage.MAX_BATCH_SIZE
if self.max_batch_size > AnnotateImage.MAX_BATCH_SIZE:
raise ValueError(
'Max batch_size exceeded. '
'Batch size needs to be smaller than '.format(
AnnotateImage.MAX_BATCH_SIZE))
self.min_batch_size = min_batch_size or AnnotateImage.MIN_BATCH_SIZE
self.client_options = client_options
self.context_side_input = context_side_input
self.metadata = metadata
def expand(self, pvalue):
return (
pvalue
| FlatMap(self._create_image_annotation_pairs, self.context_side_input)
| util.BatchElements(
min_batch_size=self.min_batch_size,
max_batch_size=self.max_batch_size)
| ParDo(
_ImageAnnotateFn(
features=self.features,
retry=self.retry,
timeout=self.timeout,
client_options=self.client_options,
metadata=self.metadata)))
@typehints.with_input_types(
Union[text_type, binary_type], Optional[vision.types.ImageContext])
@typehints.with_output_types(List[vision.types.AnnotateImageRequest])
def _create_image_annotation_pairs(self, element, context_side_input):
if context_side_input: # If we have a side input image context, use that
image_context = context_side_input.get(element)
else:
image_context = None
if isinstance(element, text_type):
image = vision.types.Image(
source=vision.types.ImageSource(image_uri=element))
else: # Typehint checks only allows text_type or binary_type
image = vision.types.Image(content=element)
request = vision.types.AnnotateImageRequest(
image=image, features=self.features, image_context=image_context)
yield request
Note, in Java you use it as .asMap()。
【讨论】:
感谢您的详细回答,我特别想知道如何将侧面输入存储在工作人员中,所以它会流入 pardo 内的 setup() 或 start_bundle() 还是仅在进程中着陆() 这是否意味着整个列表 B 将与列表 A 的每个元素一起发送?【参考方案2】:抱歉,我最初误解了这个问题。实际上,我认为在 start_bundle 中不可能有侧面输入。它只能在 process_bundle 中访问。但是您可以改为在第一次调用 process bundle 时执行该工作并获得类似的结果。
class DoFnMethods(beam.DoFn):
def __init__(self):
self.first_element_processed = False
self.once_retrieved_side_input_data = None
def called_once(self, side_input):
if self.first_element_processed:
return
self.once_retrieved_side_input_data = side_input.get(...)
self.first_element_processed = True
def process(self, element, side_input):
self.called_once(side_input)
...
注意:您确实需要注意这样一个事实,即启动捆绑包和完成捆绑包将在所有窗口中为捆绑包调用一次,并且提供给处理的侧面输入对于每个计算的窗口都是不同的。因此,如果您使用的是 windows,您可能需要对 self.first_element_processed 和 self.once_retrieved_side_input_data 变量使用 dict(按窗口键控),这样您就可以为每个窗口调用一次_onc。
【讨论】:
以上是关于侧输入的高效 ParDo 设置或 start_bundle的主要内容,如果未能解决你的问题,请参考以下文章
当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub