1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-25 20:58:24 +00:00

7 Commits

Author SHA1 Message Date
J. Fernando Sánchez
ea848f1a78 Version 0.10.3
* Ability to include "optional" fields from tweets (e.g., retweeted_status).
* Optional caching (for very large datasets)
2020-06-24 11:01:02 +02:00
J. Fernando Sánchez
030c41b826 Changes to user and tweet search: Cache by default
* Improved printing of credential limits
* Tweet and user searchers cache by default. Write has been changed to dry_run
2020-01-07 20:36:19 +01:00
J. Fernando Sánchez
bba73091e4 Fixes for serialize
Some commands in the CLI did not properly include serialize (get_tweet,
get_users and search)
2020-01-07 19:36:18 +01:00
J. Fernando Sánchez
80b58541e7 Add serialize 2019-11-12 17:15:26 +01:00
J. Fernando Sánchez
40a8b45231 Fix concurrency issue
Download_list sometimes failed with:
BrokenPipeError: [Errno 32] Broken pipe
2019-09-20 13:39:51 +02:00
J. Fernando Sánchez
fadeced761 Fix tests py2 2019-09-19 12:18:56 +02:00
J. Fernando Sánchez
bdb844d75f Fix compatibility click >=7 2019-09-19 11:45:12 +02:00
7 changed files with 212 additions and 106 deletions

View File

@@ -2,6 +2,6 @@
From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
RUN pip install ".[server]"
ENTRYPOINT ["bitter"]

View File

@@ -1,4 +1,4 @@
PYVERSIONS=3.4 2.7
PYVERSIONS=3.5
PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter
REPO=balkian

View File

@@ -1 +1 @@
0.9.5
0.10.3

View File

