Added limits to every call

master 0.5.5
J. Fernando Sánchez 8 years ago
parent 09feb050a6
commit 2036d51d96

@ -1 +1 @@
0.5.4 0.5.5

@ -10,6 +10,7 @@ import sqlite3
from sqlalchemy import exists from sqlalchemy import exists
from bitter import utils, models, crawlers from bitter import utils, models, crawlers
from bitter import config as bconf
from bitter.models import make_session, User, ExtractorEntry, Following from bitter.models import make_session, User, ExtractorEntry, Following
import sys import sys
@ -33,7 +34,7 @@ def main(ctx, verbose, logging_level, config, credentials):
ctx.obj = {} ctx.obj = {}
ctx.obj['VERBOSE'] = verbose ctx.obj['VERBOSE'] = verbose
ctx.obj['CONFIG'] = config ctx.obj['CONFIG'] = config
ctx.obj['CREDENTIALS'] = credentials bconf.CREDENTIALS = credentials
utils.create_credentials(credentials) utils.create_credentials(credentials)
@main.group() @main.group()
@ -45,7 +46,7 @@ def tweet(ctx):
@click.argument('tweetid') @click.argument('tweetid')
@click.pass_context @click.pass_context
def get_tweet(ctx, tweetid): def get_tweet(ctx, tweetid):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
t = utils.get_tweet(wq, tweetid) t = utils.get_tweet(wq, tweetid)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@ -54,7 +55,7 @@ def get_tweet(ctx, tweetid):
@click.argument('query') @click.argument('query')
@click.pass_context @click.pass_context
def get_tweet(ctx, query): def get_tweet(ctx, query):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() c = wq.next()
t = utils.search_tweet(c.client, query) t = utils.search_tweet(c.client, query)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@ -63,7 +64,7 @@ def get_tweet(ctx, query):
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def get_tweet(ctx, user): def get_tweet(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() c = wq.next()
t = utils.user_timeline(c.client, user) t = utils.user_timeline(c.client, user)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@ -88,7 +89,7 @@ def list_users(ctx, db):
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def get_user(ctx, user): def get_user(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() c = wq.next()
u = utils.get_user(c.client, user) u = utils.get_user(c.client, user)
print(json.dumps(u, indent=2)) print(json.dumps(u, indent=2))
@ -112,7 +113,7 @@ def get_users(ctx, usersfile, skip, until, threads, db):
return ExitStack() return ExitStack()
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
len(wq.queue))) len(wq.queue)))
@ -281,7 +282,7 @@ def users_extractor(ctx):
@click.pass_context @click.pass_context
def extract(ctx, recursive, user, name, initfile): def extract(ctx, recursive, user, name, initfile):
print(locals()) print(locals())
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
dburi = ctx.obj['DBURI'] dburi = ctx.obj['DBURI']
utils.extract(wq, utils.extract(wq,
recursive=recursive, recursive=recursive,
@ -293,7 +294,7 @@ def extract(ctx, recursive, user, name, initfile):
@extractor.command('reset') @extractor.command('reset')
@click.pass_context @click.pass_context
def reset_extractor(ctx): def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
db = ctx.obj['DBURI'] db = ctx.obj['DBURI']
session = make_session(db) session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@ -302,7 +303,7 @@ def reset_extractor(ctx):
@click.argument('url', required=False) @click.argument('url', required=False)
@click.pass_context @click.pass_context
def get_limits(ctx, url): def get_limits(ctx, url):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
for worker in wq.queue: for worker in wq.queue:
resp = worker.client.application.rate_limit_status() resp = worker.client.application.rate_limit_status()
print('#'*20) print('#'*20)
@ -324,11 +325,10 @@ def get_limits(ctx, url):
@click.argument('CONSUMER_SECRET', required=True) @click.argument('CONSUMER_SECRET', required=True)
@click.pass_context @click.pass_context
def run_server(ctx, consumer_key, consumer_secret): def run_server(ctx, consumer_key, consumer_secret):
from . import config bconf.CONSUMER_KEY = consumer_key
config.CONSUMER_KEY = consumer_key bconf.CONSUMER_SECRET = consumer_secret
config.CONSUMER_SECRET = consumer_secret
from .webserver import app from .webserver import app
app.run() app.run(host='0.0.0.0')
if __name__ == '__main__': if __name__ == '__main__':

@ -1,4 +1,5 @@
import time import time
import datetime
import urllib import urllib
import random import random
import json import json
@ -41,23 +42,52 @@ class TwitterWorker(object):
def __init__(self, name, client): def __init__(self, name, client):
self.name = name self.name = name
self.client = client self.client = client
self.throttled_time = False
self._lock = Lock() self._lock = Lock()
self.busy = False self.busy = False
self.limits = self.client.application.rate_limit_status()
@property
def throttled(self): def is_limited(self, uriparts):
if not self.throttled_time: limit = self.get_limit(uriparts)
return False if limit and limit['remaining'] <=0:
t = time.time() t = datime.datetime.now()
delta = self.throttled_time - t delta = limit['reset'] - t
if delta > 0: if delta < datetime.timedelta(seconds=1):
return True return True
return False return False
def throttle_until(self, epoch=None): def get_wait(self, uriparts):
self.throttled_time = int(epoch) limits = self.get_limit(uriparts)
logger.info("Worker %s throttled for %s seconds" % (self.name, str(epoch-time.time()))) if limits['remaining'] > 0:
return 0
reset = datetime.datetime.fromtimestamp(limits.get('reset', 0))
now = datetime.datetime.now()
return max(0, (reset-now).total_seconds())
def get_limit(self, uriparts):
uri = '/'+'/'.join(uriparts)
return self.limits.get('resources', {}).get(uriparts[0], {}).get(uri, {})
def set_limit(self, uriparts, value):
uri = '/'+'/'.join(uriparts)
if 'resources' not in self.limits:
self.limits['resources'] = {}
resources = self.limits['resources']
if uriparts[0] not in resources:
resources[uriparts[0]] = {}
resource = resources[uriparts[0]]
resource[uri] = value
def update_limits(self, uriparts, remaining, reset, limit):
self.set_limit(uriparts, {'remaining': remaining,
'reset': reset,
'limit': limit})
def update_limits_from_headers(self, uriparts, headers):
reset = float(headers.get('X-Rate-Limit-Reset', time.time() + 30))
remaining = int(headers.get('X-Rate-Limit-Remaining', 0))
limit = int(headers.get('X-Rate-Limit-Limit', -1))
self.update_limits(uriparts=uriparts, remaining=remaining, reset=reset, limit=limit)
class TwitterQueue(AttrToFunc): class TwitterQueue(AttrToFunc):
@ -77,20 +107,20 @@ class TwitterQueue(AttrToFunc):
while True: while True:
c = None c = None
try: try:
c = self.next() c = self.next(uriparts)
c._lock.acquire() c._lock.acquire()
c.busy = True c.busy = True
logger.debug('Next: {}'.format(c.name)) logger.debug('Next: {}'.format(c.name))
ping = time.time() ping = time.time()
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs) resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
pong = time.time() pong = time.time()
c.update_limits_from_headers(uriparts, resp.headers)
logger.debug('Took: {}'.format(pong-ping)) logger.debug('Took: {}'.format(pong-ping))
return resp return resp
except TwitterHTTPError as ex: except TwitterHTTPError as ex:
if ex.e.code in (429, 502, 503, 504): if ex.e.code in (429, 502, 503, 504):
limit = ex.e.headers.get('X-Rate-Limit-Reset', time.time() + 30)
logger.info('{} limited'.format(c.name)) logger.info('{} limited'.format(c.name))
c.throttle_until(limit) c.update_limits_from_headers(uriparts, ex.e.headers)
continue continue
else: else:
raise raise
@ -101,11 +131,6 @@ class TwitterQueue(AttrToFunc):
if c: if c:
c.busy = False c.busy = False
c._lock.release() c._lock.release()
@property
def client(self):
return self.next().client
@classmethod @classmethod
def from_credentials(self, cred_file=None): def from_credentials(self, cred_file=None):
@ -119,26 +144,26 @@ class TwitterQueue(AttrToFunc):
wq.ready(TwitterWorker(cred["user"], c)) wq.ready(TwitterWorker(cred["user"], c))
return wq return wq
def _next(self): def _next(self, uriparts):
logger.debug('Getting next available') logger.debug('Getting next available')
s = list(self.queue) s = list(self.queue)
random.shuffle(s) random.shuffle(s)
for worker in s: for worker in s:
if not worker.throttled and not worker.busy: if not worker.is_limited(uriparts) and not worker.busy:
return worker return worker
raise Exception('No worker is available') raise Exception('No worker is available')
def next(self): def next(self, uriparts):
if not self.wait: if not self.wait:
return self._next() return self._next(uriparts)
while True: while True:
try: try:
return self._next() return self._next(uriparts)
except Exception: except Exception:
available = filter(lambda x: not x.busy, self.queue) available = filter(lambda x: not x.busy, self.queue)
if available: if available:
first_worker = min(available, key=lambda x: x.throttled_time) first_worker = min(available, key=lambda x: x.get_wait(uriparts))
diff = first_worker.throttled_time - time.time() diff = first_worker.get_wait(uriparts)
logger.info("All workers are throttled. Waiting %s seconds" % diff) logger.info("All workers are throttled. Waiting %s seconds" % diff)
else: else:
diff = 5 diff = 5

@ -0,0 +1,48 @@
from unittest import TestCase
import os
import types
import datetime
import time
from bitter import utils
from bitter.crawlers import TwitterQueue, TwitterWorker
from bitter import config as c
class TestUtils(TestCase):
def setUp(self):
self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json'))
def test_create_worker(self):
assert len(self.wq.queue)==1
def test_get_limits(self):
w1 = list(self.wq.queue)[0]
print(w1.limits)
limitslook = w1.get_limit(['statuses', 'lookup'])
assert limitslook['remaining'] == limitslook['limit']
def test_set_limits(self):
w1 = list(self.wq.queue)[0]
w1.set_limit(['test', 'test2'], {'remaining': 0})
assert w1.get_limit(['test', 'test2']) == {'remaining': 0}
def test_await(self):
w1 = list(self.wq.queue)[0]
w1.set_limit(['test', 'wait'], {'remaining': 0, 'reset': time.time()+2})
assert w1.get_wait(['test', 'wait']) > 1
time.sleep(2)
assert w1.get_wait(['test', 'wait']) == 0
assert w1.get_wait(['statuses', 'lookup']) == 0
def test_is_limited(self):
w1 = list(self.wq.queue)[0]
assert not w1.is_limited(['statuses', 'lookup'])
def test_call(self):
w1 = list(self.wq.queue)[0]
l1 = w1.get_limit(['users', 'lookup'])
resp = self.wq.users.lookup(screen_name='balkian')
l2 = w1.get_limit(['users', 'lookup'])
assert l1['remaining']-l2['remaining'] == 1
Loading…
Cancel
Save