1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-25 04:38:25 +00:00

3 Commits
0.5.4 ... 0.5.6

Author SHA1 Message Date
J. Fernando Sánchez
3f42879751 Bumped to v0.5.6 2016-09-28 06:31:33 +02:00
J. Fernando Sánchez
35f0c6376d Fixed limits bug, added tests 2016-09-28 06:30:49 +02:00
J. Fernando Sánchez
2036d51d96 Added limits to every call 2016-09-28 05:06:12 +02:00
4 changed files with 152 additions and 45 deletions

View File

@@ -1 +1 @@
0.5.4 0.5.6

View File

@@ -10,6 +10,7 @@ import sqlite3
from sqlalchemy import exists from sqlalchemy import exists
from bitter import utils, models, crawlers from bitter import utils, models, crawlers
from bitter import config as bconf
from bitter.models import make_session, User, ExtractorEntry, Following from bitter.models import make_session, User, ExtractorEntry, Following
import sys import sys
@@ -33,7 +34,7 @@ def main(ctx, verbose, logging_level, config, credentials):
ctx.obj = {} ctx.obj = {}
ctx.obj['VERBOSE'] = verbose ctx.obj['VERBOSE'] = verbose
ctx.obj['CONFIG'] = config ctx.obj['CONFIG'] = config
ctx.obj['CREDENTIALS'] = credentials bconf.CREDENTIALS = credentials
utils.create_credentials(credentials) utils.create_credentials(credentials)
@main.group() @main.group()
@@ -45,7 +46,7 @@ def tweet(ctx):
@click.argument('tweetid') @click.argument('tweetid')
@click.pass_context @click.pass_context
def get_tweet(ctx, tweetid): def get_tweet(ctx, tweetid):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
t = utils.get_tweet(wq, tweetid) t = utils.get_tweet(wq, tweetid)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@@ -54,7 +55,7 @@ def get_tweet(ctx, tweetid):
@click.argument('query') @click.argument('query')
@click.pass_context @click.pass_context
def get_tweet(ctx, query): def get_tweet(ctx, query):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() c = wq.next()
t = utils.search_tweet(c.client, query) t = utils.search_tweet(c.client, query)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@@ -63,7 +64,7 @@ def get_tweet(ctx, query):
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def get_tweet(ctx, user): def get_tweet(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() c = wq.next()
t = utils.user_timeline(c.client, user) t = utils.user_timeline(c.client, user)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@@ -88,7 +89,7 @@ def list_users(ctx, db):
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def get_user(ctx, user): def get_user(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() c = wq.next()
u = utils.get_user(c.client, user) u = utils.get_user(c.client, user)
print(json.dumps(u, indent=2)) print(json.dumps(u, indent=2))
@@ -112,7 +113,7 @@ def get_users(ctx, usersfile, skip, until, threads, db):
return ExitStack() return ExitStack()
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
len(wq.queue))) len(wq.queue)))
@@ -281,7 +282,7 @@ def users_extractor(ctx):
@click.pass_context @click.pass_context
def extract(ctx, recursive, user, name, initfile): def extract(ctx, recursive, user, name, initfile):
print(locals()) print(locals())
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
dburi = ctx.obj['DBURI'] dburi = ctx.obj['DBURI']
utils.extract(wq, utils.extract(wq,
recursive=recursive, recursive=recursive,
@@ -293,7 +294,7 @@ def extract(ctx, recursive, user, name, initfile):
@extractor.command('reset') @extractor.command('reset')
@click.pass_context @click.pass_context
def reset_extractor(ctx): def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
db = ctx.obj['DBURI'] db = ctx.obj['DBURI']
session = make_session(db) session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@@ -302,7 +303,7 @@ def reset_extractor(ctx):
@click.argument('url', required=False) @click.argument('url', required=False)
@click.pass_context @click.pass_context
def get_limits(ctx, url): def get_limits(ctx, url):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
for worker in wq.queue: for worker in wq.queue:
resp = worker.client.application.rate_limit_status() resp = worker.client.application.rate_limit_status()
print('#'*20) print('#'*20)
@@ -324,11 +325,10 @@ def get_limits(ctx, url):
@click.argument('CONSUMER_SECRET', required=True) @click.argument('CONSUMER_SECRET', required=True)
@click.pass_context @click.pass_context
def run_server(ctx, consumer_key, consumer_secret): def run_server(ctx, consumer_key, consumer_secret):
from . import config bconf.CONSUMER_KEY = consumer_key
config.CONSUMER_KEY = consumer_key bconf.CONSUMER_SECRET = consumer_secret
config.CONSUMER_SECRET = consumer_secret
from .webserver import app from .webserver import app
app.run() app.run(host='0.0.0.0')
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -41,24 +41,50 @@ class TwitterWorker(object):
def __init__(self, name, client): def __init__(self, name, client):
self.name = name self.name = name
self.client = client self.client = client
self.throttled_time = False
self._lock = Lock() self._lock = Lock()
self.busy = False self.busy = False
self.limits = self.client.application.rate_limit_status()
@property def is_limited(self, uriparts):
def throttled(self): return self.get_wait(uriparts)>0
if not self.throttled_time:
return False
t = time.time()
delta = self.throttled_time - t
if delta > 0:
return True
return False
def throttle_until(self, epoch=None): def get_wait(self, uriparts):
self.throttled_time = int(epoch) limits = self.get_limit(uriparts)
logger.info("Worker %s throttled for %s seconds" % (self.name, str(epoch-time.time()))) if limits['remaining'] > 0:
return 0
reset = limits.get('reset', 0)
now = time.time()
return max(0, (reset-now))
def get_limit(self, uriparts):
uri = '/'+'/'.join(uriparts)
return self.limits.get('resources', {}).get(uriparts[0], {}).get(uri, {})
def set_limit(self, uriparts, value):
uri = '/'+'/'.join(uriparts)
if 'resources' not in self.limits:
self.limits['resources'] = {}
resources = self.limits['resources']
if uriparts[0] not in resources:
resources[uriparts[0]] = {}
resource = resources[uriparts[0]]
resource[uri] = value
def update_limits(self, uriparts, remaining, reset, limit):
self.set_limit(uriparts, {'remaining': remaining,
'reset': reset,
'limit': limit})
def update_limits_from_headers(self, uriparts, headers):
reset = float(headers.get('X-Rate-Limit-Reset', time.time() + 30))
remaining = int(headers.get('X-Rate-Limit-Remaining', 0))
limit = int(headers.get('X-Rate-Limit-Limit', -1))
self.update_limits(uriparts=uriparts, remaining=remaining, reset=reset, limit=limit)
class TwitterQueueException(BaseException):
pass
class TwitterQueue(AttrToFunc): class TwitterQueue(AttrToFunc):
def __init__(self, wait=True): def __init__(self, wait=True):
@@ -74,38 +100,37 @@ class TwitterQueue(AttrToFunc):
def handle_call(self, uriparts, *args, **kwargs): def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts)) logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs)) logger.debug('With: {} {}'.format(args, kwargs))
while True: patience = 1
while patience:
c = None c = None
try: try:
c = self.next() c = self.next(uriparts)
c._lock.acquire() c._lock.acquire()
c.busy = True c.busy = True
logger.debug('Next: {}'.format(c.name)) logger.debug('Next: {}'.format(c.name))
ping = time.time() ping = time.time()
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs) resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
pong = time.time() pong = time.time()
c.update_limits_from_headers(uriparts, resp.headers)
logger.debug('Took: {}'.format(pong-ping)) logger.debug('Took: {}'.format(pong-ping))
return resp return resp
except TwitterHTTPError as ex: except TwitterHTTPError as ex:
if ex.e.code in (429, 502, 503, 504): if ex.e.code in (429, 502, 503, 504):
limit = ex.e.headers.get('X-Rate-Limit-Reset', time.time() + 30)
logger.info('{} limited'.format(c.name)) logger.info('{} limited'.format(c.name))
c.throttle_until(limit) c.update_limits_from_headers(uriparts, ex.e.headers)
continue continue
else: else:
raise raise
except urllib.error.URLError as ex: except urllib.error.URLError as ex:
time.sleep(5) time.sleep(5)
logger.info('Something fishy happened: {}'.format(ex)) logger.info('Something fishy happened: {}'.format(ex))
raise
finally: finally:
if c: if c:
c.busy = False c.busy = False
c._lock.release() c._lock.release()
if not self.wait:
patience -= 1
@property
def client(self):
return self.next().client
@classmethod @classmethod
def from_credentials(self, cred_file=None): def from_credentials(self, cred_file=None):
@@ -119,26 +144,33 @@ class TwitterQueue(AttrToFunc):
wq.ready(TwitterWorker(cred["user"], c)) wq.ready(TwitterWorker(cred["user"], c))
return wq return wq
def _next(self):
def get_wait(self, uriparts):
available = filter(lambda x: not x.busy, self.queue)
first_worker = min(available, key=lambda x: x.get_wait(uriparts))
diff = first_worker.get_wait(uriparts)
return diff
def _next(self, uriparts):
logger.debug('Getting next available') logger.debug('Getting next available')
s = list(self.queue) s = list(self.queue)
random.shuffle(s) random.shuffle(s)
for worker in s: for worker in s:
if not worker.throttled and not worker.busy: if not worker.is_limited(uriparts) and not worker.busy:
return worker return worker
raise Exception('No worker is available') raise TwitterQueueException('No worker is available')
def next(self): def next(self, uriparts):
if not self.wait: if not self.wait:
return self._next() return self._next(uriparts)
while True: while True:
try: try:
return self._next() return self._next(uriparts)
except Exception: except TwitterQueueException:
available = filter(lambda x: not x.busy, self.queue) available = filter(lambda x: not x.busy, self.queue)
if available: if available:
first_worker = min(available, key=lambda x: x.throttled_time) diff = self.get_wait(uriparts)
diff = first_worker.throttled_time - time.time()
logger.info("All workers are throttled. Waiting %s seconds" % diff) logger.info("All workers are throttled. Waiting %s seconds" % diff)
else: else:
diff = 5 diff = 5

