Improved tweet downloader (CLI and API)

master
J. Fernando Sánchez 8 years ago
parent e3a78968da
commit 0a0d8fd5f1

@ -1 +1 @@
0.7.0 0.7.1

@ -7,4 +7,10 @@ import os
from .version import __version__ from .version import __version__
def easy(*args, **kwargs):
from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

@ -52,34 +52,15 @@ def tweet(ctx):
@click.argument('tweetid') @click.argument('tweetid')
def get_tweet(tweetid, write, folder, update): def get_tweet(tweetid, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
if not write: utils.download_tweet(wq, tweetid, write, folder, update)
t = utils.get_tweet(wq, tweetid)
js = json.dumps(t, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % tweetid)
if not update and os.path.exists(file) and os.path.isfile(file):
print('%s: Tweet exists' % tweetid)
return
try:
t = utils.get_tweet(wq, tweetid)
with open(file, 'w') as f:
js = json.dumps(t, indent=2)
print(js, file=f)
except Exception as ex:
print('%s: %s' % (tweetid, ex), file=sys.stderr)
@tweet.command('get_all') @tweet.command('get_all')
@click.argument('tweetsfile', 'File with a list of tweets to look up') @click.argument('tweetsfile', 'File with a list of tweets to look up')
@click.option('-f', '--folder', default="tweets") @click.option('-f', '--folder', default="tweets")
@click.pass_context @click.pass_context
def get_tweets(ctx, tweetsfile, folder): def get_tweets(ctx, tweetsfile, folder):
with open(tweetsfile) as f: wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
for line in f: utils.download_tweets(wq, tweetsfile, folder)
tid = line.strip()
ctx.invoke(get_tweet, folder=folder, tweetid=tid, write=True)
@tweet.command('search') @tweet.command('search')
@click.argument('query') @click.argument('query')

@ -1,3 +1,5 @@
from __future__ import print_function
import logging import logging
import time import time
import json import json
@ -9,11 +11,16 @@ import os
import multiprocessing import multiprocessing
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from itertools import islice from tqdm import tqdm
from itertools import islice, chain
from contextlib import contextmanager from contextlib import contextmanager
from future.moves.itertools import zip_longest from future.moves.itertools import zip_longest
from collections import Counter from collections import Counter
from builtins import map, filter
from twitter import TwitterHTTPError from twitter import TwitterHTTPError
from bitter.models import Following, User, ExtractorEntry, make_session from bitter.models import Following, User, ExtractorEntry, make_session
@ -27,15 +34,14 @@ def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!') logger.info('You pressed Ctrl+C!')
sys.exit(0) sys.exit(0)
def chunk(iterable, n, fillvalue=None): def chunk(iterable, n):
args = [iter(iterable)] * n it = iter(iterable)
return zip_longest(*args, fillvalue=fillvalue) return iter(lambda: tuple(islice(it, n)), ())
def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()): def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
if chunksize: source = chunk(source, chunksize)
source = chunk(source, chunksize) p = ThreadPool(numcpus*2)
p = ThreadPool(numcpus) for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))):
for i in p.imap(func, source):
yield i yield i
def get_credentials_path(credfile=None): def get_credentials_path(credfile=None):
@ -155,12 +161,12 @@ def add_user(session, user, enqueue=False):
user = User(**user) user = User(**user)
session.add(user) session.add(user)
if extract: if extract:
logging.debug('Adding entry') logger.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry: if not entry:
entry = ExtractorEntry(user=user.id) entry = ExtractorEntry(user=user.id)
session.add(entry) session.add(entry)
logging.debug(entry.pending) logger.debug(entry.pending)
entry.pending = True entry.pending = True
entry.cursor = -1 entry.cursor = -1
session.commit() session.commit()
@ -209,10 +215,10 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
add_user(session, i, enqueue=True) add_user(session, i, enqueue=True)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar() total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logging.info('Total users: {}'.format(total_users)) logger.info('Total users: {}'.format(total_users))
def pending_entries(): def pending_entries():
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count() pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
logging.info('Pending: {}'.format(pending)) logger.info('Pending: {}'.format(pending))
return pending return pending
while pending_entries() > 0: while pending_entries() > 0:
@ -276,7 +282,7 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
entry.pending = pending entry.pending = pending
entry.cursor = cursor entry.cursor = cursor
logging.debug('Entry: {} - {}'.format(entry.user, entry.pending)) logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate) session.add(candidate)
session.commit() session.commit()
@ -302,3 +308,85 @@ def get_user(c, user):
return c.users.lookup(user_id=user)[0] return c.users.lookup(user_id=user)[0]
except ValueError: except ValueError:
return c.users.lookup(screen_name=user)[0] return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
cached = cached_tweet(tweetid, folder)
newtweet = None
if update or not cached:
newtweet = get_tweet(wq, tweetid)
js = json.dumps(tweet, indent=2)
if write:
if newtweet:
write_tweet_json(js, folder)
else:
print(js)
def cached_tweet(tweetid, folder):
tweet = None
file = os.path.join(folder, '%s.json' % tweetid)
if os.path.exists(file) and os.path.isfile(file):
try:
# print('%s: Tweet exists' % tweetid)
with open(file) as f:
tweet = json.load(f)
except Exception as ex:
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
return tweet
def write_tweet_json(js, folder):
tweetid = js['id']
file = tweet_file(tweetid, folder)
if not os.path.exists(folder):
os.makedirs(folder)
with open(file, 'w') as f:
json.dump(js, f, indent=2)
logger.info('Written {} to file {}'.format(tweetid, file))
def tweet_file(tweetid, folder):
return os.path.join(folder, '%s.json' % tweetid)
def tweet_fail_file(tweetid, folder):
failsfolder = os.path.join(folder, 'failed')
if not os.path.exists(failsfolder):
os.makedirs(failsfolder)
return os.path.join(failsfolder, '%s.failed' % tweetid)
def tweet_failed(tweetid, folder):
return os.path.isfile(tweet_fail_file(tweetid, folder))
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
def filter_line(line):
tweetid = int(line)
# print('Checking {}'.format(tweetid))
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
yield None
else:
yield line
def print_result(res):
tid, tweet = res
if tweet:
try:
write_tweet_json(tweet, folder=folder)
yield 1
except Exception as ex:
logger.error('%s: %s' % (tid, ex))
if not ignore_fails:
raise
else:
logger.info('Tweet not recovered: {}'.format(tid))
with open(tweet_fail_file(tid, folder), 'w') as f:
print('Tweet not found', file=f)
yield -1
def download_batch(batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
return tweets.items()
with open(tweetsfile) as f:
lines = map(lambda x: x.strip(), f)
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
tweets = parallel(download_batch, lines_to_crawl, 100)
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
pass

@ -58,4 +58,6 @@ class TestUtils(TestCase):
assert list(resp) == [1,2,3] assert list(resp) == [1,2,3]
toc = time.time() toc = time.time()
assert (tic-toc) < 6000 assert (tic-toc) < 6000
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert list(resp2) == [1,2,3,4]

Loading…
Cancel
Save