From 0a0d8fd5f104e5d64cbc2efe49039372c1111b50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=2E=20Fernando=20S=C3=A1nchez?= Date: Mon, 5 Dec 2016 23:02:26 +0100 Subject: [PATCH] Improved tweet downloader (CLI and API) --- bitter/VERSION | 2 +- bitter/__init__.py | 6 +++ bitter/cli.py | 25 ++-------- bitter/utils.py | 116 ++++++++++++++++++++++++++++++++++++++------ tests/test_utils.py | 2 + 5 files changed, 114 insertions(+), 37 deletions(-) diff --git a/bitter/VERSION b/bitter/VERSION index faef31a..39e898a 100644 --- a/bitter/VERSION +++ b/bitter/VERSION @@ -1 +1 @@ -0.7.0 +0.7.1 diff --git a/bitter/__init__.py b/bitter/__init__.py index 8bc6daf..1041afa 100644 --- a/bitter/__init__.py +++ b/bitter/__init__.py @@ -7,4 +7,10 @@ import os from .version import __version__ +def easy(*args, **kwargs): + from .crawlers import TwitterQueue + return TwitterQueue.from_credentials(*args, **kwargs) + __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] + + diff --git a/bitter/cli.py b/bitter/cli.py index 8d205d8..1d7f228 100644 --- a/bitter/cli.py +++ b/bitter/cli.py @@ -52,34 +52,15 @@ def tweet(ctx): @click.argument('tweetid') def get_tweet(tweetid, write, folder, update): wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) - if not write: - t = utils.get_tweet(wq, tweetid) - js = json.dumps(t, indent=2) - print(js) - return - if not os.path.exists(folder): - os.makedirs(folder) - file = os.path.join(folder, '%s.json' % tweetid) - if not update and os.path.exists(file) and os.path.isfile(file): - print('%s: Tweet exists' % tweetid) - return - try: - t = utils.get_tweet(wq, tweetid) - with open(file, 'w') as f: - js = json.dumps(t, indent=2) - print(js, file=f) - except Exception as ex: - print('%s: %s' % (tweetid, ex), file=sys.stderr) + utils.download_tweet(wq, tweetid, write, folder, update) @tweet.command('get_all') @click.argument('tweetsfile', 'File with a list of tweets to look up') @click.option('-f', '--folder', default="tweets") @click.pass_context def get_tweets(ctx, tweetsfile, folder): - with open(tweetsfile) as f: - for line in f: - tid = line.strip() - ctx.invoke(get_tweet, folder=folder, tweetid=tid, write=True) + wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) + utils.download_tweets(wq, tweetsfile, folder) @tweet.command('search') @click.argument('query') diff --git a/bitter/utils.py b/bitter/utils.py index aa85d47..f621a29 100644 --- a/bitter/utils.py +++ b/bitter/utils.py @@ -1,3 +1,5 @@ +from __future__ import print_function + import logging import time import json @@ -9,11 +11,16 @@ import os import multiprocessing from multiprocessing.pool import ThreadPool -from itertools import islice +from tqdm import tqdm + +from itertools import islice, chain + from contextlib import contextmanager from future.moves.itertools import zip_longest from collections import Counter +from builtins import map, filter + from twitter import TwitterHTTPError from bitter.models import Following, User, ExtractorEntry, make_session @@ -27,15 +34,14 @@ def signal_handler(signal, frame): logger.info('You pressed Ctrl+C!') sys.exit(0) -def chunk(iterable, n, fillvalue=None): - args = [iter(iterable)] * n - return zip_longest(*args, fillvalue=fillvalue) +def chunk(iterable, n): + it = iter(iterable) + return iter(lambda: tuple(islice(it, n)), ()) -def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()): - if chunksize: - source = chunk(source, chunksize) - p = ThreadPool(numcpus) - for i in p.imap(func, source): +def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): + source = chunk(source, chunksize) + p = ThreadPool(numcpus*2) + for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))): yield i def get_credentials_path(credfile=None): @@ -155,12 +161,12 @@ def add_user(session, user, enqueue=False): user = User(**user) session.add(user) if extract: - logging.debug('Adding entry') + logger.debug('Adding entry') entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() if not entry: entry = ExtractorEntry(user=user.id) session.add(entry) - logging.debug(entry.pending) + logger.debug(entry.pending) entry.pending = True entry.cursor = -1 session.commit() @@ -209,10 +215,10 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor add_user(session, i, enqueue=True) total_users = session.query(sqlalchemy.func.count(User.id)).scalar() - logging.info('Total users: {}'.format(total_users)) + logger.info('Total users: {}'.format(total_users)) def pending_entries(): pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count() - logging.info('Pending: {}'.format(pending)) + logger.info('Pending: {}'.format(pending)) return pending while pending_entries() > 0: @@ -276,7 +282,7 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor entry.pending = pending entry.cursor = cursor - logging.debug('Entry: {} - {}'.format(entry.user, entry.pending)) + logger.debug('Entry: {} - {}'.format(entry.user, entry.pending)) session.add(candidate) session.commit() @@ -302,3 +308,85 @@ def get_user(c, user): return c.users.lookup(user_id=user)[0] except ValueError: return c.users.lookup(screen_name=user)[0] + +def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False): + cached = cached_tweet(tweetid, folder) + newtweet = None + if update or not cached: + newtweet = get_tweet(wq, tweetid) + js = json.dumps(tweet, indent=2) + if write: + if newtweet: + write_tweet_json(js, folder) + else: + print(js) + + +def cached_tweet(tweetid, folder): + tweet = None + file = os.path.join(folder, '%s.json' % tweetid) + if os.path.exists(file) and os.path.isfile(file): + try: + # print('%s: Tweet exists' % tweetid) + with open(file) as f: + tweet = json.load(f) + except Exception as ex: + logger.error('Error getting cached version of {}: {}'.format(tweetid, ex)) + return tweet + +def write_tweet_json(js, folder): + tweetid = js['id'] + file = tweet_file(tweetid, folder) + if not os.path.exists(folder): + os.makedirs(folder) + with open(file, 'w') as f: + json.dump(js, f, indent=2) + logger.info('Written {} to file {}'.format(tweetid, file)) + +def tweet_file(tweetid, folder): + return os.path.join(folder, '%s.json' % tweetid) + +def tweet_fail_file(tweetid, folder): + failsfolder = os.path.join(folder, 'failed') + if not os.path.exists(failsfolder): + os.makedirs(failsfolder) + return os.path.join(failsfolder, '%s.failed' % tweetid) + +def tweet_failed(tweetid, folder): + return os.path.isfile(tweet_fail_file(tweetid, folder)) + +def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True): + def filter_line(line): + tweetid = int(line) + # print('Checking {}'.format(tweetid)) + if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed): + yield None + else: + yield line + + def print_result(res): + tid, tweet = res + if tweet: + try: + write_tweet_json(tweet, folder=folder) + yield 1 + except Exception as ex: + logger.error('%s: %s' % (tid, ex)) + if not ignore_fails: + raise + else: + logger.info('Tweet not recovered: {}'.format(tid)) + with open(tweet_fail_file(tid, folder), 'w') as f: + print('Tweet not found', file=f) + yield -1 + + def download_batch(batch): + tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id'] + return tweets.items() + + with open(tweetsfile) as f: + lines = map(lambda x: x.strip(), f) + lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines')) + tweets = parallel(download_batch, lines_to_crawl, 100) + for res in tqdm(parallel(print_result, tweets), desc='Queried'): + pass diff --git a/tests/test_utils.py b/tests/test_utils.py index 7e71b73..4494f55 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -58,4 +58,6 @@ class TestUtils(TestCase): assert list(resp) == [1,2,3] toc = time.time() assert (tic-toc) < 6000 + resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2) + assert list(resp2) == [1,2,3,4]