1
0
mirror of https://github.com/balkian/bitter.git synced 2024-12-22 00:18:12 +00:00

First commit

This commit is contained in:
J. Fernando Sánchez 2016-01-14 21:41:14 +01:00
commit d0de6c2ea9
14 changed files with 823 additions and 0 deletions

4
.dockerignore Normal file
View File

@ -0,0 +1,4 @@
.*
env
*.egg-info
dist

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
*.egg-info
dist
env
__*
.*
*.pyc

8
Dockerfile Normal file
View File

@ -0,0 +1,8 @@
# onbuild copies . to /usr/src/app/
From python:2.7.9-onbuild
Maintainer J. Fernando Sánchez @balkian
# RUN pip --cert cacert.pem install -r -v requirements.txt
RUN pip install --editable .;
ENTRYPOINT ["bitter"]

3
MANIFEST.in Normal file
View File

@ -0,0 +1,3 @@
include requirements.txt
include test-requirements.txt
include README.md

281
bitter/cli.py Normal file
View File

@ -0,0 +1,281 @@
import click
import json
import logging
import time
import sqlalchemy.types
import threading
import sqlite3
from six.moves import map, filter, queue
from sqlalchemy import exists
from bitter import utils, models, crawlers
from bitter.models import make_session, User, ExtractorEntry, Following
from contextlib import ExitStack
logger = logging.getLogger(__name__)
@click.group()
@click.option("--verbose", is_flag=True)
@click.option("--logging_level", required=False, default='WARN')
@click.option("--config", required=False)
@click.option('-c', '--credentials',show_default=True, default='credentials.json')
@click.pass_context
def main(ctx, verbose, logging_level, config, credentials):
logging.basicConfig(level=getattr(logging, logging_level))
ctx.obj = {}
ctx.obj['VERBOSE'] = verbose
ctx.obj['CONFIG'] = config
ctx.obj['CREDENTIALS'] = credentials
@main.group()
@click.pass_context
def tweet(ctx):
pass
@tweet.command('get')
@click.argument('tweetid')
@click.pass_context
def get_tweet(ctx, tweetid):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
c = wq.next()
t = crawlers.get_tweet(c.client, tweetid)
print(json.dumps(t, indent=2))
@tweet.command('search')
@click.argument('query')
@click.pass_context
def get_tweet(ctx, query):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
c = wq.next()
t = utils.search_tweet(c.client, query)
print(json.dumps(t, indent=2))
@main.group()
@click.pass_context
def users(ctx):
pass
@users.command('list')
@click.option('--db', required=True, help='Database of users.')
@click.pass_context
def list_users(ctx, db):
dburl = 'sqlite:///{}'.format(db)
session = make_session(dburl)
for i in session.query(User):
print(i.screen_name)
for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j)))
@users.command('get_one')
@click.argument('user')
@click.pass_context
def get_user(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
c = wq.next()
u = utils.get_user(c.client, user)
print(json.dumps(u, indent=2))
@users.command('get')
@click.option('--db', required=True, help='Database to save all users.')
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
@click.argument('usersfile', 'File with a list of users to look up')
@click.pass_context
def get_users(ctx, usersfile, skip, until, threads, db):
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
if '://' not in db:
dburl = 'sqlite:///{}'.format(db)
db_lock = threading.Lock()
else:
dburl = db
def db_lock():
return ExitStack()
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
len(wq.queue)))
ids_queue = queue.Queue(1000)
skipped = skip
enqueued = 0
collected = 0
statslock = threading.Lock()
lastid = -1
def fill_queue():
global enqueued, skipped
with open(usersfile, 'r') as f:
sqlite = sqlite3.connect(db)
engine = sqlalchemy.create_engine(dburl)
def user_filter(x):
global skipped, dburl
# keep = data['users'].find_one(id=x) is None
#keep = not session.query(exists().where(User.id == x)).scalar()
# keep = session.engine.execute
keep = not list(engine.execute('SELECT 1 from users where id=\'%s\'' % x))
if not keep:
skipped += 1
return keep
for i in range(skip):
next(f)
ilist = map(lambda x: x.strip(), f)
logger.info('Skipping until {}'.format(until))
if not skip and until:
for uid in ilist:
if uid == until:
break
else:
skipped += 1
ilist = filter(user_filter, ilist)
for uid in ilist:
ids_queue.put(uid)
enqueued += 1
for i in range(threads):
ids_queue.put(None)
def consume_queue():
global dburl, collected, ids_queue, lastid
local_collected = 0
logging.debug('Consuming!')
session = make_session(dburl)
q_iter = iter(ids_queue.get, None)
for user in utils.get_users(wq, q_iter):
user['entities'] = json.dumps(user['entities'])
dbuser = User(**user)
session.add(dbuser)
local_collected += 1
with statslock:
collected += 1
lastid = user['id']
if local_collected % 100 == 0:
with db_lock:
session.commit()
session.commit()
logger.debug('Done consuming')
filler = threading.Thread(target=fill_queue)
filler.start()
consumers = [threading.Thread(target=consume_queue) for i in range(threads)]
logging.debug('Starting consumers')
for c in consumers:
c.start()
logging.debug('Joining filler')
counter = 0
speed = 0
lastcollected = collected
while True:
filler.join(1)
logger.info('########\n'
' Collected: {}\n'
' Speed: ~ {} profiles/s\n'
' Skipped: {}\n'
' Enqueued: {}\n'
' Queue size: {}\n'
' Last ID: {}'.format(collected, speed, skipped, enqueued, ids_queue.qsize(), lastid))
if not filler.isAlive():
if all(not i.isAlive() for i in consumers):
break
else:
time.sleep(1)
counter += 1
if counter % 10 == 0:
speed = (collected-lastcollected)/10
with statslock:
lastcollected = collected
logger.info('Done!')
@main.group('api')
def api():
pass
@main.group('extractor')
@click.pass_context
@click.option('--db', required=True, help='Database of users.')
def extractor(ctx, db):
if '://' not in db:
db = 'sqlite:///{}'.format(db)
ctx.obj['DBURI'] = db
ctx.obj['SESSION'] = make_session(db)
@extractor.command('status')
@click.option('--with_followers', is_flag=True, default=False)
@click.option('--with_not_pending', is_flag=True, default=False)
@click.pass_context
def status_extractor(ctx, with_followers, with_not_pending):
session = ctx.obj['SESSION']
entries = session.query(ExtractorEntry)
if not with_not_pending:
entries = entries.filter(ExtractorEntry.pending==True)
for i in entries:
print(i.id)
for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j)))
followers = session.query(Following)
print('Followers count: {}'.format(followers.count()))
if(with_followers):
for i in followers:
print(i.id)
for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j)))
@extractor.command()
@click.option('--recursive', is_flag=True, help='Get following/follower/info recursively.', default=False)
@click.option('-u', '--user', default=None)
@click.option('-n', '--name', show_default=True, default='extractor')
@click.option('-i', '--initfile', required=False, default=None, help='List of users to load')
@click.pass_context
def extract(ctx, recursive, user, name, initfile):
print(locals())
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
dburi = ctx.obj['DBURI']
utils.extract(wq,
recursive=recursive,
user=user,
dburi=dburi,
initfile=initfile,
extractor_name=name)
@extractor.command('reset')
@click.pass_context
def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
db = ctx.obj['DBURI']
session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@api.command('limits')
@click.argument('url', required=False)
@click.pass_context
def get_limits(ctx, url):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
print('#'*20)
print(worker.name)
if url:
limit = 'NOT FOUND'
print('URL is: {}'.format(url))
cat = url.split('/')[1]
if cat in resp['resources']:
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
else:
print('Cat {} not found'.format(cat))
print('{}: {}'.format(url, limit))
else:
print(json.dumps(resp, indent=2))
if __name__ == '__main__':
main()

