有没有一种有效的方法来外部加入几个(超过 2 个)kafka 主题?

Posted

技术标签:

【中文标题】有没有一种有效的方法来外部加入几个(超过 2 个)kafka 主题?【英文标题】:Is there an efficient way to outer join several (more than 2) kafka topics? 【发布时间】:2017-12-02 08:14:26 【问题描述】:

我想通过键外连接几个(通常是 2-10 个)Kafka 主题,最好使用流式 API。所有主题都将具有相同的键和分区。执行此连接的一种方法是为每个主题创建一个 KStream 并将调用链接到 KStream.outerJoin

stream1
    .outerJoin(stream2, ...)
    .outerJoin(stream3, ...)
    .outerJoin(stream4, ...)

但是,KStream.outerJoin 的 documentation 表明对 outerJoin 的每次调用都将实现其两个输入流,因此上述示例不仅会实现流 1 到 4,还会实现 stream1.outerJoin(stream2, ...)stream1.outerJoin(stream2, ...).outerJoin(stream3, ...)。与直接加入 4 个流相比,会有很多不必要的序列化、反序列化和 I/O。

上述方法的另一个问题是JoinWindow 在所有 4 个输入流中都不一致:一个 JoinWindow 将用于连接流 1 和 2,但随后将使用单独的连接窗口来连接此流和流 3 等。例如,我为每个连接指定 10 秒的连接窗口,并且具有特定键的条目在 0 秒时出现在流 1 中,在 6 秒时出现在流 2 中,在 12 秒时出现在流 3 和流中4 在 18 秒时,加入的项目会在 18 秒后输出,导致延迟过高。结果取决于连接的顺序,这似乎不自然。

有没有更好的方法来使用 Kafka 进行多路连接?

【问题讨论】:

【参考方案1】:

我目前不知道 Kafka Stream 中有更好的方法,但它正在酝酿之中:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup

【讨论】:

【参考方案2】:

最终我决定创建一个自定义的轻量级连接器,它可以避免物化并严格遵守过期时间。平均应该是 O(1)。与 Stream API 相比,它更适合 Consumer API:对于每个消费者,重复轮询并使用任何接收到的数据更新加入者;如果加入者返回一个完整的属性集,转发它。代码如下:

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;

/**
 * Inner joins multiple streams of data by key into one stream. It is assumed
 * that a key will appear in a stream exactly once. The values associated with
 * each key are collected and if all values are received within a certain
 * maximum wait time, the joiner returns all values corresponding to that key.
 * If not all values are received in time, the joiner never returns any values
 * corresponding to that key.
 * <p>
 * This class is not thread safe: all calls to
 * @link #update(Object, Object, long) must be synchronized.
 * @param <K> The type of key.
 * @param <V> The type of value.
 */
class StreamInnerJoiner<K, V> 

    private final Map<K, Vals<V>> idToVals = new LinkedHashMap<>();
    private final int joinCount;
    private final long maxWait;

    /**
     * Creates a stream inner joiner.
     * @param joinCount The number of streams being joined.
     * @param maxWait The maximum amount of time after an item has been seen in
     * one stream to wait for it to be seen in the remaining streams.
     */
    StreamInnerJoiner(final int joinCount, final long maxWait) 
        this.joinCount = joinCount;
        this.maxWait = maxWait;
    

    private static class Vals<A> 
        final long firstSeen;
        final Collection<A> vals = new ArrayList<>();
        private Vals(final long firstSeen) 
            this.firstSeen = firstSeen;
        
    

    /**
     * Updates this joiner with a value corresponding to a key.
     * @param key The key.
     * @param val The value.
     * @param now The current time.
     * @return If all values for the specified key have been received, the
     * complete collection of values for thaht key; otherwise
     * @link Optional#empty().
     */
    Optional<Collection<V>> update(final K key, final V val, final long now) 
        expireOld(now - maxWait);
        final Vals<V> curVals = getOrCreate(key, now);
        curVals.vals.add(val);
        return expireAndGetIffFull(key, curVals);
    

    private Vals<V> getOrCreate(final K key, final long now) 
        final Vals<V> existingVals = idToVals.get(key);
        if (existingVals != null)
            return existingVals;
        else 
            /*
            Note: we assume that the item with the specified ID has not already
            been seen and timed out, and therefore that its first seen time is
            now. If the item has in fact already timed out, it is doomed and
            will time out again with no ill effect.
             */
            final Vals<V> curVals = new Vals<>(now);
            idToVals.put(key, curVals);
            return curVals;
        
    

    private void expireOld(final long expireBefore) 
        final Iterator<Vals<V>> i = idToVals.values().iterator();
        while (i.hasNext() && i.next().firstSeen < expireBefore)
            i.remove();
    

    private Optional<Collection<V>> expireAndGetIffFull(final K key, final Vals<V> vals) 
        if (vals.vals.size() == joinCount) 
            // as all expired entries were already removed, this entry is valid
            idToVals.remove(key);
            return Optional.of(vals.vals);
         else
            return Optional.empty();
    

【讨论】:

以上是关于有没有一种有效的方法来外部加入几个(超过 2 个)kafka 主题?的主要内容,如果未能解决你的问题,请参考以下文章

处理用户输入数据验证的更有效方法?

有没有一种最有效的方法来读取/写入C#中的10GB二进制文件?

STM32有几个时钟振荡器?

ARM 中必须明白的几个概念

stm32除法运算几个时钟

有没有一种很好的方法可以用换表加入火花流?