SnappyData - 创建 Kafka 流表时出错
Posted
技术标签:
【中文标题】SnappyData - 创建 Kafka 流表时出错【英文标题】:SnappyData - Error creating Kafka streaming table 【发布时间】:2016-08-09 00:26:35 【问题描述】:我在使用 snappy shell 中的 kafka 创建 spark 流表时遇到问题。
'异常'无效输入'C',预期dmlOperation,insert,withIdentifier,select或put(第1行,第1列):'
参考:http://snappydatainc.github.io/snappydata/streamingWithSQL/#spark-streaming-overview
这是我的sql:
CREATE STREAM TABLE if not exists sensor_data_stream
(sensor_id string, metric string)
using kafka_stream
options (
storagelevel 'MEMORY_AND_DISK_SER_2',
rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter',
zkQuorum 'localhost:2181',
groupId 'streamConsumer',
topics 'test:01');
shell 似乎不喜欢第一个字符“C”处的脚本。我正在尝试使用以下命令执行脚本:
snappy> run '/scripts/my_test_sensor_script.sql';
任何帮助表示赞赏!
【问题讨论】:
嗨,迈克,我们会尽快为您解答 【参考方案1】:文档和实际语法有些不一致。正确的语法是:
CREATE STREAM TABLE sensor_data_stream if not exists (sensor_id string,
metric string) using kafka_stream
options (storagelevel 'MEMORY_AND_DISK_SER_2',
rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter',
zkQuorum 'localhost:2181',
groupId 'streamConsumer', topics 'test:01');
您需要做的另一件事是为您的数据编写行转换器
【讨论】:
谢谢萨钦!更新的语法有效。正如您对编写行转换器的评论所预期的那样,我在 io.snappydata.app.streaming.KafkaStreamToRowsConverter 上遇到了 ClassNotFound 异常。我将搜索文档以解决该问题。【参考方案2】:Mike,你需要通过实现以下 trait 来创建自己的 rowConverter 类 -
trait StreamToRowsConverter extends Serializable
def toRows(message: Any): Seq[Row]
然后在 DDL 中指定 rowConverter 完全限定的类名。 rowConverter 特定于模式。 'io.snappydata.app.streaming.KafkaStreamToRowsConverter' 只是一个占位符类名,应替换为您自己的 rowConverter 类。
【讨论】:
感谢 Yogesh。我正在尝试转换我发送到 Kafka 的 Java 对象。我试图了解如何使用行转换器转换此对象。您是否推荐任何文档来解释此过程?以上是关于SnappyData - 创建 Kafka 流表时出错的主要内容,如果未能解决你的问题,请参考以下文章
DBVisualizer 和 SnappyData 的数据库配置文件?