尝试使用 Python Jupyter Notebook 将带有地理标记的推文流式传输到 PostgreSQL 时出现问题

Posted

技术标签:

【中文标题】尝试使用 Python Jupyter Notebook 将带有地理标记的推文流式传输到 PostgreSQL 时出现问题【英文标题】:Problem trying to stream geotagged tweets into PostgreSQL using Python Jupyter Notebook 【发布时间】:2021-12-14 19:39:02 【问题描述】:

我正在尝试使用 Jupyter Notebook 中编写的 Python 代码在具有 PostGIS 扩展名的 PostgreSQL 数据库中流式传输推文,但没有成功。我使用了很多教程作为参考,在第一次尝试中,代码似乎可以工作并且没有错误。我什至打印了我已连接到 Twitter API 的消息。然而,没有推文被上传到 PostgreSQL 数据库中。我认为问题可能出在过滤器上(因为也许我正在使用暂时没有推文的过滤器),但经过一些运行删除过滤器或使用其他过滤器后,我发现这不是问题。我认为与 PostgreSQL 的连接也不是问题,因为我尝试将推文直接打印到 Jupyter Notebook 并且没有错误也没有错误。

在根据指南进行一些更改并检查 PostgreSQL 表的格式后,我看到代码连接到 Twitter API,但我一直收到此消息:'str' object is not callable

PostgreSQL 表是使用以下代码创建的,目的是将推文的坐标与点几何一起存储:

CREATE TABLE tweets (tweet_id VARCHAR PRIMARY KEY, user_id VARCHAR, username TEXT, tweet TEXT, hashtags TEXT, lang TEXT, created_at TIMESTAMP, coordinates GEOMETRY);

使用的Python代码是下一个:

#!/usr/bin/env python
# coding: utf-8

#Import libraries
import tweepy
import pandas as pd
import json
import psycopg2
import time
from html.parser import HTMLParser

#Insert Twitter keys
ckey = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
csecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
atoken = "xxxxxxxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
asecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

#Authorize the Twitter API
auth = tweepy.OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)

#Call the Twitter API
api = tweepy.API(auth)

#Define Listener block
class MyStreamListener(tweepy.StreamListener):
   
   def __init__(self, time_limit=300):
       self.start_time = time.time()
       self.limit = time_limit
       super(MyStreamListener, self).__init__()
   
   def on_connect(self):
       print("Connected to Twitter API.")
       
   def on_status(self, status):
       print(status.text)
       
   def on_data(self, raw_data):
       try:
           datos = json.loads(raw_data)
           #Filter only tweets with coordinates
           if datos["coordinates"] != None:
               #Obtain all the variables to store in each column
               tweet_id = datos['id_str']
               user_id = datos['user']['id']
               user_name = datos['user']['name']
               tweet = datos['text']
               hashtags = datos["entities"]["hashtags"]
               lang = datos['user']['lang']
               created_at = datos['created_at']
               coordinates = datos["coordinates"]["coordinates"]
           
               # Connect to database
               dbConnect(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates)
           
           if (time.time() - self.start_time) > self.limit:
               print(time.time(), self.start_time, self.limit)
               return False
       
       except Exception as e:
           print(e)
           
   def on_error(self, status_code):
       if status_code == 420:
           # Returning False in on_data disconnects the stream
           return False

def dbConnect(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates):
   
   #Connect to Twitter database created in PostgreSQL
   conn = psycopg2.connect(host="localhost",database="datos_twitter",port=5433,user="xxxxxxx",password="xxxxxxx")
   #Create a cursor to perform database operations
   cur = conn.cursor()

   #With the cursor, insert tweets into a PostgreSQL table
   command = "INSERT INTO tweets (tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)" 
   cur.execute(command(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates))
   
   #Commit changes
   conn.commit()
   
   #Close cursor and the connection
   cur.close()
   conn.close()

#Streaming of tweets
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener, tweet_mode="extended")
#Filtering of tweets by spatial box and keywords
myStream.filter(locations=[-10.78,34.15, 5.95,44.04], track=['Madrid', 'madrid'])

【问题讨论】:

in on_data() 打印 status 看看里面有什么? 【参考方案1】:

on_data 中的参数是原始数据(从 twitter api 端点接收的字符串 json)

def on_data(self, raw_data):
    """This is called when raw data is received from the stream.
    This method handles sending the data to other methods based on the
    message type.

    Parameters
    ----------
    raw_data : JSON
        The raw data from the stream
    """

