Add serialize

master
J. Fernando Sánchez 5 years ago
parent 40a8b45231
commit 80b58541e7

@ -1 +1 @@
0.9.7 0.10.0

@ -8,6 +8,8 @@ import time
import sqlalchemy.types import sqlalchemy.types
import threading import threading
import sqlite3 import sqlite3
import operator
from functools import reduce
from tqdm import tqdm from tqdm import tqdm
from sqlalchemy import exists from sqlalchemy import exists
@ -17,6 +19,7 @@ from bitter import config as bconf
from bitter.models import make_session, User, ExtractorEntry, Following from bitter.models import make_session, User, ExtractorEntry, Following
import sys import sys
import csv as tsv
if sys.version_info <= (3, 0): if sys.version_info <= (3, 0):
from contextlib2 import ExitStack from contextlib2 import ExitStack
else: else:
@ -26,6 +29,50 @@ else:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def serialize(function):
'''Common options to serialize output to CSV or other formats'''
@click.option('--csv', help='Print each object as a csv row. Provide a list of comma-separated fields to print.', default='', type=str)
@click.option('--header', help='Header that will be printed at the beginning of the file', default=None)
@click.option('--json', '--jsonlines', help='Print each object as JSON in a new line.', is_flag=True)
@click.option('--indented', help='Print each object as an indented JSON object', is_flag=True)
@click.option('--outfile', help='Output file. It defaults to STDOUT', default=sys.stdout)
def decorated(csv, header, jsonlines, indented, outfile, **kwargs):
if header:
print(header)
it = function(**kwargs)
def do(out):
if csv:
writer = tsv.writer(out, quoting=tsv.QUOTE_ALL, delimiter='\t')
if header is None:
# Print fields as header unless told otherwise
print(csv, file=out)
fields = list(token.strip().split('.') for token in csv.split(','))
for obj in it:
writer.writerow(list(reduce(operator.getitem, field, obj) for field in fields))
elif jsonlines:
for obj in it:
print(json.dumps(obj, sort_keys=True), file=out)
elif indented:
for obj in it:
print(json.dumps(obj, indent=4, sort_keys=True), file=out)
else:
for obj in it:
print(obj, file=out)
if outfile is sys.stdout:
return do(sys.stdout)
with open(outfile, 'w') as out:
return do(out)
return decorated
@click.group() @click.group()
@click.option("--verbose", is_flag=True) @click.option("--verbose", is_flag=True)
@click.option("--logging_level", required=False, default='WARN') @click.option("--logging_level", required=False, default='WARN')
@ -125,9 +172,10 @@ def tweet(ctx):
@click.option('-f', '--folder', default="tweets") @click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False) @click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
@click.argument('tweetid') @click.argument('tweetid')
@serialize
def get_tweet(tweetid, write, folder, update): def get_tweet(tweetid, write, folder, update):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
utils.download_tweet(wq, tweetid, write, folder, update) yield from utils.download_tweet(wq, tweetid, write, folder, update)
@tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file. @tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file.
The result is stored as individual json files in your folder of choice.''') The result is stored as individual json files in your folder of choice.''')
@ -136,12 +184,13 @@ The result is stored as individual json files in your folder of choice.''')
@click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!') @click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads') @click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-d', '--delimiter', default=",") @click.option('-d', '--delimiter', default=",")
@click.option('-h', '--header', help='Discard the first line (use it as a header)', @click.option('--skip', help='Discard the first DISCARD lines (use them as a header)', default=0)
is_flag=True, default=False) @click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-q', '--quotechar', default='"') @click.option('-q', '--quotechar', default='"')
@click.option('-c', '--column', type=int, default=0) @click.option('-c', '--column', type=int, default=0)
@serialize
@click.pass_context @click.pass_context
def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotechar, column): def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, skip, quotechar, commentchar, column):
if update and not click.confirm('This may overwrite existing tweets. Continue?'): if update and not click.confirm('This may overwrite existing tweets. Continue?'):
click.echo('Cancelling') click.echo('Cancelling')
return return
@ -151,12 +200,15 @@ def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotec
failed = 0 failed = 0
for tid, obj in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter, for tid, obj in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter,
batch_method=utils.tweet_download_batch, batch_method=utils.tweet_download_batch,
header=header, quotechar=quotechar, skip=skip, quotechar=quotechar, commentchar=commentchar,
column=column, update=update, retry_failed=retry): column=column, update=update, retry_failed=retry):
status.update(1) status.update(1)
if not obj: if not obj:
failed += 1 failed += 1
status.set_description('Failed: %s. Queried' % failed, refresh=True) status.set_description('Failed: %s. Queried' % failed, refresh=True)
continue
yield obj
@tweet.command('search') @tweet.command('search')
@click.argument('query') @click.argument('query')
@ -220,12 +272,14 @@ def get_user(user, write, folder, update):
@click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!') @click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads') @click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-d', '--delimiter', default=",") @click.option('-d', '--delimiter', default=",")
@click.option('-h', '--header', help='Discard the first line (use it as a header)', @click.option('--skip', help='Discard the first SKIP lines (e.g., use them as a header)',
is_flag=True, default=False) is_flag=True, default=False)
@click.option('-q', '--quotechar', default='"') @click.option('-q', '--quotechar', default='"')
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-c', '--column', type=int, default=0) @click.option('-c', '--column', type=int, default=0)
@click.pass_context @click.pass_context
def get_users(ctx, usersfile, folder, update, retry, delimiter, header, quotechar, column): @serialize
def get_users(ctx, usersfile, folder, update, retry, delimiter, skip, quotechar, commentchar, column):
if update and not click.confirm('This may overwrite existing users. Continue?'): if update and not click.confirm('This may overwrite existing users. Continue?'):
click.echo('Cancelling') click.echo('Cancelling')
return return
@ -233,9 +287,10 @@ def get_users(ctx, usersfile, folder, update, retry, delimiter, header, quotecha
for i in utils.download_file(wq, usersfile, folder, delimiter=delimiter, for i in utils.download_file(wq, usersfile, folder, delimiter=delimiter,
batch_method=utils.user_download_batch, batch_method=utils.user_download_batch,
update=update, retry_failed=retry, update=update, retry_failed=retry,
header=header, quotechar=quotechar, skip=skip, quotechar=quotechar,
commentchar=commentchar,
column=column): column=column):
pass yield i
@users.command('crawl') @users.command('crawl')
@click.option('--db', required=True, help='Database to save all users.') @click.option('--db', required=True, help='Database to save all users.')

@ -59,7 +59,14 @@ def chunk(iterable, n):
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize) source = chunk(source, chunksize)
p = ThreadPool(numcpus*2) p = ThreadPool(numcpus*2)
results = p.imap_unordered(func, source)
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as ex:
print('Exception on parallel thread: {}'.format(ex), file=sys.stderr)
results = p.imap_unordered(wrapped_func, source)
for i in chain.from_iterable(results): for i in chain.from_iterable(results):
yield i yield i
@ -106,7 +113,7 @@ def read_config(conffile):
raise Exception('No config file or BITTER_CONFIG env variable.') raise Exception('No config file or BITTER_CONFIG env variable.')
else: else:
f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n')) f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n'))
return yaml.load(f) or {'credentials': []} return yaml.load(f, Loader=yaml.SafeLoader) or {'credentials': []}
def write_config(conf, conffile=None): def write_config(conf, conffile=None):
@ -419,8 +426,11 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
pending = pending_entries(dburi) pending = pending_entries(dburi)
session.close() session.close()
for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users): with tqdm(parallel(de, pending), desc='Downloading users', total=total_users) as tq:
logger.info("Got %s" % i) for i in tq:
tq.write('Got {}'.format(i))
logger.info("Got %s" % i)
def pending_entries(dburi): def pending_entries(dburi):
@ -465,16 +475,14 @@ def get_user(c, user):
return c.users.lookup(screen_name=user)[0] return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False): def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
cached = cached_id(tweetid, folder) tweet = cached_id(tweetid, folder)
tweet = None if update or not tweet:
if update or not cached:
tweet = get_tweet(wq, tweetid) tweet = get_tweet(wq, tweetid)
js = json.dumps(tweet)
if write: if write:
if tweet: if tweet:
js = json.dumps(tweet)
write_json(js, folder) write_json(js, folder)
else: yield tweet
print(js)
def cached_id(oid, folder): def cached_id(oid, folder):
@ -581,7 +589,6 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail
def filter_list(lst, done, down): def filter_list(lst, done, down):
print('filtering') print('filtering')
for oid in lst: for oid in lst:
# print('Checking {}'.format(line))
cached = cached_id(oid, folder) cached = cached_id(oid, folder)
if (cached and not update): if (cached and not update):
done.put((oid, cached)) done.put((oid, cached))
@ -635,12 +642,15 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail
wait.join() wait.join()
def download_file(wq, csvfile, folder, column=0, delimiter=',', def download_file(wq, csvfile, folder, column=0, delimiter=',', skip=0,
header=False, quotechar='"', batch_method=tweet_download_batch, quotechar='"', commentchar=None, batch_method=tweet_download_batch,
**kwargs): **kwargs):
with open(csvfile) as f: with open(csvfile) as f:
if commentchar:
f = (line for line in f if not line.startswith('#'))
csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar)) csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar))
if header: for n in range(skip):
next(csvreader) next(csvreader)
def reader(r): def reader(r):

Loading…
Cancel
Save