mirror of
				https://github.com/balkian/bitter.git
				synced 2025-10-25 20:58:24 +00:00 
			
		
		
		
	Compare commits
	
		
			16 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | cf766a6bf3 | ||
|  | e65f6836b3 | ||
|  | 1cb86abbdd | ||
|  | b212a46ab7 | ||
|  | 0a0d8fd5f1 | ||
|  | e3a78968da | ||
|  | 67ef307cce | ||
|  | cb054ac365 | ||
|  | bdc4690240 | ||
|  | c0309a1e52 | ||
|  | 4afdd6807d | ||
|  | 38605ba2c8 | ||
|  | 738823c8a2 | ||
|  | 3f42879751 | ||
|  | 35f0c6376d | ||
|  | 2036d51d96 | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -1,3 +1,4 @@ | |||||||
|  | __pycache__ | ||||||
| *.egg-info | *.egg-info | ||||||
| dist | dist | ||||||
| env | env | ||||||
|   | |||||||
							
								
								
									
										7
									
								
								Dockerfile-2.7
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								Dockerfile-2.7
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,7 @@ | |||||||
|  | # onbuild copies . to /usr/src/app/ | ||||||
|  | From python:2.7-onbuild | ||||||
|  | Maintainer J. Fernando Sánchez @balkian | ||||||
|  |  | ||||||
|  | RUN pip install -e "/usr/src/app/[server]" | ||||||
|  |  | ||||||
|  | ENTRYPOINT ["bitter"] | ||||||
							
								
								
									
										7
									
								
								Dockerfile-3.4
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								Dockerfile-3.4
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,7 @@ | |||||||
|  | # onbuild copies . to /usr/src/app/ | ||||||
|  | From python:3.4-onbuild | ||||||
|  | Maintainer J. Fernando Sánchez @balkian | ||||||
|  |  | ||||||
|  | RUN pip install -e "/usr/src/app/[server]" | ||||||
|  |  | ||||||
|  | ENTRYPOINT ["bitter"] | ||||||
							
								
								
									
										7
									
								
								Dockerfile.template
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								Dockerfile.template
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,7 @@ | |||||||
|  | # onbuild copies . to /usr/src/app/ | ||||||
|  | From python:{{PYVERSION}}-onbuild | ||||||
|  | Maintainer J. Fernando Sánchez @balkian | ||||||
|  |  | ||||||
|  | RUN pip install -e "/usr/src/app/[server]" | ||||||
|  |  | ||||||
|  | ENTRYPOINT ["bitter"] | ||||||
| @@ -1,7 +1,10 @@ | |||||||
| include requirements.txt | include requirements.txt | ||||||
|  | include requirements-py2.txt | ||||||
| include test-requirements.txt | include test-requirements.txt | ||||||
| include README.md | include README.md | ||||||
| include bitter/VERSION | include bitter/VERSION | ||||||
| graft bitter/templates | graft bitter/templates | ||||||
| graft bitter/static | graft bitter/static | ||||||
| graft test | include tests/test* | ||||||
|  | global-exclude *.pyc | ||||||
|  | global-exclude __pycache__ | ||||||
							
								
								
									
										76
									
								
								Makefile
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										76
									
								
								Makefile
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,76 @@ | |||||||
|  | PYVERSIONS=3.4 2.7 | ||||||
|  | PYMAIN=$(firstword $(PYVERSIONS)) | ||||||
|  | NAME=bitter | ||||||
|  | REPO=balkian | ||||||
|  | VERSION=$(shell cat $(NAME)/VERSION) | ||||||
|  | TARNAME=$(NAME)-$(VERSION).tar.gz  | ||||||
|  | IMAGENAME=$(REPO)/$(NAME) | ||||||
|  | IMAGEWTAG=$(IMAGENAME):$(VERSION) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | all: build run | ||||||
|  |  | ||||||
|  | dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS)) | ||||||
|  |  | ||||||
|  | Dockerfile-%: Dockerfile.template | ||||||
|  | 	sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$* | ||||||
|  |  | ||||||
|  |  | ||||||
|  | dev-%: | ||||||
|  | 	@docker start $(NAME)-dev$* || (\ | ||||||
|  | 		$(MAKE) build-$*; \ | ||||||
|  | 		docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \ | ||||||
|  | 	)\ | ||||||
|  |  | ||||||
|  | 	docker exec -ti $(NAME)-dev$* bash | ||||||
|  |  | ||||||
|  | dev: dev-$(PYMAIN) | ||||||
|  |  | ||||||
|  | build: $(addprefix build-, $(PYMAIN)) | ||||||
|  |  | ||||||
|  | buildall: $(addprefix build-, $(PYVERSIONS)) | ||||||
|  |  | ||||||
|  | build-%: Dockerfile-% | ||||||
|  | 	docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .; | ||||||
|  |  | ||||||
|  | test: $(addprefix test-,$(PYMAIN)) | ||||||
|  |  | ||||||
|  | testall: $(addprefix test-,$(PYVERSIONS)) | ||||||
|  |  | ||||||
|  | test-%: build-% | ||||||
|  | 	docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ; | ||||||
|  |  | ||||||
|  | pip_test-%: | ||||||
|  | 	docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ; | ||||||
|  |  | ||||||
|  | dist/$(NAME)-$(VERSION).tar.gz: | ||||||
|  | 	docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist; | ||||||
|  |  | ||||||
|  | sdist: dist/$(NAME)-$(VERSION).tar.gz | ||||||
|  |  | ||||||
|  |  | ||||||
|  | upload-%: test-% | ||||||
|  | 	docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' | ||||||
|  |  | ||||||
|  | upload: testall $(addprefix upload-,$(PYVERSIONS)) | ||||||
|  | 	docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)' | ||||||
|  | 	docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)' | ||||||
|  |  | ||||||
|  | clean: | ||||||
|  | 	@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true | ||||||
|  | 	@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true | ||||||
|  |  | ||||||
|  | upload_git: | ||||||
|  | 	git commit -a | ||||||
|  | 	git tag ${VERSION} | ||||||
|  | 	git push --tags origin master | ||||||
|  |  | ||||||
|  | pip_upload: | ||||||
|  | 	python setup.py sdist upload ; | ||||||
|  |  | ||||||
|  | pip_test: $(addprefix pip_test-,$(PYVERSIONS)) | ||||||
|  |  | ||||||
|  | run: build | ||||||
|  | 	docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' | ||||||
|  |  | ||||||
|  | .PHONY: test test-% build-% build test test_pip run | ||||||
							
								
								
									
										11
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								README.md
									
									
									
									
									
								
							| @@ -17,11 +17,18 @@ or | |||||||
| Programmatically: | Programmatically: | ||||||
|  |  | ||||||
| ```python | ```python | ||||||
| from bitter.crawlers import TwitterQueue | from bitter import easy | ||||||
| wq = TwitterQueue.from_credentials() | wq = easy() | ||||||
| print(wq.users.show(user_name='balkian')) | print(wq.users.show(user_name='balkian')) | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
|  |  | ||||||
|  | You can also make custom calls to the API through the command line. | ||||||
|  | e.g. to get the latest 500 tweets by the python software foundation: | ||||||
|  |  | ||||||
|  | ``` | ||||||
|  | bitter api statuses/user_timeline --id thepsf --count 500 | ||||||
|  | ``` | ||||||
| # Credentials format | # Credentials format | ||||||
|  |  | ||||||
| ``` | ``` | ||||||
|   | |||||||
| @@ -1 +1 @@ | |||||||
| 0.5.4 | 0.7.4 | ||||||
|   | |||||||
| @@ -3,15 +3,14 @@ Bitter module. A library and cli for Twitter using python-twitter. | |||||||
| http://github.com/balkian/bitter | http://github.com/balkian/bitter | ||||||
| """ | """ | ||||||
|  |  | ||||||
| try: |  | ||||||
|     from future.standard_library import install_aliases |  | ||||||
|     install_aliases() |  | ||||||
| except ImportError: |  | ||||||
|     # Avoid problems at setup.py and py3.x |  | ||||||
|     pass |  | ||||||
|  |  | ||||||
| import os | import os | ||||||
|  |  | ||||||
| from .version import __version__ | from .version import __version__ | ||||||
|  |  | ||||||
|  | def easy(*args, **kwargs): | ||||||
|  |     from .crawlers import TwitterQueue | ||||||
|  |     return TwitterQueue.from_credentials(*args, **kwargs) | ||||||
|  |  | ||||||
| __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] | __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] | ||||||
|  |  | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										195
									
								
								bitter/cli.py
									
									
									
									
									
								
							
							
						
						
									
										195
									
								
								bitter/cli.py
									
									
									
									
									
								
							| @@ -1,3 +1,5 @@ | |||||||
