尝试使用 Tweepy/Twitters Streaming API 和 psycopg2 来填充 PostgreSQL 数据库。很近,一条线

Posted

技术标签:

【中文标题】尝试使用 Tweepy/Twitters Streaming API 和 psycopg2 来填充 PostgreSQL 数据库。很近,一条线【英文标题】:Trying to use Tweepy/Twitters Streaming API and psycopg2 to populate a PostgreSQL database. Very close, one line off 【发布时间】:2012-11-15 19:59:57 【问题描述】:

我一直在尝试使用 Tweepy 和 Twitter 的 Streaming API 在 PostreSQL 数据库中填充表。我非常接近,我相信我离得到它只有一条线。我看过很多例子,包括: http://andrewbrobinson.com/2011/07/15/using-tweepy-to-access-the-twitter-stream/ http://blog.creapptives.com/post/14062057061/the-key-value-store-everyone-ignored-postgresql Python tweepy writing to sqlite3 db tweepy stream to sqlite database - invalid synatx Using tweepy to access Twitter's Streaming API 等等等等

我可以很容易地使用 Tweepy 流式传输推文,所以我知道我的消费者密钥、消费者密钥、访问密钥和访问密钥是正确的。我还设置了 Postgres,并成功连接到我创建的数据库。我使用 .py 文件中的 psycopg2 将硬编码值测试到我的数据库中的表中,这也有效。我正在根据我选择的关键字获取推文,并成功连接到数据库中的表。现在我只需要将推文流式传输到我的 postgres 数据库中的表中。就像我说的,我是如此接近,任何帮助将非常感激。

这个精简的脚本将数据插入到我想要的表中:

import psycopg2

try:
    conn = psycopg2.connect("dbname=teststreamtweets user=postgres password=x host=localhost")
    print "connected"
except:
    print "unable to connect"

namedict = (
    "first_name":"Joshua", "last_name":"Drake",
    "first_name":"Steven", "last_name":"Foo",
    "first_name":"David", "last_name":"Bar"
    )

cur = conn.cursor()

cur.executemany("""INSERT INTO testdata(first_name, last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict);

conn.commit()

下面是我已经编辑了一段时间的脚本,现在试图让它工作:

import psycopg2
import time
import json
from getpass import getpass
import tweepy

consumer_key = 'x'
consumer_secret = 'x'
access_key = 'x'
access_secret = 'x'

connection = psycopg2.connect("dbname=teststreamtweets user=postgres password=x host=localhost")
cursor = connection.cursor()

#always use this step to begin clean
def reset_cursor():
    cursor = connection.cursor()

class StreamWatcherListener(tweepy.StreamListener):
    def on_data(self, data):
        try:
            print 'before cursor' + data
            connection = psycopg2.connect("dbname=teststreamtweets user=postgres password=x host=localhost")
            cur = connection.cursor()
            print 'status is: ' + str(connection.status)
            #cur.execute("INSERT INTO tweet_list VALUES (%s)" % (data.text))
            cur.executemany("""INSERT INTO tweets(tweet) VALUES (%(text)s)""", data);
            connection.commit()
            print '---------'
            print type(data)
            #print data
        except Exception as e:
            connection.rollback()
            reset_cursor()
            print "not saving"
            return 
        if cursor.lastrowid == None:
            print "Unable to save"

    def on_error(self, status_code):
        print 'Error code = %s' % status_code
        return True

    def on_timeout(self):
        print 'timed out.....'

print 'welcome'
auth1 = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth1.set_access_token(access_key, access_secret)
api = tweepy.API(auth1)

l = StreamWatcherListener()
print 'about to stream'
stream = tweepy.Stream(auth = auth1, listener = l)

setTerms = ['microsoft']
#stream.sample()
stream.filter(track = setTerms)

对不起,如果代码有点乱,但已经尝试了很多选项。就像我说的那样,任何建议、有用示例的链接等都将不胜感激,因为我已经尝试了我能想到的一切,现在正在长途跋涉。非常感谢。

【问题讨论】:

【参考方案1】:

好吧,我不确定您为什么要为此使用类,以及为什么您的类中没有定义__init__。看起来很复杂。

这是我用来做这些事情的函数的基本版本。我只使用过 sqlite,但语法看起来基本相同。也许你可以从中得到一些东西。

def retrieve_tweets(numtweets=10, *args):
    """
    This function optionally takes one or more arguments as keywords to filter tweets.
    It iterates through tweets from the stream that meet the given criteria and sends them 
    to the database population function on a per-instance basis, so as to avoid disaster 
    if the stream is disconnected.

    Both SampleStream and FilterStream methods access Twitter's stream of status elements.
    """   
    filters = []
    for key in args:
        filters.append(str(key))
    if len(filters) == 0:
        stream = tweetstream.SampleStream(username, password)  
    else:
        stream = tweetstream.FilterStream(username, password, track=filters)
    try:
        count = 0
        while count < numtweets:       
            for tweet in stream:
                # a check is needed on text as some "tweets" are actually just API operations
                # the language selection doesn't really work but it's better than nothing(?)
                if tweet.get('text') and tweet['user']['lang'] == 'en':   
                    if tweet['retweet_count'] == 0:
                        # bundle up the features I want and send them to the db population function
                        bundle = (tweet['id'], tweet['user']['screen_name'], tweet['retweet_count'], tweet['text'])
                        db_initpop(bundle)
                        break
                    else:
                        # a RT has a different structure.  This bundles the original tweet.  Getting  the
                        # retweets comes later, after the stream is de-accessed.
                        bundle = (tweet['retweeted_status']['id'], tweet['retweeted_status']['user']['screen_name'], \
                                  tweet['retweet_count'], tweet['retweeted_status']['text'])
                        db_initpop(bundle)
                        break
            count += 1
    except tweetstream.ConnectionError, e:
        print 'Disconnected from Twitter at '+time.strftime("%d %b %Y %H:%M:%S", time.localtime()) \
        +'.  Reason: ', e.reason

def db_initpop(bundle):
    """
    This function places basic tweet features in the database.  Note the placeholder values:
    these can act as a check to verify that no further expansion was available for that method.
    """
    #unpack the bundle 
    tweet_id, user_sn, retweet_count, tweet_text = bundle
    curs.execute("""INSERT INTO tblTweets VALUES (null,?,?,?,?,?,?)""", \
        (tweet_id, user_sn, retweet_count, tweet_text, 'cleaned text', 'cleaned retweet text'))
    conn.commit()
    print 'Database populated with tweet '+str(tweet_id)+' at '+time.strftime("%d %b %Y %H:%M:%S", time.localtime())

祝你好运!

【讨论】:

以上是关于尝试使用 Tweepy/Twitters Streaming API 和 psycopg2 来填充 PostgreSQL 数据库。很近,一条线的主要内容,如果未能解决你的问题,请参考以下文章

scala

scala

求这道c语言编程题的答案

使用 commitAsync 提交偏移量时出现 Kafka 异常

如何在块颤动中处理列表模型

如何使用 Firebase Cloud Firestore 对方法进行单元测试?