Improve download_list

master 0.9.5
J. Fernando Sánchez 5 years ago
parent 02aec5eefa
commit 653487e2d7

@ -1 +1 @@
0.9.3 0.9.5

@ -21,7 +21,7 @@ if sys.version_info <= (3, 0):
from contextlib2 import ExitStack from contextlib2 import ExitStack
else: else:
from contextlib import ExitStack from contextlib import ExitStack
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -42,10 +42,58 @@ def main(ctx, verbose, logging_level, config, credentials):
utils.copy_credentials_to_config(credentials, config) utils.copy_credentials_to_config(credentials, config)
@main.group() @main.group(invoke_without_command=True)
@click.pass_context @click.pass_context
def credentials(ctx): def credentials(ctx):
pass wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for worker in wq.queue:
print('#'*20)
try:
resp = worker.client.application.rate_limit_status()
print(worker.name)
except Exception as ex:
print('{}: AUTHENTICATION ERROR: {}'.format(worker.name, ex) )
@credentials.command('limits')
@click.option('--all', type=bool, default=False, required=False,
help=('Print all limits. By default, it only limits that '
'have been consumed will be shown.'))
@click.argument('url', required=False)
@click.pass_context
def get_limits(ctx, all, url):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
total = {}
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
print('#'*20)
print(worker.name)
if url:
limit = 'NOT FOUND'
print('URL is: {}'.format(url))
cat = url.split('/')[1]
if cat in resp['resources']:
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
else:
print('Cat {} not found'.format(cat))
continue
for k in limit:
total[k] = total.get(k, 0) + limit[k]
print('{}: {}'.format(url, limit))
continue
nres = {}
if not all:
for res, urls in resp['resources'].items():
nurls = {}
for u, limits in urls.items():
if limits['limit'] != limits['remaining']:
nurls[u] = limits
if nurls:
nres[res] = nurls
resp = nres
print(json.dumps(resp, indent=2))
if url:
print('Total for {}: {}'.format(url, total))
@credentials.command('add') @credentials.command('add')
@click.option('--consumer_key', default=None) @click.option('--consumer_key', default=None)
@ -68,7 +116,7 @@ def add(user_name, consumer_key, consumer_secret, token_key, token_secret):
@main.group() @main.group()
@click.pass_context @click.pass_context
def tweet(ctx): def tweet(ctx):
pass pass
@ -98,15 +146,21 @@ def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotec
click.echo('Cancelling') click.echo('Cancelling')
return return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for i in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter,
batch_method=utils.tweet_download_batch, status = tqdm('Queried')
header=header, quotechar=quotechar, failed = 0
column=column, update=update, retry_failed=retry): for tid, obj in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter,
pass batch_method=utils.tweet_download_batch,
header=header, quotechar=quotechar,
column=column, update=update, retry_failed=retry):
status.update(1)
if not obj:
failed += 1
status.set_description('Failed: %s. Queried' % failed, refresh=True)
@tweet.command('search') @tweet.command('search')
@click.argument('query') @click.argument('query')
@click.pass_context @click.pass_context
def search(ctx, query): def search(ctx, query):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
t = utils.search_tweet(wq, query) t = utils.search_tweet(wq, query)
@ -114,7 +168,7 @@ def search(ctx, query):
@tweet.command('timeline') @tweet.command('timeline')
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def timeline(ctx, user): def timeline(ctx, user):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
t = utils.user_timeline(wq, user) t = utils.user_timeline(wq, user)
@ -293,7 +347,7 @@ def crawl_users(ctx, usersfile, skip, until, threads, db):
speed = (collected-lastcollected)/10 speed = (collected-lastcollected)/10
with statslock: with statslock:
lastcollected = collected lastcollected = collected
logger.info('Done!') logger.info('Done!')
@main.group('extractor') @main.group('extractor')
@ -344,7 +398,7 @@ def network_extractor(ctx, as_json):
if as_json: if as_json:
import json import json
print(json.dumps(follower_map, indent=4)) print(json.dumps(follower_map, indent=4))
@extractor.command('users') @extractor.command('users')
@click.pass_context @click.pass_context
@ -383,34 +437,6 @@ def reset_extractor(ctx):
session = make_session(db) session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@main.command('limits')
@click.argument('url', required=False)
@click.pass_context
def get_limits(ctx, url):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
total = {}
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
print('#'*20)
print(worker.name)
if url:
limit = 'NOT FOUND'
print('URL is: {}'.format(url))
cat = url.split('/')[1]
if cat in resp['resources']:
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
else:
print('Cat {} not found'.format(cat))
continue
for k in limit:
total[k] = total.get(k, 0) + limit[k]
print('{}: {}'.format(url, limit))
else:
print(json.dumps(resp, indent=2))
if url:
print('Total for {}: {}'.format(url, total))
@main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False), @main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False),
help='''Issue a call to an endpoint of the Twitter API.''') help='''Issue a call to an endpoint of the Twitter API.''')
@ -454,7 +480,7 @@ def run_server(ctx, consumer_key, consumer_secret):
app.run(host='0.0.0.0') app.run(host='0.0.0.0')
@main.group() @main.group()
@click.pass_context @click.pass_context
def stream(ctx): def stream(ctx):
pass pass
@ -463,7 +489,7 @@ def stream(ctx):
@click.option('-t', '--track', default=None) @click.option('-t', '--track', default=None)
@click.option('-f', '--file', default=None, help='File to store the stream of tweets') @click.option('-f', '--file', default=None, help='File to store the stream of tweets')
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True) @click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
@click.pass_context @click.pass_context
def get_stream(ctx, locations, track, file, politelyretry): def get_stream(ctx, locations, track, file, politelyretry):
wq = crawlers.StreamQueue.from_config(conffile=bconf.CONFIG_FILE, max_workers=1) wq = crawlers.StreamQueue.from_config(conffile=bconf.CONFIG_FILE, max_workers=1)
@ -505,7 +531,7 @@ def get_stream(ctx, locations, track, file, politelyretry):
@stream.command('read') @stream.command('read')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True) @click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False) @click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False)
@click.pass_context @click.pass_context
def read_stream(ctx, file, tail): def read_stream(ctx, file, tail):
for tweet in utils.read_file(file, tail=tail): for tweet in utils.read_file(file, tail=tail):
try: try:
@ -516,12 +542,12 @@ def read_stream(ctx, file, tail):
@stream.command('tags') @stream.command('tags')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True) @click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.argument('limit', required=False, default=None, type=int) @click.argument('limit', required=False, default=None, type=int)
@click.pass_context @click.pass_context
def tags_stream(ctx, file, limit): def tags_stream(ctx, file, limit):
c = utils.get_hashtags(utils.read_file(file)) c = utils.get_hashtags(utils.read_file(file))
for count, tag in c.most_common(limit): for count, tag in c.most_common(limit):
print(u'{} - {}'.format(count, tag)) print(u'{} - {}'.format(count, tag))
if __name__ == '__main__': if __name__ == '__main__':
main() main()

