1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-25 12:48:23 +00:00

4 Commits
0.5.0 ... 0.5.6

Author SHA1 Message Date
J. Fernando Sánchez
3f42879751 Bumped to v0.5.6 2016-09-28 06:31:33 +02:00
J. Fernando Sánchez
35f0c6376d Fixed limits bug, added tests 2016-09-28 06:30:49 +02:00
J. Fernando Sánchez
2036d51d96 Added limits to every call 2016-09-28 05:06:12 +02:00
J. Fernando Sánchez
09feb050a6 Changed versioning, added locks 2016-09-28 01:10:10 +02:00
10 changed files with 174 additions and 54 deletions

View File

@@ -1,6 +1,7 @@
include requirements.txt include requirements.txt
include test-requirements.txt include test-requirements.txt
include README.md include README.md
include bitter/VERSION
graft bitter/templates graft bitter/templates
graft bitter/static graft bitter/static
graft test graft test

1
bitter/VERSION Normal file
View File

@@ -0,0 +1 @@
0.5.6

View File

@@ -3,8 +3,15 @@ Bitter module. A library and cli for Twitter using python-twitter.
http://github.com/balkian/bitter http://github.com/balkian/bitter
""" """
from future.standard_library import install_aliases try:
install_aliases() from future.standard_library import install_aliases
install_aliases()
except ImportError:
# Avoid problems at setup.py and py3.x
pass
import os
from .version import __version__
__version__ = '0.5.0'
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -10,6 +10,7 @@ import sqlite3
from sqlalchemy import exists from sqlalchemy import exists
from bitter import utils, models, crawlers from bitter import utils, models, crawlers
from bitter import config as bconf
from bitter.models import make_session, User, ExtractorEntry, Following from bitter.models import make_session, User, ExtractorEntry, Following
import sys import sys
@@ -33,7 +34,7 @@ def main(ctx, verbose, logging_level, config, credentials):
ctx.obj = {} ctx.obj = {}
ctx.obj['VERBOSE'] = verbose ctx.obj['VERBOSE'] = verbose
ctx.obj['CONFIG'] = config ctx.obj['CONFIG'] = config
ctx.obj['CREDENTIALS'] = credentials bconf.CREDENTIALS = credentials
utils.create_credentials(credentials) utils.create_credentials(credentials)
@main.group() @main.group()
@@ -45,7 +46,7 @@ def tweet(ctx):
@click.argument('tweetid') @click.argument('tweetid')
@click.pass_context @click.pass_context
def get_tweet(ctx, tweetid): def get_tweet(ctx, tweetid):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
t = utils.get_tweet(wq, tweetid) t = utils.get_tweet(wq, tweetid)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@@ -54,7 +55,7 @@ def get_tweet(ctx, tweetid):
@click.argument('query') @click.argument('query')
@click.pass_context @click.pass_context
def get_tweet(ctx, query): def get_tweet(ctx, query):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() c = wq.next()
t = utils.search_tweet(c.client, query) t = utils.search_tweet(c.client, query)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@@ -63,7 +64,7 @@ def get_tweet(ctx, query):
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def get_tweet(ctx, user): def get_tweet(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() c = wq.next()
t = utils.user_timeline(c.client, user) t = utils.user_timeline(c.client, user)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@@ -88,7 +89,7 @@ def list_users(ctx, db):
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def get_user(ctx, user): def get_user(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() c = wq.next()
u = utils.get_user(c.client, user) u = utils.get_user(c.client, user)
print(json.dumps(u, indent=2)) print(json.dumps(u, indent=2))
@@ -112,7 +113,7 @@ def get_users(ctx, usersfile, skip, until, threads, db):
return ExitStack() return ExitStack()
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
len(wq.queue))) len(wq.queue)))
@@ -281,7 +282,7 @@ def users_extractor(ctx):
@click.pass_context @click.pass_context
def extract(ctx, recursive, user, name, initfile): def extract(ctx, recursive, user, name, initfile):
print(locals()) print(locals())
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
dburi = ctx.obj['DBURI'] dburi = ctx.obj['DBURI']
utils.extract(wq, utils.extract(wq,
recursive=recursive, recursive=recursive,
@@ -293,7 +294,7 @@ def extract(ctx, recursive, user, name, initfile):
@extractor.command('reset') @extractor.command('reset')
@click.pass_context @click.pass_context
def reset_extractor(ctx): def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
db = ctx.obj['DBURI'] db = ctx.obj['DBURI']
session = make_session(db) session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@@ -302,7 +303,7 @@ def reset_extractor(ctx):
@click.argument('url', required=False) @click.argument('url', required=False)
@click.pass_context @click.pass_context
def get_limits(ctx, url): def get_limits(ctx, url):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
for worker in wq.queue: for worker in wq.queue:
resp = worker.client.application.rate_limit_status() resp = worker.client.application.rate_limit_status()
print('#'*20) print('#'*20)
@@ -324,11 +325,10 @@ def get_limits(ctx, url):
@click.argument('CONSUMER_SECRET', required=True) @click.argument('CONSUMER_SECRET', required=True)
@click.pass_context @click.pass_context
def run_server(ctx, consumer_key, consumer_secret): def run_server(ctx, consumer_key, consumer_secret):
from . import config bconf.CONSUMER_KEY = consumer_key
config.CONSUMER_KEY = consumer_key bconf.CONSUMER_SECRET = consumer_secret
config.CONSUMER_SECRET = consumer_secret
from .webserver import app from .webserver import app
app.run() app.run(host='0.0.0.0')
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -8,6 +8,7 @@ logger = logging.getLogger(__name__)
from twitter import * from twitter import *
from collections import OrderedDict from collections import OrderedDict
from threading import Lock
from . import utils from . import utils
from . import config from . import config
@@ -40,23 +41,50 @@ class TwitterWorker(object):
def __init__(self, name, client): def __init__(self, name, client):
self.name = name self.name = name
self.client = client self.client = client
self.throttled_time = False self._lock = Lock()
self.busy = False self.busy = False
self.limits = self.client.application.rate_limit_status()
@property def is_limited(self, uriparts):
def throttled(self): return self.get_wait(uriparts)>0
if not self.throttled_time:
return False
t = time.time()
delta = self.throttled_time - t
if delta > 0:
return True
return False
def throttle_until(self, epoch=None): def get_wait(self, uriparts):
self.throttled_time = int(epoch) limits = self.get_limit(uriparts)
logger.info("Worker %s throttled for %s seconds" % (self.name, str(epoch-time.time()))) if limits['remaining'] > 0:
return 0
reset = limits.get('reset', 0)
now = time.time()
return max(0, (reset-now))
def get_limit(self, uriparts):
uri = '/'+'/'.join(uriparts)
return self.limits.get('resources', {}).get(uriparts[0], {}).get(uri, {})
def set_limit(self, uriparts, value):
uri = '/'+'/'.join(uriparts)
if 'resources' not in self.limits:
self.limits['resources'] = {}
resources = self.limits['resources']
if uriparts[0] not in resources:
resources[uriparts[0]] = {}
resource = resources[uriparts[0]]
resource[uri] = value
def update_limits(self, uriparts, remaining, reset, limit):
self.set_limit(uriparts, {'remaining': remaining,
'reset': reset,
'limit': limit})
def update_limits_from_headers(self, uriparts, headers):
reset = float(headers.get('X-Rate-Limit-Reset', time.time() + 30))
remaining = int(headers.get('X-Rate-Limit-Remaining', 0))
limit = int(headers.get('X-Rate-Limit-Limit', -1))
self.update_limits(uriparts=uriparts, remaining=remaining, reset=reset, limit=limit)
class TwitterQueueException(BaseException):
pass
class TwitterQueue(AttrToFunc): class TwitterQueue(AttrToFunc):
def __init__(self, wait=True): def __init__(self, wait=True):
@@ -72,36 +100,37 @@ class TwitterQueue(AttrToFunc):
def handle_call(self, uriparts, *args, **kwargs): def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts)) logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs)) logger.debug('With: {} {}'.format(args, kwargs))
while True: patience = 1
while patience:
c = None c = None
try: try:
c = self.next() c = self.next(uriparts)
c._lock.acquire()
c.busy = True c.busy = True
logger.debug('Next: {}'.format(c.name)) logger.debug('Next: {}'.format(c.name))
ping = time.time() ping = time.time()
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs) resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
pong = time.time() pong = time.time()
c.update_limits_from_headers(uriparts, resp.headers)
logger.debug('Took: {}'.format(pong-ping)) logger.debug('Took: {}'.format(pong-ping))
return resp return resp
except TwitterHTTPError as ex: except TwitterHTTPError as ex:
if ex.e.code in (429, 502, 503, 504): if ex.e.code in (429, 502, 503, 504):
limit = ex.e.headers.get('X-Rate-Limit-Reset', time.time() + 30)
logger.info('{} limited'.format(c.name)) logger.info('{} limited'.format(c.name))
c.throttle_until(limit) c.update_limits_from_headers(uriparts, ex.e.headers)
continue continue
else: else:
raise raise
except urllib.error.URLError as ex: except urllib.error.URLError as ex:
time.sleep(5) time.sleep(5)
logger.info('Something fishy happened: {}'.format(ex)) logger.info('Something fishy happened: {}'.format(ex))
raise
finally: finally:
if c: if c:
c.busy = False c.busy = False
c._lock.release()
if not self.wait:
@property patience -= 1
def client(self):
return self.next().client
@classmethod @classmethod
def from_credentials(self, cred_file=None): def from_credentials(self, cred_file=None):
@@ -115,26 +144,33 @@ class TwitterQueue(AttrToFunc):
wq.ready(TwitterWorker(cred["user"], c)) wq.ready(TwitterWorker(cred["user"], c))
return wq return wq
def _next(self):
def get_wait(self, uriparts):
available = filter(lambda x: not x.busy, self.queue)
first_worker = min(available, key=lambda x: x.get_wait(uriparts))
diff = first_worker.get_wait(uriparts)
return diff
def _next(self, uriparts):
logger.debug('Getting next available') logger.debug('Getting next available')
s = list(self.queue) s = list(self.queue)
random.shuffle(s) random.shuffle(s)
for worker in s: for worker in s:
if not worker.throttled and not worker.busy: if not worker.is_limited(uriparts) and not worker.busy:
return worker return worker
raise Exception('No worker is available') raise TwitterQueueException('No worker is available')
def next(self): def next(self, uriparts):
if not self.wait: if not self.wait:
return self._next() return self._next(uriparts)
while True: while True:
try: try:
return self._next() return self._next(uriparts)
except Exception: except TwitterQueueException:
available = filter(lambda x: not x.busy, self.queue) available = filter(lambda x: not x.busy, self.queue)
if available: if available:
first_worker = min(available, key=lambda x: x.throttled_time) diff = self.get_wait(uriparts)
diff = first_worker.throttled_time - time.time()
logger.info("All workers are throttled. Waiting %s seconds" % diff) logger.info("All workers are throttled. Waiting %s seconds" % diff)
else: else:
diff = 5 diff = 5

