from __future__ import print_function
import click
import json
import os
import logging
import time
import sqlalchemy.types
import threading
import sqlite3
import operator
from functools import reduce
from tqdm import tqdm
from sqlalchemy import exists
from bitter import utils, models, crawlers
from bitter import config as bconf
from bitter.models import make_session, User, ExtractorEntry, Following
import sys
import csv as tsv
if sys.version_info <= (3, 0):
from contextlib2 import ExitStack
from contextlib import ExitStack
logger = logging.getLogger(__name__)
def serialize(function):
'''Common options to serialize output to CSV or other formats'''
@click.option('--csv', help='Print each object as a csv row. Provide a list of comma-separated fields to print.', default='', type=str)
@click.option('--header', help='Header that will be printed at the beginning of the file', default=None)
@click.option('--jsonlines', '--json', help='Print each object as JSON in a new line.', is_flag=True)
@click.option('--indented', help='Print each object as an indented JSON object', is_flag=True)
@click.option('--outfile', help='Output file. It defaults to STDOUT', default=sys.stdout)
def decorated(csv, header, jsonlines, indented, outfile, **kwargs):
if header:
it = function(**kwargs)
def do(out):
if csv:
delimiter = '\t'
writer = tsv.writer(out, quoting=tsv.QUOTE_ALL, delimiter=delimiter)
if header is None:
# Print fields as header unless told otherwise
print(csv.replace(',', delimiter), file=out)
fields = list(token.strip().split('.') for token in csv.split(','))
for obj in it:
writer.writerow(list(reduce(operator.getitem, field, obj) for field in fields))
elif jsonlines:
for obj in it:
print(json.dumps(obj, sort_keys=True), file=out)
elif indented:
for obj in it:
print(json.dumps(obj, indent=4, sort_keys=True), file=out)
for obj in it:
print(obj, file=out)
if outfile is sys.stdout:
return do(sys.stdout)
with open(outfile, 'w') as out:
return do(out)
return decorated
@click.option("--verbose", is_flag=True)
@click.option("--logging_level", required=False, default='WARN')
@click.option('--config', show_default=True, default=bconf.CONFIG_FILE)
@click.option('--credentials', show_default=True, help="DEPRECATED: If specified, these credentials will be copied to the configuratation file.", default=bconf.CREDENTIALS)
def main(ctx, verbose, logging_level, config, credentials):
logging.basicConfig(level=getattr(logging, logging_level))
ctx.obj = {}
ctx.obj['VERBOSE'] = verbose
bconf.CONFIG_FILE = config
bconf.CREDENTIALS = credentials
if os.path.exists(utils.get_config_path(credentials)):
utils.copy_credentials_to_config(credentials, config)
def credentials(ctx):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
except Exception as ex:
print('{}: AUTHENTICATION ERROR: {}'.format(, ex) )
@click.option('--all', type=bool, default=False, required=False,
help=('Print all limits. By default, it only limits that '
'have been consumed will be shown.'))
@click.argument('url', required=False)
def get_limits(ctx, all, url):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
total = {}
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
if url:
limit = 'NOT FOUND'
print('URL is: {}'.format(url))
cat = url.split('/')[1]
if cat in resp['resources']:
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
print('Cat {} not found'.format(cat))
for k in limit:
total[k] = total.get(k, 0) + limit[k]
print('{}: {}'.format(url, limit))
nres = {}
if not all:
for res, urls in resp['resources'].items():
nurls = {}
for u, limits in urls.items():
if limits['limit'] != limits['remaining']:
nurls[u] = limits
if nurls:
nres[res] = nurls
resp = nres
print(json.dumps(resp, indent=2))
if url:
print('Total for {}: {}'.format(url, total))
@click.option('--consumer_key', default=None)
@click.option('--consumer_secret', default=None)
@click.option('--token_key', default=None)
@click.option('--token_secret', default=None)
def add(user_name, consumer_key, consumer_secret, token_key, token_secret):
if not consumer_key:
consumer_key = click.prompt('Please, enter your YOUR CONSUMER KEY')
if not consumer_secret:
consumer_secret = click.prompt('Please, enter your CONSUMER SECRET')
if not token_key:
token_key = click.prompt('Please, enter your ACCESS TOKEN')
if not token_secret:
token_secret = click.prompt('Please, enter your ACCESS TOKEN SECRET')
utils.add_credentials(conffile=bconf.CONFIG_FILE, user=user_name, consumer_key=consumer_key, consumer_secret=consumer_secret,
token_key=token_key, token_secret=token_secret)
click.echo('Credentials added for {}'.format(user_name))
def tweet(ctx):
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
9 years ago
5 years ago
def get_tweet(tweetid, write, folder, update):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
5 years ago
yield from utils.download_tweet(wq, tweetid, write, folder, update)
@tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file.
The result is stored as individual json files in your folder of choice.''')
@click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-d', '--delimiter', default=",")
5 years ago
@click.option('--skip', help='Discard the first DISCARD lines (use them as a header)', default=0)
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-q', '--quotechar', default='"')
@click.option('-c', '--column', type=int, default=0)
5 years ago
5 years ago
def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, skip, quotechar, commentchar, column):
if update and not click.confirm('This may overwrite existing tweets. Continue?'):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
status = tqdm('Queried')
failed = 0
for tid, obj in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter,
5 years ago
skip=skip, quotechar=quotechar, commentchar=commentchar,
column=column, update=update, retry_failed=retry):
if not obj:
failed += 1
status.set_description('Failed: %s. Queried' % failed, refresh=True)
5 years ago
yield obj
def search(ctx, query):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
yield from utils.search_tweet(wq, query)
9 years ago
def timeline(ctx, user):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
t = utils.user_timeline(wq, user)
print(json.dumps(t, indent=2))
9 years ago
def users(ctx):
@click.option('--db', required=True, help='Database of users.')
def list_users(ctx, db):
dburl = 'sqlite:///{}'.format(db)
session = make_session(dburl)
for i in session.query(User):
for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j)))
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
def get_user(user, write, folder, update):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
if not write:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
if not os.path.exists(folder):
file = os.path.join(folder, '%s.json' % user)
if not update and os.path.exists(file) and os.path.isfile(file):
print('User exists: %s' % user)
with open(file, 'w') as f:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js, file=f)
@users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file.
The result is stored as individual json files in your folder of choice.''')
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-d', '--delimiter', default=",")
5 years ago
@click.option('--skip', help='Discard the first SKIP lines (e.g., use them as a header)',
is_flag=True, default=False)
@click.option('-q', '--quotechar', default='"')
5 years ago
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-c', '--column', type=int, default=0)
5 years ago
5 years ago
def get_users(ctx, usersfile, folder, update, retry, delimiter, skip, quotechar, commentchar, column):
if update and not click.confirm('This may overwrite existing users. Continue?'):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for i in utils.download_file(wq, usersfile, folder, delimiter=delimiter,
update=update, retry_failed=retry,
5 years ago
skip=skip, quotechar=quotechar,
5 years ago
yield i
@click.option('--db', required=True, help='Database to save all users.')
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
9 years ago
def crawl_users(ctx, usersfile, skip, until, threads, db):
9 years ago
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
if '://' not in db:
dburl = 'sqlite:///{}'.format(db)
db_lock = threading.Lock()
dburl = db
def db_lock():
return ExitStack()
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
9 years ago'Starting Network crawler with {} threads and {} credentials.'.format(threads,
ids_queue = queue.Queue(1000)
skipped = skip
enqueued = 0
collected = 0
statslock = threading.Lock()
lastid = -1
def fill_queue():
global enqueued, skipped
with open(usersfile, 'r') as f:
sqlite = sqlite3.connect(db)
engine = sqlalchemy.create_engine(dburl)
def user_filter(x):
global skipped, dburl
# keep = data['users'].find_one(id=x) is None
#keep = not session.query(exists().where( == x)).scalar()
# keep = session.engine.execute
keep = not list(engine.execute('SELECT 1 from users where id=\'%s\'' % x))
if not keep:
skipped += 1
return keep
for i in range(skip):
ilist = map(lambda x: x.strip(), f)'Skipping until {}'.format(until))
if not skip and until:
for uid in ilist:
if uid == until:
skipped += 1
ilist = filter(user_filter, ilist)
for uid in ilist:
enqueued += 1
for i in range(threads):
def consume_queue():
global dburl, collected, ids_queue, lastid
local_collected = 0
session = make_session(dburl)
q_iter = iter(ids_queue.get, None)
for user in utils.get_users(wq, q_iter):
dbuser = User(**user)
local_collected += 1
with statslock:
collected += 1
lastid = user['id']
if local_collected % 100 == 0:
with db_lock:
logger.debug('Done consuming')
filler = threading.Thread(target=fill_queue)
consumers = [threading.Thread(target=consume_queue) for i in range(threads)]
logging.debug('Starting consumers')
for c in consumers:
logging.debug('Joining filler')
counter = 0
speed = 0
lastcollected = collected
while True:
' Collected: {}\n'
' Speed: ~ {} profiles/s\n'
' Skipped: {}\n'
' Enqueued: {}\n'
' Queue size: {}\n'
' Last ID: {}'.format(collected, speed, skipped, enqueued, ids_queue.qsize(), lastid))
if not filler.isAlive():
if all(not i.isAlive() for i in consumers):
counter += 1
if counter % 10 == 0:
speed = (collected-lastcollected)/10
with statslock:
lastcollected = collected
9 years ago'Done!')'extractor')
@click.option('--db', required=True, help='Database of users.')
def extractor(ctx, db):
if '://' not in db:
db = 'sqlite:///{}'.format(db)
ctx.obj['DBURI'] = db
ctx.obj['SESSION'] = make_session(db)
@click.option('--with_followers', is_flag=True, default=False)
@click.option('--with_not_pending', is_flag=True, default=False)
def status_extractor(ctx, with_followers, with_not_pending):
session = ctx.obj['SESSION']
entries = session.query(ExtractorEntry)
if not with_not_pending:
entries = entries.filter(ExtractorEntry.pending==True)
for i in entries:
for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j)))
followers = session.query(Following)
print('Followers count: {}'.format(followers.count()))
for i in followers:
for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j)))
@click.option('--as_json', is_flag=True, default=False)
def network_extractor(ctx, as_json):
session = ctx.obj['SESSION']
followers = session.query(Following)
follower_map = []
for i in followers:
if not as_json:
print('{} -> {}'.format(i.follower, i.isfollowed))
follower_map.append({'source_id': i.follower,
'target_id': i.isfollowed,
'following': True})
if as_json:
import json
print(json.dumps(follower_map, indent=4))
def users_extractor(ctx):
session = ctx.obj['SESSION']
users = session.query(User)
import json
for i in users:
# print(json.dumps(i.as_dict(), indent=4))
dd = i.as_dict()
print(json.dumps(dd, indent=4))
@click.option('--recursive', is_flag=True, help='Get following/follower/info recursively.', default=False)
@click.option('-u', '--user', default=None)
@click.option('-n', '--name', show_default=True, default='extractor')
@click.option('-i', '--initfile', required=False, default=None, help='List of users to load')
def extract(ctx, recursive, user, name, initfile):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
9 years ago
dburi = ctx.obj['DBURI']
def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
9 years ago
db = ctx.obj['DBURI']
session = make_session(db)
@main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False),
help='''Issue a call to an endpoint of the Twitter API.''')
@click.argument('cmd', nargs=1)
@click.option('--tweets', is_flag=True, help='Fetch more tweets using smart pagination. Use --count to control how many tweets to fetch per call, and --max_count to set the number of desired tweets (or -1 to get as many as possible).', type=bool, default=False)
@click.option('--users', is_flag=True, help='Fetch more users using smart pagination. Use --count to control how many users to fetch per call, and --max_count to set the number of desired users (or -1 to get as many as possible).', type=bool, default=False)
@click.argument('api_args', nargs=-1, type=click.UNPROCESSED)
def api(ctx, cmd, tweets, users, api_args):
opts = {}
mappings = {
'id': '_id'
i = iter(api_args)
for k, v in zip(i, i):
k = k.replace('--', '')
if k in mappings:
k = mappings[k]
opts[k] = v
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
if tweets:
resp = utils.consume_tweets(wq[cmd], **opts)
elif users:
resp = utils.consume_users(wq[cmd], **opts)
resp = wq[cmd](**opts)
for i in resp:
@click.argument('CONSUMER_KEY', required=True)
@click.argument('CONSUMER_SECRET', required=True)
def run_server(ctx, consumer_key, consumer_secret):
bconf.CONSUMER_KEY = consumer_key
bconf.CONSUMER_SECRET = consumer_secret
from .webserver import app'')
def stream(ctx):
@click.option('-l', '--locations', default=None)
@click.option('-t', '--track', default=None)
7 years ago
@click.option('-f', '--file', default=None, help='File to store the stream of tweets')
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
def get_stream(ctx, locations, track, file, politelyretry):
wq = crawlers.StreamQueue.from_config(conffile=bconf.CONFIG_FILE, max_workers=1)
query_args = {}
if locations:
query_args['locations'] = locations
if track:
query_args['track'] = track
if not file:
file = sys.stdout
file = open(file, 'a')
def insist():
lasthangup = time.time()
while True:
if not query_args:
iterator = wq.statuses.sample()
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
7 years ago
for i in iterator:
yield i
if not politelyretry:
except Exception:
if not politelyretry:
raise ex
thishangup = time.time()
if thishangup - lasthangup < 60:
raise Exception('Too many hangups in a row.')
for tweet in tqdm(insist()):
print(json.dumps(tweet), file=file)
if file != sys.stdout:
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False)
def read_stream(ctx, file, tail):
for tweet in utils.read_file(file, tail=tail):
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['created_at'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
except (KeyError, TypeError):
print('Raw tweet: {}'.format(tweet))
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.argument('limit', required=False, default=None, type=int)
def tags_stream(ctx, file, limit):
c = utils.get_hashtags(utils.read_file(file))
for count, tag in c.most_common(limit):
print(u'{} - {}'.format(count, tag))
if __name__ == '__main__':