143
bitter/crawlers.py Normal file
View File

@ -0,0 +1,143 @@
import time
import urllib
import random
import json
import logging
logger = logging.getLogger(__name__)
from twitter import *
from collections import OrderedDict
class AttrToFunc(object):
def __init__(self, uriparts=None, handler=None):
if uriparts:
self.__uriparts = uriparts
else:
self.__uriparts = []
#self.__uriparts = []
self.handler = handler
def __getattr__(self, k):
def extend_call(arg):
return AttrToFunc(
uriparts=self.__uriparts + [arg,],
handler=self.handler)
if k == "_":
return extend_call
else:
return extend_call(k)
def __call__(self, *args, **kwargs):
# for i, a in enumerate(args)e
# kwargs[i] = a
return self.handler(self.__uriparts, *args, **kwargs)
class TwitterWorker(object):
def __init__(self, name, client):
self.name = name
self.client = client
self.throttled_time = False
self.busy = False
@property
def throttled(self):
if not self.throttled_time:
return False
t = time.time()
delta = self.throttled_time - t
if delta > 0:
return True
return False
def throttle_until(self, epoch=None):
self.throttled_time = int(epoch)
logger.info("Worker %s throttled for %s seconds" % (self.name, str(epoch-time.time())))
class TwitterQueue(AttrToFunc):
def __init__(self, wait=True):
logger.debug('Creating worker queue')
self.queue = set()
self.index = 0
self.wait = wait
AttrToFunc.__init__(self, handler=self.handle_call)
def ready(self, worker):
self.queue.add(worker)
def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs))
while True:
c = None
try:
c = self.next()
c.busy = True
logger.debug('Next: {}'.format(c.name))
ping = time.time()
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
pong = time.time()
logger.debug('Took: {}'.format(pong-ping))
return resp
except TwitterHTTPError as ex:
if ex.e.code in (429, 502, 503, 504):
limit = ex.e.headers.get('X-Rate-Limit-Reset', time.time() + 30)
logger.info('{} limited'.format(c.name))
c.throttle_until(limit)
continue
else:
raise
except urllib.error.URLError as ex:
time.sleep(5)
logger.info('Something fishy happened: {}'.format(ex))
finally:
if c:
c.busy = False
@property
def client(self):
return self.next().client
@classmethod
def from_credentials(self, cred_file):
wq = TwitterQueue()
with open(cred_file) as f:
for line in f:
cred = json.loads(line)
c = Twitter(auth=OAuth(cred['token_key'],
cred['token_secret'],
cred['consumer_key'],
cred['consumer_secret']))
wq.ready(TwitterWorker(cred["user"], c))
return wq
def _next(self):
logger.debug('Getting next available')
s = list(self.queue)
random.shuffle(s)
for worker in s:
if not worker.throttled and not worker.busy:
return worker
raise Exception('No worker is available')
def next(self):
if not self.wait:
return self._next()
while True:
try:
return self._next()
except Exception:
available = filter(lambda x: not x.busy, self.queue)
if available:
first_worker = min(available, key=lambda x: x.throttled_time)
diff = first_worker.throttled_time - time.time()
logger.info("All workers are throttled. Waiting %s seconds" % diff)
else:
diff = 5
logger.info("All workers are busy. Waiting %s seconds" % diff)
time.sleep(diff)

