汇合:Hdfs 沉入 avro 格式,但在 hive 中读取 avro 文件时,我的时间比“时区”提前 5:30 小时:“亚洲/加尔各答”

Posted

技术标签:

【中文标题】汇合:Hdfs 沉入 avro 格式,但在 hive 中读取 avro 文件时,我的时间比“时区”提前 5:30 小时:“亚洲/加尔各答”【英文标题】:confluent: Hdfs sink to avro format, but while reading the avro file in hive my time is 5:30 hours ahead of "timezone": "Asia/Kolkata" 【发布时间】:2019-05-13 15:39:25 【问题描述】:

我的 HDFS-Sink 连接:


  "name":"hdfs-sink1",
  "config":
    "connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max":"3",
    "topics":"mysql-prod-registrations-",
    "hadoop.conf.dir":"/usr/hdp/current/hadoop-client/conf",
    "hadoop.home":"/usr/hdp/current/hadoop-client",
    "hdfs.url":"hdfs://HACluster:8020",
    "topics.dir":"/topics",
    "logs.dir":"/logs",
    "flush.size":"100",
    "rotate.interval.ms":"60000",
    "format.class":"io.confluent.connect.hdfs.avro.AvroFormat",
    "value.converter.schemas.enable": "true",
    "partitioner.class":"io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner",
    "partition.duration.ms":"1800000",
    "path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/",
    "locale":"kor",
    "timezone":"Asia/Kolkata"
  

但是在 hive 中阅读时,我比 timezone":"Asia/Kolkata" 提前 5:30 小时。如何获取印度时区的时间戳值?

连接正常运行了两天,但现在出现如下错误:

 ERROR WorkerSinkTaskid=hdfs-sink1-2 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
java.lang.NullPointerException
        at io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:133)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:612)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:69)
        at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:672)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
[2018-12-14 12:22:39,670] ERROR WorkerSinkTaskid=hdfs-sink1-2 Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

【问题讨论】:

【参考方案1】:

Asia/Kolkata 在 UTC 之前是 +05:30,所以这很有意义......而且timezone 配置仅适用于 path.format 值,而不适用于 Kafka 记录的内部值。

我不确定您使用哪个工具进行查询,但这可能是个问题,因为我有一些工具假设数据仅以 UTC 时间写入,然后该工具将“移位”和“显示” ” 格式化的本地时间戳...因此,我建议让 HDFS 接收器连接器实际上以 UTC 时间写入,然后让您的 SQL 工具和操作系统自己处理实际的 TZ 转换。

【讨论】:

我上面的连接开始抛出如上所述的错误。它运行良好一段时间并且能够在 HDFS 中获取数据 我不确定。我建议在 connect-log4j.properties 文件中启用 TRACE 日志记录,然后获取输出,并将其作为 Github 问题粘贴到 HDFS 连接器项目中 仍然收到与 Task 被终止相同的错误,并且在手动重新启动之前不会恢复 (org.apache.kafka.connect.runtime.WorkerTask:178)。这实际上工作正常,现在开始抛出错误。我很困惑,无法追踪问题是什么 我看到了那个错误,但是 TRACE 日志记录是为了诊断目的,而不是解决您的问题,我不确定

以上是关于汇合:Hdfs 沉入 avro 格式,但在 hive 中读取 avro 文件时,我的时间比“时区”提前 5:30 小时:“亚洲/加尔各答”的主要内容,如果未能解决你的问题,请参考以下文章

从 HIVE 表加载到 HDFS 作为 AVRO 文件

配置 Spark 写入 HDFS 的 Avro 文件大小

使用 PIG 查询 Avro 数据时出错,Utf8 无法转换为 java.lang.String

将数据保存到HDFS的格式是什么?

HDFS 中的 Avro 模式生成

使用 AVRO 格式的 BigQuery 流式插入