@ -3,7 +3,7 @@ Common configuration for other modules.
It is not elegant, but it works with flask and the oauth decorators. It is not elegant, but it works with flask and the oauth decorators.
Using this module allows you to change the config before loading any other module. Using this module allows you to change the config before loading any other module.
E.g.: E.g.:
import bitter.config as c import bitter.config as c
c.CREDENTIALS="/tmp/credentials" c.CREDENTIALS="/tmp/credentials"

@ -13,6 +13,11 @@ import sqlalchemy
import os import os
import multiprocessing import multiprocessing
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from multiprocessing import Queue
import queue
import threading
from select import select
from functools import partial from functools import partial
@ -22,6 +27,7 @@ from itertools import islice, chain
from contextlib import contextmanager from contextlib import contextmanager
from collections import Counter from collections import Counter
from random import choice
from builtins import map, filter from builtins import map, filter
@ -53,7 +59,7 @@ def chunk(iterable, n):
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize) source = chunk(source, chunksize)
p = ThreadPool(numcpus*2) p = ThreadPool(numcpus*2)
results = p.imap_unordered(func, source, chunksize=int(1000/numcpus)) results = p.imap_unordered(func, source)
for i in chain.from_iterable(results): for i in chain.from_iterable(results):
yield i yield i
@ -507,7 +513,8 @@ def id_failed(oid, folder):
def tweet_download_batch(wq, batch): def tweet_download_batch(wq, batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id'] tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
return tweets.items() for tid, tweet in tweets.items():
yield tid, tweet
def user_download_batch(wq, batch): def user_download_batch(wq, batch):
screen_names = [] screen_names = []
@ -547,45 +554,81 @@ def user_download_batch(wq, batch):
yield (name, None) yield (name, None)
def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=True, def dump_result(oid, obj, folder, ignore_fails=True):
if obj:
try:
write_json(obj, folder=folder, oid=oid)
failed = fail_file(oid, folder)
if os.path.exists(failed):
os.remove(failed)
except Exception as ex:
logger.error('%s: %s' % (oid, ex))
if not ignore_fails:
raise
else:
logger.info('Object not recovered: {}'.format(oid))
with open(fail_file(oid, folder), 'w') as f:
print('Object not found', file=f)
def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False,
batch_method=tweet_download_batch): batch_method=tweet_download_batch):
def filter_lines(line):
# print('Checking {}'.format(line)) done = Queue()
oid = line[0]
if (cached_id(oid, folder) and not update) or (id_failed(oid, folder) and not retry_failed): down = Queue()
yield None
else:
yield str(oid) def filter_list(lst, done, down):
print('filtering')
def print_result(res): for oid in lst:
for oid, obj in res: # print('Checking {}'.format(line))
if obj: cached = cached_id(oid, folder)
try: if (cached and not update):
write_json(obj, folder=folder, oid=oid) done.put((oid, cached))
failed = fail_file(oid, folder) elif (id_failed(oid, folder) and not retry_failed):
if os.path.exists(failed): done.put((oid, None))
os.remove(failed) else:
yield 1 down.put(oid)
except Exception as ex: down.put(None)
logger.error('%s: %s' % (oid, ex))
if not ignore_fails: def download_results(batch_method, down, done):
raise def gen():
else: while True:
logger.info('Object not recovered: {}'.format(oid)) r = down.get()
with open(fail_file(oid, folder), 'w') as f: if not r:
print('Object not found', file=f) return
yield -1 yield r
objects_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_lines, lst), desc='Total objects')) for t in parallel(batch_method, gen(), 100):
batch_method = partial(batch_method, wq) done.put(t)
objects = parallel(batch_method, objects_to_crawl, 100)
failed = 0 def batch(*args, **kwargs):
pbar = tqdm(parallel(print_result, objects), desc='Queried') return batch_method(wq, *args, **kwargs)
for res in pbar:
if res < 0: tc = threading.Thread(target=filter_list, args=(lst, done, down), daemon=True)
failed += 1 tc.start()
pbar.set_description('Failed: %s. Queried' % failed, refresh=True) td = threading.Thread(target=download_results, args=(batch, down, done), daemon=True)
yield res td.start()
def check_threads(ts, done):
for t in ts:
t.join()
done.put(None)
wait = threading.Thread(target=check_threads, args=([tc, td], done), daemon=True)
wait.start()
while True:
rec = done.get()
if rec is None:
break
oid, obj = rec
dump_result(oid, obj, folder, ignore_fails)
yield rec
wait.join()
def download_file(wq, csvfile, folder, column=0, delimiter=',', def download_file(wq, csvfile, folder, column=0, delimiter=',',
@ -595,8 +638,14 @@ def download_file(wq, csvfile, folder, column=0, delimiter=',',
csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar)) csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar))
if header: if header:
next(csvreader) next(csvreader)
tweets = map(lambda row: row[0].strip(), csvreader)
for res in download_list(wq, tweets, folder, batch_method=batch_method, def reader(r):
for row in csvreader:
if len(row) > column:
yield row[column].strip()
for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method,
**kwargs): **kwargs):
yield res yield res

Loading…
Cancel
Save