Clojure:实现有状态的 Java 接口

Posted

技术标签:

【中文标题】Clojure:实现有状态的 Java 接口【英文标题】:Clojure: implementing stateful Java interface 【发布时间】:2017-05-02 20:46:55 【问题描述】:

Kafka Streams 有一个接口Processor,它的实现是有状态的。开发者指南中给出的example implementation 是:

public class WordCountProcessor implements Processor<String, String> 

  private ProcessorContext context;
  private KeyValueStore<String, Long> kvStore;

  @Override
  @SuppressWarnings("unchecked")
  public void init(ProcessorContext context) 
      // keep the processor context locally because we need it in punctuate() and commit()
      this.context = context;

      // call this processor's punctuate() method every 1000 time units.
      this.context.schedule(1000);

      // retrieve the key-value store named "Counts"
      kvStore = (KeyValueStore) context.getStateStore("Counts");
  

  @Override
  public void process(String dummy, String line) 
      String[] words = line.toLowerCase().split(" ");

      for (String word : words) 
          Long oldValue = kvStore.get(word);
          if (oldValue == null) 
              kvStore.put(word, 1L);
           else 
              kvStore.put(word, oldValue + 1L);
          
      
  

  @Override
  public void punctuate(long timestamp) 
      KeyValueIterator<String, Long> iter = this.kvStore.all();
      while (iter.hasNext()) 
          KeyValue<String, Long> entry = iter.next();
          context.forward(entry.key, entry.value.toString());
      
      iter.close();
      // commit the current processing progress
      context.commit();
  

  @Override
  public void close() 
      // close the key-value store
      kvStore.close();
  


init 方法初始化WordCountProcessor 的内部状态,例如检索键值存储。其他方法,如processclose,利用了这种状态。

我不清楚如何在 Clojure 中 reify 这样的接口。我们如何将init 检索到的状态传递给processclose 等?

使用闭包?

我的一个想法是使用闭包:

(let [ctx (atom nil)]
  (reify Processor
    (close [this]
      ;; Do something w/ ctx
      )
    (init [this context]
      (reset! ctx context))
    (process [this k v]
      ;; Do something w/ ctx
      )
    (punctuate [this timestamp]
      ;; Do something w/ ctx
      )))

令人讨厌的是,我们每次都必须从 ProcessorContext 对象开始,因此键值存储代码将在所有需要键值存储的方法中重复。

我没有看到解决此问题的(一般)方法,但根据具体情况,我们可以将 ctx 原子替换为方法所需的更具体的状态。

有没有更好的办法?

【问题讨论】:

【参考方案1】:

关闭一个原子将是实现它的主要方法。您的原始类有两个字段,因此您可以关闭两个原子以获得相同的效果

(let [ctx (atom nil)
      kv-store (atom nil)]
  (reify Processor
    ,,,
    (init [this context]
      (reset! ctx context)
      (reset! kv-store (.getStateStore context "Counts")))
    ,,,))

如果这仍然太乏味,那么您可以添加一些也关闭原子的便利函数

(let [ctx (atom nil)
      kv-store (atom nil)]

  (def kv-get [key]
    (.get @kv-store key))

  (def kv-all []
    (iterator-seq (.all @kv-store)))

  (def kv-put [key value]
    (.put @kv-store key value))

  (reify Processor
    ,,,
    (init [this context]
      (reset! ctx context)
      (reset! kv-store (.getStateStore context "Counts")))
    ,,,
  (punctuate [this timestamp]
    (do-seq [x (kv-all)]
      ,,,)
  )))

替代方案是使用gen-class,但认为你会更好地使用 reify。

【讨论】:

以上是关于Clojure:实现有状态的 Java 接口的主要内容,如果未能解决你的问题,请参考以下文章

Clojure 中纸牌游戏的状态

在 Clojure 中扩展 Java 接口时出错

如何Clojure.Spec引用类型(如原子)?

如何从 Clojure 调用 C++ 程序以使程序保持打开状态?

如何在 clojure 中映射很少使用的状态?

关于java中的对象序列化