1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-25 12:48:23 +00:00

2 Commits
0.5.0 ... 0.5.5

Author SHA1 Message Date
J. Fernando Sánchez
2036d51d96 Added limits to every call 2016-09-28 05:06:12 +02:00
J. Fernando Sánchez
09feb050a6 Changed versioning, added locks 2016-09-28 01:10:10 +02:00
10 changed files with 136 additions and 50 deletions

View File

@@ -1,6 +1,7 @@
include requirements.txt include requirements.txt
include test-requirements.txt include test-requirements.txt
include README.md include README.md
include bitter/VERSION
graft bitter/templates graft bitter/templates
graft bitter/static graft bitter/static
graft test graft test

1
bitter/VERSION Normal file
View File

@@ -0,0 +1 @@
0.5.5

View File

@@ -3,8 +3,15 @@ Bitter module. A library and cli for Twitter using python-twitter.
http://github.com/balkian/bitter http://github.com/balkian/bitter
""" """
from future.standard_library import install_aliases try:
install_aliases() from future.standard_library import install_aliases
install_aliases()
except ImportError:
# Avoid problems at setup.py and py3.x
pass
import os
from .version import __version__
__version__ = '0.5.0'
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -10,6 +10,7 @@ import sqlite3
from sqlalchemy import exists from sqlalchemy import exists
from bitter import utils, models, crawlers from bitter import utils, models, crawlers
from bitter import config as bconf
from bitter.models import make_session, User, ExtractorEntry, Following from bitter.models import make_session, User, ExtractorEntry, Following
import sys import sys
@@ -33,7 +34,7 @@ def main(ctx, verbose, logging_level, config, credentials):
ctx.obj = {} ctx.obj = {}
ctx.obj['VERBOSE'] = verbose ctx.obj['VERBOSE'] = verbose
ctx.obj['CONFIG'] = config ctx.obj['CONFIG'] = config
ctx.obj['CREDENTIALS'] = credentials bconf.CREDENTIALS = credentials
utils.create_credentials(credentials) utils.create_credentials(credentials)
@main.group() @main.group()
@@ -45,7 +46,7 @@ def tweet(ctx):
@click.argument('tweetid') @click.argument('tweetid')
@click.pass_context @click.pass_context
def get_tweet(ctx, tweetid): def get_tweet(ctx, tweetid):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
t = utils.get_tweet(wq, tweetid) t = utils.get_tweet(wq, tweetid)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@@ -54,7 +55,7 @@ def get_tweet(ctx, tweetid):
@click.argument('query') @click.argument('query')
@click.pass_context @click.pass_context
def get_tweet(ctx, query): def get_tweet(ctx, query):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() c = wq.next()
t = utils.search_tweet(c.client, query) t = utils.search_tweet(c.client, query)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@@ -63,7 +64,7 @@ def get_tweet(ctx, query):
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def get_tweet(ctx, user): def get_tweet(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() c = wq.next()
t = utils.user_timeline(c.client, user) t = utils.user_timeline(c.client, user)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@@ -88,7 +89,7 @@ def list_users(ctx, db):
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def get_user(ctx, user): def get_user(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() c = wq.next()
u = utils.get_user(c.client, user) u = utils.get_user(c.client, user)
print(json.dumps(u, indent=2)) print(json.dumps(u, indent=2))
@@ -112,7 +113,7 @@ def get_users(ctx, usersfile, skip, until, threads, db):
return ExitStack() return ExitStack()
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
len(wq.queue))) len(wq.queue)))
@@ -281,7 +282,7 @@ def users_extractor(ctx):
@click.pass_context @click.pass_context
def extract(ctx, recursive, user, name, initfile): def extract(ctx, recursive, user, name, initfile):
print(locals()) print(locals())
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
dburi = ctx.obj['DBURI'] dburi = ctx.obj['DBURI']
utils.extract(wq, utils.extract(wq,
recursive=recursive, recursive=recursive,
@@ -293,7 +294,7 @@ def extract(ctx, recursive, user, name, initfile):
@extractor.command('reset') @extractor.command('reset')
@click.pass_context @click.pass_context
def reset_extractor(ctx): def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
db = ctx.obj['DBURI'] db = ctx.obj['DBURI']
session = make_session(db) session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@@ -302,7 +303,7 @@ def reset_extractor(ctx):
@click.argument('url', required=False) @click.argument('url', required=False)
@click.pass_context @click.pass_context
def get_limits(ctx, url): def get_limits(ctx, url):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
for worker in wq.queue: for worker in wq.queue:
resp = worker.client.application.rate_limit_status() resp = worker.client.application.rate_limit_status()
print('#'*20) print('#'*20)
@@ -324,11 +325,10 @@ def get_limits(ctx, url):
@click.argument('CONSUMER_SECRET', required=True) @click.argument('CONSUMER_SECRET', required=True)
@click.pass_context @click.pass_context
def run_server(ctx, consumer_key, consumer_secret): def run_server(ctx, consumer_key, consumer_secret):
from . import config bconf.CONSUMER_KEY = consumer_key
config.CONSUMER_KEY = consumer_key bconf.CONSUMER_SECRET = consumer_secret
config.CONSUMER_SECRET = consumer_secret
from .webserver import app from .webserver import app
app.run() app.run(host='0.0.0.0')
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,4 +1,5 @@
import time import time
import datetime
import urllib import urllib
import random import random
import json import json
@@ -8,6 +9,7 @@ logger = logging.getLogger(__name__)
from twitter import * from twitter import *
from collections import OrderedDict from collections import OrderedDict
from threading import Lock
from . import utils from . import utils
from . import config from . import config
@@ -40,22 +42,52 @@ class TwitterWorker(object):
def __init__(self, name, client): def __init__(self, name, client):
self.name = name self.name = name
self.client = client self.client = client
self.throttled_time = False self._lock = Lock()
self.busy = False self.busy = False
self.limits = self.client.application.rate_limit_status()
@property def is_limited(self, uriparts):
def throttled(self): limit = self.get_limit(uriparts)
if not self.throttled_time: if limit and limit['remaining'] <=0:
return False t = datime.datetime.now()
t = time.time() delta = limit['reset'] - t
delta = self.throttled_time - t if delta < datetime.timedelta(seconds=1):
if delta > 0: return True
return True
return False return False
def throttle_until(self, epoch=None): def get_wait(self, uriparts):
self.throttled_time = int(epoch) limits = self.get_limit(uriparts)
logger.info("Worker %s throttled for %s seconds" % (self.name, str(epoch-time.time()))) if limits['remaining'] > 0:
return 0
reset = datetime.datetime.fromtimestamp(limits.get('reset', 0))
now = datetime.datetime.now()
return max(0, (reset-now).total_seconds())
def get_limit(self, uriparts):
uri = '/'+'/'.join(uriparts)
return self.limits.get('resources', {}).get(uriparts[0], {}).get(uri, {})
def set_limit(self, uriparts, value):
uri = '/'+'/'.join(uriparts)
if 'resources' not in self.limits:
self.limits['resources'] = {}
resources = self.limits['resources']
if uriparts[0] not in resources:
resources[uriparts[0]] = {}
resource = resources[uriparts[0]]
resource[uri] = value
def update_limits(self, uriparts, remaining, reset, limit):
self.set_limit(uriparts, {'remaining': remaining,
'reset': reset,
'limit': limit})
def update_limits_from_headers(self, uriparts, headers):
reset = float(headers.get('X-Rate-Limit-Reset', time.time() + 30))
remaining = int(headers.get('X-Rate-Limit-Remaining', 0))
limit = int(headers.get('X-Rate-Limit-Limit', -1))
self.update_limits(uriparts=uriparts, remaining=remaining, reset=reset, limit=limit)
class TwitterQueue(AttrToFunc): class TwitterQueue(AttrToFunc):
@@ -75,19 +107,20 @@ class TwitterQueue(AttrToFunc):
while True: while True:
c = None c = None
try: try:
c = self.next() c = self.next(uriparts)
c._lock.acquire()
c.busy = True c.busy = True
logger.debug('Next: {}'.format(c.name)) logger.debug('Next: {}'.format(c.name))
ping = time.time() ping = time.time()
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs) resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
pong = time.time() pong = time.time()
c.update_limits_from_headers(uriparts, resp.headers)
logger.debug('Took: {}'.format(pong-ping)) logger.debug('Took: {}'.format(pong-ping))
return resp return resp
except TwitterHTTPError as ex: except TwitterHTTPError as ex:
if ex.e.code in (429, 502, 503, 504): if ex.e.code in (429, 502, 503, 504):
limit = ex.e.headers.get('X-Rate-Limit-Reset', time.time() + 30)
logger.info('{} limited'.format(c.name)) logger.info('{} limited'.format(c.name))
c.throttle_until(limit) c.update_limits_from_headers(uriparts, ex.e.headers)
continue continue
else: else:
raise raise
@@ -97,11 +130,7 @@ class TwitterQueue(AttrToFunc):
finally: finally:
if c: if c:
c.busy = False c.busy = False
c._lock.release()
@property
def client(self):
return self.next().client
@classmethod @classmethod
def from_credentials(self, cred_file=None): def from_credentials(self, cred_file=None):
@@ -115,26 +144,26 @@ class TwitterQueue(AttrToFunc):
wq.ready(TwitterWorker(cred["user"], c)) wq.ready(TwitterWorker(cred["user"], c))
return wq return wq
def _next(self): def _next(self, uriparts):
logger.debug('Getting next available') logger.debug('Getting next available')
s = list(self.queue) s = list(self.queue)
random.shuffle(s) random.shuffle(s)
for worker in s: for worker in s:
if not worker.throttled and not worker.busy: if not worker.is_limited(uriparts) and not worker.busy:
return worker return worker
raise Exception('No worker is available') raise Exception('No worker is available')
def next(self): def next(self, uriparts):
if not self.wait: if not self.wait:
return self._next() return self._next(uriparts)
while True: while True:
try: try:
return self._next() return self._next(uriparts)
except Exception: except Exception:
available = filter(lambda x: not x.busy, self.queue) available = filter(lambda x: not x.busy, self.queue)
if available: if available:
first_worker = min(available, key=lambda x: x.throttled_time) first_worker = min(available, key=lambda x: x.get_wait(uriparts))
diff = first_worker.throttled_time - time.time() diff = first_worker.get_wait(uriparts)
logger.info("All workers are throttled. Waiting %s seconds" % diff) logger.info("All workers are throttled. Waiting %s seconds" % diff)
else: else:
diff = 5 diff = 5

4
bitter/version.py Normal file
View File

@@ -0,0 +1,4 @@
import os
with open(os.path.join(os.path.dirname(__file__), 'VERSION')) as f:
__version__ = f.read().strip()

View File

@@ -1 +1,2 @@
contextlib2 contextlib2
future

View File

@@ -1,4 +1,3 @@
sqlalchemy sqlalchemy
twitter twitter
click click
future

View File

@@ -23,16 +23,12 @@ if sys.version_info <= (3, 0):
install_reqs = [str(ir.req) for ir in install_reqs] install_reqs = [str(ir.req) for ir in install_reqs]
test_reqs = [str(ir.req) for ir in test_reqs] test_reqs = [str(ir.req) for ir in test_reqs]
with open('bitter/__init__.py') as f: from bitter import __version__
exec(f.read())
setup( setup(
name="bitter", name="bitter",
packages=['bitter'], packages=['bitter'],
description=''' description=" Simplifying how researchers access Data. It includes a CLI and a library.",
Simplifying how researchers access Data.
It includes a CLI and a library.
''',
author='J. Fernando Sanchez', author='J. Fernando Sanchez',
author_email='balkian@gmail.com', author_email='balkian@gmail.com',
url="http://balkian.com", url="http://balkian.com",

48
tests/test_crawlers.py Normal file
View File

@@ -0,0 +1,48 @@
from unittest import TestCase
import os
import types
import datetime
import time
from bitter import utils
from bitter.crawlers import TwitterQueue, TwitterWorker
from bitter import config as c
class TestUtils(TestCase):
def setUp(self):
self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json'))
def test_create_worker(self):
assert len(self.wq.queue)==1
def test_get_limits(self):
w1 = list(self.wq.queue)[0]
print(w1.limits)
limitslook = w1.get_limit(['statuses', 'lookup'])
assert limitslook['remaining'] == limitslook['limit']
def test_set_limits(self):
w1 = list(self.wq.queue)[0]
w1.set_limit(['test', 'test2'], {'remaining': 0})
assert w1.get_limit(['test', 'test2']) == {'remaining': 0}
def test_await(self):
w1 = list(self.wq.queue)[0]
w1.set_limit(['test', 'wait'], {'remaining': 0, 'reset': time.time()+2})
assert w1.get_wait(['test', 'wait']) > 1
time.sleep(2)
assert w1.get_wait(['test', 'wait']) == 0
assert w1.get_wait(['statuses', 'lookup']) == 0
def test_is_limited(self):
w1 = list(self.wq.queue)[0]
assert not w1.is_limited(['statuses', 'lookup'])
def test_call(self):
w1 = list(self.wq.queue)[0]
l1 = w1.get_limit(['users', 'lookup'])
resp = self.wq.users.lookup(screen_name='balkian')
l2 = w1.get_limit(['users', 'lookup'])
assert l1['remaining']-l2['remaining'] == 1