记录所有 presto 查询

Posted

技术标签:

【中文标题】记录所有 presto 查询【英文标题】:Logging all presto queries 【发布时间】:2018-04-27 10:59:06 【问题描述】:

如何将提交给 presto 集群的所有查询存储在一个文件(ORC 文件)或其他数据库中。目的是记录所有在 presto worker 上执行的查询。

我知道我需要覆盖 queryCompleted 方法,我也尝试关注this 和那里提到的其他链接,但我无法使用 maven 创建正确的 jar。放置maven生成的presto jar文件后,我的presto就停止工作了。

我是 presto 和 maven 的新手。如果有人可以帮助我,那就太好了。

【问题讨论】:

【参考方案1】:

这是我的方式,它适用于 EMR5.9 (presto 0.184)。

首先,如您所知,您可以使用事件监听器。 就我而言,我使用https://github.com/wyukawa/presto-fluentd 来收集查询日志,因为 fluentd 很方便。(重试容易,发送容易 到多个数据存储) 如果你想创建新的事件监听器插件,你也可以参考这个,因为它非常简单。 (或者https://github.com/zz22394/presto-audit也可以用)

接下来,您必须安装事件监听器插件。 如果您使用 EMR,您可以使用此脚本在 bootstrap actions 上安装 presto-fluentd

# cf. https://github.com/mozilla/emr-bootstrap-presto/blob/master/files/bootstrap/presto-plugins.sh
#!/bin/bash

set -exo pipefail

# re-exec with sudo into background
if [ $(whoami) != root ]; then
  sudo "$0" "$@" &
  exit 0
fi

# set variables
s3uri=$1
fluentd_endpoint=$2

# wait until presto is installed and running
until test -s /var/run/presto/presto-server.pid; do sleep 1; done

# make symbolic link
sudo mkdir -p /usr/lib/presto/etc 2>/dev/null
sudo ln -s /usr/lib/presto/etc /mnt/var/lib/presto/data

# download presto plugins
aws s3 sync $s3uri/jar/ /usr/lib/presto/plugin/
aws s3 sync $s3uri/properties /usr/lib/presto/etc/

# make sure all plugins are owned by presto user
chown -R presto:presto /usr/lib/presto/plugin
chown -R presto:presto /usr/lib/presto/etc

# set event-listner.properties endpoint parameter
echo "event-listener.fluentd-host=$fluentd_endpoint" >> 
/usr/lib/presto/etc/event-listener.properties

# restart presto
stop  presto-server
start presto-server

事件监听器.properties:

event-listener.name=presto-fluentd
event-listener.fluentd-port=24224
event-listener.fluentd-tag=presto.query

在 s3 目录内:

$ aws s3 ls s3://<s3 bucket>/emr/bootstrap_actions/plugins/jar/presto-fluentd/
2017-10-30 19:12:59      90318 fluency-1.3.0.jar
2017-10-30 19:12:59    2521113 guava-21.0.jar
2017-10-30 19:12:59      55783 jackson-annotations-2.8.1.jar
2017-10-30 19:12:59     252303 jackson-core-2.7.1.jar
2017-10-30 19:12:59    1199160 jackson-databind-2.7.1.jar
2017-10-30 19:12:59      30488 jackson-dataformat-msgpack-0.8.12.jar
2017-10-30 19:12:59       3907 log-0.148.jar
2017-10-30 19:12:59     116125 msgpack-core-0.8.12.jar
2017-10-30 19:12:59       5509 phi-accural-failure-detector-0.0.4.jar
2017-10-30 19:12:59       6130 presto-fluentd-0.0.1.jar
2017-10-30 19:12:59      41077 slf4j-api-1.7.22.jar

$ aws s3 ls s3://<s3 bucket>/emr/bootstrap_actions/plugins/properties/
2017-10-30 19:12:59        109 event-listener.properties

并且只需通过 fluentd 在另一台主机上工作来接收查询日志,如下所示

<match presto.query>
  @type copy
  <store>
    # another data store
  </store>

  <store>
    @type relabel
    @label @presto-query-storage
  </store>
</match>

# In my case, I use bigquery for storing query log
<label @presto-query-storage>
  <match **>
    @label @presto-bigquery-out
    @type record_reformer
    renew_record true
    tag presto.query_storage.big_query
    <record>
      query_id $record["queryId"]
      user_name $record["user"]
      elapsed_time $(record["endTime"] - record["createTime"]) / 1000.0
      start_at 
