mirror of
https://github.com/balkian/bitter.git
synced 2025-10-25 12:48:23 +00:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ea848f1a78 | ||
|
|
030c41b826 | ||
|
|
bba73091e4 | ||
|
|
80b58541e7 | ||
|
|
40a8b45231 | ||
|
|
fadeced761 | ||
|
|
bdb844d75f |
@@ -2,6 +2,6 @@
|
||||
From python:2.7-onbuild
|
||||
Maintainer J. Fernando Sánchez @balkian
|
||||
|
||||
RUN pip install -e "/usr/src/app/[server]"
|
||||
RUN pip install ".[server]"
|
||||
|
||||
ENTRYPOINT ["bitter"]
|
||||
|
||||
2
Makefile
2
Makefile
@@ -1,4 +1,4 @@
|
||||
PYVERSIONS=3.4 2.7
|
||||
PYVERSIONS=3.5
|
||||
PYMAIN=$(firstword $(PYVERSIONS))
|
||||
NAME=bitter
|
||||
REPO=balkian
|
||||
|
||||
@@ -1 +1 @@
|
||||
0.9.5
|
||||
0.10.3
|
||||
|
||||
@@ -6,10 +6,12 @@ http://github.com/balkian/bitter
|
||||
import os
|
||||
|
||||
from .version import __version__
|
||||
from . import config as bconf
|
||||
|
||||
def easy(*args, **kwargs):
|
||||
def easy(conffile=bconf.CONFIG_FILE):
|
||||
from .crawlers import TwitterQueue
|
||||
return TwitterQueue.from_credentials(*args, **kwargs)
|
||||
|
||||
return TwitterQueue.from_config(conffile=conffile)
|
||||
|
||||
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]
|
||||
|
||||
|
||||
172
bitter/cli.py
172
bitter/cli.py
@@ -26,6 +26,32 @@ else:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def serialize(function):
|
||||
'''Common options to serialize output to CSV or other formats'''
|
||||
|
||||
@click.option('--fields', help='Provide a list of comma-separated fields to print.', default='', type=str)
|
||||
@click.option('--ignore_missing', help='Do not show warnings for missing fields.', is_flag=True)
|
||||
@click.option('--header', help='Header that will be printed at the beginning of the file', default=None)
|
||||
@click.option('--csv', help='Print each object as a csv row.', is_flag=True)
|
||||
@click.option('--jsonlines', '--json', help='Print each object as JSON in a new line.', is_flag=True)
|
||||
@click.option('--indented', help='Print each object as an indented JSON object', is_flag=True)
|
||||
@click.option('--outdelimiter', help='Delimiter for some output formats, such as CSV. It defaults to \t', default='\t')
|
||||
@click.option('--outfile', help='Output file. It defaults to STDOUT', default=sys.stdout)
|
||||
def decorated(fields, ignore_missing, header, csv, jsonlines, indented, outfile, outdelimiter, **kwargs):
|
||||
it = function(**kwargs)
|
||||
outformat = 'json'
|
||||
if csv:
|
||||
outformat = 'csv'
|
||||
elif jsonlines:
|
||||
outformat = 'jsonlines'
|
||||
elif indented:
|
||||
outformat = 'indented'
|
||||
|
||||
return utils.serialized(it, outfile, outformat=outformat, fields=fields.split(','), ignore_missing=ignore_missing, header=header, delimiter=outdelimiter)
|
||||
|
||||
return decorated
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.option("--verbose", is_flag=True)
|
||||
@click.option("--logging_level", required=False, default='WARN')
|
||||
@@ -45,6 +71,8 @@ def main(ctx, verbose, logging_level, config, credentials):
|
||||
@main.group(invoke_without_command=True)
|
||||
@click.pass_context
|
||||
def credentials(ctx):
|
||||
if ctx.invoked_subcommand is not None:
|
||||
return
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
for worker in wq.queue:
|
||||
print('#'*20)
|
||||
@@ -56,44 +84,47 @@ def credentials(ctx):
|
||||
|
||||
|
||||
@credentials.command('limits')
|
||||
@click.option('--all', type=bool, default=False, required=False,
|
||||
help=('Print all limits. By default, it only limits that '
|
||||
@click.option('--no_aggregate', is_flag=True, default=False,
|
||||
help=('Print limits from all workers. By default, limits are '
|
||||
'aggregated (summed).'))
|
||||
@click.option('--no_diff', is_flag=True, default=False,
|
||||
help=('Print all limits. By default, only limits that '
|
||||
'have been consumed will be shown.'))
|
||||
@click.argument('url', required=False)
|
||||
@click.pass_context
|
||||
def get_limits(ctx, all, url):
|
||||
def get_limits(ctx, no_aggregate, no_diff, url):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
total = {}
|
||||
limits = {}
|
||||
if url:
|
||||
print('URL is: {}'.format(url))
|
||||
for worker in wq.queue:
|
||||
resp = worker.client.application.rate_limit_status()
|
||||
print('#'*20)
|
||||
print(worker.name)
|
||||
if url:
|
||||
limit = 'NOT FOUND'
|
||||
print('URL is: {}'.format(url))
|
||||
cat = url.split('/')[1]
|
||||
if cat in resp['resources']:
|
||||
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
|
||||
else:
|
||||
print('Cat {} not found'.format(cat))
|
||||
continue
|
||||
for k in limit:
|
||||
total[k] = total.get(k, 0) + limit[k]
|
||||
print('{}: {}'.format(url, limit))
|
||||
continue
|
||||
nres = {}
|
||||
if not all:
|
||||
for res, urls in resp['resources'].items():
|
||||
nurls = {}
|
||||
for u, limits in urls.items():
|
||||
if limits['limit'] != limits['remaining']:
|
||||
nurls[u] = limits
|
||||
if nurls:
|
||||
nres[res] = nurls
|
||||
resp = nres
|
||||
print(json.dumps(resp, indent=2))
|
||||
if url:
|
||||
print('Total for {}: {}'.format(url, total))
|
||||
for urlimits in resp['resources'].values():
|
||||
for url, value in urlimits.items():
|
||||
if url not in limits:
|
||||
limits[url] = {}
|
||||
glob = limits[url].get('global', {})
|
||||
limits[url][worker.name] = value
|
||||
for k in ['limit', 'remaining']:
|
||||
if k not in glob:
|
||||
glob[k] = 0
|
||||
glob[k] += value[k]
|
||||
limits[url]['global'] = glob
|
||||
for url, lims in limits.items():
|
||||
worker_list = lims.keys() if no_aggregate else ['global', ]
|
||||
|
||||
url_printed = False
|
||||
|
||||
for worker in worker_list:
|
||||
vals = lims[worker]
|
||||
consumed = vals['limit'] - vals['remaining']
|
||||
if no_diff or consumed:
|
||||
if not url_printed:
|
||||
print(url)
|
||||
url_printed = True
|
||||
print('\t', worker, ':')
|
||||
print('\t\t', vals)
|
||||
|
||||
|
||||
@credentials.command('add')
|
||||
@click.option('--consumer_key', default=None)
|
||||
@@ -121,27 +152,30 @@ def tweet(ctx):
|
||||
pass
|
||||
|
||||
@tweet.command('get')
|
||||
@click.option('-w', '--write', is_flag=True, default=False)
|
||||
@click.option('-d', '--dry_run', is_flag=True, default=False)
|
||||
@click.option('-f', '--folder', default="tweets")
|
||||
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
|
||||
@click.argument('tweetid')
|
||||
def get_tweet(tweetid, write, folder, update):
|
||||
@serialize
|
||||
def get_tweet(tweetid, dry_run, folder, update):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
utils.download_tweet(wq, tweetid, write, folder, update)
|
||||
yield from utils.download_tweet(wq, tweetid, not dry_run, folder, update)
|
||||
|
||||
@tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file.
|
||||
The result is stored as individual json files in your folder of choice.''')
|
||||
@click.argument('tweetsfile', 'File with a list of tweets to look up')
|
||||
@click.argument('tweetsfile')
|
||||
@click.option('-f', '--folder', default="tweets")
|
||||
@click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!')
|
||||
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
|
||||
@click.option('-d', '--delimiter', default=",")
|
||||
@click.option('-h', '--header', help='Discard the first line (use it as a header)',
|
||||
is_flag=True, default=False)
|
||||
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
|
||||
@click.option('--skip', help='Discard the first DISCARD lines (use them as a header)', default=0)
|
||||
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
|
||||
@click.option('-q', '--quotechar', default='"')
|
||||
@click.option('-c', '--column', type=int, default=0)
|
||||
@serialize
|
||||
@click.pass_context
|
||||
def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotechar, column):
|
||||
def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, nocache, skip, quotechar, commentchar, column):
|
||||
if update and not click.confirm('This may overwrite existing tweets. Continue?'):
|
||||
click.echo('Cancelling')
|
||||
return
|
||||
@@ -149,22 +183,24 @@ def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotec
|
||||
|
||||
status = tqdm('Queried')
|
||||
failed = 0
|
||||
for tid, obj in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter,
|
||||
batch_method=utils.tweet_download_batch,
|
||||
header=header, quotechar=quotechar,
|
||||
column=column, update=update, retry_failed=retry):
|
||||
for tid, obj in utils.download_tweets_file(wq, tweetsfile, folder, delimiter=delimiter, cache=not nocache,
|
||||
skip=skip, quotechar=quotechar, commentchar=commentchar,
|
||||
column=column, update=update, retry_failed=retry):
|
||||
status.update(1)
|
||||
if not obj:
|
||||
failed += 1
|
||||
status.set_description('Failed: %s. Queried' % failed, refresh=True)
|
||||
continue
|
||||
yield obj
|
||||
|
||||
|
||||
@tweet.command('search')
|
||||
@click.argument('query')
|
||||
@serialize
|
||||
@click.pass_context
|
||||
def search(ctx, query):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
t = utils.search_tweet(wq, query)
|
||||
print(json.dumps(t, indent=2))
|
||||
yield from utils.search_tweet(wq, query)
|
||||
|
||||
@tweet.command('timeline')
|
||||
@click.argument('user')
|
||||
@@ -192,57 +228,48 @@ def list_users(ctx, db):
|
||||
|
||||
@users.command('get')
|
||||
@click.argument('user')
|
||||
@click.option('-w', '--write', is_flag=True, default=False)
|
||||
@click.option('-d', '--dry_run', is_flag=True, default=False)
|
||||
@click.option('-f', '--folder', default="users")
|
||||
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
|
||||
def get_user(user, write, folder, update):
|
||||
@serialize
|
||||
def get_user(user, dry_run, folder, update):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
if not write:
|
||||
u = utils.get_user(wq, user)
|
||||
js = json.dumps(u, indent=2)
|
||||
print(js)
|
||||
return
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
file = os.path.join(folder, '%s.json' % user)
|
||||
if not update and os.path.exists(file) and os.path.isfile(file):
|
||||
print('User exists: %s' % user)
|
||||
return
|
||||
with open(file, 'w') as f:
|
||||
u = utils.get_user(wq, user)
|
||||
js = json.dumps(u, indent=2)
|
||||
print(js, file=f)
|
||||
yield from utils.download_user(wq, user, not dry_run, folder, update)
|
||||
|
||||
@users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file.
|
||||
The result is stored as individual json files in your folder of choice.''')
|
||||
@click.argument('usersfile', 'File with a list of users to look up')
|
||||
@click.argument('usersfile')
|
||||
@click.option('-f', '--folder', default="users")
|
||||
@click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!')
|
||||
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
|
||||
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
|
||||
@click.option('-d', '--delimiter', default=",")
|
||||
@click.option('-h', '--header', help='Discard the first line (use it as a header)',
|
||||
@click.option('--skip', help='Discard the first SKIP lines (e.g., use them as a header)',
|
||||
is_flag=True, default=False)
|
||||
@click.option('-q', '--quotechar', default='"')
|
||||
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
|
||||
@click.option('-c', '--column', type=int, default=0)
|
||||
@serialize
|
||||
@click.pass_context
|
||||
def get_users(ctx, usersfile, folder, update, retry, delimiter, header, quotechar, column):
|
||||
def get_users(ctx, usersfile, folder, update, retry, nocache, delimiter, skip, quotechar, commentchar, column):
|
||||
if update and not click.confirm('This may overwrite existing users. Continue?'):
|
||||
click.echo('Cancelling')
|
||||
return
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
for i in utils.download_file(wq, usersfile, folder, delimiter=delimiter,
|
||||
batch_method=utils.user_download_batch,
|
||||
update=update, retry_failed=retry,
|
||||
header=header, quotechar=quotechar,
|
||||
column=column):
|
||||
pass
|
||||
for i in utils.download_users_file(wq, usersfile, folder, delimiter=delimiter,
|
||||
update=update, retry_failed=retry,
|
||||
skip=skip, quotechar=quotechar,
|
||||
cache=not nocache,
|
||||
commentchar=commentchar,
|
||||
column=column):
|
||||
yield i
|
||||
|
||||
@users.command('crawl')
|
||||
@click.option('--db', required=True, help='Database to save all users.')
|
||||
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
|
||||
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
|
||||
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
|
||||
@click.argument('usersfile', 'File with a list of users to look up')
|
||||
@click.argument('usersfile')
|
||||
@click.pass_context
|
||||
def crawl_users(ctx, usersfile, skip, until, threads, db):
|
||||
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
|
||||
@@ -432,7 +459,6 @@ def extract(ctx, recursive, user, name, initfile):
|
||||
@extractor.command('reset')
|
||||
@click.pass_context
|
||||
def reset_extractor(ctx):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
db = ctx.obj['DBURI']
|
||||
session = make_session(db)
|
||||
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
|
||||
|
||||
128
bitter/utils.py
128
bitter/utils.py
@@ -19,7 +19,9 @@ import queue
|
||||
import threading
|
||||
from select import select
|
||||
|
||||
from functools import partial
|
||||
import operator
|
||||
|
||||
from functools import partial, reduce
|
||||
|
||||
from tqdm import tqdm
|
||||
|
||||
@@ -59,7 +61,14 @@ def chunk(iterable, n):
|
||||
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
|
||||
source = chunk(source, chunksize)
|
||||
p = ThreadPool(numcpus*2)
|
||||
results = p.imap_unordered(func, source)
|
||||
|
||||
def wrapped_func(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except Exception as ex:
|
||||
print('Exception on parallel thread: {}'.format(ex), file=sys.stderr)
|
||||
|
||||
results = p.imap_unordered(wrapped_func, source)
|
||||
for i in chain.from_iterable(results):
|
||||
yield i
|
||||
|
||||
@@ -106,7 +115,7 @@ def read_config(conffile):
|
||||
raise Exception('No config file or BITTER_CONFIG env variable.')
|
||||
else:
|
||||
f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n'))
|
||||
return yaml.load(f) or {'credentials': []}
|
||||
return yaml.load(f, Loader=yaml.SafeLoader) or {'credentials': []}
|
||||
|
||||
|
||||
def write_config(conf, conffile=None):
|
||||
@@ -270,8 +279,7 @@ def download_entry(wq, entry_id, dburi=None, recursive=False):
|
||||
download_user(wq, session, user, entry, recursive)
|
||||
session.close()
|
||||
|
||||
|
||||
def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
|
||||
def crawl_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
|
||||
|
||||
total_followers = user.followers_count
|
||||
|
||||
@@ -419,8 +427,11 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
|
||||
pending = pending_entries(dburi)
|
||||
session.close()
|
||||
|
||||
for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users):
|
||||
logger.info("Got %s" % i)
|
||||
with tqdm(parallel(de, pending), desc='Downloading users', total=total_users) as tq:
|
||||
for i in tq:
|
||||
tq.write('Got {}'.format(i))
|
||||
logger.info("Got %s" % i)
|
||||
|
||||
|
||||
|
||||
def pending_entries(dburi):
|
||||
@@ -449,7 +460,7 @@ def get_tweet(c, tid):
|
||||
return c.statuses.show(id=tid)
|
||||
|
||||
def search_tweet(c, query):
|
||||
return c.search.tweets(q=query)
|
||||
yield from c.search.tweets(q=query)['statuses']
|
||||
|
||||
def user_timeline(c, query):
|
||||
try:
|
||||
@@ -464,17 +475,25 @@ def get_user(c, user):
|
||||
except ValueError:
|
||||
return c.users.lookup(screen_name=user)[0]
|
||||
|
||||
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
|
||||
cached = cached_id(tweetid, folder)
|
||||
tweet = None
|
||||
if update or not cached:
|
||||
def download_tweet(wq, tweetid, cache=True, folder="downloaded_tweets", update=False):
|
||||
tweet = cached_id(tweetid, folder)
|
||||
if update or not tweet:
|
||||
tweet = get_tweet(wq, tweetid)
|
||||
js = json.dumps(tweet)
|
||||
if write:
|
||||
if cache and update:
|
||||
if tweet:
|
||||
js = json.dumps(tweet)
|
||||
write_json(js, folder)
|
||||
else:
|
||||
print(js)
|
||||
yield tweet
|
||||
|
||||
|
||||
def download_user(wq, userid, cache=True, folder="downloaded_users", update=False):
|
||||
user = cached_id(userid, folder)
|
||||
if update or not user:
|
||||
user = get_user(wq, userid)
|
||||
if cache and update:
|
||||
if user:
|
||||
write_json(user, folder, aliases=[user['screen_name'], ])
|
||||
yield user
|
||||
|
||||
|
||||
def cached_id(oid, folder):
|
||||
@@ -489,7 +508,7 @@ def cached_id(oid, folder):
|
||||
logger.error('Error getting cached version of {}: {}'.format(oid, ex))
|
||||
return tweet
|
||||
|
||||
def write_json(js, folder, oid=None):
|
||||
def write_json(js, folder, oid=None, aliases=[]):
|
||||
if not oid:
|
||||
oid = js['id']
|
||||
file = id_file(oid, folder)
|
||||
@@ -498,6 +517,8 @@ def write_json(js, folder, oid=None):
|
||||
with open(file, 'w') as f:
|
||||
json.dump(js, f)
|
||||
logger.info('Written {} to file {}'.format(oid, file))
|
||||
for alias in aliases:
|
||||
os.symlink('%s.json' % oid, id_file(alias, folder))
|
||||
|
||||
def id_file(oid, folder):
|
||||
return os.path.join(folder, '%s.json' % oid)
|
||||
@@ -570,7 +591,8 @@ def dump_result(oid, obj, folder, ignore_fails=True):
|
||||
with open(fail_file(oid, folder), 'w') as f:
|
||||
print('Object not found', file=f)
|
||||
|
||||
def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False,
|
||||
|
||||
def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False, cache=True,
|
||||
batch_method=tweet_download_batch):
|
||||
|
||||
done = Queue()
|
||||
@@ -581,7 +603,6 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail
|
||||
def filter_list(lst, done, down):
|
||||
print('filtering')
|
||||
for oid in lst:
|
||||
# print('Checking {}'.format(line))
|
||||
cached = cached_id(oid, folder)
|
||||
if (cached and not update):
|
||||
done.put((oid, cached))
|
||||
@@ -595,7 +616,9 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail
|
||||
def gen():
|
||||
while True:
|
||||
r = down.get()
|
||||
if not r:
|
||||
if r is None:
|
||||
down.close()
|
||||
down.join_thread()
|
||||
return
|
||||
yield r
|
||||
|
||||
@@ -622,21 +645,37 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail
|
||||
rec = done.get()
|
||||
|
||||
if rec is None:
|
||||
done.close()
|
||||
done.join_thread()
|
||||
break
|
||||
|
||||
oid, obj = rec
|
||||
dump_result(oid, obj, folder, ignore_fails)
|
||||
if cache or (not obj):
|
||||
dump_result(oid, obj, folder, ignore_fails)
|
||||
yield rec
|
||||
|
||||
wait.join()
|
||||
|
||||
|
||||
def download_file(wq, csvfile, folder, column=0, delimiter=',',
|
||||
header=False, quotechar='"', batch_method=tweet_download_batch,
|
||||
def download_tweets_file(*args, **kwargs):
|
||||
kwargs['batch_method'] = tweet_download_batch
|
||||
yield from download_file(*args, **kwargs)
|
||||
|
||||
|
||||
def download_users_file(*args, **kwargs):
|
||||
kwargs['batch_method'] = user_download_batch
|
||||
yield from download_file(*args, **kwargs)
|
||||
|
||||
|
||||
def download_file(wq, csvfile, folder, column=0, delimiter=',', skip=0, cache=True,
|
||||
quotechar='"', commentchar=None, batch_method=tweet_download_batch,
|
||||
**kwargs):
|
||||
with open(csvfile) as f:
|
||||
if commentchar:
|
||||
f = (line for line in f if not line.startswith('#'))
|
||||
|
||||
csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar))
|
||||
if header:
|
||||
for n in range(skip):
|
||||
next(csvreader)
|
||||
|
||||
def reader(r):
|
||||
@@ -645,7 +684,7 @@ def download_file(wq, csvfile, folder, column=0, delimiter=',',
|
||||
yield row[column].strip()
|
||||
|
||||
|
||||
for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method,
|
||||
for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method, cache=cache,
|
||||
**kwargs):
|
||||
yield res
|
||||
|
||||
@@ -723,3 +762,42 @@ def _users_control(func, apiargs, remaining=0, **kwargs):
|
||||
if int(cursor) != -1:
|
||||
stop = False
|
||||
return resp['users'], stop
|
||||
|
||||
|
||||
def serialized(it, outfile, outformat='csv', fields=[], header=None, ignore_missing=False, delimiter='\t'):
|
||||
outformat = outformat.lower()
|
||||
def do(out):
|
||||
|
||||
if outformat == 'csv':
|
||||
writer = csv.writer(out, quoting=csv.QUOTE_ALL, delimiter=delimiter)
|
||||
if header != '':
|
||||
h = header
|
||||
if h is None:
|
||||
h = delimiter.join(fields)
|
||||
print(h, file=out)
|
||||
attrs = list(token.strip().split('.') for token in fields)
|
||||
for obj in it:
|
||||
values = []
|
||||
for attr in attrs:
|
||||
try:
|
||||
values.append(reduce(operator.getitem, attr, obj))
|
||||
except KeyError:
|
||||
if not ignore_missing:
|
||||
print('Key not present: {}'.format(attr), file=sys.stderr)
|
||||
values.append(None)
|
||||
writer.writerow(values)
|
||||
elif outformat == 'jsonlines':
|
||||
for obj in it:
|
||||
print(json.dumps(obj, sort_keys=True), file=out)
|
||||
elif outformat == 'indented':
|
||||
for obj in it:
|
||||
print(json.dumps(obj, indent=4, sort_keys=True), file=out)
|
||||
else:
|
||||
for obj in it:
|
||||
print(obj, file=out)
|
||||
|
||||
if outfile is sys.stdout:
|
||||
return do(sys.stdout)
|
||||
|
||||
with open(outfile, 'w') as out:
|
||||
return do(out)
|
||||
|
||||
@@ -48,13 +48,13 @@ class TestUtils(TestCase):
|
||||
time.sleep(0.5)
|
||||
return i
|
||||
tic = time.time()
|
||||
resp = utils.parallel(echo, [1,2,3])
|
||||
resp = utils.parallel(echo, [1, 2, 3])
|
||||
assert isinstance(resp, types.GeneratorType)
|
||||
assert list(resp) == [1,2,3]
|
||||
assert sorted(list(resp)) == [1, 2, 3]
|
||||
toc = time.time()
|
||||
assert (tic-toc) < 600
|
||||
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
|
||||
assert list(resp2) == [1,2, 3,4]
|
||||
assert sorted(list(resp2)) == [1, 2, 3, 4]
|
||||
|
||||
|
||||
class TestUtilsEnv(TestUtils):
|
||||
|
||||
Reference in New Issue
Block a user