|  | from __future__ import print_function | ||||||
|  |  | ||||||
| import click | import click | ||||||
| import json | import json | ||||||
| import os | import os | ||||||
| @@ -6,10 +8,12 @@ import time | |||||||
| import sqlalchemy.types | import sqlalchemy.types | ||||||
| import threading | import threading | ||||||
| import sqlite3 | import sqlite3 | ||||||
|  | from tqdm import tqdm | ||||||
|  |  | ||||||
| from sqlalchemy import exists | from sqlalchemy import exists | ||||||
|  |  | ||||||
| from bitter import utils, models, crawlers | from bitter import utils, models, crawlers | ||||||
|  | from bitter import config as bconf | ||||||
| from bitter.models import make_session, User, ExtractorEntry, Following | from bitter.models import make_session, User, ExtractorEntry, Following | ||||||
|  |  | ||||||
| import sys | import sys | ||||||
| @@ -33,7 +37,7 @@ def main(ctx, verbose, logging_level, config, credentials): | |||||||
|     ctx.obj = {} |     ctx.obj = {} | ||||||
|     ctx.obj['VERBOSE'] = verbose |     ctx.obj['VERBOSE'] = verbose | ||||||
|     ctx.obj['CONFIG'] = config |     ctx.obj['CONFIG'] = config | ||||||
|     ctx.obj['CREDENTIALS'] = credentials |     bconf.CREDENTIALS = credentials | ||||||
|     utils.create_credentials(credentials) |     utils.create_credentials(credentials) | ||||||
|  |  | ||||||
| @main.group() | @main.group() | ||||||
| @@ -42,30 +46,36 @@ def tweet(ctx): | |||||||
|     pass |     pass | ||||||
|  |  | ||||||
| @tweet.command('get') | @tweet.command('get') | ||||||
|  | @click.option('-w', '--write', is_flag=True, default=False) | ||||||
|  | @click.option('-f', '--folder', default="tweets") | ||||||
|  | @click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False) | ||||||
| @click.argument('tweetid') | @click.argument('tweetid') | ||||||
| @click.pass_context  | def get_tweet(tweetid, write, folder, update): | ||||||
| def get_tweet(ctx, tweetid): |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     utils.download_tweet(wq, tweetid, write, folder, update) | ||||||
|     t = utils.get_tweet(wq, tweetid) |  | ||||||
|     print(json.dumps(t, indent=2)) |  | ||||||
|          |          | ||||||
|  | @tweet.command('get_all') | ||||||
|  | @click.argument('tweetsfile', 'File with a list of tweets to look up') | ||||||
|  | @click.option('-f', '--folder', default="tweets") | ||||||
|  | @click.pass_context | ||||||
|  | def get_tweets(ctx, tweetsfile, folder): | ||||||
|  |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|  |     utils.download_tweets(wq, tweetsfile, folder) | ||||||
|  |  | ||||||
| @tweet.command('search') | @tweet.command('search') | ||||||
| @click.argument('query') | @click.argument('query') | ||||||
| @click.pass_context  | @click.pass_context  | ||||||
| def get_tweet(ctx, query): | def search(ctx, query): | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     c = wq.next() |     t = utils.search_tweet(wq, query) | ||||||
|     t = utils.search_tweet(c.client, query) |  | ||||||
|     print(json.dumps(t, indent=2)) |     print(json.dumps(t, indent=2)) | ||||||
|  |  | ||||||
| @tweet.command('timeline') | @tweet.command('timeline') | ||||||
| @click.argument('user') | @click.argument('user') | ||||||
| @click.pass_context  | @click.pass_context  | ||||||
| def get_tweet(ctx, user): | def timeline(ctx, user): | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     c = wq.next() |     t = utils.user_timeline(wq, user) | ||||||
|     t = utils.user_timeline(c.client, user) |  | ||||||
|     print(json.dumps(t, indent=2)) |     print(json.dumps(t, indent=2)) | ||||||
|  |  | ||||||
| @main.group() | @main.group() | ||||||
| @@ -84,23 +94,47 @@ def list_users(ctx, db): | |||||||
|         for j in i.__dict__: |         for j in i.__dict__: | ||||||
|             print('\t{}: {}'.format(j, getattr(i,j))) |             print('\t{}: {}'.format(j, getattr(i,j))) | ||||||
|  |  | ||||||
| @users.command('get_one') |  | ||||||
| @click.argument('user') |  | ||||||
| @click.pass_context  |  | ||||||
| def get_user(ctx, user): |  | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |  | ||||||
|     c = wq.next() |  | ||||||
|     u = utils.get_user(c.client, user) |  | ||||||
|     print(json.dumps(u, indent=2)) |  | ||||||
|  |  | ||||||
| @users.command('get') | @users.command('get') | ||||||
|  | @click.argument('user') | ||||||
|  | @click.option('-w', '--write', is_flag=True, default=False) | ||||||
|  | @click.option('-f', '--folder', default="users") | ||||||
|  | @click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False) | ||||||
|  | def get_user(user, write, folder, update): | ||||||
|  |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|  |     if not write: | ||||||
|  |         u = utils.get_user(wq, user) | ||||||
|  |         js = json.dumps(u, indent=2) | ||||||
|  |         print(js) | ||||||
|  |         return | ||||||
|  |     if not os.path.exists(folder): | ||||||
|  |         os.makedirs(folder) | ||||||
|  |     file = os.path.join(folder, '%s.json' % user) | ||||||
|  |     if not update and os.path.exists(file) and os.path.isfile(file): | ||||||
|  |         print('User exists: %s' % user) | ||||||
|  |         return | ||||||
|  |     with open(file, 'w') as f: | ||||||
|  |         u = utils.get_user(wq, user) | ||||||
|  |         js = json.dumps(u, indent=2) | ||||||
|  |         print(js, file=f) | ||||||
|  |  | ||||||
|  | @users.command('get_all') | ||||||
|  | @click.argument('usersfile', 'File with a list of users to look up') | ||||||
|  | @click.option('-f', '--folder', default="users") | ||||||
|  | @click.pass_context | ||||||
|  | def get_users(ctx, usersfile, folder): | ||||||
|  |     with open(usersfile) as f: | ||||||
|  |         for line in f: | ||||||
|  |             uid = line.strip() | ||||||
|  |             ctx.invoke(get_user, folder=folder, user=uid, write=True) | ||||||
|  |  | ||||||
|  | @users.command('crawl') | ||||||
| @click.option('--db', required=True, help='Database to save all users.') | @click.option('--db', required=True, help='Database to save all users.') | ||||||
| @click.option('--skip', required=False, default=0, help='Skip N lines from the file.') | @click.option('--skip', required=False, default=0, help='Skip N lines from the file.') | ||||||
| @click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.') | @click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.') | ||||||
| @click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.') | @click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.') | ||||||
| @click.argument('usersfile', 'File with a list of users to look up') | @click.argument('usersfile', 'File with a list of users to look up') | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def get_users(ctx, usersfile, skip, until, threads, db): | def crawl_users(ctx, usersfile, skip, until, threads, db): | ||||||
|     global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock |     global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock | ||||||
|  |  | ||||||
|     if '://' not in db: |     if '://' not in db: | ||||||
| @@ -112,7 +146,7 @@ def get_users(ctx, usersfile, skip, until, threads, db): | |||||||
|             return ExitStack() |             return ExitStack() | ||||||
|  |  | ||||||
|  |  | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, |     logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, | ||||||
|                                                                                       len(wq.queue))) |                                                                                       len(wq.queue))) | ||||||
|  |  | ||||||
| @@ -206,11 +240,6 @@ def get_users(ctx, usersfile, skip, until, threads, db): | |||||||
|              |              | ||||||
|     logger.info('Done!') |     logger.info('Done!') | ||||||
|  |  | ||||||
| @main.group('api') |  | ||||||
| def api(): |  | ||||||
|     pass |  | ||||||
|  |  | ||||||
|  |  | ||||||
| @main.group('extractor') | @main.group('extractor') | ||||||
| @click.pass_context | @click.pass_context | ||||||
| @click.option('--db', required=True, help='Database of users.') | @click.option('--db', required=True, help='Database of users.') | ||||||
| @@ -281,7 +310,7 @@ def users_extractor(ctx): | |||||||
| @click.pass_context | @click.pass_context | ||||||
| def extract(ctx, recursive, user, name, initfile): | def extract(ctx, recursive, user, name, initfile): | ||||||
|     print(locals()) |     print(locals()) | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     dburi = ctx.obj['DBURI'] |     dburi = ctx.obj['DBURI'] | ||||||
|     utils.extract(wq, |     utils.extract(wq, | ||||||
|                   recursive=recursive, |                   recursive=recursive, | ||||||
| @@ -293,16 +322,16 @@ def extract(ctx, recursive, user, name, initfile): | |||||||
| @extractor.command('reset') | @extractor.command('reset') | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def reset_extractor(ctx): | def reset_extractor(ctx): | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     db = ctx.obj['DBURI'] |     db = ctx.obj['DBURI'] | ||||||
|     session = make_session(db) |     session = make_session(db) | ||||||
|     session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) |     session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) | ||||||
|  |  | ||||||
| @api.command('limits') | @main.command('limits') | ||||||
| @click.argument('url', required=False) | @click.argument('url', required=False) | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def get_limits(ctx, url): | def get_limits(ctx, url): | ||||||
|     wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS']) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     for worker in wq.queue: |     for worker in wq.queue: | ||||||
|         resp = worker.client.application.rate_limit_status() |         resp = worker.client.application.rate_limit_status() | ||||||
|         print('#'*20) |         print('#'*20) | ||||||
| @@ -319,16 +348,106 @@ def get_limits(ctx, url): | |||||||
|         else: |         else: | ||||||
|             print(json.dumps(resp, indent=2)) |             print(json.dumps(resp, indent=2)) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False)) | ||||||
|  | @click.argument('cmd', nargs=1) | ||||||
|  | @click.argument('api_args', nargs=-1, type=click.UNPROCESSED) | ||||||
|  | @click.pass_context | ||||||
|  | def api(ctx, cmd, api_args): | ||||||
|  |     opts = {} | ||||||
|  |     i = iter(api_args) | ||||||
|  |     for k, v in zip(i, i): | ||||||
|  |         k = k.replace('--', '') | ||||||
|  |         opts[k] = v | ||||||
|  |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|  |     resp = utils.consume_feed(wq[cmd], **opts) | ||||||
|  |     # A hack to stream jsons | ||||||
|  |     print('[') | ||||||
|  |     first = True | ||||||
|  |     for i in resp: | ||||||
|  |         if not first: | ||||||
|  |             print(',') | ||||||
|  |         else: | ||||||
|  |             first = False | ||||||
|  |          | ||||||
|  |         print(json.dumps(i, indent=2)) | ||||||
|  |     print(']') | ||||||
|  |  | ||||||
|  |  | ||||||
| @main.command('server') | @main.command('server') | ||||||
| @click.argument('CONSUMER_KEY', required=True) | @click.argument('CONSUMER_KEY', required=True) | ||||||
| @click.argument('CONSUMER_SECRET', required=True) | @click.argument('CONSUMER_SECRET', required=True) | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def run_server(ctx, consumer_key, consumer_secret): | def run_server(ctx, consumer_key, consumer_secret): | ||||||
|     from . import config |     bconf.CONSUMER_KEY = consumer_key | ||||||
|     config.CONSUMER_KEY = consumer_key |     bconf.CONSUMER_SECRET = consumer_secret | ||||||
|     config.CONSUMER_SECRET = consumer_secret |  | ||||||
|     from .webserver import app |     from .webserver import app | ||||||
|     app.run() |     app.run(host='0.0.0.0') | ||||||
|  |      | ||||||
|  | @main.group() | ||||||
|  | @click.pass_context  | ||||||
|  | def stream(ctx): | ||||||
|  |     pass | ||||||
|  |  | ||||||
|  | @stream.command('get') | ||||||
|  | @click.option('-l', '--locations', default=None) | ||||||
|  | @click.option('-t', '--track', default=None) | ||||||
|  | @click.option('-f', '--file', help='File to store the stream of tweets') | ||||||
|  | @click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True) | ||||||
|  | @click.pass_context  | ||||||
|  | def get_stream(ctx, locations, track, file, politelyretry): | ||||||
|  |     wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1) | ||||||
|  |  | ||||||
|  |     query_args = {} | ||||||
|  |     if locations: | ||||||
|  |         query_args['locations'] = locations | ||||||
|  |     if track: | ||||||
|  |         query_args['track'] = track | ||||||
|  |     if not file: | ||||||
|  |         file = sys.stdout | ||||||
|  |     else: | ||||||
|  |         file = open(file, 'a') | ||||||
|  |  | ||||||
|  |     def insist(): | ||||||
|  |         lasthangup = time.time() | ||||||
|  |         while True: | ||||||
|  |             if not query_args: | ||||||
|  |                 iterator = wq.statuses.sample() | ||||||
|  |             else: | ||||||
|  |                 iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75") | ||||||
|  |             for i in iterator: | ||||||
|  |                 yield i | ||||||
|  |             if not politelyretry: | ||||||
|  |                 return | ||||||
|  |             thishangup = time.time() | ||||||
|  |             if thishangup - lasthangup < 60: | ||||||
|  |                 raise Exception('Too many hangups in a row.') | ||||||
|  |             time.sleep(3) | ||||||
|  |  | ||||||
|  |     for tweet in tqdm(insist()): | ||||||
|  |         print(json.dumps(tweet), file=file) | ||||||
|  |     if file != sys.stdout: | ||||||
|  |         file.close() | ||||||
|  |  | ||||||
|  | @stream.command('read') | ||||||
|  | @click.option('-f', '--file', help='File to read the stream of tweets from', required=True) | ||||||
|  | @click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False) | ||||||
|  | @click.pass_context  | ||||||
|  | def read_stream(ctx, file, tail): | ||||||
|  |     for tweet in utils.read_file(file, tail=tail): | ||||||
|  |         try: | ||||||
|  |             print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text'])) | ||||||
|  |         except (KeyError, TypeError): | ||||||
|  |             print('Raw tweet: {}'.format(tweet)) | ||||||
|  |  | ||||||
|  | @stream.command('tags') | ||||||
|  | @click.option('-f', '--file', help='File to read the stream of tweets from', required=True) | ||||||
|  | @click.argument('limit', required=False, default=None, type=int) | ||||||
|  | @click.pass_context  | ||||||
|  | def tags_stream(ctx, file, limit): | ||||||
|  |     c = utils.get_hashtags(utils.read_file(file)) | ||||||
|  |     for count, tag in c.most_common(limit): | ||||||
|  |         print(u'{} - {}'.format(count, tag)) | ||||||
|      |      | ||||||
|  |  | ||||||
| if __name__ == '__main__': | if __name__ == '__main__': | ||||||
|   | |||||||
| @@ -9,6 +9,13 @@ logger = logging.getLogger(__name__) | |||||||
| from twitter import * | from twitter import * | ||||||
| from collections import OrderedDict | from collections import OrderedDict | ||||||
| from threading import Lock | from threading import Lock | ||||||
|  | from itertools import islice | ||||||
|  | from functools import partial | ||||||
|  | try: | ||||||
|  |     import itertools.ifilter as filter | ||||||
|  | except ImportError: | ||||||
|  |     pass | ||||||
|  |  | ||||||
| from . import utils | from . import utils | ||||||
| from . import config | from . import config | ||||||
|  |  | ||||||
| @@ -32,35 +39,110 @@ class AttrToFunc(object): | |||||||
|         else: |         else: | ||||||
|             return extend_call(k) |             return extend_call(k) | ||||||
|  |  | ||||||
|  |     def __getitem__(self, k): | ||||||
|  |         return partial(self.handler, self.__uriparts+k.split('/')) | ||||||
|  |  | ||||||
|     def __call__(self, *args, **kwargs): |     def __call__(self, *args, **kwargs): | ||||||
|         # for i, a in enumerate(args)e |         # for i, a in enumerate(args)e | ||||||
|         #     kwargs[i] = a |         #     kwargs[i] = a | ||||||
|         return self.handler(self.__uriparts, *args, **kwargs) |         return self.handler(self.__uriparts, *args, **kwargs) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class FromCredentialsMixin(object): | ||||||
|  |  | ||||||
|  |     @classmethod | ||||||
|  |     def from_credentials(cls, cred_file=None, max_workers=None): | ||||||
|  |         wq = cls() | ||||||
|  |  | ||||||
|  |         for cred in islice(utils.get_credentials(cred_file), max_workers): | ||||||
|  |             wq.ready(cls.worker_class(cred["user"], cred)) | ||||||
|  |         return wq | ||||||
|  |      | ||||||
|  |  | ||||||
| class TwitterWorker(object): | class TwitterWorker(object): | ||||||
|     def __init__(self, name, client): |     api_class = None | ||||||
|  |  | ||||||
|  |     def __init__(self, name, creds): | ||||||
|         self.name = name |         self.name = name | ||||||
|         self.client = client |         self._client = None | ||||||
|         self.throttled_time = False |         self.cred = creds | ||||||
|         self._lock = Lock() |         self._lock = Lock() | ||||||
|         self.busy = False |         self.busy = False | ||||||
|  |  | ||||||
|     @property |     @property | ||||||
|     def throttled(self): |     def client(self): | ||||||
|         if not self.throttled_time: |         if not self._client: | ||||||
|             return False |             auth=OAuth(self.cred['token_key'], | ||||||
|         t = time.time() |                        self.cred['token_secret'], | ||||||
|         delta = self.throttled_time - t |                        self.cred['consumer_key'], | ||||||
|         if delta > 0: |                        self.cred['consumer_secret']) | ||||||
|             return True |             self._client = self.api_class(auth=auth) | ||||||
|         return False |         return self._client | ||||||
|  |  | ||||||
|     def throttle_until(self, epoch=None): |     def __repr__(self): | ||||||
|         self.throttled_time = int(epoch) |         msg = '<{} for {}>'.format(self.__class__.__name__, self.name) | ||||||
|         logger.info("Worker %s throttled for %s seconds" % (self.name, str(epoch-time.time()))) |         if self.busy: | ||||||
|  |             msg += ' [busy]' | ||||||
|  |         return msg | ||||||
|  |  | ||||||
|  | class RestWorker(TwitterWorker): | ||||||
|  |     api_class = Twitter | ||||||
|  |  | ||||||
|  |     def __init__(self, *args, **kwargs): | ||||||
|  |         super(RestWorker, self).__init__(*args, **kwargs) | ||||||
|  |         self._limits = None | ||||||
|  |  | ||||||
|  |     @property | ||||||
|  |     def limits(self): | ||||||
|  |         if not self._limits: | ||||||
|  |             self._limits = self.client.application.rate_limit_status() | ||||||
|  |         return self._limits | ||||||
|  |  | ||||||
|  |     def is_limited(self, uriparts): | ||||||
|  |         return self.get_wait(uriparts)>0 | ||||||
|  |  | ||||||
|  |     def get_wait(self, uriparts): | ||||||
|  |         limits = self.get_limit(uriparts) | ||||||
|  |         if limits['remaining'] > 0: | ||||||
|  |             return 0 | ||||||
|  |         reset = limits.get('reset', 0) | ||||||
|  |         now = time.time() | ||||||
|  |         return max(0, (reset-now)) | ||||||
|  |  | ||||||
|  |     def get_limit(self, uriparts): | ||||||
|  |         uri = '/'+'/'.join(uriparts) | ||||||
|  |         for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items(): | ||||||
|  |             if ix.startswith(uri): | ||||||
|  |                 return i | ||||||
|  |         return {} | ||||||
|  |  | ||||||
|  |     def set_limit(self, uriparts, value): | ||||||
|  |         uri = '/'+'/'.join(uriparts) | ||||||
|  |         if 'resources' not in self.limits: | ||||||
|  |             self.limits['resources'] = {} | ||||||
|  |         resources = self.limits['resources'] | ||||||
|  |         if uriparts[0] not in resources: | ||||||
|  |             resources[uriparts[0]] = {} | ||||||
|  |         resource = resources[uriparts[0]] | ||||||
|  |         resource[uri] = value | ||||||
|  |  | ||||||
|  |     def update_limits(self, uriparts, remaining, reset, limit): | ||||||
|  |         self.set_limit(uriparts, {'remaining': remaining, | ||||||
|  |                                   'reset': reset, | ||||||
|  |                                   'limit': limit}) | ||||||
|  |          | ||||||
|  |     def update_limits_from_headers(self, uriparts, headers): | ||||||
|  |         reset = float(headers.get('X-Rate-Limit-Reset', time.time() + 30)) | ||||||
|  |         remaining = int(headers.get('X-Rate-Limit-Remaining', 0)) | ||||||
|  |         limit = int(headers.get('X-Rate-Limit-Limit', -1)) | ||||||
|  |         self.update_limits(uriparts=uriparts, remaining=remaining, reset=reset, limit=limit) | ||||||
|  |  | ||||||
|  |  | ||||||
| class TwitterQueue(AttrToFunc): |  | ||||||
|  | class QueueException(BaseException): | ||||||
|  |     pass | ||||||
|  |  | ||||||
|  | class QueueMixin(AttrToFunc, FromCredentialsMixin): | ||||||
|     def __init__(self, wait=True): |     def __init__(self, wait=True): | ||||||
|         logger.debug('Creating worker queue') |         logger.debug('Creating worker queue') | ||||||
|         self.queue = set() |         self.queue = set() | ||||||
| @@ -71,77 +153,119 @@ class TwitterQueue(AttrToFunc): | |||||||
|     def ready(self, worker): |     def ready(self, worker): | ||||||
|         self.queue.add(worker) |         self.queue.add(worker) | ||||||
|  |  | ||||||
|  | class TwitterQueue(QueueMixin): | ||||||
|  |  | ||||||
|  |     worker_class = RestWorker | ||||||
|  |  | ||||||
|     def handle_call(self, uriparts, *args, **kwargs): |     def handle_call(self, uriparts, *args, **kwargs): | ||||||
|         logger.debug('Called: {}'.format(uriparts)) |         logger.debug('Called: {}'.format(uriparts)) | ||||||
|         logger.debug('With: {} {}'.format(args, kwargs)) |         logger.debug('With: {} {}'.format(args, kwargs)) | ||||||
|         while True: |         patience = 1 | ||||||
|  |         while patience: | ||||||
|             c = None |             c = None | ||||||
|             try: |             try: | ||||||
|                 c = self.next() |                 c = self.next(uriparts) | ||||||
|                 c._lock.acquire() |                 c._lock.acquire() | ||||||
|                 c.busy = True |                 c.busy = True | ||||||
|                 logger.debug('Next: {}'.format(c.name)) |                 logger.debug('Next: {}'.format(c.name)) | ||||||
|                 ping = time.time() |                 ping = time.time() | ||||||
|                 resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs) |                 resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs) | ||||||
|                 pong = time.time() |                 pong = time.time() | ||||||
|  |                 c.update_limits_from_headers(uriparts, resp.headers) | ||||||
|                 logger.debug('Took: {}'.format(pong-ping)) |                 logger.debug('Took: {}'.format(pong-ping)) | ||||||
|                 return resp |                 return resp | ||||||
|             except TwitterHTTPError as ex: |             except TwitterHTTPError as ex: | ||||||
|                 if ex.e.code in (429, 502, 503, 504): |                 if ex.e.code in (429, 502, 503, 504): | ||||||
|                     limit = ex.e.headers.get('X-Rate-Limit-Reset', time.time() + 30) |  | ||||||
|                     logger.info('{} limited'.format(c.name)) |                     logger.info('{} limited'.format(c.name)) | ||||||
|                     c.throttle_until(limit) |                     c.update_limits_from_headers(uriparts, ex.e.headers) | ||||||
|                     continue |                     continue | ||||||
|                 else: |                 else: | ||||||
|                     raise |                     raise | ||||||
|             except urllib.error.URLError as ex: |             except urllib.error.URLError as ex: | ||||||
|                 time.sleep(5) |                 time.sleep(5) | ||||||
|                 logger.info('Something fishy happened: {}'.format(ex))                 |                 logger.info('Something fishy happened: {}'.format(ex))                 | ||||||
|  |                 raise | ||||||
|             finally: |             finally: | ||||||
|                 if c: |                 if c: | ||||||
|                     c.busy = False |                     c.busy = False | ||||||
|                     c._lock.release() |                     c._lock.release() | ||||||
|                      |                 if not self.wait: | ||||||
|  |                     patience -= 1 | ||||||
|  |  | ||||||
|     @property |     def get_wait(self, uriparts): | ||||||
|     def client(self): |         # Stop as soon as one is available to avoid initiating the rest | ||||||
|         return self.next().client |         for i in self.queue: | ||||||
|  |             if not i.busy and i.get_wait(uriparts) == 0: | ||||||
|  |                 return 0 | ||||||
|  |         # If None is available, let's see how much we have to wait | ||||||
|  |         available = filter(lambda x: not x.busy, self.queue) | ||||||
|  |         diff = min(worker.get_wait(uriparts) for worker in self.queue if not worker.busy) | ||||||
|  |         return diff | ||||||
|  |          | ||||||
|  |  | ||||||
|     @classmethod |     def _next(self, uriparts): | ||||||
|     def from_credentials(self, cred_file=None): |  | ||||||
|         wq = TwitterQueue() |  | ||||||
|  |  | ||||||
|         for cred in utils.get_credentials(cred_file): |  | ||||||
|             c = Twitter(auth=OAuth(cred['token_key'], |  | ||||||
|                                    cred['token_secret'], |  | ||||||
|                                    cred['consumer_key'], |  | ||||||
|                                    cred['consumer_secret'])) |  | ||||||
|             wq.ready(TwitterWorker(cred["user"], c)) |  | ||||||
|         return wq |  | ||||||
|  |  | ||||||
|     def _next(self): |  | ||||||
|         logger.debug('Getting next available') |         logger.debug('Getting next available') | ||||||
|         s = list(self.queue) |         s = list(self.queue) | ||||||
|         random.shuffle(s) |         random.shuffle(s) | ||||||
|         for worker in s: |         for worker in s: | ||||||
|             if not worker.throttled and not worker.busy: |             if not worker.is_limited(uriparts) and not worker.busy: | ||||||
|                 return worker |                 return worker | ||||||
|         raise Exception('No worker is available') |         raise QueueException('No worker is available') | ||||||
|  |  | ||||||
|     def next(self): |     def next(self, uriparts): | ||||||
|         if not self.wait: |         if not self.wait: | ||||||
|             return self._next() |             return self._next(uriparts) | ||||||
|         while True: |         while True: | ||||||
|             try: |             try: | ||||||
|                 return self._next() |                 return self._next(uriparts) | ||||||
|             except Exception: |             except QueueException: | ||||||
|                 available = filter(lambda x: not x.busy, self.queue) |                 available = filter(lambda x: not x.busy, self.queue) | ||||||
|                 if available: |                 if available: | ||||||
|                     first_worker = min(available, key=lambda x: x.throttled_time) |                     diff = self.get_wait(uriparts) | ||||||
|                     diff = first_worker.throttled_time - time.time() |  | ||||||
|                     logger.info("All workers are throttled. Waiting %s seconds" % diff) |                     logger.info("All workers are throttled. Waiting %s seconds" % diff) | ||||||
|                 else: |                 else: | ||||||
|                     diff = 5 |                     diff = 5 | ||||||
|                     logger.info("All workers are busy. Waiting %s seconds" % diff) |                     logger.info("All workers are busy. Waiting %s seconds" % diff) | ||||||
|                 time.sleep(diff) |                 time.sleep(diff) | ||||||
|  |  | ||||||
|  | class StreamWorker(TwitterWorker): | ||||||
|  |     api_class = TwitterStream | ||||||
|  |  | ||||||
|  |     def __init__(self, *args, **kwargs): | ||||||
|  |         super(StreamWorker, self).__init__(*args, **kwargs) | ||||||
|  |  | ||||||
|  | class StreamQueue(QueueMixin): | ||||||
|  |     worker_class = StreamWorker | ||||||
|  |  | ||||||
|  |     def __init__(self, wait=True): | ||||||
|  |         logger.debug('Creating worker queue') | ||||||
|  |         self.queue = set() | ||||||
|  |         self.index = 0 | ||||||
|  |         self.wait = wait | ||||||
|  |         AttrToFunc.__init__(self, handler=self.handle_call) | ||||||
|  |  | ||||||
|  |     def handle_call(self, uriparts, *args, **kwargs): | ||||||
|  |         logger.debug('Called: {}'.format(uriparts)) | ||||||
|  |         logger.debug('With: {} {}'.format(args, kwargs)) | ||||||
|  |         c = None | ||||||
|  |         c = self.next(uriparts) | ||||||
|  |         c._lock.acquire() | ||||||
|  |         c.busy = True | ||||||
|  |         logger.debug('Next: {}'.format(c.name)) | ||||||
|  |         ping = time.time() | ||||||
|  |         resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs) | ||||||
|  |         for i in resp: | ||||||
|  |             yield i | ||||||
|  |         pong = time.time() | ||||||
|  |         logger.debug('Listening for: {}'.format(pong-ping)) | ||||||
|  |         c.busy = False | ||||||
|  |         c._lock.release() | ||||||
|  |  | ||||||
|  |     def next(self, uriparts): | ||||||
|  |         logger.debug('Getting next available') | ||||||
|  |         s = list(self.queue) | ||||||
|  |         random.shuffle(s) | ||||||
|  |         for worker in s: | ||||||
|  |             if not worker.busy: | ||||||
|  |                 return worker | ||||||
|  |         raise QueueException('No worker is available') | ||||||
|   | |||||||
| @@ -3,6 +3,7 @@ import json | |||||||
|  |  | ||||||
| from sqlalchemy.ext.declarative import declarative_base | from sqlalchemy.ext.declarative import declarative_base | ||||||
| from sqlalchemy.types import BigInteger, Integer, Text, Boolean | from sqlalchemy.types import BigInteger, Integer, Text, Boolean | ||||||
|  | from sqlalchemy.pool import SingletonThreadPool | ||||||
| from sqlalchemy import Column, Index | from sqlalchemy import Column, Index | ||||||
|  |  | ||||||
| from sqlalchemy import create_engine | from sqlalchemy import create_engine | ||||||
| @@ -85,15 +86,19 @@ class ExtractorEntry(Base): | |||||||
|     user = Column(BigInteger, index=True) |     user = Column(BigInteger, index=True) | ||||||
|     cursor = Column(BigInteger, default=-1) |     cursor = Column(BigInteger, default=-1) | ||||||
|     pending = Column(Boolean, default=False) |     pending = Column(Boolean, default=False) | ||||||
|  |     errors = Column(Text, default="") | ||||||
|  |     busy = Column(Boolean, default=False) | ||||||
|  |  | ||||||
|  |  | ||||||
| def make_session(url): | def make_session(url): | ||||||
|     engine = create_engine(url)#, echo=True) |     if not isinstance(url, str): | ||||||
|  |         print(url) | ||||||
|  |         raise Exception("FUCK") | ||||||
|  |     engine = create_engine(url, poolclass=SingletonThreadPool)#, echo=True) | ||||||
|     Base.metadata.create_all(engine) |     Base.metadata.create_all(engine) | ||||||
|     Session = sessionmaker(bind=engine) |     Session = sessionmaker(bind=engine) | ||||||
|     session = Session() |     session = Session() | ||||||
|     return session |     return session | ||||||
|      |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def test(db='sqlite:///users.db'): | def test(db='sqlite:///users.db'): | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										465
									
								
								bitter/utils.py
									
									
									
									
									
								
							
							
						
						
									
										465
									
								
								bitter/utils.py
									
									
									
									
									
								
							| @@ -1,3 +1,5 @@ | |||||||
