mirror of
				https://github.com/balkian/bitter.git
				synced 2025-10-24 20:28:24 +00:00 
			
		
		
		
	Compare commits
	
		
			14 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | c940709df8 | ||
|  | 1cb86abbdd | ||
|  | b212a46ab7 | ||
|  | 0a0d8fd5f1 | ||
|  | e3a78968da | ||
|  | 67ef307cce | ||
|  | cb054ac365 | ||
|  | bdc4690240 | ||
|  | c0309a1e52 | ||
|  | 4afdd6807d | ||
|  | 38605ba2c8 | ||
|  | 738823c8a2 | ||
|  | 3f42879751 | ||
|  | 35f0c6376d | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -1,3 +1,4 @@ | ||||
| __pycache__ | ||||
| *.egg-info | ||||
| dist | ||||
| env | ||||
|   | ||||
							
								
								
									
										7
									
								
								Dockerfile-2.7
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								Dockerfile-2.7
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,7 @@ | ||||
| # onbuild copies . to /usr/src/app/ | ||||
| From python:2.7-onbuild | ||||
| Maintainer J. Fernando Sánchez @balkian | ||||
|  | ||||
| RUN pip install -e "/usr/src/app/[server]" | ||||
|  | ||||
| ENTRYPOINT ["bitter"] | ||||
							
								
								
									
										7
									
								
								Dockerfile-3.4
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								Dockerfile-3.4
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,7 @@ | ||||
| # onbuild copies . to /usr/src/app/ | ||||
| From python:3.4-onbuild | ||||
| Maintainer J. Fernando Sánchez @balkian | ||||
|  | ||||
| RUN pip install -e "/usr/src/app/[server]" | ||||
|  | ||||
| ENTRYPOINT ["bitter"] | ||||
							
								
								
									
										7
									
								
								Dockerfile.template
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								Dockerfile.template
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,7 @@ | ||||
| # onbuild copies . to /usr/src/app/ | ||||
| From python:{{PYVERSION}}-onbuild | ||||
| Maintainer J. Fernando Sánchez @balkian | ||||
|  | ||||
| RUN pip install -e "/usr/src/app/[server]" | ||||
|  | ||||
| ENTRYPOINT ["bitter"] | ||||
| @@ -1,7 +1,10 @@ | ||||
| include requirements.txt | ||||
| include requirements-py2.txt | ||||
| include test-requirements.txt | ||||
| include README.md | ||||
| include bitter/VERSION | ||||
| graft bitter/templates | ||||
| graft bitter/static | ||||
| graft test | ||||
| include tests/test* | ||||
| global-exclude *.pyc | ||||
| global-exclude __pycache__ | ||||
							
								
								
									
										76
									
								
								Makefile
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										76
									
								
								Makefile
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,76 @@ | ||||
| PYVERSIONS=3.4 2.7 | ||||
| PYMAIN=$(firstword $(PYVERSIONS)) | ||||
| NAME=bitter | ||||
| REPO=balkian | ||||
| VERSION=$(shell cat $(NAME)/VERSION) | ||||
| TARNAME=$(NAME)-$(VERSION).tar.gz  | ||||
| IMAGENAME=$(REPO)/$(NAME) | ||||
| IMAGEWTAG=$(IMAGENAME):$(VERSION) | ||||
|  | ||||
|  | ||||
| all: build run | ||||
|  | ||||
| dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS)) | ||||
|  | ||||
| Dockerfile-%: Dockerfile.template | ||||
| 	sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$* | ||||
|  | ||||
|  | ||||
| dev-%: | ||||
| 	@docker start $(NAME)-dev$* || (\ | ||||
| 		$(MAKE) build-$*; \ | ||||
| 		docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \ | ||||
| 	)\ | ||||
|  | ||||
| 	docker exec -ti $(NAME)-dev$* bash | ||||
|  | ||||
| dev: dev-$(PYMAIN) | ||||
|  | ||||
| build: $(addprefix build-, $(PYMAIN)) | ||||
|  | ||||
| buildall: $(addprefix build-, $(PYVERSIONS)) | ||||
|  | ||||
| build-%: Dockerfile-% | ||||
| 	docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .; | ||||
|  | ||||
| test: $(addprefix test-,$(PYMAIN)) | ||||
|  | ||||
| testall: $(addprefix test-,$(PYVERSIONS)) | ||||
|  | ||||
| test-%: build-% | ||||
| 	docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ; | ||||
|  | ||||
| pip_test-%: | ||||
| 	docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ; | ||||
|  | ||||
| dist/$(NAME)-$(VERSION).tar.gz: | ||||
| 	docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist; | ||||
|  | ||||
| sdist: dist/$(NAME)-$(VERSION).tar.gz | ||||
|  | ||||
|  | ||||
| upload-%: test-% | ||||
| 	docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' | ||||
|  | ||||
| upload: testall $(addprefix upload-,$(PYVERSIONS)) | ||||
| 	docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)' | ||||
| 	docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)' | ||||
|  | ||||
| clean: | ||||
| 	@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true | ||||
| 	@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true | ||||
|  | ||||
| upload_git: | ||||
| 	git commit -a | ||||
| 	git tag ${VERSION} | ||||
| 	git push --tags origin master | ||||
|  | ||||
| pip_upload: | ||||
| 	python setup.py sdist upload ; | ||||
|  | ||||
| pip_test: $(addprefix pip_test-,$(PYVERSIONS)) | ||||
|  | ||||
| run: build | ||||
| 	docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' | ||||
|  | ||||
| .PHONY: test test-% build-% build test test_pip run | ||||
| @@ -17,8 +17,8 @@ or | ||||
| Programmatically: | ||||
|  | ||||
| ```python | ||||
| from bitter.crawlers import TwitterQueue | ||||
| wq = TwitterQueue.from_credentials() | ||||
| from bitter import easy | ||||
| wq = easy() | ||||
| print(wq.users.show(user_name='balkian')) | ||||
| ``` | ||||
|  | ||||
|   | ||||
| @@ -1 +1 @@ | ||||
| 0.5.5 | ||||
| 0.7.1 | ||||
|   | ||||
| @@ -3,15 +3,14 @@ Bitter module. A library and cli for Twitter using python-twitter. | ||||
| http://github.com/balkian/bitter | ||||
| """ | ||||
|  | ||||
| try: | ||||
|     from future.standard_library import install_aliases | ||||
|     install_aliases() | ||||
| except ImportError: | ||||
|     # Avoid problems at setup.py and py3.x | ||||
|     pass | ||||
|  | ||||
| import os | ||||
|  | ||||
| from .version import __version__ | ||||
|  | ||||
| def easy(*args, **kwargs): | ||||
|     from .crawlers import TwitterQueue | ||||
|     return TwitterQueue.from_credentials(*args, **kwargs) | ||||
|  | ||||
| __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] | ||||
|  | ||||
|  | ||||
|   | ||||
							
								
								
									
										138
									
								
								bitter/cli.py
									
									
									
									
									
								
							
							
						
						
									
										138
									
								
								bitter/cli.py
									
									
									
									
									
								
							| @@ -1,3 +1,5 @@ | ||||
| from __future__ import print_function | ||||
|  | ||||
| import click | ||||
| import json | ||||
| import os | ||||
| @@ -6,6 +8,7 @@ import time | ||||
| import sqlalchemy.types | ||||
| import threading | ||||
| import sqlite3 | ||||
| from tqdm import tqdm | ||||
|  | ||||
| from sqlalchemy import exists | ||||
|  | ||||
| @@ -43,30 +46,36 @@ def tweet(ctx): | ||||
|     pass | ||||
|  | ||||
| @tweet.command('get') | ||||
| @click.option('-w', '--write', is_flag=True, default=False) | ||||
| @click.option('-f', '--folder', default="tweets") | ||||
| @click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False) | ||||
| @click.argument('tweetid') | ||||
| @click.pass_context  | ||||
| def get_tweet(ctx, tweetid): | ||||
| def get_tweet(tweetid, write, folder, update): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     t = utils.get_tweet(wq, tweetid) | ||||
|     print(json.dumps(t, indent=2)) | ||||
|     utils.download_tweet(wq, tweetid, write, folder, update) | ||||
|          | ||||
| @tweet.command('get_all') | ||||
| @click.argument('tweetsfile', 'File with a list of tweets to look up') | ||||
| @click.option('-f', '--folder', default="tweets") | ||||
| @click.pass_context | ||||
| def get_tweets(ctx, tweetsfile, folder): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     utils.download_tweets(wq, tweetsfile, folder) | ||||
|  | ||||
| @tweet.command('search') | ||||
| @click.argument('query') | ||||
| @click.pass_context  | ||||
| def get_tweet(ctx, query): | ||||
| def search(ctx, query): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     c = wq.next() | ||||
|     t = utils.search_tweet(c.client, query) | ||||
|     t = utils.search_tweet(wq, query) | ||||
|     print(json.dumps(t, indent=2)) | ||||
|  | ||||
| @tweet.command('timeline') | ||||
| @click.argument('user') | ||||
| @click.pass_context  | ||||
| def get_tweet(ctx, user): | ||||
| def timeline(ctx, user): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     c = wq.next() | ||||
|     t = utils.user_timeline(c.client, user) | ||||
|     t = utils.user_timeline(wq, user) | ||||
|     print(json.dumps(t, indent=2)) | ||||
|  | ||||
| @main.group() | ||||
| @@ -85,23 +94,47 @@ def list_users(ctx, db): | ||||
|         for j in i.__dict__: | ||||
|             print('\t{}: {}'.format(j, getattr(i,j))) | ||||
|  | ||||
| @users.command('get_one') | ||||
| @click.argument('user') | ||||
| @click.pass_context  | ||||
| def get_user(ctx, user): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     c = wq.next() | ||||
|     u = utils.get_user(c.client, user) | ||||
|     print(json.dumps(u, indent=2)) | ||||
|  | ||||
| @users.command('get') | ||||
| @click.argument('user') | ||||
| @click.option('-w', '--write', is_flag=True, default=False) | ||||
| @click.option('-f', '--folder', default="users") | ||||
| @click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False) | ||||
| def get_user(user, write, folder, update): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     if not write: | ||||
|         u = utils.get_user(wq, user) | ||||
|         js = json.dumps(u, indent=2) | ||||
|         print(js) | ||||
|         return | ||||
|     if not os.path.exists(folder): | ||||
|         os.makedirs(folder) | ||||
|     file = os.path.join(folder, '%s.json' % user) | ||||
|     if not update and os.path.exists(file) and os.path.isfile(file): | ||||
|         print('User exists: %s' % user) | ||||
|         return | ||||
|     with open(file, 'w') as f: | ||||
|         u = utils.get_user(wq, user) | ||||
|         js = json.dumps(u, indent=2) | ||||
|         print(js, file=f) | ||||
|  | ||||
| @users.command('get_all') | ||||
| @click.argument('usersfile', 'File with a list of users to look up') | ||||
| @click.option('-f', '--folder', default="users") | ||||
| @click.pass_context | ||||
| def get_users(ctx, usersfile, folder): | ||||
|     with open(usersfile) as f: | ||||
|         for line in f: | ||||
|             uid = line.strip() | ||||
|             ctx.invoke(get_user, folder=folder, user=uid, write=True) | ||||
|  | ||||
| @users.command('crawl') | ||||
| @click.option('--db', required=True, help='Database to save all users.') | ||||
| @click.option('--skip', required=False, default=0, help='Skip N lines from the file.') | ||||
| @click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.') | ||||
| @click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.') | ||||
| @click.argument('usersfile', 'File with a list of users to look up') | ||||
| @click.pass_context | ||||
| def get_users(ctx, usersfile, skip, until, threads, db): | ||||
| def crawl_users(ctx, usersfile, skip, until, threads, db): | ||||
|     global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock | ||||
|  | ||||
|     if '://' not in db: | ||||
| @@ -330,6 +363,71 @@ def run_server(ctx, consumer_key, consumer_secret): | ||||
|     from .webserver import app | ||||
|     app.run(host='0.0.0.0') | ||||
|      | ||||
| @main.group() | ||||
| @click.pass_context  | ||||
| def stream(ctx): | ||||
|     pass | ||||
|  | ||||
| @stream.command('get') | ||||
| @click.option('-l', '--locations', default=None) | ||||
| @click.option('-t', '--track', default=None) | ||||
| @click.option('-f', '--file', help='File to store the stream of tweets') | ||||
| @click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True) | ||||
| @click.pass_context  | ||||
| def get_stream(ctx, locations, track, file, politelyretry): | ||||
|     wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1) | ||||
|  | ||||
|     query_args = {} | ||||
|     if locations: | ||||
|         query_args['locations'] = locations | ||||
|     if track: | ||||
|         query_args['track'] = track | ||||
|     if not file: | ||||
|         file = sys.stdout | ||||
|     else: | ||||
|         file = open(file, 'a') | ||||
|  | ||||
|     def insist(): | ||||
|         lasthangup = time.time() | ||||
|         while True: | ||||
|             if not query_args: | ||||
|                 iterator = wq.statuses.sample() | ||||
|             else: | ||||
|                 iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75") | ||||
|             for i in iterator: | ||||
|                 yield i | ||||
|             if not politelyretry: | ||||
|                 return | ||||
|             thishangup = time.time() | ||||
|             if thishangup - lasthangup < 60: | ||||
|                 raise Exception('Too many hangups in a row.') | ||||
|             time.sleep(3) | ||||
|  | ||||
|     for tweet in tqdm(insist()): | ||||
|         print(json.dumps(tweet), file=file) | ||||
|     if file != sys.stdout: | ||||
|         file.close() | ||||
|  | ||||
| @stream.command('read') | ||||
| @click.option('-f', '--file', help='File to read the stream of tweets from', required=True) | ||||
| @click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False) | ||||
| @click.pass_context  | ||||
| def read_stream(ctx, file, tail): | ||||
|     for tweet in utils.read_file(file, tail=tail): | ||||
|         try: | ||||
|             print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text'])) | ||||
|         except (KeyError, TypeError): | ||||
|             print('Raw tweet: {}'.format(tweet)) | ||||
|  | ||||
| @stream.command('tags') | ||||
| @click.option('-f', '--file', help='File to read the stream of tweets from', required=True) | ||||
| @click.argument('limit', required=False, default=None, type=int) | ||||
| @click.pass_context  | ||||
| def tags_stream(ctx, file, limit): | ||||
|     c = utils.get_hashtags(utils.read_file(file)) | ||||
|     for count, tag in c.most_common(limit): | ||||
|         print(u'{} - {}'.format(count, tag)) | ||||
|      | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     main() | ||||
|   | ||||
| @@ -1,5 +1,4 @@ | ||||
| import time  | ||||
| import datetime | ||||
| import urllib | ||||
| import random | ||||
| import json | ||||
| @@ -10,6 +9,12 @@ logger = logging.getLogger(__name__) | ||||
| from twitter import * | ||||
| from collections import OrderedDict | ||||
| from threading import Lock | ||||
| from itertools import islice | ||||
| try: | ||||
|     import itertools.ifilter as filter | ||||
| except ImportError: | ||||
|     pass | ||||
|  | ||||
| from . import utils | ||||
| from . import config | ||||
|  | ||||
| @@ -38,34 +43,68 @@ class AttrToFunc(object): | ||||
|         #     kwargs[i] = a | ||||
|         return self.handler(self.__uriparts, *args, **kwargs) | ||||
|  | ||||
|  | ||||
| class FromCredentialsMixin(object): | ||||
|  | ||||
|     @classmethod | ||||
|     def from_credentials(cls, cred_file=None, max_workers=None): | ||||
|         wq = cls() | ||||
|  | ||||
|         for cred in islice(utils.get_credentials(cred_file), max_workers): | ||||
|             wq.ready(cls.worker_class(cred["user"], cred)) | ||||
|         return wq | ||||
|      | ||||
|  | ||||
| class TwitterWorker(object): | ||||
|     def __init__(self, name, client): | ||||
|     api_class = None | ||||
|  | ||||
|     def __init__(self, name, creds): | ||||
|         self.name = name | ||||
|         self.client = client | ||||
|         self._client = None | ||||
|         self.cred = creds | ||||
|         self._lock = Lock() | ||||
|         self.busy = False | ||||
|         self.limits = self.client.application.rate_limit_status() | ||||
|  | ||||
|     @property | ||||
|     def client(self): | ||||
|         if not self._client: | ||||
|             auth=OAuth(self.cred['token_key'], | ||||
|                        self.cred['token_secret'], | ||||
|                        self.cred['consumer_key'], | ||||
|                        self.cred['consumer_secret']) | ||||
|             self._client = self.api_class(auth=auth) | ||||
|         return self._client | ||||
|  | ||||
| class RestWorker(TwitterWorker): | ||||
|     api_class = Twitter | ||||
|  | ||||
|     def __init__(self, *args, **kwargs): | ||||
|         super(RestWorker, self).__init__(*args, **kwargs) | ||||
|         self._limits = None | ||||
|  | ||||
|     @property | ||||
|     def limits(self): | ||||
|         if not self._limits: | ||||
|             self._limits = self.client.application.rate_limit_status() | ||||
|         return self._limits | ||||
|  | ||||
|     def is_limited(self, uriparts): | ||||
|         limit = self.get_limit(uriparts) | ||||
|         if limit and limit['remaining'] <=0: | ||||
|             t = datime.datetime.now() | ||||
|             delta = limit['reset'] -  t | ||||
|             if delta < datetime.timedelta(seconds=1): | ||||
|                 return True | ||||
|         return False | ||||
|         return self.get_wait(uriparts)>0 | ||||
|  | ||||
|     def get_wait(self, uriparts): | ||||
|         limits = self.get_limit(uriparts) | ||||
|         if limits['remaining'] > 0: | ||||
|             return 0 | ||||
|         reset = datetime.datetime.fromtimestamp(limits.get('reset', 0)) | ||||
|         now = datetime.datetime.now() | ||||
|         return max(0, (reset-now).total_seconds()) | ||||
|         reset = limits.get('reset', 0) | ||||
|         now = time.time() | ||||
|         return max(0, (reset-now)) | ||||
|  | ||||
|     def get_limit(self, uriparts): | ||||
|         uri = '/'+'/'.join(uriparts) | ||||
|         return self.limits.get('resources', {}).get(uriparts[0], {}).get(uri, {}) | ||||
|         for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items(): | ||||
|             if ix.startswith(uri): | ||||
|                 return i | ||||
|         return {} | ||||
|  | ||||
|     def set_limit(self, uriparts, value): | ||||
|         uri = '/'+'/'.join(uriparts) | ||||
| @@ -90,7 +129,10 @@ class TwitterWorker(object): | ||||
|  | ||||
|  | ||||
|  | ||||
| class TwitterQueue(AttrToFunc): | ||||
| class QueueException(BaseException): | ||||
|     pass | ||||
|  | ||||
| class QueueMixin(AttrToFunc, FromCredentialsMixin): | ||||
|     def __init__(self, wait=True): | ||||
|         logger.debug('Creating worker queue') | ||||
|         self.queue = set() | ||||
| @@ -101,10 +143,15 @@ class TwitterQueue(AttrToFunc): | ||||
|     def ready(self, worker): | ||||
|         self.queue.add(worker) | ||||
|  | ||||
| class TwitterQueue(QueueMixin): | ||||
|  | ||||
|     worker_class = RestWorker | ||||
|  | ||||
|     def handle_call(self, uriparts, *args, **kwargs): | ||||
|         logger.debug('Called: {}'.format(uriparts)) | ||||
|         logger.debug('With: {} {}'.format(args, kwargs)) | ||||
|         while True: | ||||
|         patience = 1 | ||||
|         while patience: | ||||
|             c = None | ||||
|             try: | ||||
|                 c = self.next(uriparts) | ||||
| @@ -127,22 +174,24 @@ class TwitterQueue(AttrToFunc): | ||||
|             except urllib.error.URLError as ex: | ||||
|                 time.sleep(5) | ||||
|                 logger.info('Something fishy happened: {}'.format(ex))                 | ||||
|                 raise | ||||
|             finally: | ||||
|                 if c: | ||||
|                     c.busy = False | ||||
|                     c._lock.release() | ||||
|                 if not self.wait: | ||||
|                     patience -= 1 | ||||
|  | ||||
|     @classmethod | ||||
|     def from_credentials(self, cred_file=None): | ||||
|         wq = TwitterQueue() | ||||
|  | ||||
|         for cred in utils.get_credentials(cred_file): | ||||
|             c = Twitter(auth=OAuth(cred['token_key'], | ||||
|                                    cred['token_secret'], | ||||
|                                    cred['consumer_key'], | ||||
|                                    cred['consumer_secret'])) | ||||
|             wq.ready(TwitterWorker(cred["user"], c)) | ||||
|         return wq | ||||
|     def get_wait(self, uriparts): | ||||
|         # Stop as soon as one is available to avoid initiating the rest | ||||
|         for i in self.queue: | ||||
|             if not i.busy and i.get_wait(uriparts) == 0: | ||||
|                 return 0 | ||||
|         # If None is available, let's see how much we have to wait | ||||
|         available = filter(lambda x: not x.busy, self.queue) | ||||
|         diff = min(worker.get_wait(uriparts) for worker in self.queue if not worker.busy) | ||||
|         return diff | ||||
|          | ||||
|  | ||||
|     def _next(self, uriparts): | ||||
|         logger.debug('Getting next available') | ||||
| @@ -151,7 +200,7 @@ class TwitterQueue(AttrToFunc): | ||||
|         for worker in s: | ||||
|             if not worker.is_limited(uriparts) and not worker.busy: | ||||
|                 return worker | ||||
|         raise Exception('No worker is available') | ||||
|         raise QueueException('No worker is available') | ||||
|  | ||||
|     def next(self, uriparts): | ||||
|         if not self.wait: | ||||
| @@ -159,14 +208,54 @@ class TwitterQueue(AttrToFunc): | ||||
|         while True: | ||||
|             try: | ||||
|                 return self._next(uriparts) | ||||
|             except Exception: | ||||
|             except QueueException: | ||||
|                 available = filter(lambda x: not x.busy, self.queue) | ||||
|                 if available: | ||||
|                     first_worker = min(available, key=lambda x: x.get_wait(uriparts)) | ||||
|                     diff = first_worker.get_wait(uriparts) | ||||
|                     diff = self.get_wait(uriparts) | ||||
|                     logger.info("All workers are throttled. Waiting %s seconds" % diff) | ||||
|                 else: | ||||
|                     diff = 5 | ||||
|                     logger.info("All workers are busy. Waiting %s seconds" % diff) | ||||
|                 time.sleep(diff) | ||||
|  | ||||
| class StreamWorker(TwitterWorker): | ||||
|     api_class = TwitterStream | ||||
|  | ||||
|     def __init__(self, *args, **kwargs): | ||||
|         super(StreamWorker, self).__init__(*args, **kwargs) | ||||
|  | ||||
| class StreamQueue(QueueMixin): | ||||
|     worker_class = StreamWorker | ||||
|  | ||||
|     def __init__(self, wait=True): | ||||
|         logger.debug('Creating worker queue') | ||||
|         self.queue = set() | ||||
|         self.index = 0 | ||||
|         self.wait = wait | ||||
|         AttrToFunc.__init__(self, handler=self.handle_call) | ||||
|  | ||||
|     def handle_call(self, uriparts, *args, **kwargs): | ||||
|         logger.debug('Called: {}'.format(uriparts)) | ||||
|         logger.debug('With: {} {}'.format(args, kwargs)) | ||||
|         c = None | ||||
|         c = self.next(uriparts) | ||||
|         c._lock.acquire() | ||||
|         c.busy = True | ||||
|         logger.debug('Next: {}'.format(c.name)) | ||||
|         ping = time.time() | ||||
|         resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs) | ||||
|         for i in resp: | ||||
|             yield i | ||||
|         pong = time.time() | ||||
|         logger.debug('Listening for: {}'.format(pong-ping)) | ||||
|         c.busy = False | ||||
|         c._lock.release() | ||||
|  | ||||
|     def next(self, uriparts): | ||||
|         logger.debug('Getting next available') | ||||
|         s = list(self.queue) | ||||
|         random.shuffle(s) | ||||
|         for worker in s: | ||||
|             if not worker.busy: | ||||
|                 return worker | ||||
|         raise QueueException('No worker is available') | ||||
|   | ||||
							
								
								
									
										143
									
								
								bitter/utils.py
									
									
									
									
									
								
							
							
						
						
									
										143
									
								
								bitter/utils.py
									
									
									
									
									
								
							| @@ -1,3 +1,5 @@ | ||||
| from __future__ import print_function | ||||
|  | ||||
| import logging | ||||
| import time | ||||
| import json | ||||
| @@ -9,9 +11,19 @@ import os | ||||
| import multiprocessing | ||||
| from multiprocessing.pool import ThreadPool | ||||
|  | ||||
| from itertools import islice | ||||
| from tqdm import tqdm | ||||
|  | ||||
| from itertools import islice, chain | ||||
| from contextlib import contextmanager | ||||
| from itertools import zip_longest | ||||
|  | ||||
| try: | ||||
|     from itertools import izip_longest | ||||
| except ImportError: | ||||
|     from itertools import zip_longest | ||||
|  | ||||
| from collections import Counter | ||||
|  | ||||
| from builtins import map, filter | ||||
|  | ||||
| from twitter import TwitterHTTPError | ||||
|  | ||||
| @@ -26,15 +38,14 @@ def signal_handler(signal, frame): | ||||
|     logger.info('You pressed Ctrl+C!') | ||||
|     sys.exit(0) | ||||
|  | ||||
| def chunk(iterable, n, fillvalue=None): | ||||
|     args = [iter(iterable)] * n | ||||
|     return zip_longest(*args, fillvalue=fillvalue) | ||||
| def chunk(iterable, n): | ||||
|     it = iter(iterable) | ||||
|     return iter(lambda: tuple(islice(it, n)), ()) | ||||
|  | ||||
| def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()): | ||||
|     if chunksize: | ||||
|         source = chunk(source, chunksize) | ||||
|     p = ThreadPool(numcpus) | ||||
|     for i in p.imap(func, source): | ||||
| def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): | ||||
|     source = chunk(source, chunksize) | ||||
|     p = ThreadPool(numcpus*2) | ||||
|     for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))): | ||||
|         yield i | ||||
|  | ||||
| def get_credentials_path(credfile=None): | ||||
| @@ -86,6 +97,26 @@ def add_credentials(credfile=None, **creds): | ||||
|             f.write('\n') | ||||
|  | ||||
|  | ||||
| def get_hashtags(iter_tweets, best=None): | ||||
|     c = Counter() | ||||
|     for tweet in iter_tweets: | ||||
|         c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {})) | ||||
|     return c | ||||
|  | ||||
| def read_file(filename, tail=False): | ||||
|     with open(filename) as f: | ||||
|         while True: | ||||
|             line = f.readline() | ||||
|             if line not in (None, '', '\n'): | ||||
|                 tweet = json.loads(line.strip()) | ||||
|                 yield tweet | ||||
|             else: | ||||
|                 if tail: | ||||
|                     time.sleep(1) | ||||
|                 else: | ||||
|                     return | ||||
|      | ||||
|  | ||||
| def get_users(wq, ulist, by_name=False, queue=None, max_users=100): | ||||
|     t = 'name' if by_name else 'uid' | ||||
|     logger.debug('Getting users by {}: {}'.format(t, ulist)) | ||||
| @@ -134,12 +165,12 @@ def add_user(session, user, enqueue=False): | ||||
|     user = User(**user) | ||||
|     session.add(user) | ||||
|     if extract: | ||||
|         logging.debug('Adding entry') | ||||
|         logger.debug('Adding entry') | ||||
|         entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() | ||||
|         if not entry: | ||||
|             entry = ExtractorEntry(user=user.id) | ||||
|             session.add(entry) | ||||
|         logging.debug(entry.pending) | ||||
|         logger.debug(entry.pending) | ||||
|         entry.pending = True | ||||
|         entry.cursor = -1 | ||||
|         session.commit() | ||||
| @@ -188,10 +219,10 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor | ||||
|         add_user(session, i, enqueue=True) | ||||
|  | ||||
|     total_users = session.query(sqlalchemy.func.count(User.id)).scalar() | ||||
|     logging.info('Total users: {}'.format(total_users)) | ||||
|     logger.info('Total users: {}'.format(total_users)) | ||||
|     def pending_entries(): | ||||
|         pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count() | ||||
|         logging.info('Pending: {}'.format(pending)) | ||||
|         logger.info('Pending: {}'.format(pending)) | ||||
|         return pending | ||||
|  | ||||
|     while pending_entries() > 0: | ||||
| @@ -255,7 +286,7 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor | ||||
|  | ||||
|         entry.pending = pending | ||||
|         entry.cursor = cursor | ||||
|         logging.debug('Entry: {} - {}'.format(entry.user, entry.pending)) | ||||
|         logger.debug('Entry: {} - {}'.format(entry.user, entry.pending)) | ||||
|  | ||||
|         session.add(candidate) | ||||
|         session.commit() | ||||
| @@ -281,3 +312,85 @@ def get_user(c, user): | ||||
|         return c.users.lookup(user_id=user)[0] | ||||
|     except ValueError: | ||||
|         return c.users.lookup(screen_name=user)[0] | ||||
|  | ||||
| def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False): | ||||
|     cached = cached_tweet(tweetid, folder) | ||||
|     tweet = None | ||||
|     if update or not cached: | ||||
|         tweet = get_tweet(wq, tweetid) | ||||
|         js = json.dumps(tweet, indent=2) | ||||
|     if write: | ||||
|         if tweet: | ||||
|             write_tweet_json(js, folder) | ||||
|     else: | ||||
|         print(js) | ||||
|  | ||||
|  | ||||
| def cached_tweet(tweetid, folder): | ||||
|     tweet = None | ||||
|     file = os.path.join(folder, '%s.json' % tweetid) | ||||
|     if os.path.exists(file) and os.path.isfile(file): | ||||
|         try: | ||||
|             # print('%s: Tweet exists' % tweetid) | ||||
|             with open(file) as f: | ||||
|                 tweet = json.load(f) | ||||
|         except Exception as ex: | ||||
|             logger.error('Error getting cached version of {}: {}'.format(tweetid, ex)) | ||||
|     return tweet | ||||
|  | ||||
| def write_tweet_json(js, folder): | ||||
|     tweetid = js['id'] | ||||
|     file = tweet_file(tweetid, folder) | ||||
|     if not os.path.exists(folder): | ||||
|         os.makedirs(folder) | ||||
|     with open(file, 'w') as f: | ||||
|         json.dump(js, f, indent=2) | ||||
|         logger.info('Written {} to file {}'.format(tweetid, file)) | ||||
|  | ||||
| def tweet_file(tweetid, folder): | ||||
|     return os.path.join(folder, '%s.json' % tweetid) | ||||
|  | ||||
| def tweet_fail_file(tweetid, folder): | ||||
|     failsfolder = os.path.join(folder, 'failed') | ||||
|     if not os.path.exists(failsfolder): | ||||
|         os.makedirs(failsfolder) | ||||
|     return os.path.join(failsfolder, '%s.failed' % tweetid) | ||||
|  | ||||
| def tweet_failed(tweetid, folder): | ||||
|     return os.path.isfile(tweet_fail_file(tweetid, folder)) | ||||
|  | ||||
| def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True): | ||||
|     def filter_line(line): | ||||
|         tweetid = int(line) | ||||
|         # print('Checking {}'.format(tweetid)) | ||||
|         if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed): | ||||
|             yield None | ||||
|         else: | ||||
|             yield line | ||||
|  | ||||
|     def print_result(res): | ||||
|         tid, tweet = res | ||||
|         if tweet: | ||||
|             try: | ||||
|                 write_tweet_json(tweet, folder=folder) | ||||
|                 yield 1 | ||||
|             except Exception as ex: | ||||
|                 logger.error('%s: %s' % (tid, ex)) | ||||
|                 if not ignore_fails: | ||||
|                     raise | ||||
|         else: | ||||
|             logger.info('Tweet not recovered: {}'.format(tid)) | ||||
|             with open(tweet_fail_file(tid, folder), 'w') as f: | ||||
|                 print('Tweet not found', file=f) | ||||
|             yield -1 | ||||
|  | ||||
|     def download_batch(batch): | ||||
|         tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id'] | ||||
|         return tweets.items() | ||||
|  | ||||
|     with open(tweetsfile) as f: | ||||
|         lines = map(lambda x: x.strip(), f) | ||||
|         lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines')) | ||||
|         tweets = parallel(download_batch, lines_to_crawl, 100) | ||||
|         for res in tqdm(parallel(print_result, tweets), desc='Queried'): | ||||
|             pass | ||||
|   | ||||
| @@ -1,3 +1,4 @@ | ||||
| sqlalchemy | ||||
| twitter | ||||
| click | ||||
| tqdm | ||||
|   | ||||
							
								
								
									
										4
									
								
								setup.cfg
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								setup.cfg
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,4 @@ | ||||
| [metadata] | ||||
| description-file = README.md | ||||
| [aliases] | ||||
| test=pytest | ||||
							
								
								
									
										13
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								setup.py
									
									
									
									
									
								
							| @@ -38,10 +38,19 @@ setup( | ||||
|     extras_require = { | ||||
|         'server': ['flask', 'flask-oauthlib'] | ||||
|         }, | ||||
|     test_suite="tests", | ||||
|     setup_requires=['pytest-runner',], | ||||
|     include_package_data=True, | ||||
|     entry_points=""" | ||||
|         [console_scripts] | ||||
|         bitter=bitter.cli:main | ||||
|     """ | ||||
|     """, | ||||
|     classifiers=[ | ||||
|         'Development Status :: 4 - Beta', | ||||
|         'Intended Audience :: Developers', | ||||
|         'Intended Audience :: Science/Research', | ||||
|         'License :: OSI Approved :: Apache Software License', | ||||
|         'Programming Language :: Python :: 2', | ||||
|         'Programming Language :: Python :: 2.7', | ||||
|         'Programming Language :: Python :: 3', | ||||
|     ] | ||||
| ) | ||||
|   | ||||
| @@ -5,14 +5,14 @@ import types | ||||
| import datetime | ||||
| import time | ||||
|  | ||||
| from bitter import utils | ||||
| from bitter.crawlers import TwitterQueue, TwitterWorker | ||||
| from bitter import utils, easy | ||||
| from bitter.crawlers import QueueException | ||||
| from bitter import config as c | ||||
|  | ||||
| class TestUtils(TestCase): | ||||
| class TestCrawlers(TestCase): | ||||
|  | ||||
|     def setUp(self): | ||||
|         self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json')) | ||||
|         self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json')) | ||||
|  | ||||
|     def test_create_worker(self): | ||||
|         assert len(self.wq.queue)==1 | ||||
| @@ -39,6 +39,8 @@ class TestUtils(TestCase): | ||||
|     def test_is_limited(self): | ||||
|         w1 = list(self.wq.queue)[0] | ||||
|         assert not w1.is_limited(['statuses', 'lookup']) | ||||
|         w1.set_limit(['test', 'limited'], {'remaining': 0, 'reset': time.time()+100}) | ||||
|         assert  w1.is_limited(['test', 'limited']) | ||||
|  | ||||
|     def test_call(self): | ||||
|         w1 = list(self.wq.queue)[0] | ||||
| @@ -46,3 +48,25 @@ class TestUtils(TestCase): | ||||
|         resp = self.wq.users.lookup(screen_name='balkian') | ||||
|         l2 = w1.get_limit(['users', 'lookup']) | ||||
|         assert l1['remaining']-l2['remaining'] == 1 | ||||
|  | ||||
|     def test_consume(self): | ||||
|         w1 = list(self.wq.queue)[0] | ||||
|         l1 = w1.get_limit(['friends', 'list']) | ||||
|         self.wq.wait = False | ||||
|         for i in range(l1['remaining']): | ||||
|             print(i) | ||||
|             resp = self.wq.friends.list(screen_name='balkian') | ||||
|         # l2 = w1.get_limit(['users', 'lookup']) | ||||
|         # assert l2['remaining'] == 0 | ||||
|         # self.wq.users.lookup(screen_name='balkian') | ||||
|          | ||||
|         failed = False | ||||
|         try: | ||||
|             # resp = self.wq.friends.list(screen_name='balkian') | ||||
|             self.wq.next(['friends', 'list']) | ||||
|         except QueueException: | ||||
|             failed = True | ||||
|         assert failed | ||||
|         l2 = w1.get_limit(['friends', 'list']) | ||||
|         assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time()) | ||||
|         assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2) | ||||
|   | ||||
| @@ -58,4 +58,6 @@ class TestUtils(TestCase): | ||||
|         assert list(resp) == [1,2,3] | ||||
|         toc = time.time() | ||||
|         assert (tic-toc) < 6000 | ||||
|         resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2) | ||||
|         assert list(resp2) == [1,2,3,4] | ||||
|          | ||||
|   | ||||
		Reference in New Issue
	
	Block a user