mirror of
https://github.com/balkian/bitter.git
synced 2024-12-22 08:28:12 +00:00
Added Streaming workers/queues
This commit is contained in:
parent
3f42879751
commit
738823c8a2
@ -1 +1 @@
|
|||||||
0.5.6
|
0.6.0
|
@ -46,7 +46,7 @@ def tweet(ctx):
|
|||||||
@click.argument('tweetid')
|
@click.argument('tweetid')
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
def get_tweet(ctx, tweetid):
|
def get_tweet(ctx, tweetid):
|
||||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS, 1)
|
||||||
t = utils.get_tweet(wq, tweetid)
|
t = utils.get_tweet(wq, tweetid)
|
||||||
print(json.dumps(t, indent=2))
|
print(json.dumps(t, indent=2))
|
||||||
|
|
||||||
@ -56,8 +56,7 @@ def get_tweet(ctx, tweetid):
|
|||||||
@click.pass_context
|
@click.pass_context
|
||||||
def get_tweet(ctx, query):
|
def get_tweet(ctx, query):
|
||||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
c = wq.next()
|
t = utils.search_tweet(wq, query)
|
||||||
t = utils.search_tweet(c.client, query)
|
|
||||||
print(json.dumps(t, indent=2))
|
print(json.dumps(t, indent=2))
|
||||||
|
|
||||||
@tweet.command('timeline')
|
@tweet.command('timeline')
|
||||||
@ -65,8 +64,7 @@ def get_tweet(ctx, query):
|
|||||||
@click.pass_context
|
@click.pass_context
|
||||||
def get_tweet(ctx, user):
|
def get_tweet(ctx, user):
|
||||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
c = wq.next()
|
t = utils.user_timeline(wq, user)
|
||||||
t = utils.user_timeline(c.client, user)
|
|
||||||
print(json.dumps(t, indent=2))
|
print(json.dumps(t, indent=2))
|
||||||
|
|
||||||
@main.group()
|
@main.group()
|
||||||
@ -90,8 +88,7 @@ def list_users(ctx, db):
|
|||||||
@click.pass_context
|
@click.pass_context
|
||||||
def get_user(ctx, user):
|
def get_user(ctx, user):
|
||||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
c = wq.next()
|
u = utils.get_user(wq, user)
|
||||||
u = utils.get_user(c.client, user)
|
|
||||||
print(json.dumps(u, indent=2))
|
print(json.dumps(u, indent=2))
|
||||||
|
|
||||||
@users.command('get')
|
@users.command('get')
|
||||||
@ -330,6 +327,20 @@ def run_server(ctx, consumer_key, consumer_secret):
|
|||||||
from .webserver import app
|
from .webserver import app
|
||||||
app.run(host='0.0.0.0')
|
app.run(host='0.0.0.0')
|
||||||
|
|
||||||
|
@main.group()
|
||||||
|
@click.pass_context
|
||||||
|
def stream(ctx):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@stream.command('get')
|
||||||
|
@click.pass_context
|
||||||
|
def get_stream(ctx):
|
||||||
|
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
|
||||||
|
|
||||||
|
iterator = wq.statuses.sample()
|
||||||
|
|
||||||
|
for tweet in iterator:
|
||||||
|
print(tweet)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
@ -9,6 +9,7 @@ logger = logging.getLogger(__name__)
|
|||||||
from twitter import *
|
from twitter import *
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
|
from itertools import islice
|
||||||
from . import utils
|
from . import utils
|
||||||
from . import config
|
from . import config
|
||||||
|
|
||||||
@ -37,13 +38,50 @@ class AttrToFunc(object):
|
|||||||
# kwargs[i] = a
|
# kwargs[i] = a
|
||||||
return self.handler(self.__uriparts, *args, **kwargs)
|
return self.handler(self.__uriparts, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class FromCredentialsMixin(object):
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_credentials(cls, cred_file=None, max_workers=None):
|
||||||
|
wq = cls()
|
||||||
|
|
||||||
|
for cred in islice(utils.get_credentials(cred_file), max_workers):
|
||||||
|
wq.ready(cls.worker_class(cred["user"], cred))
|
||||||
|
return wq
|
||||||
|
|
||||||
|
|
||||||
class TwitterWorker(object):
|
class TwitterWorker(object):
|
||||||
def __init__(self, name, client):
|
api_class = None
|
||||||
|
|
||||||
|
def __init__(self, name, creds):
|
||||||
self.name = name
|
self.name = name
|
||||||
self.client = client
|
self._client = None
|
||||||
|
self.cred = creds
|
||||||
self._lock = Lock()
|
self._lock = Lock()
|
||||||
self.busy = False
|
self.busy = False
|
||||||
self.limits = self.client.application.rate_limit_status()
|
|
||||||
|
@property
|
||||||
|
def client(self):
|
||||||
|
if not self._client:
|
||||||
|
auth=OAuth(self.cred['token_key'],
|
||||||
|
self.cred['token_secret'],
|
||||||
|
self.cred['consumer_key'],
|
||||||
|
self.cred['consumer_secret'])
|
||||||
|
self._client = self.api_class(auth=auth)
|
||||||
|
return self._client
|
||||||
|
|
||||||
|
class RestWorker(TwitterWorker):
|
||||||
|
api_class = Twitter
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self._limits = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def limits(self):
|
||||||
|
if not self._limits:
|
||||||
|
self._limits = self.client.application.rate_limit_status()
|
||||||
|
return self._limits
|
||||||
|
|
||||||
def is_limited(self, uriparts):
|
def is_limited(self, uriparts):
|
||||||
return self.get_wait(uriparts)>0
|
return self.get_wait(uriparts)>0
|
||||||
@ -83,10 +121,10 @@ class TwitterWorker(object):
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
class TwitterQueueException(BaseException):
|
class QueueException(BaseException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class TwitterQueue(AttrToFunc):
|
class QueueMixin(AttrToFunc, FromCredentialsMixin):
|
||||||
def __init__(self, wait=True):
|
def __init__(self, wait=True):
|
||||||
logger.debug('Creating worker queue')
|
logger.debug('Creating worker queue')
|
||||||
self.queue = set()
|
self.queue = set()
|
||||||
@ -97,6 +135,10 @@ class TwitterQueue(AttrToFunc):
|
|||||||
def ready(self, worker):
|
def ready(self, worker):
|
||||||
self.queue.add(worker)
|
self.queue.add(worker)
|
||||||
|
|
||||||
|
class TwitterQueue(QueueMixin):
|
||||||
|
|
||||||
|
worker_class = RestWorker
|
||||||
|
|
||||||
def handle_call(self, uriparts, *args, **kwargs):
|
def handle_call(self, uriparts, *args, **kwargs):
|
||||||
logger.debug('Called: {}'.format(uriparts))
|
logger.debug('Called: {}'.format(uriparts))
|
||||||
logger.debug('With: {} {}'.format(args, kwargs))
|
logger.debug('With: {} {}'.format(args, kwargs))
|
||||||
@ -132,21 +174,8 @@ class TwitterQueue(AttrToFunc):
|
|||||||
if not self.wait:
|
if not self.wait:
|
||||||
patience -= 1
|
patience -= 1
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_credentials(self, cred_file=None):
|
|
||||||
wq = TwitterQueue()
|
|
||||||
|
|
||||||
for cred in utils.get_credentials(cred_file):
|
|
||||||
c = Twitter(auth=OAuth(cred['token_key'],
|
|
||||||
cred['token_secret'],
|
|
||||||
cred['consumer_key'],
|
|
||||||
cred['consumer_secret']))
|
|
||||||
wq.ready(TwitterWorker(cred["user"], c))
|
|
||||||
return wq
|
|
||||||
|
|
||||||
|
|
||||||
def get_wait(self, uriparts):
|
def get_wait(self, uriparts):
|
||||||
available = filter(lambda x: not x.busy, self.queue)
|
available = next(lambda x: not x.busy, self.queue)
|
||||||
first_worker = min(available, key=lambda x: x.get_wait(uriparts))
|
first_worker = min(available, key=lambda x: x.get_wait(uriparts))
|
||||||
diff = first_worker.get_wait(uriparts)
|
diff = first_worker.get_wait(uriparts)
|
||||||
return diff
|
return diff
|
||||||
@ -159,7 +188,7 @@ class TwitterQueue(AttrToFunc):
|
|||||||
for worker in s:
|
for worker in s:
|
||||||
if not worker.is_limited(uriparts) and not worker.busy:
|
if not worker.is_limited(uriparts) and not worker.busy:
|
||||||
return worker
|
return worker
|
||||||
raise TwitterQueueException('No worker is available')
|
raise QueueException('No worker is available')
|
||||||
|
|
||||||
def next(self, uriparts):
|
def next(self, uriparts):
|
||||||
if not self.wait:
|
if not self.wait:
|
||||||
@ -167,7 +196,7 @@ class TwitterQueue(AttrToFunc):
|
|||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
return self._next(uriparts)
|
return self._next(uriparts)
|
||||||
except TwitterQueueException:
|
except QueueException:
|
||||||
available = filter(lambda x: not x.busy, self.queue)
|
available = filter(lambda x: not x.busy, self.queue)
|
||||||
if available:
|
if available:
|
||||||
diff = self.get_wait(uriparts)
|
diff = self.get_wait(uriparts)
|
||||||
@ -177,3 +206,44 @@ class TwitterQueue(AttrToFunc):
|
|||||||
logger.info("All workers are busy. Waiting %s seconds" % diff)
|
logger.info("All workers are busy. Waiting %s seconds" % diff)
|
||||||
time.sleep(diff)
|
time.sleep(diff)
|
||||||
|
|
||||||
|
class StreamWorker(TwitterWorker):
|
||||||
|
api_class = TwitterStream
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
class StreamQueue(QueueMixin):
|
||||||
|
worker_class = StreamWorker
|
||||||
|
|
||||||
|
def __init__(self, wait=True):
|
||||||
|
logger.debug('Creating worker queue')
|
||||||
|
self.queue = set()
|
||||||
|
self.index = 0
|
||||||
|
self.wait = wait
|
||||||
|
AttrToFunc.__init__(self, handler=self.handle_call)
|
||||||
|
|
||||||
|
def handle_call(self, uriparts, *args, **kwargs):
|
||||||
|
logger.debug('Called: {}'.format(uriparts))
|
||||||
|
logger.debug('With: {} {}'.format(args, kwargs))
|
||||||
|
c = None
|
||||||
|
c = self.next(uriparts)
|
||||||
|
c._lock.acquire()
|
||||||
|
c.busy = True
|
||||||
|
logger.debug('Next: {}'.format(c.name))
|
||||||
|
ping = time.time()
|
||||||
|
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
|
||||||
|
for i in resp:
|
||||||
|
yield i
|
||||||
|
pong = time.time()
|
||||||
|
logger.debug('Listening for: {}'.format(pong-ping))
|
||||||
|
c.busy = False
|
||||||
|
c._lock.release()
|
||||||
|
|
||||||
|
def next(self, uriparts):
|
||||||
|
logger.debug('Getting next available')
|
||||||
|
s = list(self.queue)
|
||||||
|
random.shuffle(s)
|
||||||
|
for worker in s:
|
||||||
|
if not worker.busy:
|
||||||
|
return worker
|
||||||
|
raise QueueException('No worker is available')
|
||||||
|
Loading…
Reference in New Issue
Block a user