1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-25 04:38:25 +00:00

11 Commits
0.5.4 ... 0.7.0

Author SHA1 Message Date
J. Fernando Sánchez
4b2f107b8a Py2 compatibility and queue handling
* Removed install_aliases(), which caused problems with urllib2
* Better waiting time calculation (used in queue handling)
2016-11-23 12:27:53 +01:00
J. Fernando Sánchez
67ef307cce Improved tweet extraction @ CLI 2016-11-23 10:50:01 +01:00
J. Fernando Sánchez
cb054ac365 Added (bulk) user download 2016-11-22 20:07:18 +01:00
J. Fernando Sánchez
bdc4690240 Fixed tail argument 2016-11-19 21:42:08 +01:00
J. Fernando Sánchez
c0309a1e52 Fixed python2 compatibility issues (print!) 2016-11-19 20:38:44 +01:00
J. Fernando Sánchez
4afdd6807d Fixed MANIFEST error 2016-11-19 20:24:02 +01:00
J. Fernando Sánchez
38605ba2c8 Added stream to CLI
* Save stream to file
* Parse file and get the most important hashtags
2016-11-19 20:16:56 +01:00
J. Fernando Sánchez
738823c8a2 Added Streaming workers/queues 2016-11-18 16:08:29 +01:00
J. Fernando Sánchez
3f42879751 Bumped to v0.5.6 2016-09-28 06:31:33 +02:00
J. Fernando Sánchez
35f0c6376d Fixed limits bug, added tests 2016-09-28 06:30:49 +02:00
J. Fernando Sánchez
2036d51d96 Added limits to every call 2016-09-28 05:06:12 +02:00
9 changed files with 414 additions and 86 deletions

View File

@@ -1,4 +1,5 @@
include requirements.txt include requirements.txt
include requirements-py2.txt
include test-requirements.txt include test-requirements.txt
include README.md include README.md
include bitter/VERSION include bitter/VERSION

View File

@@ -1 +1 @@
0.5.4 0.7.0

View File

@@ -3,13 +3,6 @@ Bitter module. A library and cli for Twitter using python-twitter.
http://github.com/balkian/bitter http://github.com/balkian/bitter
""" """
try:
from future.standard_library import install_aliases
install_aliases()
except ImportError:
# Avoid problems at setup.py and py3.x
pass
import os import os
from .version import __version__ from .version import __version__

View File

