mirror of
				https://github.com/balkian/bitter.git
				synced 2025-10-25 04:38:25 +00:00 
			
		
		
		
	Compare commits
	
		
			4 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 3f42879751 | ||
|  | 35f0c6376d | ||
|  | 2036d51d96 | ||
|  | 09feb050a6 | 
| @@ -1,6 +1,7 @@ | ||||
| include requirements.txt | ||||
| include test-requirements.txt | ||||
| include README.md | ||||
| include bitter/VERSION | ||||
| graft bitter/templates | ||||
| graft bitter/static | ||||
| graft test | ||||
|   | ||||
							
								
								
									
										1
									
								
								bitter/VERSION
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								bitter/VERSION
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1 @@ | ||||
| 0.5.6 | ||||
| @@ -3,8 +3,15 @@ Bitter module. A library and cli for Twitter using python-twitter. | ||||
| http://github.com/balkian/bitter | ||||
| """ | ||||
|  | ||||
| from future.standard_library import install_aliases | ||||
| install_aliases() | ||||
| try: | ||||
|     from future.standard_library import install_aliases | ||||
|     install_aliases() | ||||
| except ImportError: | ||||
|     # Avoid problems at setup.py and py3.x | ||||
|     pass | ||||
|  | ||||
| import os | ||||
|  | ||||
| from .version import __version__ | ||||
|  | ||||
| __version__ = '0.5.0' | ||||
| __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] | ||||
|   | ||||
| @@ -10,6 +10,7 @@ import sqlite3 | ||||
| from sqlalchemy import exists | ||||
|  | ||||
| from bitter import utils, models, crawlers | ||||
| from bitter import config as bconf | ||||
| from bitter.models import make_session, User, ExtractorEntry, Following | ||||
|  | ||||
| import sys | ||||
| @@ -33,7 +34,7 @@ def main(ctx, verbose, logging_level, config, credentials): | ||||
|     ctx.obj = {} | ||||
|     ctx.obj['VERBOSE'] = verbose | ||||
|     ctx.obj['CONFIG'] = config | ||||
|     ctx.obj['CREDENTIALS'] = credentials | ||||
|     bconf.CREDENTIALS = credentials | ||||
|     utils.create_credentials(credentials) | ||||
|  | ||||
| @main.group() | ||||
| @@ -45,7 +46,7 @@ def tweet(ctx): | ||||
| @click.argument('tweetid') | ||||
| @click.pass_context  | ||||
| def get_tweet(ctx, tweetid): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     t = utils.get_tweet(wq, tweetid) | ||||
|     print(json.dumps(t, indent=2)) | ||||
|          | ||||
| @@ -54,7 +55,7 @@ def get_tweet(ctx, tweetid): | ||||
| @click.argument('query') | ||||
| @click.pass_context  | ||||
| def get_tweet(ctx, query): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     c = wq.next() | ||||
|     t = utils.search_tweet(c.client, query) | ||||
|     print(json.dumps(t, indent=2)) | ||||
| @@ -63,7 +64,7 @@ def get_tweet(ctx, query): | ||||
| @click.argument('user') | ||||
| @click.pass_context  | ||||
| def get_tweet(ctx, user): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     c = wq.next() | ||||
|     t = utils.user_timeline(c.client, user) | ||||
|     print(json.dumps(t, indent=2)) | ||||
| @@ -88,7 +89,7 @@ def list_users(ctx, db): | ||||
| @click.argument('user') | ||||
| @click.pass_context  | ||||
| def get_user(ctx, user): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     c = wq.next() | ||||
|     u = utils.get_user(c.client, user) | ||||
|     print(json.dumps(u, indent=2)) | ||||
| @@ -112,7 +113,7 @@ def get_users(ctx, usersfile, skip, until, threads, db): | ||||
|             return ExitStack() | ||||
|  | ||||
|  | ||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, | ||||
|                                                                                       len(wq.queue))) | ||||
|  | ||||
| @@ -281,7 +282,7 @@ def users_extractor(ctx): | ||||
| @click.pass_context | ||||
| def extract(ctx, recursive, user, name, initfile): | ||||
|     print(locals()) | ||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     dburi = ctx.obj['DBURI'] | ||||
|     utils.extract(wq, | ||||
|                   recursive=recursive, | ||||
| @@ -293,7 +294,7 @@ def extract(ctx, recursive, user, name, initfile): | ||||
| @extractor.command('reset') | ||||
| @click.pass_context | ||||
| def reset_extractor(ctx): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     db = ctx.obj['DBURI'] | ||||
|     session = make_session(db) | ||||
|     session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) | ||||
| @@ -302,7 +303,7 @@ def reset_extractor(ctx): | ||||
| @click.argument('url', required=False) | ||||
| @click.pass_context | ||||
| def get_limits(ctx, url): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     for worker in wq.queue: | ||||
|         resp = worker.client.application.rate_limit_status() | ||||
|         print('#'*20) | ||||
| @@ -324,11 +325,10 @@ def get_limits(ctx, url): | ||||
| @click.argument('CONSUMER_SECRET', required=True) | ||||
| @click.pass_context | ||||
| def run_server(ctx, consumer_key, consumer_secret): | ||||
|     from . import config | ||||
|     config.CONSUMER_KEY = consumer_key | ||||
|     config.CONSUMER_SECRET = consumer_secret | ||||
|     bconf.CONSUMER_KEY = consumer_key | ||||
|     bconf.CONSUMER_SECRET = consumer_secret | ||||
|     from .webserver import app | ||||
|     app.run() | ||||
|     app.run(host='0.0.0.0') | ||||
|      | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|   | ||||
| @@ -8,6 +8,7 @@ logger = logging.getLogger(__name__) | ||||
|  | ||||
| from twitter import * | ||||
| from collections import OrderedDict | ||||
| from threading import Lock | ||||
| from . import utils | ||||
| from . import config | ||||
|  | ||||
| @@ -40,23 +41,50 @@ class TwitterWorker(object): | ||||
|     def __init__(self, name, client): | ||||
|         self.name = name | ||||
|         self.client = client | ||||
|         self.throttled_time = False | ||||
|         self._lock = Lock() | ||||
|         self.busy = False | ||||
|         self.limits = self.client.application.rate_limit_status() | ||||
|  | ||||
|     @property | ||||
|     def throttled(self): | ||||
|         if not self.throttled_time: | ||||
|             return False | ||||
|         t = time.time() | ||||
|         delta = self.throttled_time - t | ||||
|         if delta > 0: | ||||
|             return True | ||||
|         return False | ||||
|     def is_limited(self, uriparts): | ||||
|         return self.get_wait(uriparts)>0 | ||||
|  | ||||
|     def throttle_until(self, epoch=None): | ||||
|         self.throttled_time = int(epoch) | ||||
|         logger.info("Worker %s throttled for %s seconds" % (self.name, str(epoch-time.time()))) | ||||
|     def get_wait(self, uriparts): | ||||
|         limits = self.get_limit(uriparts) | ||||
|         if limits['remaining'] > 0: | ||||
|             return 0 | ||||
|         reset = limits.get('reset', 0) | ||||
|         now = time.time() | ||||
|         return max(0, (reset-now)) | ||||
|  | ||||
|     def get_limit(self, uriparts): | ||||
|         uri = '/'+'/'.join(uriparts) | ||||
|         return self.limits.get('resources', {}).get(uriparts[0], {}).get(uri, {}) | ||||
|  | ||||
|     def set_limit(self, uriparts, value): | ||||
|         uri = '/'+'/'.join(uriparts) | ||||
|         if 'resources' not in self.limits: | ||||
|             self.limits['resources'] = {} | ||||
|         resources = self.limits['resources'] | ||||
|         if uriparts[0] not in resources: | ||||
|             resources[uriparts[0]] = {} | ||||
|         resource = resources[uriparts[0]] | ||||
|         resource[uri] = value | ||||
|  | ||||
|     def update_limits(self, uriparts, remaining, reset, limit): | ||||
|         self.set_limit(uriparts, {'remaining': remaining, | ||||
|                                   'reset': reset, | ||||
|                                   'limit': limit}) | ||||
|          | ||||
|     def update_limits_from_headers(self, uriparts, headers): | ||||
|         reset = float(headers.get('X-Rate-Limit-Reset', time.time() + 30)) | ||||
|         remaining = int(headers.get('X-Rate-Limit-Remaining', 0)) | ||||
|         limit = int(headers.get('X-Rate-Limit-Limit', -1)) | ||||
|         self.update_limits(uriparts=uriparts, remaining=remaining, reset=reset, limit=limit) | ||||
|  | ||||
|  | ||||
|  | ||||
| class TwitterQueueException(BaseException): | ||||
|     pass | ||||
|  | ||||
| class TwitterQueue(AttrToFunc): | ||||
|     def __init__(self, wait=True): | ||||
| @@ -72,36 +100,37 @@ class TwitterQueue(AttrToFunc): | ||||
|     def handle_call(self, uriparts, *args, **kwargs): | ||||
|         logger.debug('Called: {}'.format(uriparts)) | ||||
|         logger.debug('With: {} {}'.format(args, kwargs)) | ||||
|         while True: | ||||
|         patience = 1 | ||||
|         while patience: | ||||
|             c = None | ||||
|             try: | ||||
|                 c = self.next() | ||||
|                 c = self.next(uriparts) | ||||
|                 c._lock.acquire() | ||||
|                 c.busy = True | ||||
|                 logger.debug('Next: {}'.format(c.name)) | ||||
|                 ping = time.time() | ||||
|                 resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs) | ||||
|                 pong = time.time() | ||||
|                 c.update_limits_from_headers(uriparts, resp.headers) | ||||
|                 logger.debug('Took: {}'.format(pong-ping)) | ||||
|                 return resp | ||||
|             except TwitterHTTPError as ex: | ||||
|                 if ex.e.code in (429, 502, 503, 504): | ||||
|                     limit = ex.e.headers.get('X-Rate-Limit-Reset', time.time() + 30) | ||||
|                     logger.info('{} limited'.format(c.name)) | ||||
|                     c.throttle_until(limit) | ||||
|                     c.update_limits_from_headers(uriparts, ex.e.headers) | ||||
|                     continue | ||||
|                 else: | ||||
|                     raise | ||||
|             except urllib.error.URLError as ex: | ||||
|                 time.sleep(5) | ||||
|                 logger.info('Something fishy happened: {}'.format(ex))                 | ||||
|                 raise | ||||
|             finally: | ||||
|                 if c: | ||||
|                     c.busy = False | ||||
|                      | ||||
|  | ||||
|     @property | ||||
|     def client(self): | ||||
|         return self.next().client | ||||
|                     c._lock.release() | ||||
|                 if not self.wait: | ||||
|                     patience -= 1 | ||||
|  | ||||
|     @classmethod | ||||
|     def from_credentials(self, cred_file=None): | ||||
| @@ -115,26 +144,33 @@ class TwitterQueue(AttrToFunc): | ||||
|             wq.ready(TwitterWorker(cred["user"], c)) | ||||
|         return wq | ||||
|  | ||||
|     def _next(self): | ||||
|  | ||||
|     def get_wait(self, uriparts): | ||||
|         available = filter(lambda x: not x.busy, self.queue) | ||||
|         first_worker = min(available, key=lambda x: x.get_wait(uriparts)) | ||||
|         diff = first_worker.get_wait(uriparts) | ||||
|         return diff | ||||
|          | ||||
|  | ||||
|     def _next(self, uriparts): | ||||
|         logger.debug('Getting next available') | ||||
|         s = list(self.queue) | ||||
|         random.shuffle(s) | ||||
|         for worker in s: | ||||
|             if not worker.throttled and not worker.busy: | ||||
|             if not worker.is_limited(uriparts) and not worker.busy: | ||||
|                 return worker | ||||
|         raise Exception('No worker is available') | ||||
|         raise TwitterQueueException('No worker is available') | ||||
|  | ||||
|     def next(self): | ||||
|     def next(self, uriparts): | ||||
|         if not self.wait: | ||||
|             return self._next() | ||||
|             return self._next(uriparts) | ||||
|         while True: | ||||
|             try: | ||||
|                 return self._next() | ||||
|             except Exception: | ||||
|                 return self._next(uriparts) | ||||
|             except TwitterQueueException: | ||||
|                 available = filter(lambda x: not x.busy, self.queue) | ||||
|                 if available: | ||||
|                     first_worker = min(available, key=lambda x: x.throttled_time) | ||||
|                     diff = first_worker.throttled_time - time.time() | ||||
|                     diff = self.get_wait(uriparts) | ||||
|                     logger.info("All workers are throttled. Waiting %s seconds" % diff) | ||||
|                 else: | ||||
|                     diff = 5 | ||||
|   | ||||
							
								
								
									
										4
									
								
								bitter/version.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								bitter/version.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,4 @@ | ||||
| import os | ||||
|  | ||||
| with open(os.path.join(os.path.dirname(__file__), 'VERSION')) as f: | ||||
|     __version__ = f.read().strip() | ||||
| @@ -1 +1,2 @@ | ||||
| contextlib2 | ||||
| future | ||||
|   | ||||
| @@ -1,4 +1,3 @@ | ||||
| sqlalchemy | ||||
| twitter | ||||
| click | ||||
| future | ||||
|   | ||||
							
								
								
									
										8
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										8
									
								
								setup.py
									
									
									
									
									
								
							| @@ -23,16 +23,12 @@ if sys.version_info <= (3, 0): | ||||
| install_reqs = [str(ir.req) for ir in install_reqs] | ||||
| test_reqs = [str(ir.req) for ir in test_reqs] | ||||
|  | ||||
| with open('bitter/__init__.py') as f: | ||||
|     exec(f.read()) | ||||
| from bitter import __version__ | ||||
|  | ||||
| setup( | ||||
|     name="bitter", | ||||
|     packages=['bitter'], | ||||
|     description=''' | ||||
|     Simplifying how researchers access Data. | ||||
|     It includes a CLI and a library. | ||||
|     ''', | ||||
|     description=" Simplifying how researchers access Data. It includes a CLI and a library.", | ||||
|     author='J. Fernando Sanchez', | ||||
|     author_email='balkian@gmail.com', | ||||
|     url="http://balkian.com", | ||||
|   | ||||
							
								
								
									
										75
									
								
								tests/test_crawlers.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										75
									
								
								tests/test_crawlers.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,75 @@ | ||||
| from unittest import TestCase | ||||
|  | ||||
| import os | ||||
| import types | ||||
| import datetime | ||||
| import time | ||||
|  | ||||
| from bitter import utils | ||||
| from bitter.crawlers import TwitterQueue, TwitterWorker, TwitterQueueException | ||||
| from bitter import config as c | ||||
|  | ||||
| class TestUtils(TestCase): | ||||
|  | ||||
|     def setUp(self): | ||||
|         self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json')) | ||||
|  | ||||
|     def test_create_worker(self): | ||||
|         assert len(self.wq.queue)==1 | ||||
|  | ||||
|     def test_get_limits(self): | ||||
|         w1 = list(self.wq.queue)[0] | ||||
|         print(w1.limits) | ||||
|         limitslook = w1.get_limit(['statuses', 'lookup']) | ||||
|         assert limitslook['remaining'] == limitslook['limit'] | ||||
|  | ||||
|     def test_set_limits(self): | ||||
|         w1 = list(self.wq.queue)[0] | ||||
|         w1.set_limit(['test', 'test2'], {'remaining': 0}) | ||||
|         assert w1.get_limit(['test', 'test2']) == {'remaining': 0} | ||||
|  | ||||
|     def test_await(self): | ||||
|         w1 = list(self.wq.queue)[0] | ||||
|         w1.set_limit(['test', 'wait'], {'remaining': 0, 'reset': time.time()+2}) | ||||
|         assert w1.get_wait(['test', 'wait']) > 1 | ||||
|         time.sleep(2) | ||||
|         assert w1.get_wait(['test', 'wait']) == 0 | ||||
|         assert w1.get_wait(['statuses', 'lookup']) == 0 | ||||
|  | ||||
|     def test_is_limited(self): | ||||
|         w1 = list(self.wq.queue)[0] | ||||
|         assert not w1.is_limited(['statuses', 'lookup']) | ||||
|         w1.set_limit(['test', 'limited'], {'remaining': 0, 'reset': time.time()+100}) | ||||
|         assert  w1.is_limited(['test', 'limited']) | ||||
|  | ||||
|     def test_call(self): | ||||
|         w1 = list(self.wq.queue)[0] | ||||
|         l1 = w1.get_limit(['users', 'lookup']) | ||||
|         resp = self.wq.users.lookup(screen_name='balkian') | ||||
|         l2 = w1.get_limit(['users', 'lookup']) | ||||
|         assert l1['remaining']-l2['remaining'] == 1 | ||||
|  | ||||
|     def test_consume(self): | ||||
|         w1 = list(self.wq.queue)[0] | ||||
|         l1 = w1.get_limit(['friends', 'list']) | ||||
|         self.wq.wait = False | ||||
|         for i in range(l1['remaining']): | ||||
|             print(i) | ||||
|             resp = self.wq.friends.list(screen_name='balkian') | ||||
|         # l2 = w1.get_limit(['users', 'lookup']) | ||||
|         # assert l2['remaining'] == 0 | ||||
|         # self.wq.users.lookup(screen_name='balkian') | ||||
|          | ||||
|         failed = False | ||||
|         try: | ||||
|             # resp = self.wq.friends.list(screen_name='balkian') | ||||
|             self.wq.next(['friends', 'list']) | ||||
|         except TwitterQueueException: | ||||
|             failed = True | ||||
|         assert failed | ||||
|         l2 = w1.get_limit(['friends', 'list']) | ||||
|         assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time()) | ||||
|         assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2) | ||||
|         time.sleep(w1.get_wait(['friends', 'list'])) | ||||
|  | ||||
|          | ||||
		Reference in New Issue
	
	Block a user