mirror of
				https://github.com/balkian/bitter.git
				synced 2025-10-26 05:08:22 +00:00 
			
		
		
		
	Compare commits
	
		
			2 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 2036d51d96 | ||
|  | 09feb050a6 | 
| @@ -1,6 +1,7 @@ | |||||||
| include requirements.txt | include requirements.txt | ||||||
| include test-requirements.txt | include test-requirements.txt | ||||||
| include README.md | include README.md | ||||||
|  | include bitter/VERSION | ||||||
| graft bitter/templates | graft bitter/templates | ||||||
| graft bitter/static | graft bitter/static | ||||||
| graft test | graft test | ||||||
|   | |||||||
							
								
								
									
										1
									
								
								bitter/VERSION
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								bitter/VERSION
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1 @@ | |||||||
|  | 0.5.5 | ||||||
| @@ -3,8 +3,15 @@ Bitter module. A library and cli for Twitter using python-twitter. | |||||||
| http://github.com/balkian/bitter | http://github.com/balkian/bitter | ||||||
| """ | """ | ||||||
|  |  | ||||||
|  | try: | ||||||
|     from future.standard_library import install_aliases |     from future.standard_library import install_aliases | ||||||
|     install_aliases() |     install_aliases() | ||||||
|  | except ImportError: | ||||||
|  |     # Avoid problems at setup.py and py3.x | ||||||
|  |     pass | ||||||
|  |  | ||||||
|  | import os | ||||||
|  |  | ||||||
|  | from .version import __version__ | ||||||
|  |  | ||||||
| __version__ = '0.5.0' |  | ||||||
| __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] | __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] | ||||||
|   | |||||||
| @@ -10,6 +10,7 @@ import sqlite3 | |||||||
| from sqlalchemy import exists | from sqlalchemy import exists | ||||||
|  |  | ||||||
| from bitter import utils, models, crawlers | from bitter import utils, models, crawlers | ||||||
|  | from bitter import config as bconf | ||||||
| from bitter.models import make_session, User, ExtractorEntry, Following | from bitter.models import make_session, User, ExtractorEntry, Following | ||||||
|  |  | ||||||
| import sys | import sys | ||||||
| @@ -33,7 +34,7 @@ def main(ctx, verbose, logging_level, config, credentials): | |||||||
|     ctx.obj = {} |     ctx.obj = {} | ||||||
|     ctx.obj['VERBOSE'] = verbose |     ctx.obj['VERBOSE'] = verbose | ||||||
|     ctx.obj['CONFIG'] = config |     ctx.obj['CONFIG'] = config | ||||||
|     ctx.obj['CREDENTIALS'] = credentials |     bconf.CREDENTIALS = credentials | ||||||
|     utils.create_credentials(credentials) |     utils.create_credentials(credentials) | ||||||
|  |  | ||||||
| @main.group() | @main.group() | ||||||
| @@ -45,7 +46,7 @@ def tweet(ctx): | |||||||
| @click.argument('tweetid') | @click.argument('tweetid') | ||||||
| @click.pass_context  | @click.pass_context  | ||||||
| def get_tweet(ctx, tweetid): | def get_tweet(ctx, tweetid): | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     t = utils.get_tweet(wq, tweetid) |     t = utils.get_tweet(wq, tweetid) | ||||||
|     print(json.dumps(t, indent=2)) |     print(json.dumps(t, indent=2)) | ||||||
|          |          | ||||||
| @@ -54,7 +55,7 @@ def get_tweet(ctx, tweetid): | |||||||
| @click.argument('query') | @click.argument('query') | ||||||
| @click.pass_context  | @click.pass_context  | ||||||
| def get_tweet(ctx, query): | def get_tweet(ctx, query): | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     c = wq.next() |     c = wq.next() | ||||||
|     t = utils.search_tweet(c.client, query) |     t = utils.search_tweet(c.client, query) | ||||||
|     print(json.dumps(t, indent=2)) |     print(json.dumps(t, indent=2)) | ||||||
| @@ -63,7 +64,7 @@ def get_tweet(ctx, query): | |||||||
| @click.argument('user') | @click.argument('user') | ||||||
| @click.pass_context  | @click.pass_context  | ||||||
| def get_tweet(ctx, user): | def get_tweet(ctx, user): | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     c = wq.next() |     c = wq.next() | ||||||
|     t = utils.user_timeline(c.client, user) |     t = utils.user_timeline(c.client, user) | ||||||
|     print(json.dumps(t, indent=2)) |     print(json.dumps(t, indent=2)) | ||||||
| @@ -88,7 +89,7 @@ def list_users(ctx, db): | |||||||
| @click.argument('user') | @click.argument('user') | ||||||
| @click.pass_context  | @click.pass_context  | ||||||
| def get_user(ctx, user): | def get_user(ctx, user): | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     c = wq.next() |     c = wq.next() | ||||||
|     u = utils.get_user(c.client, user) |     u = utils.get_user(c.client, user) | ||||||
|     print(json.dumps(u, indent=2)) |     print(json.dumps(u, indent=2)) | ||||||
| @@ -112,7 +113,7 @@ def get_users(ctx, usersfile, skip, until, threads, db): | |||||||
|             return ExitStack() |             return ExitStack() | ||||||
|  |  | ||||||
|  |  | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, |     logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, | ||||||
|                                                                                       len(wq.queue))) |                                                                                       len(wq.queue))) | ||||||
|  |  | ||||||
| @@ -281,7 +282,7 @@ def users_extractor(ctx): | |||||||
| @click.pass_context | @click.pass_context | ||||||
| def extract(ctx, recursive, user, name, initfile): | def extract(ctx, recursive, user, name, initfile): | ||||||
|     print(locals()) |     print(locals()) | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     dburi = ctx.obj['DBURI'] |     dburi = ctx.obj['DBURI'] | ||||||
|     utils.extract(wq, |     utils.extract(wq, | ||||||
|                   recursive=recursive, |                   recursive=recursive, | ||||||
| @@ -293,7 +294,7 @@ def extract(ctx, recursive, user, name, initfile): | |||||||
| @extractor.command('reset') | @extractor.command('reset') | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def reset_extractor(ctx): | def reset_extractor(ctx): | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     db = ctx.obj['DBURI'] |     db = ctx.obj['DBURI'] | ||||||
|     session = make_session(db) |     session = make_session(db) | ||||||
|     session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) |     session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) | ||||||
| @@ -302,7 +303,7 @@ def reset_extractor(ctx): | |||||||
| @click.argument('url', required=False) | @click.argument('url', required=False) | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def get_limits(ctx, url): | def get_limits(ctx, url): | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     for worker in wq.queue: |     for worker in wq.queue: | ||||||
|         resp = worker.client.application.rate_limit_status() |         resp = worker.client.application.rate_limit_status() | ||||||
|         print('#'*20) |         print('#'*20) | ||||||
| @@ -324,11 +325,10 @@ def get_limits(ctx, url): | |||||||
| @click.argument('CONSUMER_SECRET', required=True) | @click.argument('CONSUMER_SECRET', required=True) | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def run_server(ctx, consumer_key, consumer_secret): | def run_server(ctx, consumer_key, consumer_secret): | ||||||
|     from . import config |     bconf.CONSUMER_KEY = consumer_key | ||||||
|     config.CONSUMER_KEY = consumer_key |     bconf.CONSUMER_SECRET = consumer_secret | ||||||
|     config.CONSUMER_SECRET = consumer_secret |  | ||||||
|     from .webserver import app |     from .webserver import app | ||||||
|     app.run() |     app.run(host='0.0.0.0') | ||||||
|      |      | ||||||
|  |  | ||||||
| if __name__ == '__main__': | if __name__ == '__main__': | ||||||
|   | |||||||
| @@ -1,4 +1,5 @@ | |||||||
| import time  | import time  | ||||||
|  | import datetime | ||||||
| import urllib | import urllib | ||||||
| import random | import random | ||||||
| import json | import json | ||||||
| @@ -8,6 +9,7 @@ logger = logging.getLogger(__name__) | |||||||
|  |  | ||||||
| from twitter import * | from twitter import * | ||||||
| from collections import OrderedDict | from collections import OrderedDict | ||||||
|  | from threading import Lock | ||||||
| from . import utils | from . import utils | ||||||
| from . import config | from . import config | ||||||
|  |  | ||||||
| @@ -40,22 +42,52 @@ class TwitterWorker(object): | |||||||
|     def __init__(self, name, client): |     def __init__(self, name, client): | ||||||
|         self.name = name |         self.name = name | ||||||
|         self.client = client |         self.client = client | ||||||
|         self.throttled_time = False |         self._lock = Lock() | ||||||
|         self.busy = False |         self.busy = False | ||||||
|  |         self.limits = self.client.application.rate_limit_status() | ||||||
|  |  | ||||||
|     @property |     def is_limited(self, uriparts): | ||||||
|     def throttled(self): |         limit = self.get_limit(uriparts) | ||||||
|         if not self.throttled_time: |         if limit and limit['remaining'] <=0: | ||||||
|             return False |             t = datime.datetime.now() | ||||||
|         t = time.time() |             delta = limit['reset'] -  t | ||||||
|         delta = self.throttled_time - t |             if delta < datetime.timedelta(seconds=1): | ||||||
|         if delta > 0: |  | ||||||
|                 return True |                 return True | ||||||
|         return False |         return False | ||||||
|  |  | ||||||
|     def throttle_until(self, epoch=None): |     def get_wait(self, uriparts): | ||||||
|         self.throttled_time = int(epoch) |         limits = self.get_limit(uriparts) | ||||||
|         logger.info("Worker %s throttled for %s seconds" % (self.name, str(epoch-time.time()))) |         if limits['remaining'] > 0: | ||||||
|  |             return 0 | ||||||
|  |         reset = datetime.datetime.fromtimestamp(limits.get('reset', 0)) | ||||||
|  |         now = datetime.datetime.now() | ||||||
|  |         return max(0, (reset-now).total_seconds()) | ||||||
|  |  | ||||||
|  |     def get_limit(self, uriparts): | ||||||
|  |         uri = '/'+'/'.join(uriparts) | ||||||
|  |         return self.limits.get('resources', {}).get(uriparts[0], {}).get(uri, {}) | ||||||
|  |  | ||||||
|  |     def set_limit(self, uriparts, value): | ||||||
|  |         uri = '/'+'/'.join(uriparts) | ||||||
|  |         if 'resources' not in self.limits: | ||||||
|  |             self.limits['resources'] = {} | ||||||
|  |         resources = self.limits['resources'] | ||||||
|  |         if uriparts[0] not in resources: | ||||||
|  |             resources[uriparts[0]] = {} | ||||||
|  |         resource = resources[uriparts[0]] | ||||||
|  |         resource[uri] = value | ||||||
|  |  | ||||||
|  |     def update_limits(self, uriparts, remaining, reset, limit): | ||||||
|  |         self.set_limit(uriparts, {'remaining': remaining, | ||||||
|  |                                   'reset': reset, | ||||||
|  |                                   'limit': limit}) | ||||||
|  |          | ||||||
|  |     def update_limits_from_headers(self, uriparts, headers): | ||||||
|  |         reset = float(headers.get('X-Rate-Limit-Reset', time.time() + 30)) | ||||||
|  |         remaining = int(headers.get('X-Rate-Limit-Remaining', 0)) | ||||||
|  |         limit = int(headers.get('X-Rate-Limit-Limit', -1)) | ||||||
|  |         self.update_limits(uriparts=uriparts, remaining=remaining, reset=reset, limit=limit) | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class TwitterQueue(AttrToFunc): | class TwitterQueue(AttrToFunc): | ||||||
| @@ -75,19 +107,20 @@ class TwitterQueue(AttrToFunc): | |||||||
|         while True: |         while True: | ||||||
|             c = None |             c = None | ||||||
|             try: |             try: | ||||||
|                 c = self.next() |                 c = self.next(uriparts) | ||||||
|  |                 c._lock.acquire() | ||||||
|                 c.busy = True |                 c.busy = True | ||||||
|                 logger.debug('Next: {}'.format(c.name)) |                 logger.debug('Next: {}'.format(c.name)) | ||||||
|                 ping = time.time() |                 ping = time.time() | ||||||
|                 resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs) |                 resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs) | ||||||
|                 pong = time.time() |                 pong = time.time() | ||||||
|  |                 c.update_limits_from_headers(uriparts, resp.headers) | ||||||
|                 logger.debug('Took: {}'.format(pong-ping)) |                 logger.debug('Took: {}'.format(pong-ping)) | ||||||
|                 return resp |                 return resp | ||||||
|             except TwitterHTTPError as ex: |             except TwitterHTTPError as ex: | ||||||
|                 if ex.e.code in (429, 502, 503, 504): |                 if ex.e.code in (429, 502, 503, 504): | ||||||
|                     limit = ex.e.headers.get('X-Rate-Limit-Reset', time.time() + 30) |  | ||||||
|                     logger.info('{} limited'.format(c.name)) |                     logger.info('{} limited'.format(c.name)) | ||||||
|                     c.throttle_until(limit) |                     c.update_limits_from_headers(uriparts, ex.e.headers) | ||||||
|                     continue |                     continue | ||||||
|                 else: |                 else: | ||||||
|                     raise |                     raise | ||||||
| @@ -97,11 +130,7 @@ class TwitterQueue(AttrToFunc): | |||||||
|             finally: |             finally: | ||||||
|                 if c: |                 if c: | ||||||
|                     c.busy = False |                     c.busy = False | ||||||
|                      |                     c._lock.release() | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def client(self): |  | ||||||
|         return self.next().client |  | ||||||
|  |  | ||||||
|     @classmethod |     @classmethod | ||||||
|     def from_credentials(self, cred_file=None): |     def from_credentials(self, cred_file=None): | ||||||
| @@ -115,26 +144,26 @@ class TwitterQueue(AttrToFunc): | |||||||
|             wq.ready(TwitterWorker(cred["user"], c)) |             wq.ready(TwitterWorker(cred["user"], c)) | ||||||
|         return wq |         return wq | ||||||
|  |  | ||||||
|     def _next(self): |     def _next(self, uriparts): | ||||||
|         logger.debug('Getting next available') |         logger.debug('Getting next available') | ||||||
|         s = list(self.queue) |         s = list(self.queue) | ||||||
|         random.shuffle(s) |         random.shuffle(s) | ||||||
|         for worker in s: |         for worker in s: | ||||||
|             if not worker.throttled and not worker.busy: |             if not worker.is_limited(uriparts) and not worker.busy: | ||||||
|                 return worker |                 return worker | ||||||
|         raise Exception('No worker is available') |         raise Exception('No worker is available') | ||||||
|  |  | ||||||
|     def next(self): |     def next(self, uriparts): | ||||||
|         if not self.wait: |         if not self.wait: | ||||||
|             return self._next() |             return self._next(uriparts) | ||||||
|         while True: |         while True: | ||||||
|             try: |             try: | ||||||
|                 return self._next() |                 return self._next(uriparts) | ||||||
|             except Exception: |             except Exception: | ||||||
|                 available = filter(lambda x: not x.busy, self.queue) |                 available = filter(lambda x: not x.busy, self.queue) | ||||||
|                 if available: |                 if available: | ||||||
|                     first_worker = min(available, key=lambda x: x.throttled_time) |                     first_worker = min(available, key=lambda x: x.get_wait(uriparts)) | ||||||
|                     diff = first_worker.throttled_time - time.time() |                     diff = first_worker.get_wait(uriparts) | ||||||
|                     logger.info("All workers are throttled. Waiting %s seconds" % diff) |                     logger.info("All workers are throttled. Waiting %s seconds" % diff) | ||||||
|                 else: |                 else: | ||||||
|                     diff = 5 |                     diff = 5 | ||||||
|   | |||||||
							
								
								
									
										4
									
								
								bitter/version.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								bitter/version.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,4 @@ | |||||||