@@ -1,3 +1,5 @@
from __future__ import print_function
import click import click
import json import json
import os import os
@@ -6,10 +8,12 @@ import time
import sqlalchemy.types import sqlalchemy.types
import threading import threading
import sqlite3 import sqlite3
from tqdm import tqdm
from sqlalchemy import exists from sqlalchemy import exists
from bitter import utils, models, crawlers from bitter import utils, models, crawlers
from bitter import config as bconf
from bitter.models import make_session, User, ExtractorEntry, Following from bitter.models import make_session, User, ExtractorEntry, Following
import sys import sys
@@ -33,7 +37,7 @@ def main(ctx, verbose, logging_level, config, credentials):
ctx.obj = {} ctx.obj = {}
ctx.obj['VERBOSE'] = verbose ctx.obj['VERBOSE'] = verbose
ctx.obj['CONFIG'] = config ctx.obj['CONFIG'] = config
ctx.obj['CREDENTIALS'] = credentials bconf.CREDENTIALS = credentials
utils.create_credentials(credentials) utils.create_credentials(credentials)
@main.group() @main.group()
@@ -42,30 +46,55 @@ def tweet(ctx):
pass pass
@tweet.command('get') @tweet.command('get')
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
@click.argument('tweetid') @click.argument('tweetid')
@click.pass_context def get_tweet(tweetid, write, folder, update):
def get_tweet(ctx, tweetid): wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) if not write:
t = utils.get_tweet(wq, tweetid) t = utils.get_tweet(wq, tweetid)
print(json.dumps(t, indent=2)) js = json.dumps(t, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % tweetid)
if not update and os.path.exists(file) and os.path.isfile(file):
print('%s: Tweet exists' % tweetid)
return
try:
t = utils.get_tweet(wq, tweetid)
with open(file, 'w') as f:
js = json.dumps(t, indent=2)
print(js, file=f)
except Exception as ex:
print('%s: %s' % (tweetid, ex), file=sys.stderr)
@tweet.command('get_all')
@click.argument('tweetsfile', 'File with a list of tweets to look up')
@click.option('-f', '--folder', default="tweets")
@click.pass_context
def get_tweets(ctx, tweetsfile, folder):
with open(tweetsfile) as f:
for line in f:
tid = line.strip()
ctx.invoke(get_tweet, folder=folder, tweetid=tid, write=True)
@tweet.command('search') @tweet.command('search')
@click.argument('query') @click.argument('query')
@click.pass_context @click.pass_context
def get_tweet(ctx, query): def search(ctx, query):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() t = utils.search_tweet(wq, query)
t = utils.search_tweet(c.client, query)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@tweet.command('timeline') @tweet.command('timeline')
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def get_tweet(ctx, user): def timeline(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() t = utils.user_timeline(wq, user)
t = utils.user_timeline(c.client, user)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@main.group() @main.group()
@@ -84,23 +113,47 @@ def list_users(ctx, db):
for j in i.__dict__: for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j))) print('\t{}: {}'.format(j, getattr(i,j)))
@users.command('get_one')
@click.argument('user')
@click.pass_context
def get_user(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
c = wq.next()
u = utils.get_user(c.client, user)
print(json.dumps(u, indent=2))
@users.command('get') @users.command('get')
@click.argument('user')
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
def get_user(user, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
if not write:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % user)
if not update and os.path.exists(file) and os.path.isfile(file):
print('User exists: %s' % user)
return
with open(file, 'w') as f:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js, file=f)
@users.command('get_all')
@click.argument('usersfile', 'File with a list of users to look up')
@click.option('-f', '--folder', default="users")
@click.pass_context
def get_users(ctx, usersfile, folder):
with open(usersfile) as f:
for line in f:
uid = line.strip()
ctx.invoke(get_user, folder=folder, user=uid, write=True)
@users.command('crawl')
@click.option('--db', required=True, help='Database to save all users.') @click.option('--db', required=True, help='Database to save all users.')
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.') @click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.') @click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.') @click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
@click.argument('usersfile', 'File with a list of users to look up') @click.argument('usersfile', 'File with a list of users to look up')
@click.pass_context @click.pass_context
def get_users(ctx, usersfile, skip, until, threads, db): def crawl_users(ctx, usersfile, skip, until, threads, db):
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
if '://' not in db: if '://' not in db:
@@ -112,7 +165,7 @@ def get_users(ctx, usersfile, skip, until, threads, db):
return ExitStack() return ExitStack()
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
len(wq.queue))) len(wq.queue)))
@@ -281,7 +334,7 @@ def users_extractor(ctx):
@click.pass_context @click.pass_context
def extract(ctx, recursive, user, name, initfile): def extract(ctx, recursive, user, name, initfile):
print(locals()) print(locals())
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
dburi = ctx.obj['DBURI'] dburi = ctx.obj['DBURI']
utils.extract(wq, utils.extract(wq,
recursive=recursive, recursive=recursive,
@@ -293,7 +346,7 @@ def extract(ctx, recursive, user, name, initfile):
@extractor.command('reset') @extractor.command('reset')
@click.pass_context @click.pass_context
def reset_extractor(ctx): def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
db = ctx.obj['DBURI'] db = ctx.obj['DBURI']
session = make_session(db) session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@@ -302,7 +355,7 @@ def reset_extractor(ctx):
@click.argument('url', required=False) @click.argument('url', required=False)
@click.pass_context @click.pass_context
def get_limits(ctx, url): def get_limits(ctx, url):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
for worker in wq.queue: for worker in wq.queue:
resp = worker.client.application.rate_limit_status() resp = worker.client.application.rate_limit_status()
print('#'*20) print('#'*20)
@@ -324,11 +377,75 @@ def get_limits(ctx, url):
@click.argument('CONSUMER_SECRET', required=True) @click.argument('CONSUMER_SECRET', required=True)
@click.pass_context @click.pass_context
def run_server(ctx, consumer_key, consumer_secret): def run_server(ctx, consumer_key, consumer_secret):
from . import config bconf.CONSUMER_KEY = consumer_key
config.CONSUMER_KEY = consumer_key bconf.CONSUMER_SECRET = consumer_secret
config.CONSUMER_SECRET = consumer_secret
from .webserver import app from .webserver import app
app.run() app.run(host='0.0.0.0')
@main.group()
@click.pass_context
def stream(ctx):
pass
@stream.command('get')
@click.option('-l', '--locations', default=None)
@click.option('-t', '--track', default=None)
@click.option('-f', '--file', help='File to store the stream of tweets')
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
@click.pass_context
def get_stream(ctx, locations, track, file, politelyretry):
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
query_args = {}
if locations:
query_args['locations'] = locations
if track:
query_args['track'] = track
if not file:
file = sys.stdout
else:
file = open(file, 'a')
def insist():
lasthangup = time.time()
while True:
if not query_args:
iterator = wq.statuses.sample()
else:
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
for i in iterator:
yield i
if not politelyretry:
return
thishangup = time.time()
if thishangup - lasthangup < 60:
raise Exception('Too many hangups in a row.')
time.sleep(3)
for tweet in tqdm(insist()):
print(json.dumps(tweet), file=file)
if file != sys.stdout:
file.close()
@stream.command('read')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False)
@click.pass_context
def read_stream(ctx, file, tail):
for tweet in utils.read_file(file, tail=tail):
try:
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
except (KeyError, TypeError):
print('Raw tweet: {}'.format(tweet))
@stream.command('tags')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.argument('limit', required=False, default=None, type=int)
@click.pass_context
def tags_stream(ctx, file, limit):
c = utils.get_hashtags(utils.read_file(file))
for count, tag in c.most_common(limit):
print(u'{} - {}'.format(count, tag))
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -9,6 +9,12 @@ logger = logging.getLogger(__name__)
from twitter import * from twitter import *
from collections import OrderedDict from collections import OrderedDict
from threading import Lock from threading import Lock
from itertools import islice
try:
import itertools.ifilter as filter
except ImportError:
pass
from . import utils from . import utils
from . import config from . import config
@@ -37,30 +43,96 @@ class AttrToFunc(object):
# kwargs[i] = a # kwargs[i] = a
return self.handler(self.__uriparts, *args, **kwargs) return self.handler(self.__uriparts, *args, **kwargs)
class FromCredentialsMixin(object):
@classmethod
def from_credentials(cls, cred_file=None, max_workers=None):
wq = cls()
for cred in islice(utils.get_credentials(cred_file), max_workers):
wq.ready(cls.worker_class(cred["user"], cred))
return wq
class TwitterWorker(object): class TwitterWorker(object):
def __init__(self, name, client): api_class = None
def __init__(self, name, creds):
self.name = name self.name = name
self.client = client self._client = None
self.throttled_time = False self.cred = creds
self._lock = Lock() self._lock = Lock()
self.busy = False self.busy = False
@property @property
def throttled(self): def client(self):
if not self.throttled_time: if not self._client:
return False auth=OAuth(self.cred['token_key'],
t = time.time() self.cred['token_secret'],
delta = self.throttled_time - t self.cred['consumer_key'],
if delta > 0: self.cred['consumer_secret'])
return True self._client = self.api_class(auth=auth)
return False return self._client
def throttle_until(self, epoch=None): class RestWorker(TwitterWorker):
self.throttled_time = int(epoch) api_class = Twitter
logger.info("Worker %s throttled for %s seconds" % (self.name, str(epoch-time.time())))
def __init__(self, *args, **kwargs):
super(RestWorker, self).__init__(*args, **kwargs)
self._limits = None
@property
def limits(self):
if not self._limits:
self._limits = self.client.application.rate_limit_status()
return self._limits
def is_limited(self, uriparts):
return self.get_wait(uriparts)>0
def get_wait(self, uriparts):
limits = self.get_limit(uriparts)
if limits['remaining'] > 0:
return 0
reset = limits.get('reset', 0)
now = time.time()
return max(0, (reset-now))
def get_limit(self, uriparts):
uri = '/'+'/'.join(uriparts)
for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items():
if ix.startswith(uri):
return i
return {}
def set_limit(self, uriparts, value):
uri = '/'+'/'.join(uriparts)
if 'resources' not in self.limits:
self.limits['resources'] = {}
resources = self.limits['resources']
if uriparts[0] not in resources:
resources[uriparts[0]] = {}
resource = resources[uriparts[0]]
resource[uri] = value
def update_limits(self, uriparts, remaining, reset, limit):
self.set_limit(uriparts, {'remaining': remaining,
'reset': reset,
'limit': limit})
def update_limits_from_headers(self, uriparts, headers):
reset = float(headers.get('X-Rate-Limit-Reset', time.time() + 30))
remaining = int(headers.get('X-Rate-Limit-Remaining', 0))
limit = int(headers.get('X-Rate-Limit-Limit', -1))
self.update_limits(uriparts=uriparts, remaining=remaining, reset=reset, limit=limit)
class TwitterQueue(AttrToFunc):
class QueueException(BaseException):
pass
class QueueMixin(AttrToFunc, FromCredentialsMixin):
def __init__(self, wait=True): def __init__(self, wait=True):
logger.debug('Creating worker queue') logger.debug('Creating worker queue')
self.queue = set() self.queue = set()
@@ -71,77 +143,119 @@ class TwitterQueue(AttrToFunc):
def ready(self, worker): def ready(self, worker):
self.queue.add(worker) self.queue.add(worker)
class TwitterQueue(QueueMixin):
worker_class = RestWorker
def handle_call(self, uriparts, *args, **kwargs): def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts)) logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs)) logger.debug('With: {} {}'.format(args, kwargs))
while True: patience = 1
while patience:
c = None c = None
try: try:
c = self.next() c = self.next(uriparts)
c._lock.acquire() c._lock.acquire()
c.busy = True c.busy = True
logger.debug('Next: {}'.format(c.name)) logger.debug('Next: {}'.format(c.name))
ping = time.time() ping = time.time()
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs) resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
pong = time.time() pong = time.time()
c.update_limits_from_headers(uriparts, resp.headers)
logger.debug('Took: {}'.format(pong-ping)) logger.debug('Took: {}'.format(pong-ping))
return resp return resp
except TwitterHTTPError as ex: except TwitterHTTPError as ex:
if ex.e.code in (429, 502, 503, 504): if ex.e.code in (429, 502, 503, 504):
limit = ex.e.headers.get('X-Rate-Limit-Reset', time.time() + 30)
logger.info('{} limited'.format(c.name)) logger.info('{} limited'.format(c.name))
c.throttle_until(limit) c.update_limits_from_headers(uriparts, ex.e.headers)
continue continue
else: else:
raise raise
except urllib.error.URLError as ex: except urllib.error.URLError as ex:
time.sleep(5) time.sleep(5)
logger.info('Something fishy happened: {}'.format(ex)) logger.info('Something fishy happened: {}'.format(ex))
raise
finally: finally:
if c: if c:
c.busy = False c.busy = False
c._lock.release() c._lock.release()
if not self.wait:
patience -= 1
@property def get_wait(self, uriparts):
def client(self): # Stop as soon as one is available to avoid initiating the rest
return self.next().client for i in self.queue:
if not i.busy and i.get_wait(uriparts) == 0:
return 0
# If None is available, let's see how much we have to wait
available = filter(lambda x: not x.busy, self.queue)
diff = min(worker.get_wait(uriparts) for worker in self.queue if not worker.busy)
return diff
@classmethod def _next(self, uriparts):
def from_credentials(self, cred_file=None):
wq = TwitterQueue()
for cred in utils.get_credentials(cred_file):
c = Twitter(auth=OAuth(cred['token_key'],
cred['token_secret'],
cred['consumer_key'],
cred['consumer_secret']))
wq.ready(TwitterWorker(cred["user"], c))
return wq
def _next(self):
logger.debug('Getting next available') logger.debug('Getting next available')
s = list(self.queue) s = list(self.queue)
random.shuffle(s) random.shuffle(s)
for worker in s: for worker in s:
if not worker.throttled and not worker.busy: if not worker.is_limited(uriparts) and not worker.busy:
return worker return worker
raise Exception('No worker is available') raise QueueException('No worker is available')
def next(self): def next(self, uriparts):
if not self.wait: if not self.wait:
return self._next() return self._next(uriparts)
while True: while True:
try: try:
return self._next() return self._next(uriparts)
except Exception: except QueueException:
available = filter(lambda x: not x.busy, self.queue) available = filter(lambda x: not x.busy, self.queue)
if available: if available:
first_worker = min(available, key=lambda x: x.throttled_time) diff = self.get_wait(uriparts)
diff = first_worker.throttled_time - time.time()
logger.info("All workers are throttled. Waiting %s seconds" % diff) logger.info("All workers are throttled. Waiting %s seconds" % diff)
else: else:
diff = 5 diff = 5
logger.info("All workers are busy. Waiting %s seconds" % diff) logger.info("All workers are busy. Waiting %s seconds" % diff)
time.sleep(diff) time.sleep(diff)
class StreamWorker(TwitterWorker):
api_class = TwitterStream
def __init__(self, *args, **kwargs):
super(StreamWorker, self).__init__(*args, **kwargs)
class StreamQueue(QueueMixin):
worker_class = StreamWorker
def __init__(self, wait=True):
logger.debug('Creating worker queue')
self.queue = set()
self.index = 0
self.wait = wait
AttrToFunc.__init__(self, handler=self.handle_call)
def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs))
c = None
c = self.next(uriparts)
c._lock.acquire()
c.busy = True
logger.debug('Next: {}'.format(c.name))
ping = time.time()
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
for i in resp:
yield i
pong = time.time()
logger.debug('Listening for: {}'.format(pong-ping))
c.busy = False
c._lock.release()
def next(self, uriparts):
logger.debug('Getting next available')
s = list(self.queue)
random.shuffle(s)
for worker in s:
if not worker.busy:
return worker
raise QueueException('No worker is available')

