1
0
mirror of https://github.com/balkian/bitter.git synced 2024-10-31 17:11:43 +00:00
bitter/bitter/cli.py

336 lines
11 KiB
Python
Raw Normal View History

2016-01-14 20:41:14 +00:00
import click
import json
2016-03-16 14:23:47 +00:00
import os
2016-01-14 20:41:14 +00:00
import logging
import time
import sqlalchemy.types
import threading
import sqlite3
from sqlalchemy import exists
from bitter import utils, models, crawlers
from bitter.models import make_session, User, ExtractorEntry, Following
2016-09-15 11:56:17 +00:00
import sys
if sys.version_info <= (3, 0):
from contextlib2 import ExitStack
else:
from contextlib import ExitStack
2016-01-14 20:41:14 +00:00
logger = logging.getLogger(__name__)
@click.group()
@click.option("--verbose", is_flag=True)
@click.option("--logging_level", required=False, default='WARN')
@click.option("--config", required=False)
2016-09-14 17:53:56 +00:00
@click.option('-c', '--credentials', show_default=True, default='~/.bitter-credentials.json')
2016-01-14 20:41:14 +00:00
@click.pass_context
def main(ctx, verbose, logging_level, config, credentials):
logging.basicConfig(level=getattr(logging, logging_level))
ctx.obj = {}
ctx.obj['VERBOSE'] = verbose
ctx.obj['CONFIG'] = config
2016-09-14 17:53:56 +00:00
ctx.obj['CREDENTIALS'] = credentials
utils.create_credentials(credentials)
2016-01-14 20:41:14 +00:00
@main.group()
@click.pass_context
def tweet(ctx):
pass
@tweet.command('get')
@click.argument('tweetid')
@click.pass_context
def get_tweet(ctx, tweetid):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
2016-09-14 17:53:56 +00:00
t = utils.get_tweet(wq, tweetid)
2016-01-14 20:41:14 +00:00
print(json.dumps(t, indent=2))
@tweet.command('search')
@click.argument('query')
@click.pass_context
def get_tweet(ctx, query):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
c = wq.next()
t = utils.search_tweet(c.client, query)
print(json.dumps(t, indent=2))
2016-03-16 14:23:47 +00:00
@tweet.command('timeline')
@click.argument('user')
@click.pass_context
def get_tweet(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
c = wq.next()
t = utils.user_timeline(c.client, user)
print(json.dumps(t, indent=2))
2016-01-14 20:41:14 +00:00
@main.group()
@click.pass_context
def users(ctx):
pass
@users.command('list')
@click.option('--db', required=True, help='Database of users.')
@click.pass_context
def list_users(ctx, db):
dburl = 'sqlite:///{}'.format(db)
session = make_session(dburl)
for i in session.query(User):
print(i.screen_name)
for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j)))
@users.command('get_one')
@click.argument('user')
@click.pass_context
def get_user(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
c = wq.next()
u = utils.get_user(c.client, user)
print(json.dumps(u, indent=2))
@users.command('get')
@click.option('--db', required=True, help='Database to save all users.')
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
@click.argument('usersfile', 'File with a list of users to look up')
@click.pass_context
def get_users(ctx, usersfile, skip, until, threads, db):
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
if '://' not in db:
dburl = 'sqlite:///{}'.format(db)
db_lock = threading.Lock()
else:
dburl = db
def db_lock():
return ExitStack()
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
len(wq.queue)))
ids_queue = queue.Queue(1000)
skipped = skip
enqueued = 0
collected = 0
statslock = threading.Lock()
lastid = -1
def fill_queue():
global enqueued, skipped
with open(usersfile, 'r') as f:
sqlite = sqlite3.connect(db)
engine = sqlalchemy.create_engine(dburl)
def user_filter(x):
global skipped, dburl
# keep = data['users'].find_one(id=x) is None
#keep = not session.query(exists().where(User.id == x)).scalar()
# keep = session.engine.execute
keep = not list(engine.execute('SELECT 1 from users where id=\'%s\'' % x))
if not keep:
skipped += 1
return keep
for i in range(skip):
next(f)
ilist = map(lambda x: x.strip(), f)
logger.info('Skipping until {}'.format(until))
if not skip and until:
for uid in ilist:
if uid == until:
break
else:
skipped += 1
ilist = filter(user_filter, ilist)
for uid in ilist:
ids_queue.put(uid)
enqueued += 1
for i in range(threads):
ids_queue.put(None)
def consume_queue():
global dburl, collected, ids_queue, lastid
local_collected = 0
logging.debug('Consuming!')
session = make_session(dburl)
q_iter = iter(ids_queue.get, None)
for user in utils.get_users(wq, q_iter):
dbuser = User(**user)
session.add(dbuser)
local_collected += 1
with statslock:
collected += 1
lastid = user['id']
if local_collected % 100 == 0:
with db_lock:
session.commit()
session.commit()
logger.debug('Done consuming')
filler = threading.Thread(target=fill_queue)
filler.start()
consumers = [threading.Thread(target=consume_queue) for i in range(threads)]
logging.debug('Starting consumers')
for c in consumers:
c.start()
logging.debug('Joining filler')
counter = 0
speed = 0
lastcollected = collected
while True:
filler.join(1)
logger.info('########\n'
' Collected: {}\n'
' Speed: ~ {} profiles/s\n'
' Skipped: {}\n'
' Enqueued: {}\n'
' Queue size: {}\n'
' Last ID: {}'.format(collected, speed, skipped, enqueued, ids_queue.qsize(), lastid))
if not filler.isAlive():
if all(not i.isAlive() for i in consumers):
break
else:
time.sleep(1)
counter += 1
if counter % 10 == 0:
speed = (collected-lastcollected)/10
with statslock:
lastcollected = collected
logger.info('Done!')
@main.group('api')
def api():
pass
@main.group('extractor')
@click.pass_context
@click.option('--db', required=True, help='Database of users.')
def extractor(ctx, db):
if '://' not in db:
db = 'sqlite:///{}'.format(db)
ctx.obj['DBURI'] = db
ctx.obj['SESSION'] = make_session(db)
@extractor.command('status')
@click.option('--with_followers', is_flag=True, default=False)
@click.option('--with_not_pending', is_flag=True, default=False)
@click.pass_context
def status_extractor(ctx, with_followers, with_not_pending):
session = ctx.obj['SESSION']
entries = session.query(ExtractorEntry)
if not with_not_pending:
entries = entries.filter(ExtractorEntry.pending==True)
for i in entries:
print(i.id)
for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j)))
followers = session.query(Following)
print('Followers count: {}'.format(followers.count()))
if(with_followers):
for i in followers:
print(i.id)
for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j)))
2016-03-16 14:23:47 +00:00
@extractor.command('network')
@click.option('--as_json', is_flag=True, default=False)
@click.pass_context
def network_extractor(ctx, as_json):
session = ctx.obj['SESSION']
followers = session.query(Following)
follower_map = []
for i in followers:
if not as_json:
print('{} -> {}'.format(i.follower, i.isfollowed))
else:
follower_map.append({'source_id': i.follower,
'target_id': i.isfollowed,
'following': True})
if as_json:
import json
print(json.dumps(follower_map, indent=4))
@extractor.command('users')
@click.pass_context
def users_extractor(ctx):
session = ctx.obj['SESSION']
users = session.query(User)
import json
for i in users:
# print(json.dumps(i.as_dict(), indent=4))
dd = i.as_dict()
print(json.dumps(dd, indent=4))
2016-01-14 20:41:14 +00:00
@extractor.command()
@click.option('--recursive', is_flag=True, help='Get following/follower/info recursively.', default=False)
@click.option('-u', '--user', default=None)
@click.option('-n', '--name', show_default=True, default='extractor')
@click.option('-i', '--initfile', required=False, default=None, help='List of users to load')
@click.pass_context
def extract(ctx, recursive, user, name, initfile):
print(locals())
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
dburi = ctx.obj['DBURI']
utils.extract(wq,
recursive=recursive,
user=user,
dburi=dburi,
initfile=initfile,
extractor_name=name)
@extractor.command('reset')
@click.pass_context
def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
db = ctx.obj['DBURI']
session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@api.command('limits')
@click.argument('url', required=False)
@click.pass_context
def get_limits(ctx, url):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
print('#'*20)
print(worker.name)
if url:
limit = 'NOT FOUND'
print('URL is: {}'.format(url))
cat = url.split('/')[1]
if cat in resp['resources']:
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
else:
print('Cat {} not found'.format(cat))
print('{}: {}'.format(url, limit))
else:
print(json.dumps(resp, indent=2))
2016-09-14 17:53:56 +00:00
@main.command('server')
@click.argument('CONSUMER_KEY', required=True)
@click.argument('CONSUMER_SECRET', required=True)
@click.pass_context
def run_server(ctx, consumer_key, consumer_secret):
from . import config
config.CONSUMER_KEY = consumer_key
config.CONSUMER_SECRET = consumer_secret
from .webserver import app
app.run()
2016-01-14 20:41:14 +00:00
if __name__ == '__main__':
main()