|  | import os | ||||||
|  |  | ||||||
|  | with open(os.path.join(os.path.dirname(__file__), 'VERSION')) as f: | ||||||
|  |     __version__ = f.read().strip() | ||||||
| @@ -1 +1,2 @@ | |||||||
| contextlib2 | contextlib2 | ||||||
|  | future | ||||||
|   | |||||||
| @@ -1,4 +1,3 @@ | |||||||
| sqlalchemy | sqlalchemy | ||||||
| twitter | twitter | ||||||
| click | click | ||||||
| future |  | ||||||
|   | |||||||
							
								
								
									
										8
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										8
									
								
								setup.py
									
									
									
									
									
								
							| @@ -23,16 +23,12 @@ if sys.version_info <= (3, 0): | |||||||
| install_reqs = [str(ir.req) for ir in install_reqs] | install_reqs = [str(ir.req) for ir in install_reqs] | ||||||
| test_reqs = [str(ir.req) for ir in test_reqs] | test_reqs = [str(ir.req) for ir in test_reqs] | ||||||
|  |  | ||||||
| with open('bitter/__init__.py') as f: | from bitter import __version__ | ||||||
|     exec(f.read()) |  | ||||||
|  |  | ||||||
| setup( | setup( | ||||||
|     name="bitter", |     name="bitter", | ||||||
|     packages=['bitter'], |     packages=['bitter'], | ||||||
|     description=''' |     description=" Simplifying how researchers access Data. It includes a CLI and a library.", | ||||||
|     Simplifying how researchers access Data. |  | ||||||
|     It includes a CLI and a library. |  | ||||||
|     ''', |  | ||||||
|     author='J. Fernando Sanchez', |     author='J. Fernando Sanchez', | ||||||
|     author_email='balkian@gmail.com', |     author_email='balkian@gmail.com', | ||||||
|     url="http://balkian.com", |     url="http://balkian.com", | ||||||
|   | |||||||
							
								
								
									
										48
									
								
								tests/test_crawlers.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										48
									
								
								tests/test_crawlers.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,48 @@ | |||||||
|  | from unittest import TestCase | ||||||
|  |  | ||||||
|  | import os | ||||||
|  | import types | ||||||
|  | import datetime | ||||||
|  | import time | ||||||
|  |  | ||||||
|  | from bitter import utils | ||||||
|  | from bitter.crawlers import TwitterQueue, TwitterWorker | ||||||
|  | from bitter import config as c | ||||||
|  |  | ||||||
|  | class TestUtils(TestCase): | ||||||
|  |  | ||||||
|  |     def setUp(self): | ||||||
|  |         self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json')) | ||||||
|  |  | ||||||
|  |     def test_create_worker(self): | ||||||
|  |         assert len(self.wq.queue)==1 | ||||||
|  |  | ||||||
|  |     def test_get_limits(self): | ||||||
|  |         w1 = list(self.wq.queue)[0] | ||||||
|  |         print(w1.limits) | ||||||
|  |         limitslook = w1.get_limit(['statuses', 'lookup']) | ||||||
|  |         assert limitslook['remaining'] == limitslook['limit'] | ||||||
|  |  | ||||||
|  |     def test_set_limits(self): | ||||||
|  |         w1 = list(self.wq.queue)[0] | ||||||
|  |         w1.set_limit(['test', 'test2'], {'remaining': 0}) | ||||||
|  |         assert w1.get_limit(['test', 'test2']) == {'remaining': 0} | ||||||
|  |  | ||||||
|  |     def test_await(self): | ||||||
|  |         w1 = list(self.wq.queue)[0] | ||||||
|  |         w1.set_limit(['test', 'wait'], {'remaining': 0, 'reset': time.time()+2}) | ||||||
|  |         assert w1.get_wait(['test', 'wait']) > 1 | ||||||
|  |         time.sleep(2) | ||||||
|  |         assert w1.get_wait(['test', 'wait']) == 0 | ||||||
|  |         assert w1.get_wait(['statuses', 'lookup']) == 0 | ||||||
|  |  | ||||||
|  |     def test_is_limited(self): | ||||||
|  |         w1 = list(self.wq.queue)[0] | ||||||
|  |         assert not w1.is_limited(['statuses', 'lookup']) | ||||||
|  |  | ||||||
|  |     def test_call(self): | ||||||
|  |         w1 = list(self.wq.queue)[0] | ||||||
|  |         l1 = w1.get_limit(['users', 'lookup']) | ||||||
|  |         resp = self.wq.users.lookup(screen_name='balkian') | ||||||
|  |         l2 = w1.get_limit(['users', 'lookup']) | ||||||
|  |         assert l1['remaining']-l2['remaining'] == 1 | ||||||
		Reference in New Issue
	
	Block a user