尝试使用 Python Jupyter Notebook 将带有地理标记的推文流式传输到 PostgreSQL 时出现问题
Posted
技术标签:
【中文标题】尝试使用 Python Jupyter Notebook 将带有地理标记的推文流式传输到 PostgreSQL 时出现问题【英文标题】:Problem trying to stream geotagged tweets into PostgreSQL using Python Jupyter Notebook 【发布时间】:2021-12-14 19:39:02 【问题描述】:我正在尝试使用 Jupyter Notebook 中编写的 Python 代码在具有 PostGIS 扩展名的 PostgreSQL 数据库中流式传输推文,但没有成功。我使用了很多教程作为参考,在第一次尝试中,代码似乎可以工作并且没有错误。我什至打印了我已连接到 Twitter API 的消息。然而,没有推文被上传到 PostgreSQL 数据库中。我认为问题可能出在过滤器上(因为也许我正在使用暂时没有推文的过滤器),但经过一些运行删除过滤器或使用其他过滤器后,我发现这不是问题。我认为与 PostgreSQL 的连接也不是问题,因为我尝试将推文直接打印到 Jupyter Notebook 并且没有错误也没有错误。
在根据指南进行一些更改并检查 PostgreSQL 表的格式后,我看到代码连接到 Twitter API,但我一直收到此消息:'str' object is not callable
PostgreSQL 表是使用以下代码创建的,目的是将推文的坐标与点几何一起存储:
CREATE TABLE tweets (tweet_id VARCHAR PRIMARY KEY, user_id VARCHAR, username TEXT, tweet TEXT, hashtags TEXT, lang TEXT, created_at TIMESTAMP, coordinates GEOMETRY);
使用的Python代码是下一个:
#!/usr/bin/env python
# coding: utf-8
#Import libraries
import tweepy
import pandas as pd
import json
import psycopg2
import time
from html.parser import HTMLParser
#Insert Twitter keys
ckey = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
csecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
atoken = "xxxxxxxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
asecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
#Authorize the Twitter API
auth = tweepy.OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)
#Call the Twitter API
api = tweepy.API(auth)
#Define Listener block
class MyStreamListener(tweepy.StreamListener):
def __init__(self, time_limit=300):
self.start_time = time.time()
self.limit = time_limit
super(MyStreamListener, self).__init__()
def on_connect(self):
print("Connected to Twitter API.")
def on_status(self, status):
print(status.text)
def on_data(self, raw_data):
try:
datos = json.loads(raw_data)
#Filter only tweets with coordinates
if datos["coordinates"] != None:
#Obtain all the variables to store in each column
tweet_id = datos['id_str']
user_id = datos['user']['id']
user_name = datos['user']['name']
tweet = datos['text']
hashtags = datos["entities"]["hashtags"]
lang = datos['user']['lang']
created_at = datos['created_at']
coordinates = datos["coordinates"]["coordinates"]
# Connect to database
dbConnect(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates)
if (time.time() - self.start_time) > self.limit:
print(time.time(), self.start_time, self.limit)
return False
except Exception as e:
print(e)
def on_error(self, status_code):
if status_code == 420:
# Returning False in on_data disconnects the stream
return False
def dbConnect(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates):
#Connect to Twitter database created in PostgreSQL
conn = psycopg2.connect(host="localhost",database="datos_twitter",port=5433,user="xxxxxxx",password="xxxxxxx")
#Create a cursor to perform database operations
cur = conn.cursor()
#With the cursor, insert tweets into a PostgreSQL table
command = "INSERT INTO tweets (tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)"
cur.execute(command(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates))
#Commit changes
conn.commit()
#Close cursor and the connection
cur.close()
conn.close()
#Streaming of tweets
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener, tweet_mode="extended")
#Filtering of tweets by spatial box and keywords
myStream.filter(locations=[-10.78,34.15, 5.95,44.04], track=['Madrid', 'madrid'])
【问题讨论】:
inon_data()
打印 status
看看里面有什么?
【参考方案1】:
on_data
中的参数是原始数据(从 twitter api 端点接收的字符串 json)
def on_data(self, raw_data):
"""This is called when raw data is received from the stream.
This method handles sending the data to other methods based on the
message type.
Parameters
----------
raw_data : JSON
The raw data from the stream
"""
像this一样使用on_status
,在这个函数中,参数是Status
对象,你可以访问它的字段,比如status.text
def on_status(self, status):
"""This is called when a status is received.
Parameters
----------
status : Status
The Status received
"""
这些函数名可能在tweepy
的不同版本中发生变化,阅读this article安装指定版本的python包
【讨论】:
【参考方案2】:我已经编辑了代码,修复了评论中指出的错误。我有 def on_status(self, status)
基于之前对我有用的一个例子(在那种情况下我错误地用数据替换了自己)。
连接已创建,我收到了相关消息,但 10 秒后我收到以下错误 TypeError: 'str' object is not callable
完整的错误跟踪是:
TypeError Traceback (most recent call last)
<ipython-input-14-249caad94bfb> in <module>
4 tweet_mode="extended")
5 #Filtering by spatial box and keywords
----> 6 myStream.filter(locations=[-10.78,34.15, 5.95,44.04], track=['Covid','covid-19'])
~\Anaconda3\lib\site-packages\tweepy\streaming.py in filter(self, follow, track, is_async, locations, stall_warnings, languages, encoding, filter_level)
472 self.body['filter_level'] = filter_level.encode(encoding)
473 self.session.params = 'delimited': 'length'
--> 474 self._start(is_async)
475
476 def sitestream(self, follow, stall_warnings=False,
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _start(self, is_async)
387 self._thread.start()
388 else:
--> 389 self._run()
390
391 def on_closed(self, resp):
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _run(self)
318 # call a handler first so that the exception can be logged.
319 self.listener.on_exception(exc_info[1])
--> 320 six.reraise(*exc_info)
321
322 def _data(self, data):
~\Anaconda3\lib\site-packages\six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _run(self)
287 self.snooze_time = self.snooze_time_step
288 self.listener.on_connect()
--> 289 self._read_loop(resp)
290 except (Timeout, ssl.SSLError) as exc:
291 # This is still necessary, as a SSLError can actually be
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _read_loop(self, resp)
349 next_status_obj = buf.read_len(length)
350 if self.running and next_status_obj:
--> 351 self._data(next_status_obj)
352
353 # # Note: keep-alive newlines might be inserted before each length value.
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _data(self, data)
321
322 def _data(self, data):
--> 323 if self.listener.on_data(data) is False:
324 self.running = False
325
~\Anaconda3\lib\site-packages\tweepy\streaming.py in on_data(self, raw_data)
52 if 'in_reply_to_status_id' in data:
53 status = Status.parse(self.api, data)
---> 54 if self.on_status(status) is False:
55 return False
56 elif 'delete' in data:
<ipython-input-12-3460245af936> in on_status(self, status)
34 if not hasattr(status, "retweeted_status") and coordinates!= None:
35 # Connect to database
---> 36 dbConnect(tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates)
37
38 if (time.time() - self.start_time) > self.limit:
<ipython-input-13-d7acfb1cce67> in dbConnect(tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates)
6
7 command = "INSERT INTO tweets (tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)"
----> 8 cur.execute(command(tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates))
9
10 conn.commit()
TypeError: 'str' object is not callable
我不确定,但似乎错误来自我将推文插入 PostgreSQL 的那一行。
我现在编辑了添加函数def on_data(self, raw_data)
的代码,然后像 cmets 中提到的那样输入def on_status(self, status)
。我继续收到错误TypeError: 'str' object is not callable
【讨论】:
以上是关于尝试使用 Python Jupyter Notebook 将带有地理标记的推文流式传输到 PostgreSQL 时出现问题的主要内容,如果未能解决你的问题,请参考以下文章
Pyechart在Jupyter Lab下无法正确显示图形的问题
从 jupyter-notebook 下载 HTML 文件到本地
解决Jupyter notebook报错:AssertionError: wrong color format ‘var(--jp-mirror-editor-variable-color)‘(代码片