像this一样使用on_status,在这个函数中,参数是Status对象,你可以访问它的字段,比如status.text

def on_status(self, status):
    """This is called when a status is received.

    Parameters
    ----------
    status : Status
        The Status received
    """

这些函数名可能在tweepy的不同版本中发生变化,阅读this article安装指定版本的python包

【讨论】:

【参考方案2】:

我已经编辑了代码,修复了评论中指出的错误。我有 def on_status(self, status) 基于之前对我有用的一个例子(在那种情况下我错误地用数据替换了自己)。

连接已创建,我收到了相关消息,但 10 秒后我收到以下错误 TypeError: 'str' object is not callable

完整的错误跟踪是:

TypeError                                 Traceback (most recent call last)
<ipython-input-14-249caad94bfb> in <module>
      4                         tweet_mode="extended")
      5 #Filtering by spatial box and keywords

----> 6 myStream.filter(locations=[-10.78,34.15, 5.95,44.04], track=['Covid','covid-19'])

~\Anaconda3\lib\site-packages\tweepy\streaming.py in filter(self, follow, track, is_async, locations, stall_warnings, languages, encoding, filter_level)
    472             self.body['filter_level'] = filter_level.encode(encoding)
    473         self.session.params = 'delimited': 'length'
--> 474         self._start(is_async)
    475 
    476     def sitestream(self, follow, stall_warnings=False,

~\Anaconda3\lib\site-packages\tweepy\streaming.py in _start(self, is_async)
    387             self._thread.start()
    388         else:
--> 389             self._run()
    390 
    391     def on_closed(self, resp):

~\Anaconda3\lib\site-packages\tweepy\streaming.py in _run(self)
    318             # call a handler first so that the exception can be logged.
    319             self.listener.on_exception(exc_info[1])
--> 320             six.reraise(*exc_info)
    321 
    322     def _data(self, data):

~\Anaconda3\lib\site-packages\six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~\Anaconda3\lib\site-packages\tweepy\streaming.py in _run(self)
    287                     self.snooze_time = self.snooze_time_step
    288                     self.listener.on_connect()
--> 289                     self._read_loop(resp)
    290             except (Timeout, ssl.SSLError) as exc:
    291                 # This is still necessary, as a SSLError can actually be

~\Anaconda3\lib\site-packages\tweepy\streaming.py in _read_loop(self, resp)
    349             next_status_obj = buf.read_len(length)
    350             if self.running and next_status_obj:
--> 351                 self._data(next_status_obj)
    352 
    353             # # Note: keep-alive newlines might be inserted before each length value.

~\Anaconda3\lib\site-packages\tweepy\streaming.py in _data(self, data)
    321 
    322     def _data(self, data):
--> 323         if self.listener.on_data(data) is False:
    324             self.running = False
    325 

~\Anaconda3\lib\site-packages\tweepy\streaming.py in on_data(self, raw_data)
     52         if 'in_reply_to_status_id' in data:
     53             status = Status.parse(self.api, data)
---> 54             if self.on_status(status) is False:
     55                 return False
     56         elif 'delete' in data:

<ipython-input-12-3460245af936> in on_status(self, status)
     34         if not hasattr(status, "retweeted_status") and coordinates!= None:
     35             # Connect to database
---> 36             dbConnect(tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates)
     37 
     38         if (time.time() - self.start_time) > self.limit:

<ipython-input-13-d7acfb1cce67> in dbConnect(tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates)
      6 
      7     command = "INSERT INTO tweets (tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)"
----> 8     cur.execute(command(tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates))
      9 
     10     conn.commit()

TypeError: 'str' object is not callable

我不确定,但似乎错误来自我将推文插入 PostgreSQL 的那一行。

我现在编辑了添加函数def on_data(self, raw_data) 的代码,然后像 cmets 中提到的那样输入def on_status(self, status)。我继续收到错误TypeError: 'str' object is not callable

【讨论】:

以上是关于尝试使用 Python Jupyter Notebook 将带有地理标记的推文流式传输到 PostgreSQL 时出现问题的主要内容,如果未能解决你的问题,请参考以下文章

使用Jupyter notebook的简单说明

Pyechart在Jupyter Lab下无法正确显示图形的问题

设置 jupyter notebook 可远程访问

从 jupyter-notebook 下载 HTML 文件到本地

解决Jupyter notebook报错:AssertionError: wrong color format ‘var(--jp-mirror-editor-variable-color)‘(代码片

将python文件导入jupyter笔记本