105
bitter/models.py Normal file
View File

@ -0,0 +1,105 @@
import time
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.types import BigInteger, Integer, Text, Boolean
from sqlalchemy import Column, Index
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(BigInteger, primary_key=True, index=True, unique=True)
contributors_enabled = Column(Boolean)
created_at_stamp = Column(Text)
default_profile = Column(Boolean)
default_profile_image = Column(Boolean)
description = Column(Text)
entities = Column(Text)
favourites_count = Column(Integer)
followers_count = Column(Integer)
following = Column(Boolean)
friends_count = Column(Integer)
geo_enabled = Column(Boolean)
has_extended_profile = Column(Boolean)
id_str = Column(Text)
is_translation_enabled = Column(Boolean)
is_translator = Column(Boolean)
lang = Column(Text)
listed_count = Column(Integer)
location = Column(Text)
name = Column(Text)
notifications = Column(Boolean)
profile_background_color = Column(Text)
profile_background_image_url = Column(Text)
profile_background_image_url_https = Column(Text)
profile_background_tile = Column(Boolean)
profile_banner_url = Column(Text)
profile_image_url = Column(Text)
profile_image_url_https = Column(Text)
profile_link_color = Column(Text)
profile_sidebar_border_color = Column(Text)
profile_sidebar_fill_color = Column(Text)
profile_text_color = Column(Text)
profile_use_background_image = Column(Boolean)
protected = Column(Boolean)
screen_name = Column(Text)
statuses_count = Column(Integer)
time_zone = Column(Text)
url = Column(Text)
utc_offset = Column(Integer)
verified = Column(Boolean)
class Following(Base):
__tablename__ = 'followers'
id = Column(Integer, primary_key=True, autoincrement=True)
isfollowed = Column(Integer)
follower = Column(Integer)
created_at_stamp = Column(Text)
follower_index = Index('isfollowed', 'follower')
class ExtractorEntry(Base):
__tablename__ = 'extractor-cursor'
id = Column(Integer, primary_key=True, default=lambda x: int(time.time()*1000))
user = Column(BigInteger, index=True)
cursor = Column(BigInteger, default=-1)
pending = Column(Boolean, default=False)
def make_session(url):
engine = create_engine(url)#, echo=True)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
return session
def test(db='sqlite:///users.db'):
from sqlalchemy import exists
session = make_session(db)
our_user = session.query(User).first()
print(our_user.name)
print(session.query(User).count())
fake_user = User(name="Fake user")
session.add(fake_user)
session.commit()
print(session.query(User).count())
print(session.query(exists().where(User.name == "Fake user")).scalar())
fake_committed = session.query(User).filter_by(name="Fake user").first()
print(fake_committed.id)
print(fake_committed.name)
session.delete(fake_committed)
session.commit()
print(session.query(User).count())
print(list(session.execute('SELECT 1 from users where id=\'%s\'' % 1548)))

