flink run -py wordcount.py导致NullPointerException

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink run -py wordcount.py导致NullPointerException相关的知识,希望对你有一定的参考价值。

我想在Windows上使用flink的python api处理数据。但是,当我使用命令将作业提交到本地群集时,它将引发NullPointerException。

bin/flink run -py D:\workspace\python-test\flink-test.py

flink-test.py:

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('D:\\workspace\\python-test\\data.txt')) \
    .with_format(OldCsv()
                 .line_delimiter(' ')
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .register_table_source('mySource')

t_env.connect(FileSystem().path('D:\\workspace\\python-test\\result.txt')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .register_table_sink('mySink')

t_env.scan('mySource') \
    .group_by('word') \
    .select('word, count(1)') \
    .insert_into('mySink')

t_env.execute("tutorial_job")

enter image description here有人知道为什么吗?

答案

我已经解决了这个问题。我通过错误消息阅读了源代码。

enter image description here

NullPointerException是由flinkOptPath为空引起的。我使用flink.bat提交作业,而flink.bat不设置flinkOptPath。所以我像这样在flink.bat中添加了一些代码。 flink.bat目前不完整。我们应该在Linux上运行flink。enter image description here

以上是关于flink run -py wordcount.py导致NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章

kafkakafka 执行 多个脚本 kafka-run-class.sh 导致 server 节点 时不时挂掉

Flink SourceFunction 初了解

Flink SourceFunction 初了解

Flink Flink 应用资源分配问题排查思路

flink作业提交源码解析-命令行解析及运行

flink作业提交源码解析-命令行解析及运行