1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-25 20:58:24 +00:00

7 Commits

Author SHA1 Message Date
J. Fernando Sánchez
ea848f1a78 Version 0.10.3
* Ability to include "optional" fields from tweets (e.g., retweeted_status).
* Optional caching (for very large datasets)
2020-06-24 11:01:02 +02:00
J. Fernando Sánchez
030c41b826 Changes to user and tweet search: Cache by default
* Improved printing of credential limits
* Tweet and user searchers cache by default. Write has been changed to dry_run
2020-01-07 20:36:19 +01:00
J. Fernando Sánchez
bba73091e4 Fixes for serialize
Some commands in the CLI did not properly include serialize (get_tweet,
get_users and search)
2020-01-07 19:36:18 +01:00
J. Fernando Sánchez
80b58541e7 Add serialize 2019-11-12 17:15:26 +01:00
J. Fernando Sánchez
40a8b45231 Fix concurrency issue
Download_list sometimes failed with:
BrokenPipeError: [Errno 32] Broken pipe
2019-09-20 13:39:51 +02:00
J. Fernando Sánchez
fadeced761 Fix tests py2 2019-09-19 12:18:56 +02:00
J. Fernando Sánchez
bdb844d75f Fix compatibility click >=7 2019-09-19 11:45:12 +02:00
7 changed files with 212 additions and 106 deletions

View File

@@ -2,6 +2,6 @@
From python:2.7-onbuild From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]" RUN pip install ".[server]"
ENTRYPOINT ["bitter"] ENTRYPOINT ["bitter"]

View File

@@ -1,4 +1,4 @@
PYVERSIONS=3.4 2.7 PYVERSIONS=3.5
PYMAIN=$(firstword $(PYVERSIONS)) PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter NAME=bitter
REPO=balkian REPO=balkian

View File

@@ -1 +1 @@
0.9.5 0.10.3

View File