|  | from __future__ import print_function | ||||||
|  |  | ||||||
| import logging | import logging | ||||||
| import time | import time | ||||||
| import json | import json | ||||||
| @@ -9,9 +11,16 @@ import os | |||||||
| import multiprocessing | import multiprocessing | ||||||
| from multiprocessing.pool import ThreadPool | from multiprocessing.pool import ThreadPool | ||||||
|  |  | ||||||
| from itertools import islice | from functools import partial | ||||||
|  |  | ||||||
|  | from tqdm import tqdm | ||||||
|  |  | ||||||
|  | from itertools import islice, chain | ||||||
| from contextlib import contextmanager | from contextlib import contextmanager | ||||||
| from itertools import zip_longest |  | ||||||
|  | from collections import Counter | ||||||
|  |  | ||||||
|  | from builtins import map, filter | ||||||
|  |  | ||||||
| from twitter import TwitterHTTPError | from twitter import TwitterHTTPError | ||||||
|  |  | ||||||
| @@ -26,17 +35,20 @@ def signal_handler(signal, frame): | |||||||
|     logger.info('You pressed Ctrl+C!') |     logger.info('You pressed Ctrl+C!') | ||||||
|     sys.exit(0) |     sys.exit(0) | ||||||
|  |  | ||||||
| def chunk(iterable, n, fillvalue=None): |  | ||||||
|     args = [iter(iterable)] * n |  | ||||||
|     return zip_longest(*args, fillvalue=fillvalue) |  | ||||||
|  |  | ||||||
| def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()): | def chunk(iterable, n): | ||||||
|     if chunksize: |     it = iter(iterable) | ||||||
|         source = chunk(source, chunksize) |     return iter(lambda: tuple(islice(it, n)), ()) | ||||||
|     p = ThreadPool(numcpus) |  | ||||||
|     for i in p.imap(func, source): |  | ||||||
|  | def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): | ||||||
|  |     source = chunk(source, chunksize) | ||||||
|  |     p = ThreadPool(numcpus*2) | ||||||
|  |     results = p.imap_unordered(func, source, chunksize=int(1000/numcpus)) | ||||||
|  |     for i in chain.from_iterable(results): | ||||||
|         yield i |         yield i | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_credentials_path(credfile=None): | def get_credentials_path(credfile=None): | ||||||
|     if not credfile: |     if not credfile: | ||||||
|         if config.CREDENTIALS: |         if config.CREDENTIALS: | ||||||
| @@ -45,17 +57,20 @@ def get_credentials_path(credfile=None): | |||||||
|             raise Exception('No valid credentials file') |             raise Exception('No valid credentials file') | ||||||
|     return os.path.expanduser(credfile) |     return os.path.expanduser(credfile) | ||||||
|  |  | ||||||
|  |  | ||||||
| @contextmanager | @contextmanager | ||||||
| def credentials_file(credfile, *args, **kwargs): | def credentials_file(credfile, *args, **kwargs): | ||||||
|     p = get_credentials_path(credfile) |     p = get_credentials_path(credfile) | ||||||
|     with open(p, *args, **kwargs) as f: |     with open(p, *args, **kwargs) as f: | ||||||
|         yield f |         yield f | ||||||
|  |  | ||||||
|  |  | ||||||
| def iter_credentials(credfile=None): | def iter_credentials(credfile=None): | ||||||
|     with credentials_file(credfile) as f: |     with credentials_file(credfile) as f: | ||||||
|         for l in f: |         for l in f: | ||||||
|             yield json.loads(l.strip()) |             yield json.loads(l.strip()) | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_credentials(credfile=None, inverse=False, **kwargs): | def get_credentials(credfile=None, inverse=False, **kwargs): | ||||||
|     creds = [] |     creds = [] | ||||||
|     for i in iter_credentials(credfile): |     for i in iter_credentials(credfile): | ||||||
| @@ -66,11 +81,13 @@ def get_credentials(credfile=None, inverse=False, **kwargs): | |||||||
|             creds.append(i) |             creds.append(i) | ||||||
|     return creds |     return creds | ||||||
|  |  | ||||||
|  |  | ||||||
| def create_credentials(credfile=None): | def create_credentials(credfile=None): | ||||||
|     credfile = get_credentials_path(credfile) |     credfile = get_credentials_path(credfile) | ||||||
|     with credentials_file(credfile, 'a'): |     with credentials_file(credfile, 'a'): | ||||||
|         pass |         pass | ||||||
|  |  | ||||||
|  |      | ||||||
| def delete_credentials(credfile=None, **creds): | def delete_credentials(credfile=None, **creds): | ||||||
|     tokeep = get_credentials(credfile, inverse=True, **creds) |     tokeep = get_credentials(credfile, inverse=True, **creds) | ||||||
|     with credentials_file(credfile, 'w') as f: |     with credentials_file(credfile, 'w') as f: | ||||||
| @@ -78,6 +95,7 @@ def delete_credentials(credfile=None, **creds): | |||||||
|             f.write(json.dumps(i)) |             f.write(json.dumps(i)) | ||||||
|             f.write('\n') |             f.write('\n') | ||||||
|  |  | ||||||
|  |  | ||||||
| def add_credentials(credfile=None, **creds): | def add_credentials(credfile=None, **creds): | ||||||
|     exist = get_credentials(credfile, **creds) |     exist = get_credentials(credfile, **creds) | ||||||
|     if not exist: |     if not exist: | ||||||
| @@ -86,6 +104,27 @@ def add_credentials(credfile=None, **creds): | |||||||
|             f.write('\n') |             f.write('\n') | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def get_hashtags(iter_tweets, best=None): | ||||||
|  |     c = Counter() | ||||||
|  |     for tweet in iter_tweets: | ||||||
|  |         c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {})) | ||||||
|  |     return c | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def read_file(filename, tail=False): | ||||||
|  |     with open(filename) as f: | ||||||
|  |         while True: | ||||||
|  |             line = f.readline() | ||||||
|  |             if line not in (None, '', '\n'): | ||||||
|  |                 tweet = json.loads(line.strip()) | ||||||
|  |                 yield tweet | ||||||
|  |             else: | ||||||
|  |                 if tail: | ||||||
|  |                     time.sleep(1) | ||||||
|  |                 else: | ||||||
|  |                     return | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_users(wq, ulist, by_name=False, queue=None, max_users=100): | def get_users(wq, ulist, by_name=False, queue=None, max_users=100): | ||||||
|     t = 'name' if by_name else 'uid' |     t = 'name' if by_name else 'uid' | ||||||
|     logger.debug('Getting users by {}: {}'.format(t, ulist)) |     logger.debug('Getting users by {}: {}'.format(t, ulist)) | ||||||
| @@ -113,6 +152,7 @@ def get_users(wq, ulist, by_name=False, queue=None, max_users=100): | |||||||
|             else: |             else: | ||||||
|                 yield user |                 yield user | ||||||
|  |  | ||||||
|  |  | ||||||
| def trim_user(user): | def trim_user(user): | ||||||
|     if 'status' in user: |     if 'status' in user: | ||||||
|         del user['status'] |         del user['status'] | ||||||
| @@ -126,142 +166,218 @@ def trim_user(user): | |||||||
|     return user |     return user | ||||||
|  |  | ||||||
|  |  | ||||||
| def add_user(session, user, enqueue=False): | def add_user(user, dburi=None, session=None, update=False): | ||||||
|  |     if not session: | ||||||
|  |         session = make_session(dburi) | ||||||
|  |  | ||||||
|     user = trim_user(user) |     user = trim_user(user) | ||||||
|     olduser = session.query(User).filter(User.id==user['id']) |     olduser = session.query(User).filter(User.id == user['id']) | ||||||
|     if olduser: |     if olduser: | ||||||
|  |         if not update: | ||||||
|  |             return | ||||||
|         olduser.delete() |         olduser.delete() | ||||||
|     user = User(**user) |     nuser = User() | ||||||
|     session.add(user) |     for key, value in user.items(): | ||||||
|     if extract: |         setattr(nuser, key, value) | ||||||
|         logging.debug('Adding entry') |     user = nuser | ||||||
|  |     if update: | ||||||
|  |         session.add(user) | ||||||
|  |         logger.debug('Adding entry') | ||||||
|         entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() |         entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() | ||||||
|         if not entry: |         if not entry: | ||||||
|             entry = ExtractorEntry(user=user.id) |             entry = ExtractorEntry(user=user.id) | ||||||
|             session.add(entry) |             session.add(entry) | ||||||
|         logging.debug(entry.pending) |         logger.debug(entry.pending) | ||||||
|         entry.pending = True |         entry.pending = True | ||||||
|         entry.cursor = -1 |         entry.cursor = -1 | ||||||
|         session.commit() |         session.commit() | ||||||
|  |     session.close() | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def download_entry(wq, entry_id, dburi=None, recursive=False): | ||||||
|  |     session = make_session(dburi) | ||||||
|  |     if not session: | ||||||
|  |         raise Exception("Provide dburi or session") | ||||||
|  |     logger.info("Downloading entry: %s (%s)" % (entry_id, type(entry_id))) | ||||||
|  |     entry = session.query(ExtractorEntry).filter(ExtractorEntry.id==entry_id).first() | ||||||
|  |     user = session.query(User).filter(User.id == entry.user).first() | ||||||
|  |     download_user(wq, session, user, entry, recursive) | ||||||
|  |     session.close() | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000): | ||||||
|  |  | ||||||
|  |     total_followers = user.followers_count | ||||||
|  |  | ||||||
|  |     if total_followers > max_followers: | ||||||
|  |         entry.pending = False | ||||||
|  |         logger.info("Too many followers for user: %s" % user.screen_name) | ||||||
|  |         session.add(entry) | ||||||
|  |         session.commit() | ||||||
|  |         return | ||||||
|  |  | ||||||
|  |     if not entry: | ||||||
|  |         entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() or ExtractorEntry(user=user.id) | ||||||
|  |     session.add(entry) | ||||||
|  |     session.commit() | ||||||
|  |  | ||||||
|  |     pending = True | ||||||
|  |     cursor = entry.cursor | ||||||
|  |     uid = user.id | ||||||
|  |     name = user.name | ||||||
|  |  | ||||||
|  |     logger.info("#"*20) | ||||||
|  |     logger.info("Getting %s - %s" % (uid, name)) | ||||||
|  |     logger.info("Cursor %s" % cursor) | ||||||
|  |     logger.info("Using account: %s" % wq.name) | ||||||
|  |  | ||||||
|  |     _fetched_followers = 0 | ||||||
|  |  | ||||||
|  |     def fetched_followers(): | ||||||
|  |         return session.query(Following).filter(Following.isfollowed==uid).count() | ||||||
|  |  | ||||||
|  |     attempts = 0 | ||||||
|  |     while cursor > 0 or fetched_followers() < total_followers: | ||||||
|  |         try: | ||||||
|  |             resp = wq.followers.ids(user_id=uid, cursor=cursor) | ||||||
|  |         except TwitterHTTPError as ex: | ||||||
|  |             attempts += 1 | ||||||
|  |             if ex.e.code in (401, ) or attempts > 3: | ||||||
|  |                 logger.info('Not authorized for user: {}'.format(uid)) | ||||||
|  |                 entry.errors = ex.message | ||||||
|  |                 break | ||||||
|  |         if 'ids' not in resp: | ||||||
|  |             logger.info("Error with id %s %s" % (uid, resp)) | ||||||
|  |             entry.pending = False | ||||||
|  |             entry.errors = "No ids in response: %s" % resp | ||||||
|  |             break | ||||||
|  |  | ||||||
|  |         logger.info("New followers: %s" % len(resp['ids'])) | ||||||
|  |         if recursive: | ||||||
|  |             newusers = get_users(wq, resp) | ||||||
|  |             for newuser in newusers: | ||||||
|  |                 add_user(session=session, user=newuser) | ||||||
|  |  | ||||||
|  |         if 'ids' not in resp or not resp['ids']: | ||||||
|  |             logger.info('NO IDS in response') | ||||||
|  |             break | ||||||
|  |         for i in resp['ids']: | ||||||
|  |             existing_user = session.query(Following).\ | ||||||
|  |                             filter(Following.isfollowed == uid).\ | ||||||
|  |                             filter(Following.follower == i).first() | ||||||
|  |             now = int(time.time()) | ||||||
|  |             if existing_user: | ||||||
|  |                 existing_user.created_at_stamp = now | ||||||
|  |             else: | ||||||
|  |                 f = Following(isfollowed=uid, | ||||||
|  |                               follower=i, | ||||||
|  |                               created_at_stamp=now) | ||||||
|  |                 session.add(f) | ||||||
|  |  | ||||||
|  |         logger.info("Fetched: %s/%s followers" % (fetched_followers(), | ||||||
|  |                                                   total_followers)) | ||||||
|  |         entry.cursor = resp["next_cursor"] | ||||||
|  |  | ||||||
|  |         session.add(entry) | ||||||
|  |         session.commit() | ||||||
|  |  | ||||||
|  |     logger.info("Done getting followers for %s" % uid) | ||||||
|  |  | ||||||
|  |     entry.pending = False | ||||||
|  |     entry.busy = False | ||||||
|  |     session.add(entry) | ||||||
|  |     session.commit() | ||||||
|  |  | ||||||
|  |     logger.debug('Entry: {} - {}'.format(entry.user, entry.pending)) | ||||||
|  |     sys.stdout.flush() | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def classify_user(id_or_name, screen_names, user_ids): | ||||||
|  |     try: | ||||||
|  |         int(id_or_name) | ||||||
|  |         user_ids.append(id_or_name) | ||||||
|  |         logger.debug("Added user id") | ||||||
|  |     except ValueError: | ||||||
|  |         logger.debug("Added screen_name") | ||||||
|  |         screen_names.append(id_or_name.split('@')[-1]) | ||||||
|  |  | ||||||
|  |  | ||||||
| # TODO: adapt to the crawler |  | ||||||
| def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None): | def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None): | ||||||
|     signal.signal(signal.SIGINT, signal_handler) |     signal.signal(signal.SIGINT, signal_handler) | ||||||
|  |  | ||||||
|     w = wq.next() |  | ||||||
|     if not dburi: |     if not dburi: | ||||||
|         dburi = 'sqlite:///%s.db' % extractor_name |         dburi = 'sqlite:///%s.db' % extractor_name | ||||||
|  |  | ||||||
|     session = make_session(dburi) |     session = make_session(dburi) | ||||||
|  |     session.query(ExtractorEntry).update({ExtractorEntry.busy: False}) | ||||||
|  |     session.commit() | ||||||
|  |  | ||||||
|     screen_names = [] |  | ||||||
|     user_ids = [] |  | ||||||
|  |  | ||||||
|     def classify_user(id_or_name): |     if not (user or initfile): | ||||||
|         try: |  | ||||||
|             int(user) |  | ||||||
|             user_ids.append(user) |  | ||||||
|             logger.info("Added user id") |  | ||||||
|         except ValueError: |  | ||||||
|             logger.info("Added screen_name") |  | ||||||
|             screen_names.append(user.split('@')[-1]) |  | ||||||
|  |  | ||||||
|     if user: |  | ||||||
|         classify_user(user) |  | ||||||
|  |  | ||||||
|     elif initfile: |  | ||||||
|         logger.info("No user. I will open %s" % initfile) |  | ||||||
|         with open(initfile, 'r') as f: |  | ||||||
|             for line in f: |  | ||||||
|                 user = line.strip().split(',')[0] |  | ||||||
|                 classify_user(user) |  | ||||||
|     else: |  | ||||||
|         logger.info('Using pending users from last session') |         logger.info('Using pending users from last session') | ||||||
|  |     else: | ||||||
|  |         screen_names = [] | ||||||
|  |         user_ids = [] | ||||||
|  |         if user: | ||||||
|  |             classify_user(user, screen_names, user_ids) | ||||||
|  |         elif initfile: | ||||||
|  |             logger.info("No user. I will open %s" % initfile) | ||||||
|  |             with open(initfile, 'r') as f: | ||||||
|  |                 for line in f: | ||||||
|  |                     user = line.strip().split(',')[0] | ||||||
|  |                     classify_user(user, screen_names, user_ids) | ||||||
|  |  | ||||||
|  |         def missing_user(ix, column=User.screen_name): | ||||||
|  |             res = session.query(User).filter(column == ix).count() == 0 | ||||||
|  |             if res: | ||||||
|  |                 logger.info("Missing user %s. Count: %s" % (ix, res)) | ||||||
|  |             return res | ||||||
|  |  | ||||||
|     nusers = list(get_users(wq, screen_names, by_name=True)) |         screen_names = list(filter(missing_user, screen_names)) | ||||||
|     if user_ids: |         user_ids = list(filter(partial(missing_user, column=User.id_str), user_ids)) | ||||||
|         nusers += list(get_users(wq, user_ids, by_name=False)) |         nusers = [] | ||||||
|  |         logger.info("Missing user ids: %s" % user_ids) | ||||||
|  |         logger.info("Missing screen names: %s" % screen_names) | ||||||
|  |         if screen_names: | ||||||
|  |             nusers = list(get_users(wq, screen_names, by_name=True)) | ||||||
|  |         if user_ids: | ||||||
|  |             nusers += list(get_users(wq, user_ids, by_name=False)) | ||||||
|  |  | ||||||
|     for i in nusers: |         for i in nusers: | ||||||
|         add_user(session, i, enqueue=True) |             add_user(dburi=dburi, user=i) | ||||||
|  |  | ||||||
|     total_users = session.query(sqlalchemy.func.count(User.id)).scalar() |     total_users = session.query(sqlalchemy.func.count(User.id)).scalar() | ||||||
|     logging.info('Total users: {}'.format(total_users)) |     logger.info('Total users: {}'.format(total_users)) | ||||||
|     def pending_entries(): |  | ||||||
|         pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count() |  | ||||||
|         logging.info('Pending: {}'.format(pending)) |  | ||||||
|         return pending |  | ||||||
|  |  | ||||||
|     while pending_entries() > 0: |     de = partial(download_entry, wq, dburi=dburi) | ||||||
|         logger.info("Using account: %s" % w.name) |     pending = pending_entries(dburi) | ||||||
|  |     session.close() | ||||||
|  |  | ||||||
|  |     for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users): | ||||||
|  |         logger.info("Got %s" % i) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def pending_entries(dburi): | ||||||
|  |     session = make_session(dburi) | ||||||
|  |     while True: | ||||||
|         candidate, entry = session.query(User, ExtractorEntry).\ |         candidate, entry = session.query(User, ExtractorEntry).\ | ||||||
|                            filter(ExtractorEntry.user == User.id).\ |                         filter(ExtractorEntry.user == User.id).\ | ||||||
|                            filter(ExtractorEntry.pending == True).\ |                         filter(ExtractorEntry.pending == True).\ | ||||||
|                            order_by(User.followers_count).first() |                         filter(ExtractorEntry.busy == False).\ | ||||||
|         if not candidate: |                         order_by(User.followers_count).first() | ||||||
|             break |         if candidate: | ||||||
|         pending = True |             entry.busy = True | ||||||
|         cursor = entry.cursor |             session.add(entry) | ||||||
|         uid = candidate.id |             session.commit() | ||||||
|         uobject = session.query(User).filter(User.id==uid).first() |             yield int(entry.id) | ||||||
|         name = uobject.screen_name if uobject else None |             continue | ||||||
|  |         if session.query(ExtractorEntry).\ | ||||||
|         logger.info("#"*20) |             filter(ExtractorEntry.busy == True).count() > 0: | ||||||
|         logger.info("Getting %s - %s" % (uid, name)) |             time.sleep(1) | ||||||
|         logger.info("Cursor %s" % cursor) |             continue | ||||||
|         logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users)) |         logger.info("No more pending entries") | ||||||
|         try: |         break | ||||||
|             resp = wq.followers.ids(user_id=uid, cursor=cursor) |     session.close() | ||||||
|         except TwitterHTTPError as ex: |  | ||||||
|             if ex.e.code in (401, ): |  | ||||||
|                 logger.info('Not authorized for user: {}'.format(uid)) |  | ||||||
|                 resp = {} |  | ||||||
|         if 'ids' in resp: |  | ||||||
|             logger.info("New followers: %s" % len(resp['ids'])) |  | ||||||
|             if recursive: |  | ||||||
|                 newusers = get_users(wq, resp) |  | ||||||
|                 for user in newusers: |  | ||||||
|                     add_user(session, newuser, enqueue=True) |  | ||||||
|             for i in resp['ids']: |  | ||||||
|                 existing_user = session.query(Following).\ |  | ||||||
|                                 filter(Following.isfollowed==uid).\ |  | ||||||
|                                 filter(Following.follower==i).first() |  | ||||||
|                 now = int(time.time()) |  | ||||||
|                 if existing_user: |  | ||||||
|                     existing_user.created_at_stamp = now |  | ||||||
|                 else: |  | ||||||
|                     f = Following(isfollowed=uid, |  | ||||||
|                                   follower=i, |  | ||||||
|                                   created_at_stamp=now) |  | ||||||
|                     session.add(f) |  | ||||||
|  |  | ||||||
|             total_followers = candidate.followers_count |  | ||||||
|             fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count() |  | ||||||
|             logger.info("Fetched: %s/%s followers" % (fetched_followers, |  | ||||||
|                                                       total_followers)) |  | ||||||
|             cursor = resp["next_cursor"] |  | ||||||
|             if cursor > 0: |  | ||||||
|                 pending = True |  | ||||||
|                 logger.info("Getting more followers for %s" % uid) |  | ||||||
|             else: |  | ||||||
|                 logger.info("Done getting followers for %s" % uid) |  | ||||||
|                 cursor = -1 |  | ||||||
|                 pending = False |  | ||||||
|         else: |  | ||||||
|             logger.info("Error with id %s %s" % (uid, resp)) |  | ||||||
|             pending = False |  | ||||||
|  |  | ||||||
|         entry.pending = pending |  | ||||||
|         entry.cursor = cursor |  | ||||||
|         logging.debug('Entry: {} - {}'.format(entry.user, entry.pending)) |  | ||||||
|  |  | ||||||
|         session.add(candidate) |  | ||||||
|         session.commit() |  | ||||||
|  |  | ||||||
|         sys.stdout.flush() |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_tweet(c, tid): | def get_tweet(c, tid): | ||||||
|     return c.statuses.show(id=tid) |     return c.statuses.show(id=tid) | ||||||
| @@ -281,3 +397,118 @@ def get_user(c, user): | |||||||
|         return c.users.lookup(user_id=user)[0] |         return c.users.lookup(user_id=user)[0] | ||||||
|     except ValueError: |     except ValueError: | ||||||
|         return c.users.lookup(screen_name=user)[0] |         return c.users.lookup(screen_name=user)[0] | ||||||
|  |  | ||||||
|  | def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False): | ||||||
|  |     cached = cached_tweet(tweetid, folder) | ||||||
|  |     tweet = None | ||||||
|  |     if update or not cached: | ||||||
|  |         tweet = get_tweet(wq, tweetid) | ||||||
|  |         js = json.dumps(tweet, indent=2) | ||||||
|  |     if write: | ||||||
|  |         if tweet: | ||||||
|  |             write_tweet_json(js, folder) | ||||||
|  |     else: | ||||||
|  |         print(js) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def cached_tweet(tweetid, folder): | ||||||
|  |     tweet = None | ||||||
|  |     file = os.path.join(folder, '%s.json' % tweetid) | ||||||
|  |     if os.path.exists(file) and os.path.isfile(file): | ||||||
|  |         try: | ||||||
|  |             # print('%s: Tweet exists' % tweetid) | ||||||
|  |             with open(file) as f: | ||||||
|  |                 tweet = json.load(f) | ||||||
|  |         except Exception as ex: | ||||||
|  |             logger.error('Error getting cached version of {}: {}'.format(tweetid, ex)) | ||||||
|  |     return tweet | ||||||
|  |  | ||||||
|  | def write_tweet_json(js, folder): | ||||||
|  |     tweetid = js['id'] | ||||||
|  |     file = tweet_file(tweetid, folder) | ||||||
|  |     if not os.path.exists(folder): | ||||||
|  |         os.makedirs(folder) | ||||||
|  |     with open(file, 'w') as f: | ||||||
|  |         json.dump(js, f, indent=2) | ||||||
|  |         logger.info('Written {} to file {}'.format(tweetid, file)) | ||||||
|  |  | ||||||
|  | def tweet_file(tweetid, folder): | ||||||
|  |     return os.path.join(folder, '%s.json' % tweetid) | ||||||
|  |  | ||||||
|  | def tweet_fail_file(tweetid, folder): | ||||||
|  |     failsfolder = os.path.join(folder, 'failed') | ||||||
|  |     if not os.path.exists(failsfolder): | ||||||
|  |         os.makedirs(failsfolder) | ||||||
|  |     return os.path.join(failsfolder, '%s.failed' % tweetid) | ||||||
|  |  | ||||||
|  | def tweet_failed(tweetid, folder): | ||||||
|  |     return os.path.isfile(tweet_fail_file(tweetid, folder)) | ||||||
|  |  | ||||||
|  | def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True): | ||||||
|  |     def filter_line(line): | ||||||
|  |         tweetid = int(line) | ||||||
|  |         # print('Checking {}'.format(tweetid)) | ||||||
|  |         if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed): | ||||||
|  |             yield None | ||||||
|  |         else: | ||||||
|  |             yield line | ||||||
|  |  | ||||||
|  |     def print_result(res): | ||||||
|  |         tid, tweet = res | ||||||
|  |         if tweet: | ||||||
|  |             try: | ||||||
|  |                 write_tweet_json(tweet, folder=folder) | ||||||
|  |                 yield 1 | ||||||
|  |             except Exception as ex: | ||||||
|  |                 logger.error('%s: %s' % (tid, ex)) | ||||||
|  |                 if not ignore_fails: | ||||||
|  |                     raise | ||||||
|  |         else: | ||||||
|  |             logger.info('Tweet not recovered: {}'.format(tid)) | ||||||
|  |             with open(tweet_fail_file(tid, folder), 'w') as f: | ||||||
|  |                 print('Tweet not found', file=f) | ||||||
|  |             yield -1 | ||||||
|  |  | ||||||
|  |     def download_batch(batch): | ||||||
|  |         tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id'] | ||||||
|  |         return tweets.items() | ||||||
|  |  | ||||||
|  |     with open(tweetsfile) as f: | ||||||
|  |         lines = map(lambda x: x.strip(), f) | ||||||
|  |         lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines')) | ||||||
|  |         tweets = parallel(download_batch, lines_to_crawl, 100) | ||||||
|  |         for res in tqdm(parallel(print_result, tweets), desc='Queried'): | ||||||
|  |             pass | ||||||
|  |  | ||||||
|  | def download_timeline(wq, user): | ||||||
|  |     return wq.statuses.user_timeline(id=user) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def consume_feed(func, *args, **kwargs): | ||||||
|  |     ''' | ||||||
|  |     Get all the tweets using pagination and a given method. | ||||||
|  |     It can be controlled with the `count` parameter. | ||||||
|  |  | ||||||
|  |     If count < 0 => Loop until the whole feed is consumed. | ||||||
|  |     If count == 0 => Only call the API once, with the default values. | ||||||
|  |     If count > 0 => Get count tweets from the feed. | ||||||
|  |     ''' | ||||||
|  |     remaining = int(kwargs.pop('count', 0)) | ||||||
|  |     consume = remaining < 0 | ||||||
|  |     limit = False | ||||||
|  |  | ||||||
|  |     # Simulate a do-while by updating the condition at the end | ||||||
|  |     while not limit: | ||||||
|  |         if remaining > 0: | ||||||
|  |             kwargs['count'] = remaining | ||||||
|  |         resp = func(*args, **kwargs) | ||||||
|  |         if not resp: | ||||||
|  |             return | ||||||
|  |         for t in resp: | ||||||
|  |             yield t | ||||||
|  |         if consume: | ||||||
|  |             continue | ||||||
|  |         remaining -= len(resp) | ||||||
|  |         max_id = min(s['id'] for s in func(*args, **kwargs)) - 1 | ||||||
|  |         kwargs['max_id'] = max_id | ||||||
|  |         limit = remaining <= 0 | ||||||
|   | |||||||
| @@ -1,3 +1,4 @@ | |||||||
| sqlalchemy | sqlalchemy | ||||||
| twitter | twitter | ||||||
| click | click | ||||||
|  | tqdm | ||||||
|   | |||||||
							
								
								
									
										4
									
								
								setup.cfg
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								setup.cfg
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,4 @@ | |||||||
|  | [metadata] | ||||||
|  | description-file = README.md | ||||||
|  | [aliases] | ||||||
|  | test=pytest | ||||||
							
								
								
									
										13
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								setup.py
									
									
									
									
									
								
							| @@ -38,10 +38,19 @@ setup( | |||||||
|     extras_require = { |     extras_require = { | ||||||
|         'server': ['flask', 'flask-oauthlib'] |         'server': ['flask', 'flask-oauthlib'] | ||||||
|         }, |         }, | ||||||
|     test_suite="tests", |     setup_requires=['pytest-runner',], | ||||||
|     include_package_data=True, |     include_package_data=True, | ||||||
|     entry_points=""" |     entry_points=""" | ||||||
|         [console_scripts] |         [console_scripts] | ||||||
|         bitter=bitter.cli:main |         bitter=bitter.cli:main | ||||||
|     """ |     """, | ||||||
|  |     classifiers=[ | ||||||
|  |         'Development Status :: 4 - Beta', | ||||||
|  |         'Intended Audience :: Developers', | ||||||
|  |         'Intended Audience :: Science/Research', | ||||||
|  |         'License :: OSI Approved :: Apache Software License', | ||||||
|  |         'Programming Language :: Python :: 2', | ||||||
|  |         'Programming Language :: Python :: 2.7', | ||||||
|  |         'Programming Language :: Python :: 3', | ||||||
|  |     ] | ||||||
| ) | ) | ||||||
|   | |||||||
							
								
								
									
										72
									
								
								tests/test_crawlers.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										72
									
								
								tests/test_crawlers.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,72 @@ | |||||||