75
tests/test_crawlers.py Normal file
View File

@@ -0,0 +1,75 @@
from unittest import TestCase
import os
import types
import datetime
import time
from bitter import utils
from bitter.crawlers import TwitterQueue, TwitterWorker, TwitterQueueException
from bitter import config as c
class TestUtils(TestCase):
def setUp(self):
self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json'))
def test_create_worker(self):
assert len(self.wq.queue)==1
def test_get_limits(self):
w1 = list(self.wq.queue)[0]
print(w1.limits)
limitslook = w1.get_limit(['statuses', 'lookup'])
assert limitslook['remaining'] == limitslook['limit']
def test_set_limits(self):
w1 = list(self.wq.queue)[0]
w1.set_limit(['test', 'test2'], {'remaining': 0})
assert w1.get_limit(['test', 'test2']) == {'remaining': 0}
def test_await(self):
w1 = list(self.wq.queue)[0]
w1.set_limit(['test', 'wait'], {'remaining': 0, 'reset': time.time()+2})
assert w1.get_wait(['test', 'wait']) > 1
time.sleep(2)
assert w1.get_wait(['test', 'wait']) == 0
assert w1.get_wait(['statuses', 'lookup']) == 0
def test_is_limited(self):
w1 = list(self.wq.queue)[0]
assert not w1.is_limited(['statuses', 'lookup'])
w1.set_limit(['test', 'limited'], {'remaining': 0, 'reset': time.time()+100})
assert w1.is_limited(['test', 'limited'])
def test_call(self):
w1 = list(self.wq.queue)[0]
l1 = w1.get_limit(['users', 'lookup'])
resp = self.wq.users.lookup(screen_name='balkian')
l2 = w1.get_limit(['users', 'lookup'])
assert l1['remaining']-l2['remaining'] == 1
def test_consume(self):
w1 = list(self.wq.queue)[0]
l1 = w1.get_limit(['friends', 'list'])
self.wq.wait = False
for i in range(l1['remaining']):
print(i)
resp = self.wq.friends.list(screen_name='balkian')
# l2 = w1.get_limit(['users', 'lookup'])
# assert l2['remaining'] == 0
# self.wq.users.lookup(screen_name='balkian')
failed = False
try:
# resp = self.wq.friends.list(screen_name='balkian')
self.wq.next(['friends', 'list'])
except TwitterQueueException:
failed = True
assert failed
l2 = w1.get_limit(['friends', 'list'])
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)
time.sleep(w1.get_wait(['friends', 'list']))