From 80b58541e76ac3f6d82906c771741b902679a3f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=2E=20Fernando=20S=C3=A1nchez?= Date: Tue, 12 Nov 2019 17:15:26 +0100 Subject: [PATCH] Add serialize --- bitter/VERSION | 2 +- bitter/cli.py | 73 +++++++++++++++++++++++++++++++++++++++++++------ bitter/utils.py | 38 +++++++++++++++---------- 3 files changed, 89 insertions(+), 24 deletions(-) diff --git a/bitter/VERSION b/bitter/VERSION index c81aa44..78bc1ab 100644 --- a/bitter/VERSION +++ b/bitter/VERSION @@ -1 +1 @@ -0.9.7 +0.10.0 diff --git a/bitter/cli.py b/bitter/cli.py index c01174f..4afd722 100644 --- a/bitter/cli.py +++ b/bitter/cli.py @@ -8,6 +8,8 @@ import time import sqlalchemy.types import threading import sqlite3 +import operator +from functools import reduce from tqdm import tqdm from sqlalchemy import exists @@ -17,6 +19,7 @@ from bitter import config as bconf from bitter.models import make_session, User, ExtractorEntry, Following import sys +import csv as tsv if sys.version_info <= (3, 0): from contextlib2 import ExitStack else: @@ -26,6 +29,50 @@ else: logger = logging.getLogger(__name__) + + +def serialize(function): + '''Common options to serialize output to CSV or other formats''' + + @click.option('--csv', help='Print each object as a csv row. Provide a list of comma-separated fields to print.', default='', type=str) + @click.option('--header', help='Header that will be printed at the beginning of the file', default=None) + @click.option('--json', '--jsonlines', help='Print each object as JSON in a new line.', is_flag=True) + @click.option('--indented', help='Print each object as an indented JSON object', is_flag=True) + @click.option('--outfile', help='Output file. It defaults to STDOUT', default=sys.stdout) + def decorated(csv, header, jsonlines, indented, outfile, **kwargs): + if header: + print(header) + + it = function(**kwargs) + + def do(out): + + if csv: + writer = tsv.writer(out, quoting=tsv.QUOTE_ALL, delimiter='\t') + if header is None: + # Print fields as header unless told otherwise + print(csv, file=out) + fields = list(token.strip().split('.') for token in csv.split(',')) + for obj in it: + writer.writerow(list(reduce(operator.getitem, field, obj) for field in fields)) + elif jsonlines: + for obj in it: + print(json.dumps(obj, sort_keys=True), file=out) + elif indented: + for obj in it: + print(json.dumps(obj, indent=4, sort_keys=True), file=out) + else: + for obj in it: + print(obj, file=out) + + if outfile is sys.stdout: + return do(sys.stdout) + + with open(outfile, 'w') as out: + return do(out) + return decorated + + @click.group() @click.option("--verbose", is_flag=True) @click.option("--logging_level", required=False, default='WARN') @@ -125,9 +172,10 @@ def tweet(ctx): @click.option('-f', '--folder', default="tweets") @click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False) @click.argument('tweetid') +@serialize def get_tweet(tweetid, write, folder, update): wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) - utils.download_tweet(wq, tweetid, write, folder, update) + yield from utils.download_tweet(wq, tweetid, write, folder, update) @tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file. The result is stored as individual json files in your folder of choice.''') @@ -136,12 +184,13 @@ The result is stored as individual json files in your folder of choice.''') @click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!') @click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads') @click.option('-d', '--delimiter', default=",") -@click.option('-h', '--header', help='Discard the first line (use it as a header)', - is_flag=True, default=False) +@click.option('--skip', help='Discard the first DISCARD lines (use them as a header)', default=0) +@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None) @click.option('-q', '--quotechar', default='"') @click.option('-c', '--column', type=int, default=0) +@serialize @click.pass_context -def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotechar, column): +def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, skip, quotechar, commentchar, column): if update and not click.confirm('This may overwrite existing tweets. Continue?'): click.echo('Cancelling') return @@ -151,12 +200,15 @@ def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotec failed = 0 for tid, obj in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter, batch_method=utils.tweet_download_batch, - header=header, quotechar=quotechar, + skip=skip, quotechar=quotechar, commentchar=commentchar, column=column, update=update, retry_failed=retry): status.update(1) if not obj: failed += 1 status.set_description('Failed: %s. Queried' % failed, refresh=True) + continue + yield obj + @tweet.command('search') @click.argument('query') @@ -220,12 +272,14 @@ def get_user(user, write, folder, update): @click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!') @click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads') @click.option('-d', '--delimiter', default=",") -@click.option('-h', '--header', help='Discard the first line (use it as a header)', +@click.option('--skip', help='Discard the first SKIP lines (e.g., use them as a header)', is_flag=True, default=False) @click.option('-q', '--quotechar', default='"') +@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None) @click.option('-c', '--column', type=int, default=0) @click.pass_context -def get_users(ctx, usersfile, folder, update, retry, delimiter, header, quotechar, column): +@serialize +def get_users(ctx, usersfile, folder, update, retry, delimiter, skip, quotechar, commentchar, column): if update and not click.confirm('This may overwrite existing users. Continue?'): click.echo('Cancelling') return @@ -233,9 +287,10 @@ def get_users(ctx, usersfile, folder, update, retry, delimiter, header, quotecha for i in utils.download_file(wq, usersfile, folder, delimiter=delimiter, batch_method=utils.user_download_batch, update=update, retry_failed=retry, - header=header, quotechar=quotechar, + skip=skip, quotechar=quotechar, + commentchar=commentchar, column=column): - pass + yield i @users.command('crawl') @click.option('--db', required=True, help='Database to save all users.') diff --git a/bitter/utils.py b/bitter/utils.py index ccb605c..08b9eb7 100644 --- a/bitter/utils.py +++ b/bitter/utils.py @@ -59,7 +59,14 @@ def chunk(iterable, n): def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): source = chunk(source, chunksize) p = ThreadPool(numcpus*2) - results = p.imap_unordered(func, source) + + def wrapped_func(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as ex: + print('Exception on parallel thread: {}'.format(ex), file=sys.stderr) + + results = p.imap_unordered(wrapped_func, source) for i in chain.from_iterable(results): yield i @@ -106,7 +113,7 @@ def read_config(conffile): raise Exception('No config file or BITTER_CONFIG env variable.') else: f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n')) - return yaml.load(f) or {'credentials': []} + return yaml.load(f, Loader=yaml.SafeLoader) or {'credentials': []} def write_config(conf, conffile=None): @@ -419,8 +426,11 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor pending = pending_entries(dburi) session.close() - for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users): - logger.info("Got %s" % i) + with tqdm(parallel(de, pending), desc='Downloading users', total=total_users) as tq: + for i in tq: + tq.write('Got {}'.format(i)) + logger.info("Got %s" % i) + def pending_entries(dburi): @@ -465,16 +475,14 @@ def get_user(c, user): return c.users.lookup(screen_name=user)[0] def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False): - cached = cached_id(tweetid, folder) - tweet = None - if update or not cached: + tweet = cached_id(tweetid, folder) + if update or not tweet: tweet = get_tweet(wq, tweetid) - js = json.dumps(tweet) if write: if tweet: + js = json.dumps(tweet) write_json(js, folder) - else: - print(js) + yield tweet def cached_id(oid, folder): @@ -581,7 +589,6 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail def filter_list(lst, done, down): print('filtering') for oid in lst: - # print('Checking {}'.format(line)) cached = cached_id(oid, folder) if (cached and not update): done.put((oid, cached)) @@ -635,12 +642,15 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail wait.join() -def download_file(wq, csvfile, folder, column=0, delimiter=',', - header=False, quotechar='"', batch_method=tweet_download_batch, +def download_file(wq, csvfile, folder, column=0, delimiter=',', skip=0, + quotechar='"', commentchar=None, batch_method=tweet_download_batch, **kwargs): with open(csvfile) as f: + if commentchar: + f = (line for line in f if not line.startswith('#')) + csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar)) - if header: + for n in range(skip): next(csvreader) def reader(r):