mirror of
				https://github.com/balkian/bitter.git
				synced 2025-10-26 05:08:22 +00:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 4b2f107b8a | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -1,4 +1,3 @@ | ||||
| __pycache__ | ||||
| *.egg-info | ||||
| dist | ||||
| env | ||||
|   | ||||
| @@ -1,7 +0,0 @@ | ||||
| # onbuild copies . to /usr/src/app/ | ||||
| From python:2.7-onbuild | ||||
| Maintainer J. Fernando Sánchez @balkian | ||||
|  | ||||
| RUN pip install -e "/usr/src/app/[server]" | ||||
|  | ||||
| ENTRYPOINT ["bitter"] | ||||
| @@ -1,7 +0,0 @@ | ||||
| # onbuild copies . to /usr/src/app/ | ||||
| From python:3.4-onbuild | ||||
| Maintainer J. Fernando Sánchez @balkian | ||||
|  | ||||
| RUN pip install -e "/usr/src/app/[server]" | ||||
|  | ||||
| ENTRYPOINT ["bitter"] | ||||
| @@ -1,7 +0,0 @@ | ||||
| # onbuild copies . to /usr/src/app/ | ||||
| From python:{{PYVERSION}}-onbuild | ||||
| Maintainer J. Fernando Sánchez @balkian | ||||
|  | ||||
| RUN pip install -e "/usr/src/app/[server]" | ||||
|  | ||||
| ENTRYPOINT ["bitter"] | ||||
| @@ -5,6 +5,4 @@ include README.md | ||||
| include bitter/VERSION | ||||
| graft bitter/templates | ||||
| graft bitter/static | ||||
| include tests/test* | ||||
| global-exclude *.pyc | ||||
| global-exclude __pycache__ | ||||
| graft test | ||||
|   | ||||
							
								
								
									
										76
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										76
									
								
								Makefile
									
									
									
									
									
								
							| @@ -1,76 +0,0 @@ | ||||
| PYVERSIONS=3.4 2.7 | ||||
| PYMAIN=$(firstword $(PYVERSIONS)) | ||||
| NAME=bitter | ||||
| REPO=balkian | ||||
| VERSION=$(shell cat $(NAME)/VERSION) | ||||
| TARNAME=$(NAME)-$(VERSION).tar.gz  | ||||
| IMAGENAME=$(REPO)/$(NAME) | ||||
| IMAGEWTAG=$(IMAGENAME):$(VERSION) | ||||
|  | ||||
|  | ||||
| all: build run | ||||
|  | ||||
| dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS)) | ||||
|  | ||||
| Dockerfile-%: Dockerfile.template | ||||
| 	sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$* | ||||
|  | ||||
|  | ||||
| dev-%: | ||||
| 	@docker start $(NAME)-dev$* || (\ | ||||
| 		$(MAKE) build-$*; \ | ||||
| 		docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \ | ||||
| 	)\ | ||||
|  | ||||
| 	docker exec -ti $(NAME)-dev$* bash | ||||
|  | ||||
| dev: dev-$(PYMAIN) | ||||
|  | ||||
| build: $(addprefix build-, $(PYMAIN)) | ||||
|  | ||||
| buildall: $(addprefix build-, $(PYVERSIONS)) | ||||
|  | ||||
| build-%: Dockerfile-% | ||||
| 	docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .; | ||||
|  | ||||
| test: $(addprefix test-,$(PYMAIN)) | ||||
|  | ||||
| testall: $(addprefix test-,$(PYVERSIONS)) | ||||
|  | ||||
| test-%: build-% | ||||
| 	docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ; | ||||
|  | ||||
| pip_test-%: | ||||
| 	docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ; | ||||
|  | ||||
| dist/$(NAME)-$(VERSION).tar.gz: | ||||
| 	docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist; | ||||
|  | ||||
| sdist: dist/$(NAME)-$(VERSION).tar.gz | ||||
|  | ||||
|  | ||||
| upload-%: test-% | ||||
| 	docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' | ||||
|  | ||||
| upload: testall $(addprefix upload-,$(PYVERSIONS)) | ||||
| 	docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)' | ||||
| 	docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)' | ||||
|  | ||||
| clean: | ||||
| 	@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true | ||||
| 	@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true | ||||
|  | ||||
| upload_git: | ||||
| 	git commit -a | ||||
| 	git tag ${VERSION} | ||||
| 	git push --tags origin master | ||||
|  | ||||
| pip_upload: | ||||
| 	python setup.py sdist upload ; | ||||
|  | ||||
| pip_test: $(addprefix pip_test-,$(PYVERSIONS)) | ||||
|  | ||||
| run: build | ||||
| 	docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' | ||||
|  | ||||
| .PHONY: test test-% build-% build test test_pip run | ||||
							
								
								
									
										11
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								README.md
									
									
									
									
									
								
							| @@ -17,18 +17,11 @@ or | ||||
