diff --git a/bitter/VERSION b/bitter/VERSION index ad83b1b..09a3acf 100644 --- a/bitter/VERSION +++ b/bitter/VERSION @@ -1 +1 @@ -0.5.6 \ No newline at end of file +0.6.0 \ No newline at end of file diff --git a/bitter/cli.py b/bitter/cli.py index 938cc41..28a77e2 100644 --- a/bitter/cli.py +++ b/bitter/cli.py @@ -46,7 +46,7 @@ def tweet(ctx): @click.argument('tweetid') @click.pass_context def get_tweet(ctx, tweetid): - wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) + wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS, 1) t = utils.get_tweet(wq, tweetid) print(json.dumps(t, indent=2)) @@ -56,8 +56,7 @@ def get_tweet(ctx, tweetid): @click.pass_context def get_tweet(ctx, query): wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) - c = wq.next() - t = utils.search_tweet(c.client, query) + t = utils.search_tweet(wq, query) print(json.dumps(t, indent=2)) @tweet.command('timeline') @@ -65,8 +64,7 @@ def get_tweet(ctx, query): @click.pass_context def get_tweet(ctx, user): wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) - c = wq.next() - t = utils.user_timeline(c.client, user) + t = utils.user_timeline(wq, user) print(json.dumps(t, indent=2)) @main.group() @@ -90,8 +88,7 @@ def list_users(ctx, db): @click.pass_context def get_user(ctx, user): wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) - c = wq.next() - u = utils.get_user(c.client, user) + u = utils.get_user(wq, user) print(json.dumps(u, indent=2)) @users.command('get') @@ -330,6 +327,20 @@ def run_server(ctx, consumer_key, consumer_secret): from .webserver import app app.run(host='0.0.0.0') +@main.group() +@click.pass_context +def stream(ctx): + pass + +@stream.command('get') +@click.pass_context +def get_stream(ctx): + wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1) + + iterator = wq.statuses.sample() + + for tweet in iterator: + print(tweet) if __name__ == '__main__': main() diff --git a/bitter/crawlers.py b/bitter/crawlers.py index 686e472..0cdf200 100644 --- a/bitter/crawlers.py +++ b/bitter/crawlers.py @@ -9,6 +9,7 @@ logger = logging.getLogger(__name__) from twitter import * from collections import OrderedDict from threading import Lock +from itertools import islice from . import utils from . import config @@ -37,13 +38,50 @@ class AttrToFunc(object): # kwargs[i] = a return self.handler(self.__uriparts, *args, **kwargs) + +class FromCredentialsMixin(object): + + @classmethod + def from_credentials(cls, cred_file=None, max_workers=None): + wq = cls() + + for cred in islice(utils.get_credentials(cred_file), max_workers): + wq.ready(cls.worker_class(cred["user"], cred)) + return wq + + class TwitterWorker(object): - def __init__(self, name, client): + api_class = None + + def __init__(self, name, creds): self.name = name - self.client = client + self._client = None + self.cred = creds self._lock = Lock() self.busy = False - self.limits = self.client.application.rate_limit_status() + + @property + def client(self): + if not self._client: + auth=OAuth(self.cred['token_key'], + self.cred['token_secret'], + self.cred['consumer_key'], + self.cred['consumer_secret']) + self._client = self.api_class(auth=auth) + return self._client + +class RestWorker(TwitterWorker): + api_class = Twitter + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._limits = None + + @property + def limits(self): + if not self._limits: + self._limits = self.client.application.rate_limit_status() + return self._limits def is_limited(self, uriparts): return self.get_wait(uriparts)>0 @@ -83,10 +121,10 @@ class TwitterWorker(object): -class TwitterQueueException(BaseException): +class QueueException(BaseException): pass -class TwitterQueue(AttrToFunc): +class QueueMixin(AttrToFunc, FromCredentialsMixin): def __init__(self, wait=True): logger.debug('Creating worker queue') self.queue = set() @@ -97,6 +135,10 @@ class TwitterQueue(AttrToFunc): def ready(self, worker): self.queue.add(worker) +class TwitterQueue(QueueMixin): + + worker_class = RestWorker + def handle_call(self, uriparts, *args, **kwargs): logger.debug('Called: {}'.format(uriparts)) logger.debug('With: {} {}'.format(args, kwargs)) @@ -132,21 +174,8 @@ class TwitterQueue(AttrToFunc): if not self.wait: patience -= 1 - @classmethod - def from_credentials(self, cred_file=None): - wq = TwitterQueue() - - for cred in utils.get_credentials(cred_file): - c = Twitter(auth=OAuth(cred['token_key'], - cred['token_secret'], - cred['consumer_key'], - cred['consumer_secret'])) - wq.ready(TwitterWorker(cred["user"], c)) - return wq - - def get_wait(self, uriparts): - available = filter(lambda x: not x.busy, self.queue) + available = next(lambda x: not x.busy, self.queue) first_worker = min(available, key=lambda x: x.get_wait(uriparts)) diff = first_worker.get_wait(uriparts) return diff @@ -159,7 +188,7 @@ class TwitterQueue(AttrToFunc): for worker in s: if not worker.is_limited(uriparts) and not worker.busy: return worker - raise TwitterQueueException('No worker is available') + raise QueueException('No worker is available') def next(self, uriparts): if not self.wait: @@ -167,7 +196,7 @@ class TwitterQueue(AttrToFunc): while True: try: return self._next(uriparts) - except TwitterQueueException: + except QueueException: available = filter(lambda x: not x.busy, self.queue) if available: diff = self.get_wait(uriparts) @@ -177,3 +206,44 @@ class TwitterQueue(AttrToFunc): logger.info("All workers are busy. Waiting %s seconds" % diff) time.sleep(diff) +class StreamWorker(TwitterWorker): + api_class = TwitterStream + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + +class StreamQueue(QueueMixin): + worker_class = StreamWorker + + def __init__(self, wait=True): + logger.debug('Creating worker queue') + self.queue = set() + self.index = 0 + self.wait = wait + AttrToFunc.__init__(self, handler=self.handle_call) + + def handle_call(self, uriparts, *args, **kwargs): + logger.debug('Called: {}'.format(uriparts)) + logger.debug('With: {} {}'.format(args, kwargs)) + c = None + c = self.next(uriparts) + c._lock.acquire() + c.busy = True + logger.debug('Next: {}'.format(c.name)) + ping = time.time() + resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs) + for i in resp: + yield i + pong = time.time() + logger.debug('Listening for: {}'.format(pong-ping)) + c.busy = False + c._lock.release() + + def next(self, uriparts): + logger.debug('Getting next available') + s = list(self.queue) + random.shuffle(s) + for worker in s: + if not worker.busy: + return worker + raise QueueException('No worker is available')