diff --git a/bitter/crawlers.py b/bitter/crawlers.py index 0976f94..686e472 100644 --- a/bitter/crawlers.py +++ b/bitter/crawlers.py @@ -1,5 +1,4 @@ import time -import datetime import urllib import random import json @@ -47,21 +46,15 @@ class TwitterWorker(object): self.limits = self.client.application.rate_limit_status() def is_limited(self, uriparts): - limit = self.get_limit(uriparts) - if limit and limit['remaining'] <=0: - t = datime.datetime.now() - delta = limit['reset'] - t - if delta < datetime.timedelta(seconds=1): - return True - return False + return self.get_wait(uriparts)>0 def get_wait(self, uriparts): limits = self.get_limit(uriparts) if limits['remaining'] > 0: return 0 - reset = datetime.datetime.fromtimestamp(limits.get('reset', 0)) - now = datetime.datetime.now() - return max(0, (reset-now).total_seconds()) + reset = limits.get('reset', 0) + now = time.time() + return max(0, (reset-now)) def get_limit(self, uriparts): uri = '/'+'/'.join(uriparts) @@ -90,6 +83,9 @@ class TwitterWorker(object): +class TwitterQueueException(BaseException): + pass + class TwitterQueue(AttrToFunc): def __init__(self, wait=True): logger.debug('Creating worker queue') @@ -104,7 +100,8 @@ class TwitterQueue(AttrToFunc): def handle_call(self, uriparts, *args, **kwargs): logger.debug('Called: {}'.format(uriparts)) logger.debug('With: {} {}'.format(args, kwargs)) - while True: + patience = 1 + while patience: c = None try: c = self.next(uriparts) @@ -127,10 +124,13 @@ class TwitterQueue(AttrToFunc): except urllib.error.URLError as ex: time.sleep(5) logger.info('Something fishy happened: {}'.format(ex)) + raise finally: if c: c.busy = False c._lock.release() + if not self.wait: + patience -= 1 @classmethod def from_credentials(self, cred_file=None): @@ -144,6 +144,14 @@ class TwitterQueue(AttrToFunc): wq.ready(TwitterWorker(cred["user"], c)) return wq + + def get_wait(self, uriparts): + available = filter(lambda x: not x.busy, self.queue) + first_worker = min(available, key=lambda x: x.get_wait(uriparts)) + diff = first_worker.get_wait(uriparts) + return diff + + def _next(self, uriparts): logger.debug('Getting next available') s = list(self.queue) @@ -151,7 +159,7 @@ class TwitterQueue(AttrToFunc): for worker in s: if not worker.is_limited(uriparts) and not worker.busy: return worker - raise Exception('No worker is available') + raise TwitterQueueException('No worker is available') def next(self, uriparts): if not self.wait: @@ -159,11 +167,10 @@ class TwitterQueue(AttrToFunc): while True: try: return self._next(uriparts) - except Exception: + except TwitterQueueException: available = filter(lambda x: not x.busy, self.queue) if available: - first_worker = min(available, key=lambda x: x.get_wait(uriparts)) - diff = first_worker.get_wait(uriparts) + diff = self.get_wait(uriparts) logger.info("All workers are throttled. Waiting %s seconds" % diff) else: diff = 5 diff --git a/tests/test_crawlers.py b/tests/test_crawlers.py index 146cb32..165ac12 100644 --- a/tests/test_crawlers.py +++ b/tests/test_crawlers.py @@ -6,7 +6,7 @@ import datetime import time from bitter import utils -from bitter.crawlers import TwitterQueue, TwitterWorker +from bitter.crawlers import TwitterQueue, TwitterWorker, TwitterQueueException from bitter import config as c class TestUtils(TestCase): @@ -39,6 +39,8 @@ class TestUtils(TestCase): def test_is_limited(self): w1 = list(self.wq.queue)[0] assert not w1.is_limited(['statuses', 'lookup']) + w1.set_limit(['test', 'limited'], {'remaining': 0, 'reset': time.time()+100}) + assert w1.is_limited(['test', 'limited']) def test_call(self): w1 = list(self.wq.queue)[0] @@ -46,3 +48,28 @@ class TestUtils(TestCase): resp = self.wq.users.lookup(screen_name='balkian') l2 = w1.get_limit(['users', 'lookup']) assert l1['remaining']-l2['remaining'] == 1 + + def test_consume(self): + w1 = list(self.wq.queue)[0] + l1 = w1.get_limit(['friends', 'list']) + self.wq.wait = False + for i in range(l1['remaining']): + print(i) + resp = self.wq.friends.list(screen_name='balkian') + # l2 = w1.get_limit(['users', 'lookup']) + # assert l2['remaining'] == 0 + # self.wq.users.lookup(screen_name='balkian') + + failed = False + try: + # resp = self.wq.friends.list(screen_name='balkian') + self.wq.next(['friends', 'list']) + except TwitterQueueException: + failed = True + assert failed + l2 = w1.get_limit(['friends', 'list']) + assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time()) + assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2) + time.sleep(w1.get_wait(['friends', 'list'])) + +