$Time.at(record["executionStartTime"]/1000).utc.strftime("%Y-%m-%d %H:%M:%S.%3N")
      end_at $Time.at(record["endTime"]/1000).utc.strftime("%Y-%m-%d %H:%M:%S")
      query $record["query"]
      status $record["state"]
    </record>
  </match>
</label>

提示

我使用这个脚本来收集 presto-fluentd 的依赖。

require 'fileutils'
require 'open3'
include FileUtils

TMP_PATH = File.expand_path('../../tmp', __FILE__)
JAR_PATH = File.expand_path('../bootstrap_actions/plugins/jar', __FILE__)
CLONE_URI = 'https://github.com/wyukawa/presto-fluentd'

NEEDED_JAR = %w(
  fluency-1.3.0.jar
  guava-21.0.jar
  jackson-annotations-2.8.1.jar
  jackson-core-2.7.1.jar
  jackson-databind-2.7.1.jar
  jackson-dataformat-msgpack-0.8.12.jar
  log-0.148.jar
  msgpack-core-0.8.12.jar
  phi-accural-failure-detector-0.0.4.jar
  presto-fluentd-0.0.1.jar
  slf4j-api-1.7.22.jar
)

def cleanup_dir
  puts "Clean up #TMP_PATH/presto-fluentd ..."
  rm_r(Dir.glob("#TMP_PATH/presto-fluentd"))
  mkdir_p("#JAR_PATH/presto-fluentd")

  puts "Clean up #JAR_PATH/presto-fluentd ..."
  rm(Dir.glob("#JAR_PATH/presto-fluentd/*.jar"))
end

def clone
  cd(TMP_PATH)

  puts "Download presto-fluentd repo ..."
  out, err, status = Open3.capture2("git clone #CLONE_URI #TMP_PATH/presto-fluentd")
  puts out
end

def mvn
  cd("#TMP_PATH/presto-fluentd")

  puts "Build presto-fluentd ..."
  out, err, status = Open3.capture2("mvn clean package")
  puts out

  out, err, status = Open3.capture2("mvn dependency:copy-dependencies -DoutputDirectory=target -DincludeScope=runtime")
  puts out
end

def copy_dependencies
  cd("#TMP_PATH/presto-fluentd/target")
  puts "Copy jar files to #JAR_PATH ..."

  # FIXME: it's better to fix actual pom.xml for assign scope
  mv(Dir.glob("*.jar").select|file| NEEDED_JAR.include?(file), "#JAR_PATH/presto-fluentd")
  puts "done !!"
end


cleanup_dir
clone
mvn
copy_dependencies

【讨论】:

你有查询创建和查询完成对象返回的信息列表吗?我试图使用反射 api 得到同样的结果,但现在卡住了。 对不起,我不知道你想知道什么......如果你写下你的情况细节(presto 版本、目录结构、启动器日志和服务器环境(在 aws 或你的本地主机上) ,也许问题变得清晰了。(仅供参考:首先我在本地测试了 presto-fluentd,并比较了 AWS(失败情况)和本地(成功情况)之间的启动器日志,如下所示:gist.github.com/reizist/b1a07d6326c48b5ecfaa9aec62182188 我试图获取与 queryCompletedEvent 对象关联的所有字段和方法。类> clazz = queryCompletedEvent.getMetadata().getClass(); for(Field field : clazz.getDeclaredFields()) writer.append(field.getName()+"\r\n ");这给了我这个对象的字段。但我也需要所有嵌套对象。例如:queryCompletedEvent.getFailureInfo().get();有更多的细节,嵌套在它下面。 谁能帮助我在 loki-fluentbit 部署在 kubernetes 上的情况下完成这项工作,我需要从在 K8s 上的另一个容器上运行的另一个应用程序获取日志,但到目前为止我无法找出在这种情况下我应该在 event-listener.properties 文件中保留什么 URL 和端口。

以上是关于记录所有 presto 查询的主要内容,如果未能解决你的问题,请参考以下文章

presto访问 Azure blob storage

无法使用 presto 从 mongodb 获取记录

Presto Web UI

Athena (Hive/Presto) Parquet vs ORC 计数查询

Presto系列 | 四Presto Query Planner And Optimizer

Presto系列 | 四Presto Query Planner And Optimizer