Fixed limits bug, added tests

master
J. Fernando Sánchez 8 years ago
parent 2036d51d96
commit 35f0c6376d

@ -1,5 +1,4 @@
import time import time
import datetime
import urllib import urllib
import random import random
import json import json
@ -47,21 +46,15 @@ class TwitterWorker(object):
self.limits = self.client.application.rate_limit_status() self.limits = self.client.application.rate_limit_status()
def is_limited(self, uriparts): def is_limited(self, uriparts):
limit = self.get_limit(uriparts) return self.get_wait(uriparts)>0
if limit and limit['remaining'] <=0:
t = datime.datetime.now()
delta = limit['reset'] - t
if delta < datetime.timedelta(seconds=1):
return True
return False
def get_wait(self, uriparts): def get_wait(self, uriparts):
limits = self.get_limit(uriparts) limits = self.get_limit(uriparts)
if limits['remaining'] > 0: if limits['remaining'] > 0:
return 0 return 0
reset = datetime.datetime.fromtimestamp(limits.get('reset', 0)) reset = limits.get('reset', 0)
now = datetime.datetime.now() now = time.time()
return max(0, (reset-now).total_seconds()) return max(0, (reset-now))
def get_limit(self, uriparts): def get_limit(self, uriparts):
uri = '/'+'/'.join(uriparts) uri = '/'+'/'.join(uriparts)
@ -90,6 +83,9 @@ class TwitterWorker(object):
class TwitterQueueException(BaseException):
pass
class TwitterQueue(AttrToFunc): class TwitterQueue(AttrToFunc):
def __init__(self, wait=True): def __init__(self, wait=True):
logger.debug('Creating worker queue') logger.debug('Creating worker queue')
@ -104,7 +100,8 @@ class TwitterQueue(AttrToFunc):
def handle_call(self, uriparts, *args, **kwargs): def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts)) logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs)) logger.debug('With: {} {}'.format(args, kwargs))
while True: patience = 1
while patience:
c = None c = None
try: try:
c = self.next(uriparts) c = self.next(uriparts)
@ -127,10 +124,13 @@ class TwitterQueue(AttrToFunc):
except urllib.error.URLError as ex: except urllib.error.URLError as ex:
time.sleep(5) time.sleep(5)
logger.info('Something fishy happened: {}'.format(ex)) logger.info('Something fishy happened: {}'.format(ex))
raise
finally: finally:
if c: if c:
c.busy = False c.busy = False
c._lock.release() c._lock.release()
if not self.wait:
patience -= 1
@classmethod @classmethod
def from_credentials(self, cred_file=None): def from_credentials(self, cred_file=None):
@ -144,6 +144,14 @@ class TwitterQueue(AttrToFunc):
wq.ready(TwitterWorker(cred["user"], c)) wq.ready(TwitterWorker(cred["user"], c))
return wq return wq
def get_wait(self, uriparts):
available = filter(lambda x: not x.busy, self.queue)
first_worker = min(available, key=lambda x: x.get_wait(uriparts))
diff = first_worker.get_wait(uriparts)
return diff
def _next(self, uriparts): def _next(self, uriparts):
logger.debug('Getting next available') logger.debug('Getting next available')
s = list(self.queue) s = list(self.queue)
@ -151,7 +159,7 @@ class TwitterQueue(AttrToFunc):
for worker in s: for worker in s:
if not worker.is_limited(uriparts) and not worker.busy: if not worker.is_limited(uriparts) and not worker.busy:
return worker return worker
raise Exception('No worker is available') raise TwitterQueueException('No worker is available')
def next(self, uriparts): def next(self, uriparts):
if not self.wait: if not self.wait:
@ -159,11 +167,10 @@ class TwitterQueue(AttrToFunc):
while True: while True:
try: try:
return self._next(uriparts) return self._next(uriparts)
except Exception: except TwitterQueueException:
available = filter(lambda x: not x.busy, self.queue) available = filter(lambda x: not x.busy, self.queue)
if available: if available:
first_worker = min(available, key=lambda x: x.get_wait(uriparts)) diff = self.get_wait(uriparts)
diff = first_worker.get_wait(uriparts)
logger.info("All workers are throttled. Waiting %s seconds" % diff) logger.info("All workers are throttled. Waiting %s seconds" % diff)
else: else:
diff = 5 diff = 5

@ -6,7 +6,7 @@ import datetime
import time import time
from bitter import utils from bitter import utils
from bitter.crawlers import TwitterQueue, TwitterWorker from bitter.crawlers import TwitterQueue, TwitterWorker, TwitterQueueException
from bitter import config as c from bitter import config as c
class TestUtils(TestCase): class TestUtils(TestCase):
@ -39,6 +39,8 @@ class TestUtils(TestCase):
def test_is_limited(self): def test_is_limited(self):
w1 = list(self.wq.queue)[0] w1 = list(self.wq.queue)[0]
assert not w1.is_limited(['statuses', 'lookup']) assert not w1.is_limited(['statuses', 'lookup'])
w1.set_limit(['test', 'limited'], {'remaining': 0, 'reset': time.time()+100})
assert w1.is_limited(['test', 'limited'])
def test_call(self): def test_call(self):
w1 = list(self.wq.queue)[0] w1 = list(self.wq.queue)[0]
@ -46,3 +48,28 @@ class TestUtils(TestCase):
resp = self.wq.users.lookup(screen_name='balkian') resp = self.wq.users.lookup(screen_name='balkian')
l2 = w1.get_limit(['users', 'lookup']) l2 = w1.get_limit(['users', 'lookup'])
assert l1['remaining']-l2['remaining'] == 1 assert l1['remaining']-l2['remaining'] == 1
def test_consume(self):
w1 = list(self.wq.queue)[0]
l1 = w1.get_limit(['friends', 'list'])
self.wq.wait = False
for i in range(l1['remaining']):
print(i)
resp = self.wq.friends.list(screen_name='balkian')
# l2 = w1.get_limit(['users', 'lookup'])
# assert l2['remaining'] == 0
# self.wq.users.lookup(screen_name='balkian')
failed = False
try:
# resp = self.wq.friends.list(screen_name='balkian')
self.wq.next(['friends', 'list'])
except TwitterQueueException:
failed = True
assert failed
l2 = w1.get_limit(['friends', 'list'])
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)
time.sleep(w1.get_wait(['friends', 'list']))

Loading…
Cancel
Save