209
bitter/utils.py Normal file
View File

@ -0,0 +1,209 @@
import logging
import time
import json
import signal
import sys
import sqlalchemy
from itertools import islice
from twitter import TwitterHTTPError
from bitter.models import Following, User, ExtractorEntry, make_session
logger = logging.getLogger(__name__)
def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!')
sys.exit(0)
def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
t = 'name' if by_name else 'uid'
logger.debug('Getting users by {}: {}'.format(t, ulist))
ilist = iter(ulist)
while True:
userslice = ",".join(islice(ilist, max_users))
if not userslice:
break
try:
if by_name:
resp = wq.users.lookup(screen_name=userslice)
else:
resp = wq.users.lookup(user_id=userslice)
except TwitterHTTPError as ex:
if ex.e.code in (404,):
resp = []
else:
raise
if not resp:
logger.debug('Empty response')
for user in resp:
user = trim_user(user)
if queue:
queue.put(user)
else:
yield user
def trim_user(user):
if 'status' in user:
del user['status']
if 'follow_request_sent' in user:
del user['follow_request_sent']
if 'created_at' in user:
ts = time.strftime('%s', time.strptime(user['created_at'],'%a %b %d %H:%M:%S +0000 %Y'))
user['created_at_stamp'] = ts
del user['created_at']
user['entities'] = json.dumps(user['entities'])
return user
def add_user(session, user, enqueue=False):
user = trim_user(user)
olduser = session.query(User).filter(User.id==user['id'])
if olduser:
olduser.delete()
user = User(**user)
session.add(user)
if extract:
logging.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry:
entry = ExtractorEntry(user=user.id)
session.add(entry)
logging.debug(entry.pending)
entry.pending = True
entry.cursor = -1
session.commit()
# TODO: adapt to the crawler
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
signal.signal(signal.SIGINT, signal_handler)
w = wq.next()
if not dburi:
dburi = 'sqlite:///%s.db' % extractor_name
session = make_session(dburi)
if initfile:
screen_names = []
user_ids = []
if not user:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
try:
int(user)
user_ids.append(user)
except ValueError:
screen_names.append(user.split('@')[-1])
else:
try:
user_ids.append(int(user))
logger.info("Added id")
except Exception as ex:
logger.info("Exception: {}".format(ex))
logger.info("Added screen_name")
screen_names.append(user)
nusers = list(get_users(wq, screen_names, by_name=True))
if user_ids:
nusers += list(get_users(wq, user_ids, by_name=False))
for i in nusers:
add_user(session, i, enqueue=True)
else:
logger.info('Using pending users from last session')
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logging.info('Total users: {}'.format(total_users))
def pending_entries():
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
logging.info('Pending: {}'.format(pending))
return pending
while pending_entries() > 0:
logger.info("Using account: %s" % w.name)
candidate, entry = session.query(User, ExtractorEntry).\
filter(ExtractorEntry.user == User.id).\
filter(ExtractorEntry.pending == True).\
order_by(User.followers_count).first()
if not candidate:
break
pending = True
cursor = entry.cursor
uid = candidate.id
uobject = session.query(User).filter(User.id==uid).first()
name = uobject.screen_name if uobject else None
logger.info("#"*20)
logger.info("Getting %s - %s" % (uid, name))
logger.info("Cursor %s" % cursor)
logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users))
try:
resp = wq.followers.ids(user_id=uid, cursor=cursor)
except TwitterHTTPError as ex:
if ex.e.code in (401, ):
logger.info('Not authorized for user: {}'.format(uid))
resp = {}
if 'ids' in resp:
logger.info("New followers: %s" % len(resp['ids']))
if recursive:
newusers = get_users(wq, resp)
for user in newusers:
add_user(session, newuser, enqueue=True)
for i in resp['ids']:
existing_user = session.query(Following).\
filter(Following.isfollowed==uid).\
filter(Following.follower==i).first()
now = int(time.time())
if existing_user:
existing_user.created_at_stamp = now
else:
f = Following(isfollowed=uid,
follower=i,
created_at_stamp=now)
session.add(f)
total_followers = candidate.followers_count
fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count()
logger.info("Fetched: %s/%s followers" % (fetched_followers,
total_followers))
cursor = resp["next_cursor"]
if cursor > 0:
pending = True
logger.info("Getting more followers for %s" % uid)
else:
logger.info("Done getting followers for %s" % uid)
cursor = -1
pending = False
else:
logger.info("Error with id %s %s" % (uid, resp))
pending = False
entry.pending = pending
entry.cursor = cursor
logging.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate)
session.commit()
sys.stdout.flush()
def get_tweet(c, tid):
return c.statuses.show(id=tid)
def search_tweet(c, query):
return c.search.tweets(q=query)
def get_user(c, user):
try:
int(user)
return c.users.lookup(user_id=user)[0]
except ValueError:
return c.users.lookup(screen_name=user)[0]

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
sqlalchemy
sqlite3
twitter
click

