Clojure:实现有状态的 Java 接口
Posted
技术标签:
【中文标题】Clojure:实现有状态的 Java 接口【英文标题】:Clojure: implementing stateful Java interface 【发布时间】:2017-05-02 20:46:55 【问题描述】:Kafka Streams 有一个接口Processor
,它的实现是有状态的。开发者指南中给出的example implementation 是:
public class WordCountProcessor implements Processor<String, String>
private ProcessorContext context;
private KeyValueStore<String, Long> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context)
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;
// call this processor's punctuate() method every 1000 time units.
this.context.schedule(1000);
// retrieve the key-value store named "Counts"
kvStore = (KeyValueStore) context.getStateStore("Counts");
@Override
public void process(String dummy, String line)
String[] words = line.toLowerCase().split(" ");
for (String word : words)
Long oldValue = kvStore.get(word);
if (oldValue == null)
kvStore.put(word, 1L);
else
kvStore.put(word, oldValue + 1L);
@Override
public void punctuate(long timestamp)
KeyValueIterator<String, Long> iter = this.kvStore.all();
while (iter.hasNext())
KeyValue<String, Long> entry = iter.next();
context.forward(entry.key, entry.value.toString());
iter.close();
// commit the current processing progress
context.commit();
@Override
public void close()
// close the key-value store
kvStore.close();
init
方法初始化WordCountProcessor
的内部状态,例如检索键值存储。其他方法,如process
和close
,利用了这种状态。
我不清楚如何在 Clojure 中 reify
这样的接口。我们如何将init
检索到的状态传递给process
、close
等?
使用闭包?
我的一个想法是使用闭包:
(let [ctx (atom nil)]
(reify Processor
(close [this]
;; Do something w/ ctx
)
(init [this context]
(reset! ctx context))
(process [this k v]
;; Do something w/ ctx
)
(punctuate [this timestamp]
;; Do something w/ ctx
)))
令人讨厌的是,我们每次都必须从 ProcessorContext
对象开始,因此键值存储代码将在所有需要键值存储的方法中重复。
我没有看到解决此问题的(一般)方法,但根据具体情况,我们可以将 ctx
原子替换为方法所需的更具体的状态。
有没有更好的办法?
【问题讨论】:
【参考方案1】:关闭一个原子将是实现它的主要方法。您的原始类有两个字段,因此您可以关闭两个原子以获得相同的效果
(let [ctx (atom nil)
kv-store (atom nil)]
(reify Processor
,,,
(init [this context]
(reset! ctx context)
(reset! kv-store (.getStateStore context "Counts")))
,,,))
如果这仍然太乏味,那么您可以添加一些也关闭原子的便利函数
(let [ctx (atom nil)
kv-store (atom nil)]
(def kv-get [key]
(.get @kv-store key))
(def kv-all []
(iterator-seq (.all @kv-store)))
(def kv-put [key value]
(.put @kv-store key value))
(reify Processor
,,,
(init [this context]
(reset! ctx context)
(reset! kv-store (.getStateStore context "Counts")))
,,,
(punctuate [this timestamp]
(do-seq [x (kv-all)]
,,,)
)))
替代方案是使用gen-class,但认为你会更好地使用 reify。
【讨论】:
以上是关于Clojure:实现有状态的 Java 接口的主要内容,如果未能解决你的问题,请参考以下文章