Flink Twitter Streaming示例不适用于自定义端点

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink Twitter Streaming示例不适用于自定义端点相关的知识,希望对你有一定的参考价值。

我正在扩展github上的flink连接器以获取自定义URL的twitter流,尽管我能够获得示例代码中给出的随机推文,但是当我提供自定义URL时,推文未被提取(没有任何内容在控制台上打印出来文件)。我已经编写了一个customEndpoint,如下所示

public class CustomEndPoint implements EndpointInitializer, Serializable{
@Override
public StreamingEndpoint createEndpoint() {
    return new RawEndpoint("https://api.twitter.com/1.1/search/tweets.json?q=%23apple", "GET");     
}}

下面是我如何将自定义端点附加到连接器API中给出的TwitterSource类

TwitterSource twitterSource = new TwitterSource(params.getProperties());
    twitterSource.setCustomEndpointInitializer(new CustomEndPoint());
        streamSource = env.addSource(twitterSource);

当我直接修改TwitterSource类的代码(即将我的端点硬编码到字段变量)时,同样的端点也可以工作,但我不想这样做,因为这不是使用API​​的最佳方式,而且我失去了在没有代码更改的情况下提供不同端点的能力

答案

这是因为您必须设置twitterSource主机属性。您的主机不是Flink Twitter源客户端使用的默认主机。

props.setProperty(TwitterSource.CLIENT_HOSTS,"https://api.twitter.com");

使用的默认主机是“https://stream.twitter.com”,请参阅TwitterSource类,运行方法:

    client = new ClientBuilder()
        .name(properties.getProperty(CLIENT_NAME, "flink-twitter-source"))
        .hosts(properties.getProperty(CLIENT_HOSTS, Constants.STREAM_HOST))
        .endpoint(endpoint)
        .authentication(auth)
        .processor(new HosebirdMessageProcessor() {

...

以上是关于Flink Twitter Streaming示例不适用于自定义端点的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming和Flink的Word Count对比

Spark Streaming和Flink的Word Count对比

Spark Streaming和Flink的Word Count对比

flink-streaming-java 在 Apache Flink 中不可用

Apache Flink -Streaming(DataStream API)

Flink SQL管理平台flink-streaming-platform-web安装搭建