@@ -6,10 +6,12 @@ http://github.com/balkian/bitter
import os
from .version import __version__
from . import config as bconf
def easy(*args, **kwargs):
def easy(conffile=bconf.CONFIG_FILE):
from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
return TwitterQueue.from_config(conffile=conffile)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -26,6 +26,32 @@ else:
logger = logging.getLogger(__name__)
def serialize(function):
'''Common options to serialize output to CSV or other formats'''
@click.option('--fields', help='Provide a list of comma-separated fields to print.', default='', type=str)
@click.option('--ignore_missing', help='Do not show warnings for missing fields.', is_flag=True)
@click.option('--header', help='Header that will be printed at the beginning of the file', default=None)
@click.option('--csv', help='Print each object as a csv row.', is_flag=True)
@click.option('--jsonlines', '--json', help='Print each object as JSON in a new line.', is_flag=True)
@click.option('--indented', help='Print each object as an indented JSON object', is_flag=True)
@click.option('--outdelimiter', help='Delimiter for some output formats, such as CSV. It defaults to \t', default='\t')
@click.option('--outfile', help='Output file. It defaults to STDOUT', default=sys.stdout)
def decorated(fields, ignore_missing, header, csv, jsonlines, indented, outfile, outdelimiter, **kwargs):
it = function(**kwargs)
outformat = 'json'
if csv:
outformat = 'csv'
elif jsonlines:
outformat = 'jsonlines'
elif indented:
outformat = 'indented'
return utils.serialized(it, outfile, outformat=outformat, fields=fields.split(','), ignore_missing=ignore_missing, header=header, delimiter=outdelimiter)
return decorated
@click.group()
@click.option("--verbose", is_flag=True)
@click.option("--logging_level", required=False, default='WARN')
@@ -45,6 +71,8 @@ def main(ctx, verbose, logging_level, config, credentials):
@main.group(invoke_without_command=True)
@click.pass_context
def credentials(ctx):
if ctx.invoked_subcommand is not None:
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for worker in wq.queue:
print('#'*20)
@@ -56,44 +84,47 @@ def credentials(ctx):
@credentials.command('limits')
@click.option('--all', type=bool, default=False, required=False,
help=('Print all limits. By default, it only limits that '
@click.option('--no_aggregate', is_flag=True, default=False,
help=('Print limits from all workers. By default, limits are '
'aggregated (summed).'))
@click.option('--no_diff', is_flag=True, default=False,
help=('Print all limits. By default, only limits that '
'have been consumed will be shown.'))
@click.argument('url', required=False)
@click.pass_context
def get_limits(ctx, all, url):
def get_limits(ctx, no_aggregate, no_diff, url):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
total = {}
limits = {}
if url:
print('URL is: {}'.format(url))
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
print('#'*20)
print(worker.name)
if url:
limit = 'NOT FOUND'
print('URL is: {}'.format(url))
cat = url.split('/')[1]
if cat in resp['resources']:
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
else:
print('Cat {} not found'.format(cat))
continue
for k in limit:
total[k] = total.get(k, 0) + limit[k]
print('{}: {}'.format(url, limit))
continue
nres = {}
if not all:
for res, urls in resp['resources'].items():
nurls = {}
for u, limits in urls.items():
if limits['limit'] != limits['remaining']:
nurls[u] = limits
if nurls:
nres[res] = nurls
resp = nres
print(json.dumps(resp, indent=2))
if url:
print('Total for {}: {}'.format(url, total))
for urlimits in resp['resources'].values():
for url, value in urlimits.items():
if url not in limits:
limits[url] = {}
glob = limits[url].get('global', {})
limits[url][worker.name] = value
for k in ['limit', 'remaining']:
if k not in glob:
glob[k] = 0
glob[k] += value[k]
limits[url]['global'] = glob
for url, lims in limits.items():
worker_list = lims.keys() if no_aggregate else ['global', ]
url_printed = False
for worker in worker_list:
vals = lims[worker]
consumed = vals['limit'] - vals['remaining']
if no_diff or consumed:
if not url_printed:
print(url)
url_printed = True
print('\t', worker, ':')
print('\t\t', vals)
@credentials.command('add')
@click.option('--consumer_key', default=None)
@@ -121,27 +152,30 @@ def tweet(ctx):
pass
@tweet.command('get')
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-d', '--dry_run', is_flag=True, default=False)
@click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
@click.argument('tweetid')
def get_tweet(tweetid, write, folder, update):
@serialize
def get_tweet(tweetid, dry_run, folder, update):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
utils.download_tweet(wq, tweetid, write, folder, update)
yield from utils.download_tweet(wq, tweetid, not dry_run, folder, update)
@tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file.
The result is stored as individual json files in your folder of choice.''')
@click.argument('tweetsfile', 'File with a list of tweets to look up')
@click.argument('tweetsfile')
@click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-d', '--delimiter', default=",")
@click.option('-h', '--header', help='Discard the first line (use it as a header)',
is_flag=True, default=False)
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
@click.option('--skip', help='Discard the first DISCARD lines (use them as a header)', default=0)
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-q', '--quotechar', default='"')
@click.option('-c', '--column', type=int, default=0)
@serialize
@click.pass_context
def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotechar, column):
def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, nocache, skip, quotechar, commentchar, column):
if update and not click.confirm('This may overwrite existing tweets. Continue?'):
click.echo('Cancelling')
return
@@ -149,22 +183,24 @@ def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotec
status = tqdm('Queried')
failed = 0
for tid, obj in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter,
batch_method=utils.tweet_download_batch,
header=header, quotechar=quotechar,
for tid, obj in utils.download_tweets_file(wq, tweetsfile, folder, delimiter=delimiter, cache=not nocache,
skip=skip, quotechar=quotechar, commentchar=commentchar,
column=column, update=update, retry_failed=retry):
status.update(1)
if not obj:
failed += 1
status.set_description('Failed: %s. Queried' % failed, refresh=True)
continue
yield obj
@tweet.command('search')
@click.argument('query')
@serialize
@click.pass_context
def search(ctx, query):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
t = utils.search_tweet(wq, query)
print(json.dumps(t, indent=2))
yield from utils.search_tweet(wq, query)
@tweet.command('timeline')
@click.argument('user')
@@ -192,57 +228,48 @@ def list_users(ctx, db):
@users.command('get')
@click.argument('user')
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-d', '--dry_run', is_flag=True, default=False)
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
def get_user(user, write, folder, update):
@serialize
def get_user(user, dry_run, folder, update):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
if not write:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % user)
if not update and os.path.exists(file) and os.path.isfile(file):
print('User exists: %s' % user)
return
with open(file, 'w') as f:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js, file=f)
yield from utils.download_user(wq, user, not dry_run, folder, update)
@users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file.
The result is stored as individual json files in your folder of choice.''')
@click.argument('usersfile', 'File with a list of users to look up')
@click.argument('usersfile')
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
@click.option('-d', '--delimiter', default=",")
@click.option('-h', '--header', help='Discard the first line (use it as a header)',
@click.option('--skip', help='Discard the first SKIP lines (e.g., use them as a header)',
is_flag=True, default=False)
@click.option('-q', '--quotechar', default='"')
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-c', '--column', type=int, default=0)
@serialize
@click.pass_context
def get_users(ctx, usersfile, folder, update, retry, delimiter, header, quotechar, column):
def get_users(ctx, usersfile, folder, update, retry, nocache, delimiter, skip, quotechar, commentchar, column):
if update and not click.confirm('This may overwrite existing users. Continue?'):
click.echo('Cancelling')
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for i in utils.download_file(wq, usersfile, folder, delimiter=delimiter,
batch_method=utils.user_download_batch,
for i in utils.download_users_file(wq, usersfile, folder, delimiter=delimiter,
update=update, retry_failed=retry,
header=header, quotechar=quotechar,
skip=skip, quotechar=quotechar,
cache=not nocache,
commentchar=commentchar,
column=column):
pass
yield i
@users.command('crawl')
@click.option('--db', required=True, help='Database to save all users.')
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
@click.argument('usersfile', 'File with a list of users to look up')
@click.argument('usersfile')
@click.pass_context
def crawl_users(ctx, usersfile, skip, until, threads, db):
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
@@ -432,7 +459,6 @@ def extract(ctx, recursive, user, name, initfile):
@extractor.command('reset')
@click.pass_context
def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
db = ctx.obj['DBURI']
session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})

View File

@@ -19,7 +19,9 @@ import queue
import threading
from select import select
from functools import partial
import operator
from functools import partial, reduce
from tqdm import tqdm
@@ -59,7 +61,14 @@ def chunk(iterable, n):
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize)
p = ThreadPool(numcpus*2)
results = p.imap_unordered(func, source)
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as ex:
print('Exception on parallel thread: {}'.format(ex), file=sys.stderr)
results = p.imap_unordered(wrapped_func, source)
for i in chain.from_iterable(results):
yield i
@@ -106,7 +115,7 @@ def read_config(conffile):
raise Exception('No config file or BITTER_CONFIG env variable.')
else:
f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n'))
return yaml.load(f) or {'credentials': []}
return yaml.load(f, Loader=yaml.SafeLoader) or {'credentials': []}
def write_config(conf, conffile=None):
@@ -270,8 +279,7 @@ def download_entry(wq, entry_id, dburi=None, recursive=False):
download_user(wq, session, user, entry, recursive)
session.close()
def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
def crawl_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
total_followers = user.followers_count
@@ -419,10 +427,13 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
pending = pending_entries(dburi)
session.close()
for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users):
with tqdm(parallel(de, pending), desc='Downloading users', total=total_users) as tq:
for i in tq:
tq.write('Got {}'.format(i))
logger.info("Got %s" % i)
def pending_entries(dburi):
session = make_session(dburi)
while True:
@@ -449,7 +460,7 @@ def get_tweet(c, tid):
return c.statuses.show(id=tid)
def search_tweet(c, query):
return c.search.tweets(q=query)
yield from c.search.tweets(q=query)['statuses']
def user_timeline(c, query):
try:
@@ -464,17 +475,25 @@ def get_user(c, user):
except ValueError:
return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
cached = cached_id(tweetid, folder)
tweet = None
if update or not cached:
def download_tweet(wq, tweetid, cache=True, folder="downloaded_tweets", update=False):
tweet = cached_id(tweetid, folder)
if update or not tweet:
tweet = get_tweet(wq, tweetid)
js = json.dumps(tweet)
if write:
if cache and update:
if tweet:
js = json.dumps(tweet)
write_json(js, folder)
else:
print(js)
yield tweet
def download_user(wq, userid, cache=True, folder="downloaded_users", update=False):
user = cached_id(userid, folder)
if update or not user:
user = get_user(wq, userid)
if cache and update:
if user:
write_json(user, folder, aliases=[user['screen_name'], ])
yield user
def cached_id(oid, folder):
@@ -489,7 +508,7 @@ def cached_id(oid, folder):
logger.error('Error getting cached version of {}: {}'.format(oid, ex))
return tweet
def write_json(js, folder, oid=None):
def write_json(js, folder, oid=None, aliases=[]):
if not oid:
oid = js['id']
file = id_file(oid, folder)
@@ -498,6 +517,8 @@ def write_json(js, folder, oid=None):
with open(file, 'w') as f:
json.dump(js, f)
logger.info('Written {} to file {}'.format(oid, file))
for alias in aliases:
os.symlink('%s.json' % oid, id_file(alias, folder))
def id_file(oid, folder):
return os.path.join(folder, '%s.json' % oid)
@@ -570,7 +591,8 @@ def dump_result(oid, obj, folder, ignore_fails=True):
with open(fail_file(oid, folder), 'w') as f:
print('Object not found', file=f)
def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False,
def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False, cache=True,
batch_method=tweet_download_batch):
done = Queue()
@@ -581,7 +603,6 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail
def filter_list(lst, done, down):
print('filtering')
for oid in lst:
# print('Checking {}'.format(line))
cached = cached_id(oid, folder)
if (cached and not update):
done.put((oid, cached))
@@ -595,7 +616,9 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail
def gen():
while True:
r = down.get()
if not r:
if r is None:
down.close()
down.join_thread()
return
yield r
@@ -622,21 +645,37 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail
rec = done.get()
if rec is None:
done.close()
done.join_thread()
break
oid, obj = rec
if cache or (not obj):
dump_result(oid, obj, folder, ignore_fails)
yield rec
wait.join()
def download_file(wq, csvfile, folder, column=0, delimiter=',',
header=False, quotechar='"', batch_method=tweet_download_batch,
def download_tweets_file(*args, **kwargs):
kwargs['batch_method'] = tweet_download_batch
yield from download_file(*args, **kwargs)
def download_users_file(*args, **kwargs):
kwargs['batch_method'] = user_download_batch
yield from download_file(*args, **kwargs)
def download_file(wq, csvfile, folder, column=0, delimiter=',', skip=0, cache=True,
quotechar='"', commentchar=None, batch_method=tweet_download_batch,
**kwargs):
with open(csvfile) as f:
if commentchar:
f = (line for line in f if not line.startswith('#'))
csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar))
if header:
for n in range(skip):
next(csvreader)
def reader(r):
@@ -645,7 +684,7 @@ def download_file(wq, csvfile, folder, column=0, delimiter=',',
yield row[column].strip()
for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method,
for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method, cache=cache,
**kwargs):
yield res
@@ -723,3 +762,42 @@ def _users_control(func, apiargs, remaining=0, **kwargs):
if int(cursor) != -1:
stop = False
return resp['users'], stop
def serialized(it, outfile, outformat='csv', fields=[], header=None, ignore_missing=False, delimiter='\t'):
outformat = outformat.lower()
def do(out):
if outformat == 'csv':
writer = csv.writer(out, quoting=csv.QUOTE_ALL, delimiter=delimiter)
if header != '':
h = header
if h is None:
h = delimiter.join(fields)
print(h, file=out)
attrs = list(token.strip().split('.') for token in fields)
for obj in it:
values = []
for attr in attrs:
try:
values.append(reduce(operator.getitem, attr, obj))
except KeyError:
if not ignore_missing:
print('Key not present: {}'.format(attr), file=sys.stderr)
values.append(None)
writer.writerow(values)
elif outformat == 'jsonlines':
for obj in it:
print(json.dumps(obj, sort_keys=True), file=out)
elif outformat == 'indented':
for obj in it:
print(json.dumps(obj, indent=4, sort_keys=True), file=out)
else:
for obj in it:
print(obj, file=out)
if outfile is sys.stdout:
return do(sys.stdout)
with open(outfile, 'w') as out:
return do(out)

View File

@@ -48,13 +48,13 @@ class TestUtils(TestCase):
time.sleep(0.5)
return i
tic = time.time()
resp = utils.parallel(echo, [1,2,3])
resp = utils.parallel(echo, [1, 2, 3])
assert isinstance(resp, types.GeneratorType)
assert list(resp) == [1,2,3]
assert sorted(list(resp)) == [1, 2, 3]
toc = time.time()
assert (tic-toc) < 600
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert list(resp2) == [1,2, 3,4]
assert sorted(list(resp2)) == [1, 2, 3, 4]
class TestUtilsEnv(TestUtils):