CoGroupByKey 没有给出想要的结果 Apache Beam(python)

Posted

技术标签:

【中文标题】CoGroupByKey 没有给出想要的结果 Apache Beam(python)【英文标题】:CoGroupByKey not giving desired results Apache Beam(python) 【发布时间】:2021-09-25 07:25:45 【问题描述】:

我一直在测试将 pub/sub 读取数据与自创数据连接起来。下面是主要的流水线方法。

def run(input_topic,input_subscription, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    
    pipeline_options = PipelineOptions(pipeline_args, streaming=True, save_main_session=True)
    with Pipeline(options=pipeline_options) as pipeline:
        # reading from pub/sub and creating a fixed window of 1 min.
        p1 =  pipeline | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=input_subscription)\
        | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
        #creating sample data 
        p2 = pipeline | "creating a sample data" >> Create([('Hello','sh 1'),('Hello','sh 1.1'),
        ('Hello_world','sh 2'),
        ('Hello_everyone','sh 3'),
        ('Hello_cloud','sh 4')])
    
        ("schdedule":p2,"timestamp":p1) | "merging" >> CoGroupByKey()| "merge print">> Map(print)

下面是window和addtimestamp的转换方法。

class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 30 seconds.
        self.window_size = int(window_size * 30)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
                                    
        )


class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam, window=DoFn.WindowParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (element.decode("utf-8"),datetime.utcfromtimestamp(float(publish_time)).strftime("%Y-%m-%d %H:%M:%S"))

我得到的结果如下所示。

('Hello', 'schdedule': [], 'timestamp': ['2021-07-16 13:19:00'])
('Hello_world', 'schdedule': [], 'timestamp': ['2021-07-16 13:19:00'])
('Hello_everyone', 'schdedule': [], 'timestamp': ['2021-07-16 13:19:00'])

schedule 列表打印为空,因为它没有加入。

期待

('Hello', 'schdedule': ['sh 1','sh 1.1'], 'timestamp': ['2021-07-16 13:19:00'])
('Hello_world', 'schdedule': ['sh 2'], 'timestamp': ['2021-07-16 13:19:00'])
('Hello_everyone', 'schdedule': ['sh 3'], 'timestamp': ['2021-07-16 13:19:00'])

我尝试在 p2 上单独执行 GroupByKey,效果很好,结果如下。

('Hello', ['sh 1','sh 1.1'])
('Hello_world', ['sh 2'])
('Hello_everyone', ['sh 3'])

还尝试了带有侧面输入的静态字典,它工作正常,但是一旦我这样做 CoGroupByKey 它就不会从 p2 管道产生任何结果。建议我在这里做错什么。

【问题讨论】:

为了了解您的两个 Pcollections 未正确合并的原因,您能否提供一些 p1 集合 的示例数据?我创建了一个示例代码来解释 CoGroupByKey 的工作原理,here。可以看到,合并是基于每个 Pcollection 的主键(元素[1])完成的,即 'Hello'、'Hello_world' 等。您用于合并的密钥是什么?我创建的代码对您有帮助吗? 这个例子工作得很好,可能与 window.不过不确定。 您能提供 p1 的示例数据吗? 通过打印 P1. ('Hello_world', '2021-07-19 12:08:00') ('Hello_everyone', '2021-07-19 12:08:00') ('Hello', '2021-07-19 12:08:00') 第二个列表(p2)是否已修复?换句话说,当key是Hello时,你会一直加'schdedule': ['sh 1','sh 1.1']吗? 【参考方案1】:

所以只是在这里贡献。 这个问题的真正目的是将来自维度表或静态数据存储的数据与流数据连接起来。 从问题中可以明显看出 CoGroupByKey 没有加入时间窗口和全局窗口数据。什么是窗口数据和全局窗口数据?

windowed :换句话说,应用了窗口化的数据组。这反过来将时间边界应用于不断流式传输的数据。因此行数永远不会是无限的。

全局窗口化:没有时间戳边界。它可以是流式或批处理,也可以是维度表或静态数据存储。