@@ -6,10 +6,12 @@ http://github.com/balkian/bitter
import os import os
from .version import __version__ from .version import __version__
from . import config as bconf
def easy(*args, **kwargs): def easy(conffile=bconf.CONFIG_FILE):
from .crawlers import TwitterQueue from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
return TwitterQueue.from_config(conffile=conffile)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -26,6 +26,32 @@ else:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def serialize(function):
'''Common options to serialize output to CSV or other formats'''
@click.option('--fields', help='Provide a list of comma-separated fields to print.', default='', type=str)
@click.option('--ignore_missing', help='Do not show warnings for missing fields.', is_flag=True)
@click.option('--header', help='Header that will be printed at the beginning of the file', default=None)
@click.option('--csv', help='Print each object as a csv row.', is_flag=True)
@click.option('--jsonlines', '--json', help='Print each object as JSON in a new line.', is_flag=True)
@click.option('--indented', help='Print each object as an indented JSON object', is_flag=True)
@click.option('--outdelimiter', help='Delimiter for some output formats, such as CSV. It defaults to \t', default='\t')
@click.option('--outfile', help='Output file. It defaults to STDOUT', default=sys.stdout)
def decorated(fields, ignore_missing, header, csv, jsonlines, indented, outfile, outdelimiter, **kwargs):
it = function(**kwargs)
outformat = 'json'
if csv:
outformat = 'csv'
elif jsonlines:
outformat = 'jsonlines'
elif indented:
outformat = 'indented'
return utils.serialized(it, outfile, outformat=outformat, fields=fields.split(','), ignore_missing=ignore_missing, header=header, delimiter=outdelimiter)
return decorated
@click.group() @click.group()
@click.option("--verbose", is_flag=True) @click.option("--verbose", is_flag=True)
@click.option("--logging_level", required=False, default='WARN') @click.option("--logging_level", required=False, default='WARN')
@@ -45,6 +71,8 @@ def main(ctx, verbose, logging_level, config, credentials):
@main.group(invoke_without_command=True) @main.group(invoke_without_command=True)
@click.pass_context @click.pass_context
def credentials(ctx): def credentials(ctx):
if ctx.invoked_subcommand is not None:
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for worker in wq.queue: for worker in wq.queue:
print('#'*20) print('#'*20)
@@ -56,44 +84,47 @@ def credentials(ctx):
@credentials.command('limits') @credentials.command('limits')
@click.option('--all', type=bool, default=False, required=False, @click.option('--no_aggregate', is_flag=True, default=False,
help=('Print all limits. By default, it only limits that ' help=('Print limits from all workers. By default, limits are '
'aggregated (summed).'))
@click.option('--no_diff', is_flag=True, default=False,
help=('Print all limits. By default, only limits that '
'have been consumed will be shown.')) 'have been consumed will be shown.'))
@click.argument('url', required=False) @click.argument('url', required=False)
@click.pass_context @click.pass_context
def get_limits(ctx, all, url): def get_limits(ctx, no_aggregate, no_diff, url):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
total = {} limits = {}
if url:
print('URL is: {}'.format(url))
for worker in wq.queue: for worker in wq.queue:
resp = worker.client.application.rate_limit_status() resp = worker.client.application.rate_limit_status()
print('#'*20) for urlimits in resp['resources'].values():
print(worker.name) for url, value in urlimits.items():
if url: if url not in limits:
limit = 'NOT FOUND' limits[url] = {}
print('URL is: {}'.format(url)) glob = limits[url].get('global', {})
cat = url.split('/')[1] limits[url][worker.name] = value
if cat in resp['resources']: for k in ['limit', 'remaining']:
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat] if k not in glob:
else: glob[k] = 0
print('Cat {} not found'.format(cat)) glob[k] += value[k]
continue limits[url]['global'] = glob
for k in limit: for url, lims in limits.items():
total[k] = total.get(k, 0) + limit[k] worker_list = lims.keys() if no_aggregate else ['global', ]
print('{}: {}'.format(url, limit))
continue url_printed = False
nres = {}
if not all: for worker in worker_list:
for res, urls in resp['resources'].items(): vals = lims[worker]
nurls = {} consumed = vals['limit'] - vals['remaining']
for u, limits in urls.items(): if no_diff or consumed:
if limits['limit'] != limits['remaining']: if not url_printed:
nurls[u] = limits print(url)
if nurls: url_printed = True
nres[res] = nurls print('\t', worker, ':')
resp = nres print('\t\t', vals)
print(json.dumps(resp, indent=2))
if url:
print('Total for {}: {}'.format(url, total))
@credentials.command('add') @credentials.command('add')
@click.option('--consumer_key', default=None) @click.option('--consumer_key', default=None)
@@ -121,27 +152,30 @@ def tweet(ctx):
pass pass
@tweet.command('get') @tweet.command('get')
@click.option('-w', '--write', is_flag=True, default=False) @click.option('-d', '--dry_run', is_flag=True, default=False)
@click.option('-f', '--folder', default="tweets") @click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False) @click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
@click.argument('tweetid') @click.argument('tweetid')
def get_tweet(tweetid, write, folder, update): @serialize
def get_tweet(tweetid, dry_run, folder, update):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
utils.download_tweet(wq, tweetid, write, folder, update) yield from utils.download_tweet(wq, tweetid, not dry_run, folder, update)
@tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file. @tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file.
The result is stored as individual json files in your folder of choice.''') The result is stored as individual json files in your folder of choice.''')
@click.argument('tweetsfile', 'File with a list of tweets to look up') @click.argument('tweetsfile')
@click.option('-f', '--folder', default="tweets") @click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!') @click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads') @click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-d', '--delimiter', default=",") @click.option('-d', '--delimiter', default=",")
@click.option('-h', '--header', help='Discard the first line (use it as a header)', @click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
is_flag=True, default=False) @click.option('--skip', help='Discard the first DISCARD lines (use them as a header)', default=0)
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-q', '--quotechar', default='"') @click.option('-q', '--quotechar', default='"')
@click.option('-c', '--column', type=int, default=0) @click.option('-c', '--column', type=int, default=0)
@serialize
@click.pass_context @click.pass_context
def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotechar, column): def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, nocache, skip, quotechar, commentchar, column):
if update and not click.confirm('This may overwrite existing tweets. Continue?'): if update and not click.confirm('This may overwrite existing tweets. Continue?'):
click.echo('Cancelling') click.echo('Cancelling')
return return
@@ -149,22 +183,24 @@ def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotec
status = tqdm('Queried') status = tqdm('Queried')
failed = 0 failed = 0
for tid, obj in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter, for tid, obj in utils.download_tweets_file(wq, tweetsfile, folder, delimiter=delimiter, cache=not nocache,
batch_method=utils.tweet_download_batch, skip=skip, quotechar=quotechar, commentchar=commentchar,
header=header, quotechar=quotechar, column=column, update=update, retry_failed=retry):
column=column, update=update, retry_failed=retry):
status.update(1) status.update(1)
if not obj: if not obj:
failed += 1 failed += 1
status.set_description('Failed: %s. Queried' % failed, refresh=True) status.set_description('Failed: %s. Queried' % failed, refresh=True)
continue
yield obj
@tweet.command('search') @tweet.command('search')
@click.argument('query') @click.argument('query')
@serialize
@click.pass_context @click.pass_context
def search(ctx, query): def search(ctx, query):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
t = utils.search_tweet(wq, query) yield from utils.search_tweet(wq, query)
print(json.dumps(t, indent=2))
@tweet.command('timeline') @tweet.command('timeline')
@click.argument('user') @click.argument('user')
@@ -192,57 +228,48 @@ def list_users(ctx, db):
@users.command('get') @users.command('get')
@click.argument('user') @click.argument('user')
@click.option('-w', '--write', is_flag=True, default=False) @click.option('-d', '--dry_run', is_flag=True, default=False)
@click.option('-f', '--folder', default="users") @click.option('-f', '--folder', default="users")
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False) @click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
def get_user(user, write, folder, update): @serialize
def get_user(user, dry_run, folder, update):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
if not write: yield from utils.download_user(wq, user, not dry_run, folder, update)
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % user)
if not update and os.path.exists(file) and os.path.isfile(file):
print('User exists: %s' % user)
return
with open(file, 'w') as f:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js, file=f)
@users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file. @users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file.
The result is stored as individual json files in your folder of choice.''') The result is stored as individual json files in your folder of choice.''')
@click.argument('usersfile', 'File with a list of users to look up') @click.argument('usersfile')
@click.option('-f', '--folder', default="users") @click.option('-f', '--folder', default="users")
@click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!') @click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads') @click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
@click.option('-d', '--delimiter', default=",") @click.option('-d', '--delimiter', default=",")
@click.option('-h', '--header', help='Discard the first line (use it as a header)', @click.option('--skip', help='Discard the first SKIP lines (e.g., use them as a header)',
is_flag=True, default=False) is_flag=True, default=False)
@click.option('-q', '--quotechar', default='"') @click.option('-q', '--quotechar', default='"')
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-c', '--column', type=int, default=0) @click.option('-c', '--column', type=int, default=0)
@serialize
@click.pass_context @click.pass_context
def get_users(ctx, usersfile, folder, update, retry, delimiter, header, quotechar, column): def get_users(ctx, usersfile, folder, update, retry, nocache, delimiter, skip, quotechar, commentchar, column):
if update and not click.confirm('This may overwrite existing users. Continue?'): if update and not click.confirm('This may overwrite existing users. Continue?'):
click.echo('Cancelling') click.echo('Cancelling')
return return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for i in utils.download_file(wq, usersfile, folder, delimiter=delimiter, for i in utils.download_users_file(wq, usersfile, folder, delimiter=delimiter,
batch_method=utils.user_download_batch, update=update, retry_failed=retry,
update=update, retry_failed=retry, skip=skip, quotechar=quotechar,
header=header, quotechar=quotechar, cache=not nocache,
column=column): commentchar=commentchar,
pass column=column):
yield i
@users.command('crawl') @users.command('crawl')
@click.option('--db', required=True, help='Database to save all users.') @click.option('--db', required=True, help='Database to save all users.')
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.') @click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.') @click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.') @click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
@click.argument('usersfile', 'File with a list of users to look up') @click.argument('usersfile')
@click.pass_context @click.pass_context
def crawl_users(ctx, usersfile, skip, until, threads, db): def crawl_users(ctx, usersfile, skip, until, threads, db):
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
@@ -432,7 +459,6 @@ def extract(ctx, recursive, user, name, initfile):
@extractor.command('reset') @extractor.command('reset')
@click.pass_context @click.pass_context
def reset_extractor(ctx): def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
db = ctx.obj['DBURI'] db = ctx.obj['DBURI']
session = make_session(db) session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})

