Tweepy流媒体套接字无法发送预处理的文本

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Tweepy流媒体套接字无法发送预处理的文本相关的知识,希望对你有一定的参考价值。

我有两个程序,它们通过套接字连接。一种是蠕动的StreamListener,我还使用“ tweet-preprocessor”库对数据进行了预处理。另一个程序应连接到该套接字并通过Spark结构化流分析数据。问题是,当我在发送数据之前对数据进行预处理时,Spark无法得到批处理。

这是StreamListener

import tweepy
import socket
import json
import preprocessor as p

CONSUMER_KEY = ""
CONSUMER_SECRET = ""
ACCESS_TOKEN = ""
ACCESS_TOKEN_SECRET = ""
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

p.set_options(p.OPT.URL, p.OPT.EMOJI, p.OPT.SMILEY)

class MyStreamListener(tweepy.StreamListener):
    def __init__(self, csocket):
        self.client_socket = csocket

    def on_data(self, raw_data):
        try:
            data = json.loads(raw_data)
            clean_text = p.clean(data["text"])
            print(clean_text)
            self.client_socket.send(clean_text.encode("utf-8"))
            return True
        except BaseException as e:
            print("Error: " + str(e))
        return True

    def on_error(self, status_code):
        print(status_code)
        return True


skt = socket.socket()
host = "localhost"
port = 5555
skt.bind((host, port))
skt.listen()
client, address = skt.accept()

myStreamListener = MyStreamListener(csocket=client)
myStream = tweepy.Stream(auth=auth, listener=myStreamListener, )
myStream.filter(track=["Trump"], languages=["en"])

以及简单的Spark代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, size

spark = SparkSession.builder.appName("TwitterSpark").getOrCreate()

lines = spark.readStream.format("socket").option("host", "localhost").option("port", 5555).load()

#tweetlength = lines.select(
#        size(split(lines.value, " ")).alias("tweetlength")
#)

query = lines.writeStream.outputMode("update").format("console").start()

query.awaitTermination()
答案

[最有可能clean_text末尾没有换行符(\n)。与自动添加新行的print(clean_text)不同,socket.send()照原样从clean_text.encode("utf-8")发送字节,您需要显式添加\n

self.client_socket.send((clean_text + "\n").encode("utf-8"))

没有\n来分隔套接字数据中的行,Spark会将输入视为一条增长的行,除非tweet文本本身中有新行。

以上是关于Tweepy流媒体套接字无法发送预处理的文本的主要内容,如果未能解决你的问题,请参考以下文章

Tweepy [ Twitter API v2 ] 无法上传照片/媒体 [重复]

Python API:使用没有文件的媒体发推文

从 twitter 流中排除回复 - tweepy

一起发送 JSON 和 blob

UWP物联网核心RTSP流媒体音频

为非文本媒体类型发送带有 Content-Type 标头的字符集参数是不是绝对错误?