From 2036d51d96b8aadf5ef2077c95ff1a5adf61dbd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=2E=20Fernando=20S=C3=A1nchez?= Date: Wed, 28 Sep 2016 05:06:12 +0200 Subject: [PATCH] Added limits to every call --- bitter/VERSION | 2 +- bitter/cli.py | 26 +++++++------- bitter/crawlers.py | 81 +++++++++++++++++++++++++++--------------- tests/test_crawlers.py | 48 +++++++++++++++++++++++++ 4 files changed, 115 insertions(+), 42 deletions(-) create mode 100644 tests/test_crawlers.py diff --git a/bitter/VERSION b/bitter/VERSION index 167b000..389facc 100644 --- a/bitter/VERSION +++ b/bitter/VERSION @@ -1 +1 @@ -0.5.4 \ No newline at end of file +0.5.5 \ No newline at end of file diff --git a/bitter/cli.py b/bitter/cli.py index dbd5fac..938cc41 100644 --- a/bitter/cli.py +++ b/bitter/cli.py @@ -10,6 +10,7 @@ import sqlite3 from sqlalchemy import exists from bitter import utils, models, crawlers +from bitter import config as bconf from bitter.models import make_session, User, ExtractorEntry, Following import sys @@ -33,7 +34,7 @@ def main(ctx, verbose, logging_level, config, credentials): ctx.obj = {} ctx.obj['VERBOSE'] = verbose ctx.obj['CONFIG'] = config - ctx.obj['CREDENTIALS'] = credentials + bconf.CREDENTIALS = credentials utils.create_credentials(credentials) @main.group() @@ -45,7 +46,7 @@ def tweet(ctx): @click.argument('tweetid') @click.pass_context def get_tweet(ctx, tweetid): - wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) + wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) t = utils.get_tweet(wq, tweetid) print(json.dumps(t, indent=2)) @@ -54,7 +55,7 @@ def get_tweet(ctx, tweetid): @click.argument('query') @click.pass_context def get_tweet(ctx, query): - wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) + wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) c = wq.next() t = utils.search_tweet(c.client, query) print(json.dumps(t, indent=2)) @@ -63,7 +64,7 @@ def get_tweet(ctx, query): @click.argument('user') @click.pass_context def get_tweet(ctx, user): - wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) + wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) c = wq.next() t = utils.user_timeline(c.client, user) print(json.dumps(t, indent=2)) @@ -88,7 +89,7 @@ def list_users(ctx, db): @click.argument('user') @click.pass_context def get_user(ctx, user): - wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) + wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) c = wq.next() u = utils.get_user(c.client, user) print(json.dumps(u, indent=2)) @@ -112,7 +113,7 @@ def get_users(ctx, usersfile, skip, until, threads, db): return ExitStack() - wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) + wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, len(wq.queue))) @@ -281,7 +282,7 @@ def users_extractor(ctx): @click.pass_context def extract(ctx, recursive, user, name, initfile): print(locals()) - wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) + wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) dburi = ctx.obj['DBURI'] utils.extract(wq, recursive=recursive, @@ -293,7 +294,7 @@ def extract(ctx, recursive, user, name, initfile): @extractor.command('reset') @click.pass_context def reset_extractor(ctx): - wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) + wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) db = ctx.obj['DBURI'] session = make_session(db) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) @@ -302,7 +303,7 @@ def reset_extractor(ctx): @click.argument('url', required=False) @click.pass_context def get_limits(ctx, url): - wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) + wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) for worker in wq.queue: resp = worker.client.application.rate_limit_status() print('#'*20) @@ -324,11 +325,10 @@ def get_limits(ctx, url): @click.argument('CONSUMER_SECRET', required=True) @click.pass_context def run_server(ctx, consumer_key, consumer_secret): - from . import config - config.CONSUMER_KEY = consumer_key - config.CONSUMER_SECRET = consumer_secret + bconf.CONSUMER_KEY = consumer_key + bconf.CONSUMER_SECRET = consumer_secret from .webserver import app - app.run() + app.run(host='0.0.0.0') if __name__ == '__main__': diff --git a/bitter/crawlers.py b/bitter/crawlers.py index 59f2976..0976f94 100644 --- a/bitter/crawlers.py +++ b/bitter/crawlers.py @@ -1,4 +1,5 @@ import time +import datetime import urllib import random import json @@ -41,23 +42,52 @@ class TwitterWorker(object): def __init__(self, name, client): self.name = name self.client = client - self.throttled_time = False self._lock = Lock() self.busy = False - - @property - def throttled(self): - if not self.throttled_time: - return False - t = time.time() - delta = self.throttled_time - t - if delta > 0: - return True + self.limits = self.client.application.rate_limit_status() + + def is_limited(self, uriparts): + limit = self.get_limit(uriparts) + if limit and limit['remaining'] <=0: + t = datime.datetime.now() + delta = limit['reset'] - t + if delta < datetime.timedelta(seconds=1): + return True return False - def throttle_until(self, epoch=None): - self.throttled_time = int(epoch) - logger.info("Worker %s throttled for %s seconds" % (self.name, str(epoch-time.time()))) + def get_wait(self, uriparts): + limits = self.get_limit(uriparts) + if limits['remaining'] > 0: + return 0 + reset = datetime.datetime.fromtimestamp(limits.get('reset', 0)) + now = datetime.datetime.now() + return max(0, (reset-now).total_seconds()) + + def get_limit(self, uriparts): + uri = '/'+'/'.join(uriparts) + return self.limits.get('resources', {}).get(uriparts[0], {}).get(uri, {}) + + def set_limit(self, uriparts, value): + uri = '/'+'/'.join(uriparts) + if 'resources' not in self.limits: + self.limits['resources'] = {} + resources = self.limits['resources'] + if uriparts[0] not in resources: + resources[uriparts[0]] = {} + resource = resources[uriparts[0]] + resource[uri] = value + + def update_limits(self, uriparts, remaining, reset, limit): + self.set_limit(uriparts, {'remaining': remaining, + 'reset': reset, + 'limit': limit}) + + def update_limits_from_headers(self, uriparts, headers): + reset = float(headers.get('X-Rate-Limit-Reset', time.time() + 30)) + remaining = int(headers.get('X-Rate-Limit-Remaining', 0)) + limit = int(headers.get('X-Rate-Limit-Limit', -1)) + self.update_limits(uriparts=uriparts, remaining=remaining, reset=reset, limit=limit) + class TwitterQueue(AttrToFunc): @@ -77,20 +107,20 @@ class TwitterQueue(AttrToFunc): while True: c = None try: - c = self.next() + c = self.next(uriparts) c._lock.acquire() c.busy = True logger.debug('Next: {}'.format(c.name)) ping = time.time() resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs) pong = time.time() + c.update_limits_from_headers(uriparts, resp.headers) logger.debug('Took: {}'.format(pong-ping)) return resp except TwitterHTTPError as ex: if ex.e.code in (429, 502, 503, 504): - limit = ex.e.headers.get('X-Rate-Limit-Reset', time.time() + 30) logger.info('{} limited'.format(c.name)) - c.throttle_until(limit) + c.update_limits_from_headers(uriparts, ex.e.headers) continue else: raise @@ -101,11 +131,6 @@ class TwitterQueue(AttrToFunc): if c: c.busy = False c._lock.release() - - - @property - def client(self): - return self.next().client @classmethod def from_credentials(self, cred_file=None): @@ -119,26 +144,26 @@ class TwitterQueue(AttrToFunc): wq.ready(TwitterWorker(cred["user"], c)) return wq - def _next(self): + def _next(self, uriparts): logger.debug('Getting next available') s = list(self.queue) random.shuffle(s) for worker in s: - if not worker.throttled and not worker.busy: + if not worker.is_limited(uriparts) and not worker.busy: return worker raise Exception('No worker is available') - def next(self): + def next(self, uriparts): if not self.wait: - return self._next() + return self._next(uriparts) while True: try: - return self._next() + return self._next(uriparts) except Exception: available = filter(lambda x: not x.busy, self.queue) if available: - first_worker = min(available, key=lambda x: x.throttled_time) - diff = first_worker.throttled_time - time.time() + first_worker = min(available, key=lambda x: x.get_wait(uriparts)) + diff = first_worker.get_wait(uriparts) logger.info("All workers are throttled. Waiting %s seconds" % diff) else: diff = 5 diff --git a/tests/test_crawlers.py b/tests/test_crawlers.py new file mode 100644 index 0000000..146cb32 --- /dev/null +++ b/tests/test_crawlers.py @@ -0,0 +1,48 @@ +from unittest import TestCase + +import os +import types +import datetime +import time + +from bitter import utils +from bitter.crawlers import TwitterQueue, TwitterWorker +from bitter import config as c + +class TestUtils(TestCase): + + def setUp(self): + self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json')) + + def test_create_worker(self): + assert len(self.wq.queue)==1 + + def test_get_limits(self): + w1 = list(self.wq.queue)[0] + print(w1.limits) + limitslook = w1.get_limit(['statuses', 'lookup']) + assert limitslook['remaining'] == limitslook['limit'] + + def test_set_limits(self): + w1 = list(self.wq.queue)[0] + w1.set_limit(['test', 'test2'], {'remaining': 0}) + assert w1.get_limit(['test', 'test2']) == {'remaining': 0} + + def test_await(self): + w1 = list(self.wq.queue)[0] + w1.set_limit(['test', 'wait'], {'remaining': 0, 'reset': time.time()+2}) + assert w1.get_wait(['test', 'wait']) > 1 + time.sleep(2) + assert w1.get_wait(['test', 'wait']) == 0 + assert w1.get_wait(['statuses', 'lookup']) == 0 + + def test_is_limited(self): + w1 = list(self.wq.queue)[0] + assert not w1.is_limited(['statuses', 'lookup']) + + def test_call(self): + w1 = list(self.wq.queue)[0] + l1 = w1.get_limit(['users', 'lookup']) + resp = self.wq.users.lookup(screen_name='balkian') + l2 = w1.get_limit(['users', 'lookup']) + assert l1['remaining']-l2['remaining'] == 1