View File

@@ -19,7 +19,9 @@ import queue
import threading import threading
from select import select from select import select
from functools import partial import operator
from functools import partial, reduce
from tqdm import tqdm from tqdm import tqdm
@@ -59,7 +61,14 @@ def chunk(iterable, n):
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize) source = chunk(source, chunksize)
p = ThreadPool(numcpus*2) p = ThreadPool(numcpus*2)
results = p.imap_unordered(func, source)
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as ex:
print('Exception on parallel thread: {}'.format(ex), file=sys.stderr)
results = p.imap_unordered(wrapped_func, source)
for i in chain.from_iterable(results): for i in chain.from_iterable(results):
yield i yield i
@@ -106,7 +115,7 @@ def read_config(conffile):
raise Exception('No config file or BITTER_CONFIG env variable.') raise Exception('No config file or BITTER_CONFIG env variable.')
else: else:
f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n')) f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n'))
return yaml.load(f) or {'credentials': []} return yaml.load(f, Loader=yaml.SafeLoader) or {'credentials': []}
def write_config(conf, conffile=None): def write_config(conf, conffile=None):
@@ -270,8 +279,7 @@ def download_entry(wq, entry_id, dburi=None, recursive=False):
download_user(wq, session, user, entry, recursive) download_user(wq, session, user, entry, recursive)
session.close() session.close()
def crawl_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
total_followers = user.followers_count total_followers = user.followers_count
@@ -419,8 +427,11 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
pending = pending_entries(dburi) pending = pending_entries(dburi)
session.close() session.close()
for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users): with tqdm(parallel(de, pending), desc='Downloading users', total=total_users) as tq:
logger.info("Got %s" % i) for i in tq:
tq.write('Got {}'.format(i))
logger.info("Got %s" % i)
def pending_entries(dburi): def pending_entries(dburi):
@@ -449,7 +460,7 @@ def get_tweet(c, tid):
return c.statuses.show(id=tid) return c.statuses.show(id=tid)
def search_tweet(c, query): def search_tweet(c, query):
return c.search.tweets(q=query) yield from c.search.tweets(q=query)['statuses']
def user_timeline(c, query): def user_timeline(c, query):
try: try:
@@ -464,17 +475,25 @@ def get_user(c, user):
except ValueError: except ValueError:
return c.users.lookup(screen_name=user)[0] return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False): def download_tweet(wq, tweetid, cache=True, folder="downloaded_tweets", update=False):
cached = cached_id(tweetid, folder) tweet = cached_id(tweetid, folder)
tweet = None if update or not tweet:
if update or not cached:
tweet = get_tweet(wq, tweetid) tweet = get_tweet(wq, tweetid)
js = json.dumps(tweet) if cache and update:
if write:
if tweet: if tweet:
js = json.dumps(tweet)
write_json(js, folder) write_json(js, folder)
else: yield tweet
print(js)
def download_user(wq, userid, cache=True, folder="downloaded_users", update=False):
user = cached_id(userid, folder)
if update or not user:
user = get_user(wq, userid)
if cache and update:
if user:
write_json(user, folder, aliases=[user['screen_name'], ])
yield user
def cached_id(oid, folder): def cached_id(oid, folder):
@@ -489,7 +508,7 @@ def cached_id(oid, folder):
logger.error('Error getting cached version of {}: {}'.format(oid, ex)) logger.error('Error getting cached version of {}: {}'.format(oid, ex))
return tweet return tweet
def write_json(js, folder, oid=None): def write_json(js, folder, oid=None, aliases=[]):
if not oid: if not oid:
oid = js['id'] oid = js['id']
file = id_file(oid, folder) file = id_file(oid, folder)
@@ -498,6 +517,8 @@ def write_json(js, folder, oid=None):
with open(file, 'w') as f: with open(file, 'w') as f:
json.dump(js, f) json.dump(js, f)
logger.info('Written {} to file {}'.format(oid, file)) logger.info('Written {} to file {}'.format(oid, file))
for alias in aliases:
os.symlink('%s.json' % oid, id_file(alias, folder))
def id_file(oid, folder): def id_file(oid, folder):
return os.path.join(folder, '%s.json' % oid) return os.path.join(folder, '%s.json' % oid)
@@ -570,7 +591,8 @@ def dump_result(oid, obj, folder, ignore_fails=True):
with open(fail_file(oid, folder), 'w') as f: with open(fail_file(oid, folder), 'w') as f:
print('Object not found', file=f) print('Object not found', file=f)
def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False,
def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False, cache=True,
batch_method=tweet_download_batch): batch_method=tweet_download_batch):
done = Queue() done = Queue()
@@ -581,7 +603,6 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail
def filter_list(lst, done, down): def filter_list(lst, done, down):
print('filtering') print('filtering')
for oid in lst: for oid in lst:
# print('Checking {}'.format(line))
cached = cached_id(oid, folder) cached = cached_id(oid, folder)
if (cached and not update): if (cached and not update):
done.put((oid, cached)) done.put((oid, cached))
@@ -595,7 +616,9 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail
def gen(): def gen():
while True: while True:
r = down.get() r = down.get()
if not r: if r is None:
down.close()
down.join_thread()
return return
yield r yield r
@@ -622,21 +645,37 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail
rec = done.get() rec = done.get()
if rec is None: if rec is None:
done.close()
done.join_thread()
break break
oid, obj = rec oid, obj = rec
dump_result(oid, obj, folder, ignore_fails) if cache or (not obj):
dump_result(oid, obj, folder, ignore_fails)
yield rec yield rec
wait.join() wait.join()
def download_file(wq, csvfile, folder, column=0, delimiter=',', def download_tweets_file(*args, **kwargs):
header=False, quotechar='"', batch_method=tweet_download_batch, kwargs['batch_method'] = tweet_download_batch
yield from download_file(*args, **kwargs)
def download_users_file(*args, **kwargs):
kwargs['batch_method'] = user_download_batch
yield from download_file(*args, **kwargs)
def download_file(wq, csvfile, folder, column=0, delimiter=',', skip=0, cache=True,
quotechar='"', commentchar=None, batch_method=tweet_download_batch,
**kwargs): **kwargs):
with open(csvfile) as f: with open(csvfile) as f:
if commentchar:
f = (line for line in f if not line.startswith('#'))
csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar)) csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar))
if header: for n in range(skip):
next(csvreader) next(csvreader)
def reader(r): def reader(r):
@@ -645,7 +684,7 @@ def download_file(wq, csvfile, folder, column=0, delimiter=',',
yield row[column].strip() yield row[column].strip()
for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method, for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method, cache=cache,
**kwargs): **kwargs):
yield res yield res
@@ -723,3 +762,42 @@ def _users_control(func, apiargs, remaining=0, **kwargs):
if int(cursor) != -1: if int(cursor) != -1:
stop = False stop = False
return resp['users'], stop return resp['users'], stop
def serialized(it, outfile, outformat='csv', fields=[], header=None, ignore_missing=False, delimiter='\t'):
outformat = outformat.lower()
def do(out):
if outformat == 'csv':
writer = csv.writer(out, quoting=csv.QUOTE_ALL, delimiter=delimiter)
if header != '':
h = header
if h is None:
h = delimiter.join(fields)
print(h, file=out)
attrs = list(token.strip().split('.') for token in fields)
for obj in it:
values = []
for attr in attrs:
try:
values.append(reduce(operator.getitem, attr, obj))
except KeyError:
if not ignore_missing:
print('Key not present: {}'.format(attr), file=sys.stderr)
values.append(None)
writer.writerow(values)
elif outformat == 'jsonlines':
for obj in it:
print(json.dumps(obj, sort_keys=True), file=out)
elif outformat == 'indented':
for obj in it:
print(json.dumps(obj, indent=4, sort_keys=True), file=out)
else:
for obj in it:
print(obj, file=out)
if outfile is sys.stdout:
return do(sys.stdout)
with open(outfile, 'w') as out:
return do(out)

View File

@@ -48,13 +48,13 @@ class TestUtils(TestCase):
time.sleep(0.5) time.sleep(0.5)
return i return i
tic = time.time() tic = time.time()
resp = utils.parallel(echo, [1,2,3]) resp = utils.parallel(echo, [1, 2, 3])
assert isinstance(resp, types.GeneratorType) assert isinstance(resp, types.GeneratorType)
assert list(resp) == [1,2,3] assert sorted(list(resp)) == [1, 2, 3]
toc = time.time() toc = time.time()
assert (tic-toc) < 600 assert (tic-toc) < 600
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2) resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert list(resp2) == [1,2, 3,4] assert sorted(list(resp2)) == [1, 2, 3, 4]
class TestUtilsEnv(TestUtils): class TestUtilsEnv(TestUtils):