所以我们在这里遇到了冲突,因为我们将窗口数据与全局窗口数据组合在一起。

那么如何解决这种情况?

有不同的方法可以做到这一点。下面列出了其中的几个。

1.使两个数据流进入同一个窗口。

2. 使用侧输入。阅读this。更多信息here

3.在Pardo变换中使用setup方法。

在我的情况下,我寻求不需要为静态数据生成窗口,因此我使用解决方案 23 实现了这一点。

解决方案2

def run(input_topic,input_subscription, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )
#     pipeline = Pipeline(options=pipeline_options)
    with Pipeline(options=pipeline_options) as pipeline:
        p1 =  pipeline | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=input_subscription)\
                 | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)\
                 |"adding time stamp value ">> Map(lambda x : (x[0],datetime.utcfromtimestamp(float(x[1])).strftime("%Y-%m-%d %H:%M:%S")))\
                 |"p1 group by">>GroupByKey()

        p2 = pipeline |"generating data">> Create([('Hello','sh 1'),('Hello','sh 1.1'),
        ('Hello_world','sh 2'),
        ('Hello_everyone','sh 3'),
        ('Hello_cloud','sh 4')])\
         |"p2 group by">> GroupByKey()      
        p1|"perfomring join">> Map(join_data,beam.pvalue.AsDict(p2))| Map(print)

解决方案3

class join_data(DoFn):
    def setup(self):
        self.sample_data_dict = 'Hello':['sh 1','sh 1.1'],
    'Hello_world':'sh 2',
    'Hello_everyone':'sh 3',
    'Hello_cloud':'sh 4'
        return
    def process(self,ele):
        yield ((ele[0],ele[1],self.sample_data_dict[ele[0]]))

def run(input_topic,input_subscription, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )
#     pipeline = Pipeline(options=pipeline_options)
    with Pipeline(options=pipeline_options) as pipeline:
        p1 =  pipeline | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=input_subscription)\
                 | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)\
|"adding time stamp value ">> Map(lambda x : (x[0],datetime.utcfromtimestamp(float(x[1])).strftime("%Y-%m-%d %H:%M:%S")))\
|"p1 group by">>GroupByKey()
        p1|"perfomring transformation">> ParDo(join_data())| Map(print)

在生产管道中,我们可能会遇到这个问题,通过在其中添加维度信息来转换流数据,我们可以轻松地利用 setupstart_bundle 来创建数据库/bigquery 连接. 请注意:每个类实例/每个工作人员调用一次设置方法每个窗口或每组行调用一次 start_bundle 方法。 documentation。有关 ParDo 的更多信息here。

在这两种情况下,我都能得到上述问题中提到的预期结果。

【讨论】:

【参考方案2】:

为了进一步为社区做出贡献,我发布了这个答案。

我假设您的第二个 PCollection p2 是固定且不可变的。因此,对于来自 p1 的每条新记录,它将与来自 p2 的相应键合并。换句话说,每当一条记录有Hello作为主键时,schdedule': ['sh 1','sh 1.1']就会被添加到最终输出中。

如 cmets 中所述,CoGroupByKey 方法可以在没有窗口功能的情况下工作。如下例所示,

import apache_beam as beam
from apache_beam import Create, Map, ParDo, Flatten
from apache_beam import CoGroupByKey
from apache_beam import pvalue, window, WindowInto

with beam.Pipeline() as pipeline:
    
    timestamps= [('Hello','2021-07-16 13:19:00'),('Hello_world','2021-07-16 13:19:00'),('Hello_everyone','2021-07-16 13:19:00'),
                 ('Hello_cloud','2021-07-16 13:19:00')]
    p1 = pipeline | "Timestamps" >> Create(timestamps)
    
        #creating sample data 
    p2 = pipeline | "creating a sample data" >> Create([('Hello','sh 1'),('Hello','sh 1.1'),
    ('Hello_world','sh 2'),
    ('Hello_everyone','sh 3'),
    ('Hello_cloud','sh 4')])
    
    ("schdedule":p2,"timestamp":p1) | "merging" >> CoGroupByKey() | "merge print">> Map(print)
 