|  | from unittest import TestCase | ||||||
|  |  | ||||||
|  | import os | ||||||
|  | import types | ||||||
|  | import datetime | ||||||
|  | import time | ||||||
|  |  | ||||||
|  | from bitter import utils, easy | ||||||
|  | from bitter.crawlers import QueueException | ||||||
|  | from bitter import config as c | ||||||
|  |  | ||||||
|  | class TestCrawlers(TestCase): | ||||||
|  |  | ||||||
|  |     def setUp(self): | ||||||
|  |         self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json')) | ||||||
|  |  | ||||||
|  |     def test_create_worker(self): | ||||||
|  |         assert len(self.wq.queue)==1 | ||||||
|  |  | ||||||
|  |     def test_get_limits(self): | ||||||
|  |         w1 = list(self.wq.queue)[0] | ||||||
|  |         print(w1.limits) | ||||||
|  |         limitslook = w1.get_limit(['statuses', 'lookup']) | ||||||
|  |         assert limitslook['remaining'] == limitslook['limit'] | ||||||
|  |  | ||||||
|  |     def test_set_limits(self): | ||||||
|  |         w1 = list(self.wq.queue)[0] | ||||||
|  |         w1.set_limit(['test', 'test2'], {'remaining': 0}) | ||||||
|  |         assert w1.get_limit(['test', 'test2']) == {'remaining': 0} | ||||||
|  |  | ||||||
|  |     def test_await(self): | ||||||
|  |         w1 = list(self.wq.queue)[0] | ||||||
|  |         w1.set_limit(['test', 'wait'], {'remaining': 0, 'reset': time.time()+2}) | ||||||
|  |         assert w1.get_wait(['test', 'wait']) > 1 | ||||||
|  |         time.sleep(2) | ||||||
|  |         assert w1.get_wait(['test', 'wait']) == 0 | ||||||
|  |         assert w1.get_wait(['statuses', 'lookup']) == 0 | ||||||
|  |  | ||||||
|  |     def test_is_limited(self): | ||||||
|  |         w1 = list(self.wq.queue)[0] | ||||||
|  |         assert not w1.is_limited(['statuses', 'lookup']) | ||||||
|  |         w1.set_limit(['test', 'limited'], {'remaining': 0, 'reset': time.time()+100}) | ||||||
|  |         assert  w1.is_limited(['test', 'limited']) | ||||||
|  |  | ||||||
|  |     def test_call(self): | ||||||
|  |         w1 = list(self.wq.queue)[0] | ||||||
|  |         l1 = w1.get_limit(['users', 'lookup']) | ||||||
|  |         resp = self.wq.users.lookup(screen_name='balkian') | ||||||
|  |         l2 = w1.get_limit(['users', 'lookup']) | ||||||
|  |         assert l1['remaining']-l2['remaining'] == 1 | ||||||
|  |  | ||||||
|  |     def test_consume(self): | ||||||
|  |         w1 = list(self.wq.queue)[0] | ||||||
|  |         l1 = w1.get_limit(['friends', 'list']) | ||||||
|  |         self.wq.wait = False | ||||||
|  |         for i in range(l1['remaining']): | ||||||
|  |             print(i) | ||||||
|  |             resp = self.wq.friends.list(screen_name='balkian') | ||||||
|  |         # l2 = w1.get_limit(['users', 'lookup']) | ||||||
|  |         # assert l2['remaining'] == 0 | ||||||
|  |         # self.wq.users.lookup(screen_name='balkian') | ||||||
|  |          | ||||||
|  |         failed = False | ||||||
|  |         try: | ||||||
|  |             # resp = self.wq.friends.list(screen_name='balkian') | ||||||
|  |             self.wq.next(['friends', 'list']) | ||||||
|  |         except QueueException: | ||||||
|  |             failed = True | ||||||
|  |         assert failed | ||||||
|  |         l2 = w1.get_limit(['friends', 'list']) | ||||||
|  |         assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time()) | ||||||
|  |         assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2) | ||||||
| @@ -58,4 +58,6 @@ class TestUtils(TestCase): | |||||||
|         assert list(resp) == [1,2,3] |         assert list(resp) == [1,2,3] | ||||||
|         toc = time.time() |         toc = time.time() | ||||||
|         assert (tic-toc) < 6000 |         assert (tic-toc) < 6000 | ||||||
|  |         resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2) | ||||||
|  |         assert list(resp2) == [1,2,3,4] | ||||||
|          |          | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user