38
setup.py Normal file
View File

@ -0,0 +1,38 @@
import pip
from setuptools import setup
from pip.req import parse_requirements
# parse_requirements() returns generator of pip.req.InstallRequirement objects
# pip 6 introduces the *required* session argument
try:
install_reqs = parse_requirements("requirements.txt", session=pip.download.PipSession())
test_reqs = parse_requirements("test-requirements.txt", session=pip.download.PipSession())
except AttributeError:
install_reqs = parse_requirements("requirements.txt")
test_reqs = parse_requirements("test-requirements.txt")
# reqs is a list of requirement
# e.g. ['django==1.5.1', 'mezzanine==1.4.6']
install_reqs = [str(ir.req) for ir in install_reqs]
test_reqs = [str(ir.req) for ir in test_reqs]
setup(
name="bitter",
packages=['bitter'],
description='''
Simplifying how researchers access Data.
It includes a CLI and a library.
''',
author='J. Fernando Sanchez',
author_email='balkian@gmail.com',
url="http://balkian.com",
version="0.2",
install_requires=install_reqs,
tests_require=test_reqs,
include_package_data=True,
entry_points="""
[console_scripts]
bitter=bitter.cli:main
"""
)

0
test-requirements.txt Normal file
View File

9
tests/test_crawlers.py Normal file
View File

@ -0,0 +1,9 @@
from unittest import TestCase
from bitter.crawlers import TwitterWorker, TwitterQueue
class TestWorker(TestCase):
def test_worker(self):
w = TwitterWorker()

6
tests/test_models.py Normal file
View File

@ -0,0 +1,6 @@
from unittests import TestCase
class TestModels(TestCase):
def test_worker(self):
assert True

7
tests/test_utils.py Normal file
View File

@ -0,0 +1,7 @@
from unittest import TestCase
class TestUtils(TestCase):
def test_get_user(self):
assert True