但是,当窗口化第一个 PCollection p1 时,不会合并第二个 PCollection。发生这种情况是因为第二个 PCollection 没有窗口化,元素没有时间戳以便与 p1 放在同一个窗口中(或不放在同一个窗口中)。根据documentation,

侧面输入和窗口

Beam 使用主输入元素的窗口来查找 侧输入元素的适当窗口。梁项目主要 输入元素的窗口到侧输入的窗口集,然后使用 来自结果窗口的侧面输入。如果主输入和侧 输入具有相同的窗口,投影提供了准确的 对应的窗口。但是,如果输入有不同的窗口, Beam 使用投影选择最合适的侧输入 窗口。

在您的情况下,p2p1 不在同一个窗口中,因为它没有时间戳。所以它不存在于输出中。但是,有一种解决方法。考虑到 p2 是不可变的,如前所述,我们可以:

    首先将 p1 的时间戳转换为 UNIX 将 p2 合并到 p1 根据 p1 的时间戳对输出进行窗口化

为简单起见,使用批处理模型的此代码的简化版本是,

import apache_beam as beam
from apache_beam import Create, Map, 
from apache_beam import  CoGroupByKey
from apache_beam import pvalue, window, WindowInto

with beam.Pipeline() as pipeline:
    
    timestamps= [('Hello','2021-07-16 13:19:00'), ('Hello','2021-07-16 13:19:05'),('Hello_world','2021-07-16 13:19:00'),('Hello_everyone','2021-07-16 13:19:00'),
                 ('Hello_cloud','2021-07-16 13:19:00')]
    p1 = pipeline | "Timestamps" >> Create(timestamps) | "Add timestamps" >> Map(lambda x: window.TimestampedValue(x, date2unix(x[1])))
    
        #creating sample data 
    p2 = (pipeline | "creating a sample data" >> Create([('Hello','sh 1'),('Hello','sh 1.1'),
    ('Hello_world','sh 2'),
    ('Hello_everyone','sh 3'),
    ('Hello_cloud','sh 4')]))
    
    (("schdedule":p2,"timestamp":p1) | "merging" >> CoGroupByKey() 
                                       | "FixedWindow2" >> WindowInto(window.FixedWindows(60)) #60 seconds windows
                                       | "merge print">> Map(print))

还有输出,

('Hello', 'schdedule': ['sh 1', 'sh 1.1'], 'timestamp': ['2021-07-16 13:19:00', '2021-07-16 13:19:05'])
('Hello_world', 'schdedule': ['sh 2'], 'timestamp': ['2021-07-16 13:19:00'])
('Hello_everyone', 'schdedule': ['sh 3'], 'timestamp': ['2021-07-16 13:19:00'])
('Hello_cloud', 'schdedule': ['sh 4'], 'timestamp': ['2021-07-16 13:19:00'])

请注意,对于键 Hello,在同一窗口中有两个时间戳,这证实了正确使用了窗口。

【讨论】:

一旦将时间戳与全局窗口分组,它如何能够将其窗口化为 1 分钟?我试过了,但没有任何结果。 你在某个地方接近这个......我用 SideInputs 实现了这个。请参考我稍后会发布的答案。 感谢您的所有帮助和时间。继续贡献。 谢谢,如果您发现有用的信息,如果您能更新答案,我将不胜感激。

以上是关于CoGroupByKey 没有给出想要的结果 Apache Beam(python)的主要内容,如果未能解决你的问题,请参考以下文章

使用 .stage 没有给出想要的结果

Sql Inner 连接没有给出想要的结果

Hive 自动增量 UDF 没有给出想要的结果

将 CoGroupByKey 与自定义类型一起使用会导致 Coder 错误

我的随机森林分类算法没有给出我想要的结果

RedShift to_char 格式没有给出想要的结果