| Programmatically: | ||||
|  | ||||
| ```python | ||||
| from bitter import easy | ||||
| wq = easy() | ||||
| from bitter.crawlers import TwitterQueue | ||||
| wq = TwitterQueue.from_credentials() | ||||
| print(wq.users.show(user_name='balkian')) | ||||
| ``` | ||||
|  | ||||
|  | ||||
| You can also make custom calls to the API through the command line. | ||||
| e.g. to get the latest 500 tweets by the python software foundation: | ||||
|  | ||||
| ``` | ||||
| bitter api statuses/user_timeline --id thepsf --count 500 | ||||
| ``` | ||||
| # Credentials format | ||||
|  | ||||
| ``` | ||||
|   | ||||
| @@ -1 +1 @@ | ||||
| 0.7.4 | ||||
| 0.7.0 | ||||
|   | ||||
| @@ -7,10 +7,4 @@ import os | ||||
|  | ||||
| from .version import __version__ | ||||
|  | ||||
| def easy(*args, **kwargs): | ||||
|     from .crawlers import TwitterQueue | ||||
|     return TwitterQueue.from_credentials(*args, **kwargs) | ||||
|  | ||||
| __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -52,15 +52,34 @@ def tweet(ctx): | ||||
| @click.argument('tweetid') | ||||
| def get_tweet(tweetid, write, folder, update): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     utils.download_tweet(wq, tweetid, write, folder, update) | ||||
|     if not write: | ||||
|         t = utils.get_tweet(wq, tweetid) | ||||
|         js = json.dumps(t, indent=2) | ||||
|         print(js) | ||||
|         return | ||||
|     if not os.path.exists(folder): | ||||
|         os.makedirs(folder) | ||||
|     file = os.path.join(folder, '%s.json' % tweetid) | ||||
|     if not update and os.path.exists(file) and os.path.isfile(file): | ||||
|         print('%s: Tweet exists' % tweetid) | ||||
|         return | ||||
|     try: | ||||
|         t = utils.get_tweet(wq, tweetid) | ||||
|         with open(file, 'w') as f: | ||||
|             js = json.dumps(t, indent=2) | ||||
|             print(js, file=f) | ||||
|     except Exception as ex: | ||||
|         print('%s: %s' % (tweetid, ex), file=sys.stderr) | ||||
|          | ||||
| @tweet.command('get_all') | ||||
| @click.argument('tweetsfile', 'File with a list of tweets to look up') | ||||
| @click.option('-f', '--folder', default="tweets") | ||||
| @click.pass_context | ||||
| def get_tweets(ctx, tweetsfile, folder): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     utils.download_tweets(wq, tweetsfile, folder) | ||||
|     with open(tweetsfile) as f: | ||||
|         for line in f: | ||||
|             tid = line.strip() | ||||
|             ctx.invoke(get_tweet, folder=folder, tweetid=tid, write=True) | ||||
|  | ||||
| @tweet.command('search') | ||||
| @click.argument('query') | ||||
| @@ -240,6 +259,11 @@ def crawl_users(ctx, usersfile, skip, until, threads, db): | ||||
|              | ||||
|     logger.info('Done!') | ||||
|  | ||||
| @main.group('api') | ||||
| def api(): | ||||
|     pass | ||||
|  | ||||
|  | ||||
| @main.group('extractor') | ||||
| @click.pass_context | ||||
| @click.option('--db', required=True, help='Database of users.') | ||||
| @@ -327,7 +351,7 @@ def reset_extractor(ctx): | ||||
|     session = make_session(db) | ||||
|     session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) | ||||
|  | ||||
| @main.command('limits') | ||||
| @api.command('limits') | ||||
| @click.argument('url', required=False) | ||||
| @click.pass_context | ||||
| def get_limits(ctx, url): | ||||
| @@ -348,32 +372,6 @@ def get_limits(ctx, url): | ||||
|         else: | ||||
|             print(json.dumps(resp, indent=2)) | ||||
|  | ||||
|  | ||||
| @main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False)) | ||||
| @click.argument('cmd', nargs=1) | ||||
| @click.argument('api_args', nargs=-1, type=click.UNPROCESSED) | ||||
| @click.pass_context | ||||
| def api(ctx, cmd, api_args): | ||||
|     opts = {} | ||||
|     i = iter(api_args) | ||||
|     for k, v in zip(i, i): | ||||
|         k = k.replace('--', '') | ||||
|         opts[k] = v | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     resp = utils.consume_feed(wq[cmd], **opts) | ||||
|     # A hack to stream jsons | ||||
|     print('[') | ||||
|     first = True | ||||
|     for i in resp: | ||||
|         if not first: | ||||
|             print(',') | ||||
|         else: | ||||
|             first = False | ||||
|          | ||||
|         print(json.dumps(i, indent=2)) | ||||
|     print(']') | ||||
|  | ||||
|  | ||||
| @main.command('server') | ||||
| @click.argument('CONSUMER_KEY', required=True) | ||||
| @click.argument('CONSUMER_SECRET', required=True) | ||||
|   | ||||
| @@ -10,7 +10,6 @@ from twitter import * | ||||
| from collections import OrderedDict | ||||
| from threading import Lock | ||||
| from itertools import islice | ||||
| from functools import partial | ||||
| try: | ||||
|     import itertools.ifilter as filter | ||||
| except ImportError: | ||||
| @@ -39,9 +38,6 @@ class AttrToFunc(object): | ||||
|         else: | ||||
|             return extend_call(k) | ||||
|  | ||||
|     def __getitem__(self, k): | ||||
|         return partial(self.handler, self.__uriparts+k.split('/')) | ||||
|  | ||||
|     def __call__(self, *args, **kwargs): | ||||
|         # for i, a in enumerate(args)e | ||||
|         #     kwargs[i] = a | ||||
| @@ -79,12 +75,6 @@ class TwitterWorker(object): | ||||
|             self._client = self.api_class(auth=auth) | ||||
|         return self._client | ||||
|  | ||||
|     def __repr__(self): | ||||
|         msg = '<{} for {}>'.format(self.__class__.__name__, self.name) | ||||
|         if self.busy: | ||||
|             msg += ' [busy]' | ||||
|         return msg | ||||
|  | ||||
| class RestWorker(TwitterWorker): | ||||
|     api_class = Twitter | ||||
|  | ||||
|   | ||||
| @@ -3,7 +3,6 @@ import json | ||||
|  | ||||
| from sqlalchemy.ext.declarative import declarative_base | ||||
| from sqlalchemy.types import BigInteger, Integer, Text, Boolean | ||||
| from sqlalchemy.pool import SingletonThreadPool | ||||
| from sqlalchemy import Column, Index | ||||
|  | ||||
| from sqlalchemy import create_engine | ||||
| @@ -86,20 +85,16 @@ class ExtractorEntry(Base): | ||||
|     user = Column(BigInteger, index=True) | ||||
|     cursor = Column(BigInteger, default=-1) | ||||
|     pending = Column(Boolean, default=False) | ||||
|     errors = Column(Text, default="") | ||||
|     busy = Column(Boolean, default=False) | ||||
|  | ||||
|  | ||||
| def make_session(url): | ||||
|     if not isinstance(url, str): | ||||
|         print(url) | ||||
|         raise Exception("FUCK") | ||||
|     engine = create_engine(url, poolclass=SingletonThreadPool)#, echo=True) | ||||
|     engine = create_engine(url)#, echo=True) | ||||
|     Base.metadata.create_all(engine) | ||||
|     Session = sessionmaker(bind=engine) | ||||
|     session = Session() | ||||
|     return session | ||||
|      | ||||
|  | ||||
|  | ||||
| def test(db='sqlite:///users.db'): | ||||
|  | ||||
|     from sqlalchemy import exists | ||||
|   | ||||
							
								
								
									
										444
									
								
								bitter/utils.py
									
									
									
									
									
								
							
							
						
						
									
										444
									
								
								bitter/utils.py
									
									
									
									
									
								
							| @@ -1,5 +1,3 @@ | ||||
| from __future__ import print_function | ||||
|  | ||||
| import logging | ||||
| import time | ||||
| import json | ||||
| @@ -11,17 +9,11 @@ import os | ||||
| import multiprocessing | ||||
| from multiprocessing.pool import ThreadPool | ||||
|  | ||||
| from functools import partial | ||||
|  | ||||
| from tqdm import tqdm | ||||
|  | ||||
| from itertools import islice, chain | ||||
| from itertools import islice | ||||
| from contextlib import contextmanager | ||||
|  | ||||
| from future.moves.itertools import zip_longest | ||||
| from collections import Counter | ||||
|  | ||||
| from builtins import map, filter | ||||
|  | ||||
| from twitter import TwitterHTTPError | ||||
|  | ||||
| from bitter.models import Following, User, ExtractorEntry, make_session | ||||
| @@ -35,20 +27,17 @@ def signal_handler(signal, frame): | ||||
|     logger.info('You pressed Ctrl+C!') | ||||
|     sys.exit(0) | ||||
|  | ||||
| def chunk(iterable, n, fillvalue=None): | ||||
|     args = [iter(iterable)] * n | ||||
|     return zip_longest(*args, fillvalue=fillvalue) | ||||
|  | ||||
| def chunk(iterable, n): | ||||
|     it = iter(iterable) | ||||
|     return iter(lambda: tuple(islice(it, n)), ()) | ||||
|  | ||||
|  | ||||
| def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): | ||||
|     source = chunk(source, chunksize) | ||||
|     p = ThreadPool(numcpus*2) | ||||
|     results = p.imap_unordered(func, source, chunksize=int(1000/numcpus)) | ||||
|     for i in chain.from_iterable(results): | ||||
| def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()): | ||||
|     if chunksize: | ||||
|         source = chunk(source, chunksize) | ||||
|     p = ThreadPool(numcpus) | ||||
|     for i in p.imap(func, source): | ||||
|         yield i | ||||
|  | ||||
|  | ||||
| def get_credentials_path(credfile=None): | ||||
|     if not credfile: | ||||
|         if config.CREDENTIALS: | ||||
| @@ -57,20 +46,17 @@ def get_credentials_path(credfile=None): | ||||
|             raise Exception('No valid credentials file') | ||||
|     return os.path.expanduser(credfile) | ||||
|  | ||||
|  | ||||
| @contextmanager | ||||
| def credentials_file(credfile, *args, **kwargs): | ||||
|     p = get_credentials_path(credfile) | ||||
|     with open(p, *args, **kwargs) as f: | ||||
|         yield f | ||||
|  | ||||
|  | ||||
| def iter_credentials(credfile=None): | ||||
|     with credentials_file(credfile) as f: | ||||
|         for l in f: | ||||
|             yield json.loads(l.strip()) | ||||
|  | ||||
|  | ||||
| def get_credentials(credfile=None, inverse=False, **kwargs): | ||||
|     creds = [] | ||||
|     for i in iter_credentials(credfile): | ||||
| @@ -81,13 +67,11 @@ def get_credentials(credfile=None, inverse=False, **kwargs): | ||||
|             creds.append(i) | ||||
|     return creds | ||||
|  | ||||
|  | ||||
| def create_credentials(credfile=None): | ||||
|     credfile = get_credentials_path(credfile) | ||||
|     with credentials_file(credfile, 'a'): | ||||
|         pass | ||||
|  | ||||
|      | ||||
| def delete_credentials(credfile=None, **creds): | ||||
|     tokeep = get_credentials(credfile, inverse=True, **creds) | ||||
|     with credentials_file(credfile, 'w') as f: | ||||
| @@ -95,7 +79,6 @@ def delete_credentials(credfile=None, **creds): | ||||
|             f.write(json.dumps(i)) | ||||
|             f.write('\n') | ||||
|  | ||||
|  | ||||
| def add_credentials(credfile=None, **creds): | ||||
|     exist = get_credentials(credfile, **creds) | ||||
|     if not exist: | ||||
| @@ -110,7 +93,6 @@ def get_hashtags(iter_tweets, best=None): | ||||
|         c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {})) | ||||
|     return c | ||||
|  | ||||
|  | ||||
| def read_file(filename, tail=False): | ||||
|     with open(filename) as f: | ||||
|         while True: | ||||
| @@ -152,7 +134,6 @@ def get_users(wq, ulist, by_name=False, queue=None, max_users=100): | ||||
|             else: | ||||
|                 yield user | ||||
|  | ||||
|  | ||||
| def trim_user(user): | ||||
|     if 'status' in user: | ||||
|         del user['status'] | ||||
| @@ -166,218 +147,142 @@ def trim_user(user): | ||||
|     return user | ||||
|  | ||||
|  | ||||
| def add_user(user, dburi=None, session=None, update=False): | ||||
|     if not session: | ||||
|         session = make_session(dburi) | ||||
|  | ||||
| def add_user(session, user, enqueue=False): | ||||
|     user = trim_user(user) | ||||
|     olduser = session.query(User).filter(User.id == user['id']) | ||||
|     olduser = session.query(User).filter(User.id==user['id']) | ||||
|     if olduser: | ||||
|         if not update: | ||||
|             return | ||||
|         olduser.delete() | ||||
|     nuser = User() | ||||
|     for key, value in user.items(): | ||||
|         setattr(nuser, key, value) | ||||
|     user = nuser | ||||
|     if update: | ||||
|         session.add(user) | ||||
|         logger.debug('Adding entry') | ||||
|     user = User(**user) | ||||
|     session.add(user) | ||||
|     if extract: | ||||
|         logging.debug('Adding entry') | ||||
|         entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() | ||||
|         if not entry: | ||||
|             entry = ExtractorEntry(user=user.id) | ||||
|             session.add(entry) | ||||
|         logger.debug(entry.pending) | ||||
|         logging.debug(entry.pending) | ||||
|         entry.pending = True | ||||
|         entry.cursor = -1 | ||||
|         session.commit() | ||||
|     session.close() | ||||
|  | ||||
|  | ||||
| def download_entry(wq, entry_id, dburi=None, recursive=False): | ||||
|     session = make_session(dburi) | ||||
|     if not session: | ||||
|         raise Exception("Provide dburi or session") | ||||
|     logger.info("Downloading entry: %s (%s)" % (entry_id, type(entry_id))) | ||||
|     entry = session.query(ExtractorEntry).filter(ExtractorEntry.id==entry_id).first() | ||||
|     user = session.query(User).filter(User.id == entry.user).first() | ||||
|     download_user(wq, session, user, entry, recursive) | ||||
|     session.close() | ||||
|  | ||||
|  | ||||
| def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000): | ||||
|  | ||||
|     total_followers = user.followers_count | ||||
|  | ||||
|     if total_followers > max_followers: | ||||
|         entry.pending = False | ||||
|         logger.info("Too many followers for user: %s" % user.screen_name) | ||||
|         session.add(entry) | ||||
|         session.commit() | ||||
|         return | ||||
|  | ||||
|     if not entry: | ||||
|         entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() or ExtractorEntry(user=user.id) | ||||
|     session.add(entry) | ||||
|     session.commit() | ||||
|  | ||||
|     pending = True | ||||
|     cursor = entry.cursor | ||||
|     uid = user.id | ||||
|     name = user.name | ||||
|  | ||||
|     logger.info("#"*20) | ||||
|     logger.info("Getting %s - %s" % (uid, name)) | ||||
|     logger.info("Cursor %s" % cursor) | ||||
|     logger.info("Using account: %s" % wq.name) | ||||
|  | ||||
|     _fetched_followers = 0 | ||||
|  | ||||
|     def fetched_followers(): | ||||
|         return session.query(Following).filter(Following.isfollowed==uid).count() | ||||
|  | ||||
|     attempts = 0 | ||||
|     while cursor > 0 or fetched_followers() < total_followers: | ||||
|         try: | ||||
|             resp = wq.followers.ids(user_id=uid, cursor=cursor) | ||||
|         except TwitterHTTPError as ex: | ||||
|             attempts += 1 | ||||
|             if ex.e.code in (401, ) or attempts > 3: | ||||
|                 logger.info('Not authorized for user: {}'.format(uid)) | ||||
|                 entry.errors = ex.message | ||||
|                 break | ||||
|         if 'ids' not in resp: | ||||
|             logger.info("Error with id %s %s" % (uid, resp)) | ||||
|             entry.pending = False | ||||
|             entry.errors = "No ids in response: %s" % resp | ||||
|             break | ||||
|  | ||||
|         logger.info("New followers: %s" % len(resp['ids'])) | ||||
|         if recursive: | ||||
|             newusers = get_users(wq, resp) | ||||
|             for newuser in newusers: | ||||
|                 add_user(session=session, user=newuser) | ||||
|  | ||||
|         if 'ids' not in resp or not resp['ids']: | ||||
|             logger.info('NO IDS in response') | ||||
|             break | ||||
|         for i in resp['ids']: | ||||
|             existing_user = session.query(Following).\ | ||||
|                             filter(Following.isfollowed == uid).\ | ||||
|                             filter(Following.follower == i).first() | ||||
|             now = int(time.time()) | ||||
|             if existing_user: | ||||
|                 existing_user.created_at_stamp = now | ||||
|             else: | ||||
|                 f = Following(isfollowed=uid, | ||||
|                               follower=i, | ||||
|                               created_at_stamp=now) | ||||
|                 session.add(f) | ||||
|  | ||||
|         logger.info("Fetched: %s/%s followers" % (fetched_followers(), | ||||
|                                                   total_followers)) | ||||
|         entry.cursor = resp["next_cursor"] | ||||
|  | ||||
|         session.add(entry) | ||||
|         session.commit() | ||||
|  | ||||
|     logger.info("Done getting followers for %s" % uid) | ||||
|  | ||||
|     entry.pending = False | ||||
|     entry.busy = False | ||||
|     session.add(entry) | ||||
|     session.commit() | ||||
|  | ||||
|     logger.debug('Entry: {} - {}'.format(entry.user, entry.pending)) | ||||
|     sys.stdout.flush() | ||||
|  | ||||
|  | ||||
| def classify_user(id_or_name, screen_names, user_ids): | ||||
|     try: | ||||
|         int(id_or_name) | ||||
|         user_ids.append(id_or_name) | ||||
|         logger.debug("Added user id") | ||||
|     except ValueError: | ||||
|         logger.debug("Added screen_name") | ||||
|         screen_names.append(id_or_name.split('@')[-1]) | ||||
|  | ||||
|  | ||||
| # TODO: adapt to the crawler | ||||
| def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None): | ||||
|     signal.signal(signal.SIGINT, signal_handler) | ||||
|  | ||||
|     w = wq.next() | ||||
|     if not dburi: | ||||
|         dburi = 'sqlite:///%s.db' % extractor_name | ||||
|  | ||||
|     session = make_session(dburi) | ||||
|     session.query(ExtractorEntry).update({ExtractorEntry.busy: False}) | ||||
|     session.commit() | ||||
|  | ||||
|     screen_names = [] | ||||
|     user_ids = [] | ||||
|  | ||||
|     if not (user or initfile): | ||||
|         logger.info('Using pending users from last session') | ||||
|     def classify_user(id_or_name): | ||||
|         try: | ||||
|             int(user) | ||||
|             user_ids.append(user) | ||||
|             logger.info("Added user id") | ||||
|         except ValueError: | ||||
|             logger.info("Added screen_name") | ||||
|             screen_names.append(user.split('@')[-1]) | ||||
|  | ||||
|     if user: | ||||
|         classify_user(user) | ||||
|  | ||||
|     elif initfile: | ||||
|         logger.info("No user. I will open %s" % initfile) | ||||
|         with open(initfile, 'r') as f: | ||||
|             for line in f: | ||||
|                 user = line.strip().split(',')[0] | ||||
|                 classify_user(user) | ||||
|     else: | ||||
|         screen_names = [] | ||||
|         user_ids = [] | ||||
|         if user: | ||||
|             classify_user(user, screen_names, user_ids) | ||||
|         elif initfile: | ||||
|             logger.info("No user. I will open %s" % initfile) | ||||
|             with open(initfile, 'r') as f: | ||||
|                 for line in f: | ||||
|                     user = line.strip().split(',')[0] | ||||
|                     classify_user(user, screen_names, user_ids) | ||||
|         logger.info('Using pending users from last session') | ||||
|  | ||||
|         def missing_user(ix, column=User.screen_name): | ||||
|             res = session.query(User).filter(column == ix).count() == 0 | ||||
|             if res: | ||||
|                 logger.info("Missing user %s. Count: %s" % (ix, res)) | ||||
|             return res | ||||
|  | ||||
|         screen_names = list(filter(missing_user, screen_names)) | ||||
|         user_ids = list(filter(partial(missing_user, column=User.id_str), user_ids)) | ||||
|         nusers = [] | ||||
|         logger.info("Missing user ids: %s" % user_ids) | ||||
|         logger.info("Missing screen names: %s" % screen_names) | ||||
|         if screen_names: | ||||
|             nusers = list(get_users(wq, screen_names, by_name=True)) | ||||
|         if user_ids: | ||||
|             nusers += list(get_users(wq, user_ids, by_name=False)) | ||||
|     nusers = list(get_users(wq, screen_names, by_name=True)) | ||||
|     if user_ids: | ||||
|         nusers += list(get_users(wq, user_ids, by_name=False)) | ||||
|  | ||||
|         for i in nusers: | ||||
|             add_user(dburi=dburi, user=i) | ||||
|     for i in nusers: | ||||
|         add_user(session, i, enqueue=True) | ||||
|  | ||||
|     total_users = session.query(sqlalchemy.func.count(User.id)).scalar() | ||||
|     logger.info('Total users: {}'.format(total_users)) | ||||
|     logging.info('Total users: {}'.format(total_users)) | ||||
|     def pending_entries(): | ||||
|         pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count() | ||||
|         logging.info('Pending: {}'.format(pending)) | ||||
|         return pending | ||||
|  | ||||
|     de = partial(download_entry, wq, dburi=dburi) | ||||
|     pending = pending_entries(dburi) | ||||
|     session.close() | ||||
|  | ||||
|     for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users): | ||||
|         logger.info("Got %s" % i) | ||||
|  | ||||
|  | ||||
| def pending_entries(dburi): | ||||
|     session = make_session(dburi) | ||||
|     while True: | ||||
|     while pending_entries() > 0: | ||||
|         logger.info("Using account: %s" % w.name) | ||||
|         candidate, entry = session.query(User, ExtractorEntry).\ | ||||
|                         filter(ExtractorEntry.user == User.id).\ | ||||
|                         filter(ExtractorEntry.pending == True).\ | ||||
|                         filter(ExtractorEntry.busy == False).\ | ||||
|                         order_by(User.followers_count).first() | ||||
|         if candidate: | ||||
|             entry.busy = True | ||||
|             session.add(entry) | ||||
|             session.commit() | ||||
|             yield int(entry.id) | ||||
|             continue | ||||
|         if session.query(ExtractorEntry).\ | ||||
|             filter(ExtractorEntry.busy == True).count() > 0: | ||||
|             time.sleep(1) | ||||
|             continue | ||||
|         logger.info("No more pending entries") | ||||
|         break | ||||
|     session.close() | ||||
|                            filter(ExtractorEntry.user == User.id).\ | ||||
|                            filter(ExtractorEntry.pending == True).\ | ||||
|                            order_by(User.followers_count).first() | ||||
|         if not candidate: | ||||
|             break | ||||
|         pending = True | ||||
|         cursor = entry.cursor | ||||
|         uid = candidate.id | ||||
|         uobject = session.query(User).filter(User.id==uid).first() | ||||
|         name = uobject.screen_name if uobject else None | ||||
|  | ||||
|         logger.info("#"*20) | ||||
|         logger.info("Getting %s - %s" % (uid, name)) | ||||
|         logger.info("Cursor %s" % cursor) | ||||
|         logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users)) | ||||
|         try: | ||||
|             resp = wq.followers.ids(user_id=uid, cursor=cursor) | ||||
|         except TwitterHTTPError as ex: | ||||
|             if ex.e.code in (401, ): | ||||
|                 logger.info('Not authorized for user: {}'.format(uid)) | ||||
|                 resp = {} | ||||
|         if 'ids' in resp: | ||||
|             logger.info("New followers: %s" % len(resp['ids'])) | ||||
|             if recursive: | ||||
|                 newusers = get_users(wq, resp) | ||||
|                 for user in newusers: | ||||
|                     add_user(session, newuser, enqueue=True) | ||||
|             for i in resp['ids']: | ||||
|                 existing_user = session.query(Following).\ | ||||
|                                 filter(Following.isfollowed==uid).\ | ||||
|                                 filter(Following.follower==i).first() | ||||
|                 now = int(time.time()) | ||||
|                 if existing_user: | ||||
|                     existing_user.created_at_stamp = now | ||||
|                 else: | ||||
|                     f = Following(isfollowed=uid, | ||||
|                                   follower=i, | ||||
|                                   created_at_stamp=now) | ||||
|                     session.add(f) | ||||
|  | ||||
|             total_followers = candidate.followers_count | ||||
|             fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count() | ||||
|             logger.info("Fetched: %s/%s followers" % (fetched_followers, | ||||
|                                                       total_followers)) | ||||
|             cursor = resp["next_cursor"] | ||||
|             if cursor > 0: | ||||
|                 pending = True | ||||
|                 logger.info("Getting more followers for %s" % uid) | ||||
|             else: | ||||
|                 logger.info("Done getting followers for %s" % uid) | ||||
|                 cursor = -1 | ||||
|                 pending = False | ||||
|         else: | ||||
|             logger.info("Error with id %s %s" % (uid, resp)) | ||||
|             pending = False | ||||
|  | ||||
|         entry.pending = pending | ||||
|         entry.cursor = cursor | ||||
|         logging.debug('Entry: {} - {}'.format(entry.user, entry.pending)) | ||||
|  | ||||
|         session.add(candidate) | ||||
|         session.commit() | ||||
|  | ||||
|         sys.stdout.flush() | ||||
|  | ||||
|  | ||||
| def get_tweet(c, tid): | ||||
|     return c.statuses.show(id=tid) | ||||
| @@ -397,118 +302,3 @@ def get_user(c, user): | ||||
|         return c.users.lookup(user_id=user)[0] | ||||
|     except ValueError: | ||||
|         return c.users.lookup(screen_name=user)[0] | ||||
|  | ||||
| def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False): | ||||
|     cached = cached_tweet(tweetid, folder) | ||||
|     tweet = None | ||||
|     if update or not cached: | ||||
|         tweet = get_tweet(wq, tweetid) | ||||
|         js = json.dumps(tweet, indent=2) | ||||
|     if write: | ||||
|         if tweet: | ||||
|             write_tweet_json(js, folder) | ||||
|     else: | ||||
|         print(js) | ||||
|  | ||||
|  | ||||
| def cached_tweet(tweetid, folder): | ||||
|     tweet = None | ||||
|     file = os.path.join(folder, '%s.json' % tweetid) | ||||
|     if os.path.exists(file) and os.path.isfile(file): | ||||
|         try: | ||||
|             # print('%s: Tweet exists' % tweetid) | ||||
|             with open(file) as f: | ||||
|                 tweet = json.load(f) | ||||
|         except Exception as ex: | ||||
|             logger.error('Error getting cached version of {}: {}'.format(tweetid, ex)) | ||||
|     return tweet | ||||
|  | ||||
| def write_tweet_json(js, folder): | ||||
|     tweetid = js['id'] | ||||
|     file = tweet_file(tweetid, folder) | ||||
|     if not os.path.exists(folder): | ||||
|         os.makedirs(folder) | ||||
|     with open(file, 'w') as f: | ||||
|         json.dump(js, f, indent=2) | ||||
|         logger.info('Written {} to file {}'.format(tweetid, file)) | ||||
|  | ||||
| def tweet_file(tweetid, folder): | ||||
|     return os.path.join(folder, '%s.json' % tweetid) | ||||
|  | ||||
| def tweet_fail_file(tweetid, folder): | ||||
|     failsfolder = os.path.join(folder, 'failed') | ||||
|     if not os.path.exists(failsfolder): | ||||
|         os.makedirs(failsfolder) | ||||
|     return os.path.join(failsfolder, '%s.failed' % tweetid) | ||||
|  | ||||
| def tweet_failed(tweetid, folder): | ||||
|     return os.path.isfile(tweet_fail_file(tweetid, folder)) | ||||
|  | ||||
| def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True): | ||||
|     def filter_line(line): | ||||
|         tweetid = int(line) | ||||
|         # print('Checking {}'.format(tweetid)) | ||||
|         if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed): | ||||
|             yield None | ||||
|         else: | ||||
|             yield line | ||||
|  | ||||
|     def print_result(res): | ||||
|         tid, tweet = res | ||||
|         if tweet: | ||||
|             try: | ||||
|                 write_tweet_json(tweet, folder=folder) | ||||
|                 yield 1 | ||||
|             except Exception as ex: | ||||
|                 logger.error('%s: %s' % (tid, ex)) | ||||
|                 if not ignore_fails: | ||||
|                     raise | ||||
|         else: | ||||
|             logger.info('Tweet not recovered: {}'.format(tid)) | ||||
|             with open(tweet_fail_file(tid, folder), 'w') as f: | ||||
|                 print('Tweet not found', file=f) | ||||
|             yield -1 | ||||
|  | ||||
|     def download_batch(batch): | ||||
|         tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id'] | ||||
|         return tweets.items() | ||||
|  | ||||
|     with open(tweetsfile) as f: | ||||
|         lines = map(lambda x: x.strip(), f) | ||||
|         lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines')) | ||||
|         tweets = parallel(download_batch, lines_to_crawl, 100) | ||||
|         for res in tqdm(parallel(print_result, tweets), desc='Queried'): | ||||
|             pass | ||||
|  | ||||
| def download_timeline(wq, user): | ||||
|     return wq.statuses.user_timeline(id=user) | ||||
|  | ||||
|  | ||||
| def consume_feed(func, *args, **kwargs): | ||||
|     ''' | ||||
|     Get all the tweets using pagination and a given method. | ||||
|     It can be controlled with the `count` parameter. | ||||
|  | ||||
|     If count < 0 => Loop until the whole feed is consumed. | ||||
|     If count == 0 => Only call the API once, with the default values. | ||||
|     If count > 0 => Get count tweets from the feed. | ||||
|     ''' | ||||
|     remaining = int(kwargs.pop('count', 0)) | ||||
|     consume = remaining < 0 | ||||
|     limit = False | ||||
|  | ||||
|     # Simulate a do-while by updating the condition at the end | ||||
|     while not limit: | ||||
|         if remaining > 0: | ||||
|             kwargs['count'] = remaining | ||||
|         resp = func(*args, **kwargs) | ||||
|         if not resp: | ||||
|             return | ||||
|         for t in resp: | ||||
|             yield t | ||||
|         if consume: | ||||
|             continue | ||||
|         remaining -= len(resp) | ||||
|         max_id = min(s['id'] for s in func(*args, **kwargs)) - 1 | ||||
|         kwargs['max_id'] = max_id | ||||
|         limit = remaining <= 0 | ||||
|   | ||||
							
								
								
									
										4
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								setup.py
									
									
									
									
									
								
							| @@ -38,7 +38,7 @@ setup( | ||||
|     extras_require = { | ||||
|         'server': ['flask', 'flask-oauthlib'] | ||||
|         }, | ||||
|     setup_requires=['pytest-runner',], | ||||
|     test_suite="tests", | ||||
|     include_package_data=True, | ||||
|     entry_points=""" | ||||
|         [console_scripts] | ||||
| @@ -48,7 +48,7 @@ setup( | ||||
|         'Development Status :: 4 - Beta', | ||||
|         'Intended Audience :: Developers', | ||||
|         'Intended Audience :: Science/Research', | ||||
|         'License :: OSI Approved :: Apache Software License', | ||||
|         'License :: OSI Approved :: Apache 2 License', | ||||
|         'Programming Language :: Python :: 2', | ||||
|         'Programming Language :: Python :: 2.7', | ||||
|         'Programming Language :: Python :: 3', | ||||
|   | ||||
| @@ -5,14 +5,14 @@ import types | ||||
| import datetime | ||||
| import time | ||||
|  | ||||
| from bitter import utils, easy | ||||
| from bitter.crawlers import QueueException | ||||
| from bitter import utils | ||||
| from bitter.crawlers import TwitterQueue, TwitterWorker, QueueException | ||||
| from bitter import config as c | ||||
|  | ||||
| class TestCrawlers(TestCase): | ||||
| class TestUtils(TestCase): | ||||
|  | ||||
|     def setUp(self): | ||||
|         self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json')) | ||||
|         self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json')) | ||||
|  | ||||
|     def test_create_worker(self): | ||||
|         assert len(self.wq.queue)==1 | ||||
|   | ||||
| @@ -58,6 +58,4 @@ class TestUtils(TestCase): | ||||
|         assert list(resp) == [1,2,3] | ||||
|         toc = time.time() | ||||
|         assert (tic-toc) < 6000 | ||||
|         resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2) | ||||
|         assert list(resp2) == [1,2,3,4] | ||||
|          | ||||
|   | ||||
		Reference in New Issue
	
	Block a user