View File

@@ -11,7 +11,8 @@ from multiprocessing.pool import ThreadPool
from itertools import islice from itertools import islice
from contextlib import contextmanager from contextlib import contextmanager
from itertools import zip_longest from future.moves.itertools import zip_longest
from collections import Counter
from twitter import TwitterHTTPError from twitter import TwitterHTTPError
@@ -86,6 +87,26 @@ def add_credentials(credfile=None, **creds):
f.write('\n') f.write('\n')
def get_hashtags(iter_tweets, best=None):
c = Counter()
for tweet in iter_tweets:
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
return c
def read_file(filename, tail=False):
with open(filename) as f:
while True:
line = f.readline()
if line not in (None, '', '\n'):
tweet = json.loads(line.strip())
yield tweet
else:
if tail:
time.sleep(1)
else:
return
def get_users(wq, ulist, by_name=False, queue=None, max_users=100): def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
t = 'name' if by_name else 'uid' t = 'name' if by_name else 'uid'
logger.debug('Getting users by {}: {}'.format(t, ulist)) logger.debug('Getting users by {}: {}'.format(t, ulist))

View File

@@ -1,3 +1,4 @@
sqlalchemy sqlalchemy
twitter twitter
click click
tqdm

View File

@@ -43,5 +43,14 @@ setup(
entry_points=""" entry_points="""
[console_scripts] [console_scripts]
bitter=bitter.cli:main bitter=bitter.cli:main
""" """,
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Science/Research',
'License :: OSI Approved :: Apache 2 License',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
]
) )

