clojure.java.jdbc /查询大结果集懒惰
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了clojure.java.jdbc /查询大结果集懒惰相关的知识,希望对你有一定的参考价值。
我正在尝试从数据库中读取数百万行并写入文本文件。
这是我的问题database dump to text file with side effects的延续
我现在的问题似乎是在程序完成之前不会发生日志记录。我没有懒惰处理的另一个指标是在程序完成之前根本不写入文本文件。
基于IRC提示,似乎我的问题可能与:result-set-fn
and默认为doall
在代码的clojure.java.jdbc/query
区域。
我试图用for
函数替换它,但仍然发现内存消耗很高,因为它将整个结果集拉入内存。
我怎么能有:result-set-fn
不像doall
那样拉动所有东西?如何在程序运行时逐步编写日志文件,而不是在-main
执行完成后转储所有内容?
(let [
db-spec local-postgres
sql "select * from public.f_5500_sf "
log-report-interval 1000
fetch-size 100
field-delim " "
row-delim "
"
db-connection (doto ( j/get-connection db-spec) (.setAutoCommit false))
statement (j/prepare-statement db-connection sql :fetch-size fetch-size )
joiner (fn [v] (str (join field-delim v ) row-delim ) )
start (System/currentTimeMillis)
rate-calc (fn [r] (float (/ r (/ ( - (System/currentTimeMillis) start) 100))))
row-count (atom 0)
result-set-fn (fn [rs] (lazy-seq rs))
lazy-results (rest (j/query db-connection [statement] :as-arrays? true :row-fn joiner :result-set-fn result-set-fn))
]; }}}
(.setAutoCommit db-connection false)
(info "Started dbdump session...")
(with-open [^java.io.Writer wrtr (io/writer "output.txt")]
(info "Running query...")
(doseq [row lazy-results]
(.write wrtr row)
))
(info (format "Completed write with %d rows" @row-count))
)
我通过将clojure.java.jdbc
放在我的project.clj依赖项列表中来获取[org.clojure/java.jdbc "0.3.0-beta1"]
最近的修复。这个增强/纠正:as-arrays? true
描述clojure.java.jdbc/query
的here功能。
我认为这有点帮助,但我可能仍然能够覆盖:result-set-fn
到vec
。
通过将所有行逻辑塞入:row-fn
解决了核心问题。最初的OutOfMemory问题与迭代j/query
结果集而不是定义特定的:row-fn
有关。
新(工作)代码如下:
(defn -main []
(let [; {{{
db-spec local-postgres
source-sql "select * from public.f_5500 "
log-report-interval 1000
fetch-size 1000
row-count (atom 0)
field-delim "u0001" ; unlikely to be in source feed,
; although i should still check in
; replace-newline below (for when " "
; is used especially)
row-delim "
" ; unless fixed-width, target doesn't
; support non-printable chars for recDelim like
db-connection (doto ( j/get-connection db-spec) (.setAutoCommit false))
statement (j/prepare-statement db-connection source-sql :fetch-size fetch-size :concurrency :read-only)
start (System/currentTimeMillis)
rate-calc (fn [r] (float (/ r (/ ( - (System/currentTimeMillis) start) 100))))
replace-newline (fn [s] (if (string? s) (clojure.string/replace s #"
" " ") s))
row-fn (fn [v]
(swap! row-count inc)
(when (zero? (mod @row-count log-report-interval))
(info (format "wrote %d rows" @row-count))
(info (format " rows/s %.2f" (rate-calc @row-count)))
(info (format " Percent Mem used %s " (memory-percent-used))))
(str (join field-delim (doall (map #(replace-newline %) v))) row-delim ))
]; }}}
(info "Started database table dump session...")
(with-open [^java.io.Writer wrtr (io/writer "./sql/output.txt")]
(j/query db-connection [statement] :as-arrays? true :row-fn
#(.write wrtr (row-fn %))))
(info (format " Completed with %d rows" @row-count))
(info (format " Completed in %s seconds" (float (/ (- (System/currentTimeMillis) start) 1000))))
(info (format " Average rows/s %.2f" (rate-calc @row-count)))
nil)
)
我试验的其他事情(成功有限)涉及音色记录和关闭标准;我想知道如果使用REPL它可能会在显示回我的编辑器(vim壁炉)之前缓存结果,我不确定这是否利用了大量的内存。
另外,我使用(.freeMemory (java.lang.Runtime/getRuntime))
在记忆中添加了记录部分。我对VisualVM并不熟悉并准确指出我的问题所在。
我很高兴现在的工作方式,感谢大家的帮助。
你可以使用prepare-statement
和:fetch-size
选项。否则,尽管结果以惰性序列传递,但查询本身仍然很渴望。
prepare-statement
需要一个连接对象,因此您需要显式创建一个。以下是您的使用情况的示例:
(let [db-spec local-postgres
sql "select * from big_table limit 500000 "
fetch-size 10000 ;; or whatever's appropriate
cnxn (doto (j/get-connection db-spec)
(.setAutoCommit false))
stmt (j/prepare-statement cnxn sql :fetch-size fetch-size)
results (rest (j/query cnxn [stmt]))]
;; ...
)
另外一个选项
由于问题似乎与query
,尝试with-query-results
。它被认为已被弃用但仍然存在且有效。这是一个示例用法:
(let [db-spec local-postgres
sql "select * from big_table limit 500000 "
fetch-size 100 ;; or whatever's appropriate
cnxn (doto (j/get-connection db-spec)
(.setAutoCommit false))
stmt (j/prepare-statement cnxn sql :fetch-size fetch-size)]
(j/with-query-results results [stmt] ;; binds the results to `results`
(doseq [row results]
;;
)))
我已经找到了一个更好的解决方案:你需要在事务中声明一个游标并从中获取数据块。例:
(db/with-tx
(db/execute! "declare cur cursor for select * from huge_table")
(loop []
(when-let [rows (-> "fetch 10 from cur" db/query not-empty)]
(doseq [row rows]
(process-a-row row))
(recur))))
在这里,db/with-tx
,db/execute!
和db/query
是我自己在db
命名空间中声明的快捷方式:
(def ^:dynamic
*db* {:dbtype "postgresql"
:connection-uri <some db url>)})
(defn query [& args]
(apply jdbc/query *db* args))
(defn execute! [& args]
(apply jdbc/execute! *db* args))
(defmacro with-tx
"Runs a series of queries into transaction."
[& body]
`(jdbc/with-db-transaction [tx# *db*]
(binding [*db* tx#]
~@body)))
以上是关于clojure.java.jdbc /查询大结果集懒惰的主要内容,如果未能解决你的问题,请参考以下文章
Clojure PostgreSQL JDBC执行查询时不断出错