4
bitter/version.py Normal file
View File

@@ -0,0 +1,4 @@
import os
with open(os.path.join(os.path.dirname(__file__), 'VERSION')) as f:
__version__ = f.read().strip()

View File

@@ -1 +1,2 @@
contextlib2 contextlib2
future

View File

@@ -1,4 +1,3 @@
sqlalchemy sqlalchemy
twitter twitter
click click
future

View File

@@ -23,16 +23,12 @@ if sys.version_info <= (3, 0):
install_reqs = [str(ir.req) for ir in install_reqs] install_reqs = [str(ir.req) for ir in install_reqs]
test_reqs = [str(ir.req) for ir in test_reqs] test_reqs = [str(ir.req) for ir in test_reqs]
with open('bitter/__init__.py') as f: from bitter import __version__
exec(f.read())
setup( setup(
name="bitter", name="bitter",
packages=['bitter'], packages=['bitter'],
description=''' description=" Simplifying how researchers access Data. It includes a CLI and a library.",
Simplifying how researchers access Data.
It includes a CLI and a library.
''',
author='J. Fernando Sanchez', author='J. Fernando Sanchez',
author_email='balkian@gmail.com', author_email='balkian@gmail.com',
url="http://balkian.com", url="http://balkian.com",

75
tests/test_crawlers.py Normal file
View File

@@ -0,0 +1,75 @@
from unittest import TestCase
import os
import types
import datetime
import time
from bitter import utils
from bitter.crawlers import TwitterQueue, TwitterWorker, TwitterQueueException
from bitter import config as c
class TestUtils(TestCase):
def setUp(self):
self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json'))
def test_create_worker(self):
assert len(self.wq.queue)==1
def test_get_limits(self):
w1 = list(self.wq.queue)[0]
print(w1.limits)
limitslook = w1.get_limit(['statuses', 'lookup'])
assert limitslook['remaining'] == limitslook['limit']
def test_set_limits(self):
w1 = list(self.wq.queue)[0]
w1.set_limit(['test', 'test2'], {'remaining': 0})
assert w1.get_limit(['test', 'test2']) == {'remaining': 0}
def test_await(self):
w1 = list(self.wq.queue)[0]
w1.set_limit(['test', 'wait'], {'remaining': 0, 'reset': time.time()+2})
assert w1.get_wait(['test', 'wait']) > 1
time.sleep(2)
assert w1.get_wait(['test', 'wait']) == 0
assert w1.get_wait(['statuses', 'lookup']) == 0
def test_is_limited(self):
w1 = list(self.wq.queue)[0]
assert not w1.is_limited(['statuses', 'lookup'])
w1.set_limit(['test', 'limited'], {'remaining': 0, 'reset': time.time()+100})
assert w1.is_limited(['test', 'limited'])
def test_call(self):
w1 = list(self.wq.queue)[0]
l1 = w1.get_limit(['users', 'lookup'])
resp = self.wq.users.lookup(screen_name='balkian')
l2 = w1.get_limit(['users', 'lookup'])
assert l1['remaining']-l2['remaining'] == 1
def test_consume(self):
w1 = list(self.wq.queue)[0]
l1 = w1.get_limit(['friends', 'list'])
self.wq.wait = False
for i in range(l1['remaining']):
print(i)
resp = self.wq.friends.list(screen_name='balkian')
# l2 = w1.get_limit(['users', 'lookup'])
# assert l2['remaining'] == 0
# self.wq.users.lookup(screen_name='balkian')
failed = False
try:
# resp = self.wq.friends.list(screen_name='balkian')
self.wq.next(['friends', 'list'])
except TwitterQueueException:
failed = True
assert failed
l2 = w1.get_limit(['friends', 'list'])
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)
time.sleep(w1.get_wait(['friends', 'list']))