From 653487e2d7d5ed3c898dfd9fc1675f1bea37f484 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=2E=20Fernando=20S=C3=A1nchez?= Date: Tue, 30 Apr 2019 19:15:15 +0200 Subject: [PATCH] Improve download_list --- bitter/VERSION | 2 +- bitter/cli.py | 120 ++++++++++++++++++++++++++----------------- bitter/config.py | 2 +- bitter/utils.py | 129 ++++++++++++++++++++++++++++++++--------------- 4 files changed, 164 insertions(+), 89 deletions(-) diff --git a/bitter/VERSION b/bitter/VERSION index 965065d..b0bb878 100644 --- a/bitter/VERSION +++ b/bitter/VERSION @@ -1 +1 @@ -0.9.3 +0.9.5 diff --git a/bitter/cli.py b/bitter/cli.py index 49fb35f..2f2d4d8 100644 --- a/bitter/cli.py +++ b/bitter/cli.py @@ -21,7 +21,7 @@ if sys.version_info <= (3, 0): from contextlib2 import ExitStack else: from contextlib import ExitStack - + logger = logging.getLogger(__name__) @@ -42,10 +42,58 @@ def main(ctx, verbose, logging_level, config, credentials): utils.copy_credentials_to_config(credentials, config) -@main.group() -@click.pass_context +@main.group(invoke_without_command=True) +@click.pass_context def credentials(ctx): - pass + wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) + for worker in wq.queue: + print('#'*20) + try: + resp = worker.client.application.rate_limit_status() + print(worker.name) + except Exception as ex: + print('{}: AUTHENTICATION ERROR: {}'.format(worker.name, ex) ) + + +@credentials.command('limits') +@click.option('--all', type=bool, default=False, required=False, + help=('Print all limits. By default, it only limits that ' + 'have been consumed will be shown.')) +@click.argument('url', required=False) +@click.pass_context +def get_limits(ctx, all, url): + wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) + total = {} + for worker in wq.queue: + resp = worker.client.application.rate_limit_status() + print('#'*20) + print(worker.name) + if url: + limit = 'NOT FOUND' + print('URL is: {}'.format(url)) + cat = url.split('/')[1] + if cat in resp['resources']: + limit = resp['resources'][cat].get(url, None) or resp['resources'][cat] + else: + print('Cat {} not found'.format(cat)) + continue + for k in limit: + total[k] = total.get(k, 0) + limit[k] + print('{}: {}'.format(url, limit)) + continue + nres = {} + if not all: + for res, urls in resp['resources'].items(): + nurls = {} + for u, limits in urls.items(): + if limits['limit'] != limits['remaining']: + nurls[u] = limits + if nurls: + nres[res] = nurls + resp = nres + print(json.dumps(resp, indent=2)) + if url: + print('Total for {}: {}'.format(url, total)) @credentials.command('add') @click.option('--consumer_key', default=None) @@ -68,7 +116,7 @@ def add(user_name, consumer_key, consumer_secret, token_key, token_secret): @main.group() -@click.pass_context +@click.pass_context def tweet(ctx): pass @@ -98,15 +146,21 @@ def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotec click.echo('Cancelling') return wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) - for i in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter, - batch_method=utils.tweet_download_batch, - header=header, quotechar=quotechar, - column=column, update=update, retry_failed=retry): - pass + + status = tqdm('Queried') + failed = 0 + for tid, obj in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter, + batch_method=utils.tweet_download_batch, + header=header, quotechar=quotechar, + column=column, update=update, retry_failed=retry): + status.update(1) + if not obj: + failed += 1 + status.set_description('Failed: %s. Queried' % failed, refresh=True) @tweet.command('search') @click.argument('query') -@click.pass_context +@click.pass_context def search(ctx, query): wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) t = utils.search_tweet(wq, query) @@ -114,7 +168,7 @@ def search(ctx, query): @tweet.command('timeline') @click.argument('user') -@click.pass_context +@click.pass_context def timeline(ctx, user): wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) t = utils.user_timeline(wq, user) @@ -293,7 +347,7 @@ def crawl_users(ctx, usersfile, skip, until, threads, db): speed = (collected-lastcollected)/10 with statslock: lastcollected = collected - + logger.info('Done!') @main.group('extractor') @@ -344,7 +398,7 @@ def network_extractor(ctx, as_json): if as_json: import json print(json.dumps(follower_map, indent=4)) - + @extractor.command('users') @click.pass_context @@ -383,34 +437,6 @@ def reset_extractor(ctx): session = make_session(db) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) -@main.command('limits') -@click.argument('url', required=False) -@click.pass_context -def get_limits(ctx, url): - wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) - total = {} - for worker in wq.queue: - resp = worker.client.application.rate_limit_status() - print('#'*20) - print(worker.name) - if url: - limit = 'NOT FOUND' - print('URL is: {}'.format(url)) - cat = url.split('/')[1] - if cat in resp['resources']: - limit = resp['resources'][cat].get(url, None) or resp['resources'][cat] - else: - print('Cat {} not found'.format(cat)) - continue - for k in limit: - total[k] = total.get(k, 0) + limit[k] - print('{}: {}'.format(url, limit)) - else: - print(json.dumps(resp, indent=2)) - if url: - print('Total for {}: {}'.format(url, total)) - - @main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False), help='''Issue a call to an endpoint of the Twitter API.''') @@ -454,7 +480,7 @@ def run_server(ctx, consumer_key, consumer_secret): app.run(host='0.0.0.0') @main.group() -@click.pass_context +@click.pass_context def stream(ctx): pass @@ -463,7 +489,7 @@ def stream(ctx): @click.option('-t', '--track', default=None) @click.option('-f', '--file', default=None, help='File to store the stream of tweets') @click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True) -@click.pass_context +@click.pass_context def get_stream(ctx, locations, track, file, politelyretry): wq = crawlers.StreamQueue.from_config(conffile=bconf.CONFIG_FILE, max_workers=1) @@ -505,7 +531,7 @@ def get_stream(ctx, locations, track, file, politelyretry): @stream.command('read') @click.option('-f', '--file', help='File to read the stream of tweets from', required=True) @click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False) -@click.pass_context +@click.pass_context def read_stream(ctx, file, tail): for tweet in utils.read_file(file, tail=tail): try: @@ -516,12 +542,12 @@ def read_stream(ctx, file, tail): @stream.command('tags') @click.option('-f', '--file', help='File to read the stream of tweets from', required=True) @click.argument('limit', required=False, default=None, type=int) -@click.pass_context +@click.pass_context def tags_stream(ctx, file, limit): c = utils.get_hashtags(utils.read_file(file)) for count, tag in c.most_common(limit): print(u'{} - {}'.format(count, tag)) - + if __name__ == '__main__': main() diff --git a/bitter/config.py b/bitter/config.py index 1a8a9e7..a519fde 100644 --- a/bitter/config.py +++ b/bitter/config.py @@ -3,7 +3,7 @@ Common configuration for other modules. It is not elegant, but it works with flask and the oauth decorators. Using this module allows you to change the config before loading any other module. -E.g.: +E.g.: import bitter.config as c c.CREDENTIALS="/tmp/credentials" diff --git a/bitter/utils.py b/bitter/utils.py index 13eabe8..856afa1 100644 --- a/bitter/utils.py +++ b/bitter/utils.py @@ -13,6 +13,11 @@ import sqlalchemy import os import multiprocessing from multiprocessing.pool import ThreadPool +from multiprocessing import Queue + +import queue +import threading +from select import select from functools import partial @@ -22,6 +27,7 @@ from itertools import islice, chain from contextlib import contextmanager from collections import Counter +from random import choice from builtins import map, filter @@ -53,7 +59,7 @@ def chunk(iterable, n): def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): source = chunk(source, chunksize) p = ThreadPool(numcpus*2) - results = p.imap_unordered(func, source, chunksize=int(1000/numcpus)) + results = p.imap_unordered(func, source) for i in chain.from_iterable(results): yield i @@ -507,7 +513,8 @@ def id_failed(oid, folder): def tweet_download_batch(wq, batch): tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id'] - return tweets.items() + for tid, tweet in tweets.items(): + yield tid, tweet def user_download_batch(wq, batch): screen_names = [] @@ -547,45 +554,81 @@ def user_download_batch(wq, batch): yield (name, None) -def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=True, +def dump_result(oid, obj, folder, ignore_fails=True): + if obj: + try: + write_json(obj, folder=folder, oid=oid) + failed = fail_file(oid, folder) + if os.path.exists(failed): + os.remove(failed) + except Exception as ex: + logger.error('%s: %s' % (oid, ex)) + if not ignore_fails: + raise + else: + logger.info('Object not recovered: {}'.format(oid)) + with open(fail_file(oid, folder), 'w') as f: + print('Object not found', file=f) + +def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False, batch_method=tweet_download_batch): - def filter_lines(line): - # print('Checking {}'.format(line)) - oid = line[0] - if (cached_id(oid, folder) and not update) or (id_failed(oid, folder) and not retry_failed): - yield None - else: - yield str(oid) - def print_result(res): - for oid, obj in res: - if obj: - try: - write_json(obj, folder=folder, oid=oid) - failed = fail_file(oid, folder) - if os.path.exists(failed): - os.remove(failed) - yield 1 - except Exception as ex: - logger.error('%s: %s' % (oid, ex)) - if not ignore_fails: - raise - else: - logger.info('Object not recovered: {}'.format(oid)) - with open(fail_file(oid, folder), 'w') as f: - print('Object not found', file=f) - yield -1 + done = Queue() - objects_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_lines, lst), desc='Total objects')) - batch_method = partial(batch_method, wq) - objects = parallel(batch_method, objects_to_crawl, 100) - failed = 0 - pbar = tqdm(parallel(print_result, objects), desc='Queried') - for res in pbar: - if res < 0: - failed += 1 - pbar.set_description('Failed: %s. Queried' % failed, refresh=True) - yield res + down = Queue() + + + def filter_list(lst, done, down): + print('filtering') + for oid in lst: + # print('Checking {}'.format(line)) + cached = cached_id(oid, folder) + if (cached and not update): + done.put((oid, cached)) + elif (id_failed(oid, folder) and not retry_failed): + done.put((oid, None)) + else: + down.put(oid) + down.put(None) + + def download_results(batch_method, down, done): + def gen(): + while True: + r = down.get() + if not r: + return + yield r + + for t in parallel(batch_method, gen(), 100): + done.put(t) + + def batch(*args, **kwargs): + return batch_method(wq, *args, **kwargs) + + tc = threading.Thread(target=filter_list, args=(lst, done, down), daemon=True) + tc.start() + td = threading.Thread(target=download_results, args=(batch, down, done), daemon=True) + td.start() + + def check_threads(ts, done): + for t in ts: + t.join() + done.put(None) + + wait = threading.Thread(target=check_threads, args=([tc, td], done), daemon=True) + wait.start() + + while True: + rec = done.get() + + if rec is None: + break + + oid, obj = rec + dump_result(oid, obj, folder, ignore_fails) + yield rec + + wait.join() def download_file(wq, csvfile, folder, column=0, delimiter=',', @@ -595,8 +638,14 @@ def download_file(wq, csvfile, folder, column=0, delimiter=',', csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar)) if header: next(csvreader) - tweets = map(lambda row: row[0].strip(), csvreader) - for res in download_list(wq, tweets, folder, batch_method=batch_method, + + def reader(r): + for row in csvreader: + if len(row) > column: + yield row[column].strip() + + + for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method, **kwargs): yield res