flink run -py wordcount.py导致NullPointerException
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink run -py wordcount.py导致NullPointerException相关的知识,希望对你有一定的参考价值。
我想在Windows上使用flink的python api处理数据。但是,当我使用命令将作业提交到本地群集时,它将引发NullPointerException。
bin/flink run -py D:\workspace\python-test\flink-test.py
flink-test.py:
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
t_env.connect(FileSystem().path('D:\\workspace\\python-test\\data.txt')) \
.with_format(OldCsv()
.line_delimiter(' ')
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.register_table_source('mySource')
t_env.connect(FileSystem().path('D:\\workspace\\python-test\\result.txt')) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.register_table_sink('mySink')
t_env.scan('mySource') \
.group_by('word') \
.select('word, count(1)') \
.insert_into('mySink')
t_env.execute("tutorial_job")
答案
我已经解决了这个问题。我通过错误消息阅读了源代码。
NullPointerException是由flinkOptPath为空引起的。我使用flink.bat提交作业,而flink.bat不设置flinkOptPath。所以我像这样在flink.bat中添加了一些代码。 flink.bat目前不完整。我们应该在Linux上运行flink。
以上是关于flink run -py wordcount.py导致NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章