Clojure + 狐猴

Posted

技术标签:

【中文标题】Clojure + 狐猴【英文标题】:Clojure + Lemur 【发布时间】:2013-07-09 23:02:35 【问题描述】:

我正在尝试使用 lemur+clojure 运行一些多步骤作业。

我在将多个输入作为参数传递给 clojure+lemur 时遇到问题。

作为我工作的第一步,我尝试运行 emr Streaming Job

狐猴运行 $CONF_DIR/run-pipeline.clj --master-instance-type $MASTER_INSTANCE_TYPE --slave-instance-type $SLAVE_INSTANCE_TYPE --num-instances $NUM_INSTANCES --ami-版本 $AMI_VERSION --hadoop-version $HADOOP_VERSION--bucket $BUCKET --jar-src-path $CONF_DIR/run-pipeline.clj --input_folder "$input_folder" --output -folder "$output_folder" --reduce-tasks "$REDUCE_TASKS" --map-tasks "$MAP_TASKS"

使用单个输入文件,我的代码如下所示

(import com.amazonaws.services.elasticmapreduce.util.StepFactory)
(import com.amazonaws.services.elasticmapreduce.model.StepConfig)
(import com.amazonaws.services.elasticmapreduce.util.StreamingStep)


(defn create-step-parsing [eopts]
 (def step (new StreamingStep))
 (.withInputs step (into-array [(str (:input-folder eopts) "/inputs/*")]))
 ...

这很好用,但是当我尝试传递输入文件列表时出现错误

狐猴运行 $CONF_DIR/run-pipeline.clj --master-instance-type $MASTER_INSTANCE_TYPE --slave-instance-type $SLAVE_INSTANCE_TYPE --num-instances $NUM_INSTANCES --ami-版本 $AMI_VERSION --hadoop-version $HADOOP_VERSION--bucket $BUCKET --jar-src-path $CONF_DIR/run-pipeline.clj --input_folder "$input_folder1" --input_folder "$input_folder2" --input_folder "$input_folder3" --input_folder "$input_folder" --output-folder "$output_folder" --reduce-tasks "$REDUCE_TASKS" --map-任务“$MAP_TASKS”

(defn create-normalizer-step [eopts]
  (def step (new StreamingStep))
  (.withInputs step (to-array (:input-folder eopts)))

这是我遇到的错误

15:44:05 Exception in thread "main" java.lang.ClassCastException
15:44:05    at java.lang.Class.cast(Class.java:2990)
15:44:05    at clojure.lang.Reflector.boxArg(Reflector.java:429)
15:44:05    at clojure.lang.Reflector.boxArgs(Reflector.java:462)
15:44:05    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:57)
15:44:05    at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:30)
15:44:05    at run_pipeline17$create_normalizer_step.invoke(run-pipeline.clj:18)
15:44:05    at run_pipeline17$run_pipeline.invoke(run-pipeline.clj:209)
15:44:05    at lemur.core$fire_BANG_.doInvoke(core.clj:711)
15:44:05    at clojure.lang.RestFn.invoke(RestFn.java:423)
15:44:05    at run_pipeline17$eval178.invoke(run-pipeline.clj:222)
15:44:05    at clojure.lang.Compiler.eval(Compiler.java:6465)
15:44:05    at clojure.lang.Compiler.load(Compiler.java:6902)
15:44:05    at clojure.lang.Compiler.loadFile(Compiler.java:6863)
15:44:05    at clojure.lang.RT$3.invoke(RT.java:305)
15:44:05    at lemur.core$execute_jobdef.invoke(core.clj:742)
15:44:05    at lemur.core$_main$fn__1388.invoke(core.clj:929)
15:44:05    at lemur.core$_main.doInvoke(core.clj:924)
15:44:05    at clojure.lang.RestFn.applyTo(RestFn.java:137)
15:44:05    at lemur.core.main(Unknown Source)

我添加的代码是从第 17 行到第 19 行。

谢谢

【问题讨论】:

你还在努力解决这个问题吗? 【参考方案1】:

我认为问题出在您使用的数组函数上。在您使用的第一个工作示例中 (into-array)。

来自文档:

(入数组 aseq)

(入数组类型aseq)

返回一个数组,其中组件设置为 aseq 中的值。这 数组的组件类型是 type(如果提供),或者是第一个的类型 aseq 中的值(如果存在)或 Object. aseq 中的所有值必须是 与组件类型兼容。基元的类对象 可以使用例如 Integer/TYPE 来获取类型。

因为您正在创建字符串作为传入数组的参数,所以 clojure 正确地为您创建了一个“字符串”数组。

在第二个损坏的示例中,您使用 (to-array)。来自文档:

(到数组 coll)

返回一个包含 coll 内容的 Objects 数组,它可以 是任何集合。映射到 java.util.Collection.toArray()。

在本例中,您创建了一个“对象”数组。根据 StreamingStep 的 AWS Java API,这与“字符串”类型不兼容。

我的建议是使用 (into-array),类型规范为 String。例如:(into-array String ["hello" "goodby"])。在我看来,显式类型规范很好,以便将来阅读澄清。但正如您在文档中看到的那样,(into-array) 会为您猜测输入。

【讨论】:

以上是关于Clojure + 狐猴的主要内容,如果未能解决你的问题,请参考以下文章

Clojure 类型提示,无法解析类名 clojure.core$double

Clojure基础课程2-Clojure中的数据长啥样?

Clojure基础课程2-Clojure中的数据长啥样?

《Learn Clojure》直播第二期:Clojure 数据类型介绍

Lein Clojure 1.3 与 Clojure 1.2.1

Clojure使用Java方法