72
tests/test_crawlers.py Normal file
View File

@@ -0,0 +1,72 @@
from unittest import TestCase
import os
import types
import datetime
import time
from bitter import utils
from bitter.crawlers import TwitterQueue, TwitterWorker, QueueException
from bitter import config as c
class TestUtils(TestCase):
def setUp(self):
self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json'))
def test_create_worker(self):
assert len(self.wq.queue)==1
def test_get_limits(self):
w1 = list(self.wq.queue)[0]
print(w1.limits)
limitslook = w1.get_limit(['statuses', 'lookup'])
assert limitslook['remaining'] == limitslook['limit']
def test_set_limits(self):
w1 = list(self.wq.queue)[0]
w1.set_limit(['test', 'test2'], {'remaining': 0})
assert w1.get_limit(['test', 'test2']) == {'remaining': 0}
def test_await(self):
w1 = list(self.wq.queue)[0]
w1.set_limit(['test', 'wait'], {'remaining': 0, 'reset': time.time()+2})
assert w1.get_wait(['test', 'wait']) > 1
time.sleep(2)
assert w1.get_wait(['test', 'wait']) == 0
assert w1.get_wait(['statuses', 'lookup']) == 0
def test_is_limited(self):
w1 = list(self.wq.queue)[0]
assert not w1.is_limited(['statuses', 'lookup'])
w1.set_limit(['test', 'limited'], {'remaining': 0, 'reset': time.time()+100})
assert w1.is_limited(['test', 'limited'])
def test_call(self):
w1 = list(self.wq.queue)[0]
l1 = w1.get_limit(['users', 'lookup'])
resp = self.wq.users.lookup(screen_name='balkian')
l2 = w1.get_limit(['users', 'lookup'])
assert l1['remaining']-l2['remaining'] == 1
def test_consume(self):
w1 = list(self.wq.queue)[0]
l1 = w1.get_limit(['friends', 'list'])
self.wq.wait = False
for i in range(l1['remaining']):
print(i)
resp = self.wq.friends.list(screen_name='balkian')
# l2 = w1.get_limit(['users', 'lookup'])
# assert l2['remaining'] == 0
# self.wq.users.lookup(screen_name='balkian')
failed = False
try:
# resp = self.wq.friends.list(screen_name='balkian')
self.wq.next(['friends', 'list'])
except QueueException:
failed = True
assert failed
l2 = w1.get_limit(['friends', 'list'])
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)