mirror of
https://github.com/balkian/bitter.git
synced 2025-10-25 20:58:24 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cf766a6bf3 | ||
|
|
e65f6836b3 | ||
|
|
1cb86abbdd | ||
|
|
b212a46ab7 | ||
|
|
0a0d8fd5f1 | ||
|
|
e3a78968da |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
|||||||
|
__pycache__
|
||||||
*.egg-info
|
*.egg-info
|
||||||
dist
|
dist
|
||||||
env
|
env
|
||||||
|
|||||||
7
Dockerfile-2.7
Normal file
7
Dockerfile-2.7
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
# onbuild copies . to /usr/src/app/
|
||||||
|
From python:2.7-onbuild
|
||||||
|
Maintainer J. Fernando Sánchez @balkian
|
||||||
|
|
||||||
|
RUN pip install -e "/usr/src/app/[server]"
|
||||||
|
|
||||||
|
ENTRYPOINT ["bitter"]
|
||||||
7
Dockerfile-3.4
Normal file
7
Dockerfile-3.4
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
# onbuild copies . to /usr/src/app/
|
||||||
|
From python:3.4-onbuild
|
||||||
|
Maintainer J. Fernando Sánchez @balkian
|
||||||
|
|
||||||
|
RUN pip install -e "/usr/src/app/[server]"
|
||||||
|
|
||||||
|
ENTRYPOINT ["bitter"]
|
||||||
7
Dockerfile.template
Normal file
7
Dockerfile.template
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
# onbuild copies . to /usr/src/app/
|
||||||
|
From python:{{PYVERSION}}-onbuild
|
||||||
|
Maintainer J. Fernando Sánchez @balkian
|
||||||
|
|
||||||
|
RUN pip install -e "/usr/src/app/[server]"
|
||||||
|
|
||||||
|
ENTRYPOINT ["bitter"]
|
||||||
@@ -5,4 +5,6 @@ include README.md
|
|||||||
include bitter/VERSION
|
include bitter/VERSION
|
||||||
graft bitter/templates
|
graft bitter/templates
|
||||||
graft bitter/static
|
graft bitter/static
|
||||||
graft test
|
include tests/test*
|
||||||
|
global-exclude *.pyc
|
||||||
|
global-exclude __pycache__
|
||||||
76
Makefile
Normal file
76
Makefile
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
PYVERSIONS=3.4 2.7
|
||||||
|
PYMAIN=$(firstword $(PYVERSIONS))
|
||||||
|
NAME=bitter
|
||||||
|
REPO=balkian
|
||||||
|
VERSION=$(shell cat $(NAME)/VERSION)
|
||||||
|
TARNAME=$(NAME)-$(VERSION).tar.gz
|
||||||
|
IMAGENAME=$(REPO)/$(NAME)
|
||||||
|
IMAGEWTAG=$(IMAGENAME):$(VERSION)
|
||||||
|
|
||||||
|
|
||||||
|
all: build run
|
||||||
|
|
||||||
|
dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS))
|
||||||
|
|
||||||
|
Dockerfile-%: Dockerfile.template
|
||||||
|
sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$*
|
||||||
|
|
||||||
|
|
||||||
|
dev-%:
|
||||||
|
@docker start $(NAME)-dev$* || (\
|
||||||
|
$(MAKE) build-$*; \
|
||||||
|
docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
|
||||||
|
)\
|
||||||
|
|
||||||
|
docker exec -ti $(NAME)-dev$* bash
|
||||||
|
|
||||||
|
dev: dev-$(PYMAIN)
|
||||||
|
|
||||||
|
build: $(addprefix build-, $(PYMAIN))
|
||||||
|
|
||||||
|
buildall: $(addprefix build-, $(PYVERSIONS))
|
||||||
|
|
||||||
|
build-%: Dockerfile-%
|
||||||
|
docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .;
|
||||||
|
|
||||||
|
test: $(addprefix test-,$(PYMAIN))
|
||||||
|
|
||||||
|
testall: $(addprefix test-,$(PYVERSIONS))
|
||||||
|
|
||||||
|
test-%: build-%
|
||||||
|
docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
|
||||||
|
|
||||||
|
pip_test-%:
|
||||||
|
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
|
||||||
|
|
||||||
|
dist/$(NAME)-$(VERSION).tar.gz:
|
||||||
|
docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist;
|
||||||
|
|
||||||
|
sdist: dist/$(NAME)-$(VERSION).tar.gz
|
||||||
|
|
||||||
|
|
||||||
|
upload-%: test-%
|
||||||
|
docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
|
||||||
|
|
||||||
|
upload: testall $(addprefix upload-,$(PYVERSIONS))
|
||||||
|
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)'
|
||||||
|
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)'
|
||||||
|
|
||||||
|
clean:
|
||||||
|
@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true
|
||||||
|
@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true
|
||||||
|
|
||||||
|
upload_git:
|
||||||
|
git commit -a
|
||||||
|
git tag ${VERSION}
|
||||||
|
git push --tags origin master
|
||||||
|
|
||||||
|
pip_upload:
|
||||||
|
python setup.py sdist upload ;
|
||||||
|
|
||||||
|
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
|
||||||
|
|
||||||
|
run: build
|
||||||
|
docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
|
||||||
|
|
||||||
|
.PHONY: test test-% build-% build test test_pip run
|
||||||
11
README.md
11
README.md
@@ -17,11 +17,18 @@ or
|
|||||||
Programmatically:
|
Programmatically:
|
||||||
|
|
||||||
```python
|
```python
|
||||||
from bitter.crawlers import TwitterQueue
|
from bitter import easy
|
||||||
wq = TwitterQueue.from_credentials()
|
wq = easy()
|
||||||
print(wq.users.show(user_name='balkian'))
|
print(wq.users.show(user_name='balkian'))
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
You can also make custom calls to the API through the command line.
|
||||||
|
e.g. to get the latest 500 tweets by the python software foundation:
|
||||||
|
|
||||||
|
```
|
||||||
|
bitter api statuses/user_timeline --id thepsf --count 500
|
||||||
|
```
|
||||||
# Credentials format
|
# Credentials format
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
0.6.6
|
0.7.4
|
||||||
|
|||||||
@@ -3,15 +3,14 @@ Bitter module. A library and cli for Twitter using python-twitter.
|
|||||||
http://github.com/balkian/bitter
|
http://github.com/balkian/bitter
|
||||||
"""
|
"""
|
||||||
|
|
||||||
try:
|
|
||||||
from future.standard_library import install_aliases
|
|
||||||
install_aliases()
|
|
||||||
except ImportError:
|
|
||||||
# Avoid problems at setup.py and py3.x
|
|
||||||
pass
|
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from .version import __version__
|
from .version import __version__
|
||||||
|
|
||||||
|
def easy(*args, **kwargs):
|
||||||
|
from .crawlers import TwitterQueue
|
||||||
|
return TwitterQueue.from_credentials(*args, **kwargs)
|
||||||
|
|
||||||
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]
|
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -52,34 +52,15 @@ def tweet(ctx):
|
|||||||
@click.argument('tweetid')
|
@click.argument('tweetid')
|
||||||
def get_tweet(tweetid, write, folder, update):
|
def get_tweet(tweetid, write, folder, update):
|
||||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
if not write:
|
utils.download_tweet(wq, tweetid, write, folder, update)
|
||||||
t = utils.get_tweet(wq, tweetid)
|
|
||||||
js = json.dumps(t, indent=2)
|
|
||||||
print(js)
|
|
||||||
return
|
|
||||||
if not os.path.exists(folder):
|
|
||||||
os.makedirs(folder)
|
|
||||||
file = os.path.join(folder, '%s.json' % tweetid)
|
|
||||||
if not update and os.path.exists(file) and os.path.isfile(file):
|
|
||||||
print('%s: Tweet exists' % tweetid)
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
t = utils.get_tweet(wq, tweetid)
|
|
||||||
with open(file, 'w') as f:
|
|
||||||
js = json.dumps(t, indent=2)
|
|
||||||
print(js, file=f)
|
|
||||||
except Exception as ex:
|
|
||||||
print('%s: %s' % (tweetid, ex), file=sys.stderr)
|
|
||||||
|
|
||||||
@tweet.command('get_all')
|
@tweet.command('get_all')
|
||||||
@click.argument('tweetsfile', 'File with a list of tweets to look up')
|
@click.argument('tweetsfile', 'File with a list of tweets to look up')
|
||||||
@click.option('-f', '--folder', default="tweets")
|
@click.option('-f', '--folder', default="tweets")
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
def get_tweets(ctx, tweetsfile, folder):
|
def get_tweets(ctx, tweetsfile, folder):
|
||||||
with open(tweetsfile) as f:
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
for line in f:
|
utils.download_tweets(wq, tweetsfile, folder)
|
||||||
tid = line.strip()
|
|
||||||
ctx.invoke(get_tweet, folder=folder, tweetid=tid, write=True)
|
|
||||||
|
|
||||||
@tweet.command('search')
|
@tweet.command('search')
|
||||||
@click.argument('query')
|
@click.argument('query')
|
||||||
@@ -259,11 +240,6 @@ def crawl_users(ctx, usersfile, skip, until, threads, db):
|
|||||||
|
|
||||||
logger.info('Done!')
|
logger.info('Done!')
|
||||||
|
|
||||||
@main.group('api')
|
|
||||||
def api():
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@main.group('extractor')
|
@main.group('extractor')
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
@click.option('--db', required=True, help='Database of users.')
|
@click.option('--db', required=True, help='Database of users.')
|
||||||
@@ -351,7 +327,7 @@ def reset_extractor(ctx):
|
|||||||
session = make_session(db)
|
session = make_session(db)
|
||||||
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
|
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
|
||||||
|
|
||||||
@api.command('limits')
|
@main.command('limits')
|
||||||
@click.argument('url', required=False)
|
@click.argument('url', required=False)
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
def get_limits(ctx, url):
|
def get_limits(ctx, url):
|
||||||
@@ -372,6 +348,32 @@ def get_limits(ctx, url):
|
|||||||
else:
|
else:
|
||||||
print(json.dumps(resp, indent=2))
|
print(json.dumps(resp, indent=2))
|
||||||
|
|
||||||
|
|
||||||
|
@main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False))
|
||||||
|
@click.argument('cmd', nargs=1)
|
||||||
|
@click.argument('api_args', nargs=-1, type=click.UNPROCESSED)
|
||||||
|
@click.pass_context
|
||||||
|
def api(ctx, cmd, api_args):
|
||||||
|
opts = {}
|
||||||
|
i = iter(api_args)
|
||||||
|
for k, v in zip(i, i):
|
||||||
|
k = k.replace('--', '')
|
||||||
|
opts[k] = v
|
||||||
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
|
resp = utils.consume_feed(wq[cmd], **opts)
|
||||||
|
# A hack to stream jsons
|
||||||
|
print('[')
|
||||||
|
first = True
|
||||||
|
for i in resp:
|
||||||
|
if not first:
|
||||||
|
print(',')
|
||||||
|
else:
|
||||||
|
first = False
|
||||||
|
|
||||||
|
print(json.dumps(i, indent=2))
|
||||||
|
print(']')
|
||||||
|
|
||||||
|
|
||||||
@main.command('server')
|
@main.command('server')
|
||||||
@click.argument('CONSUMER_KEY', required=True)
|
@click.argument('CONSUMER_KEY', required=True)
|
||||||
@click.argument('CONSUMER_SECRET', required=True)
|
@click.argument('CONSUMER_SECRET', required=True)
|
||||||
@@ -391,8 +393,9 @@ def stream(ctx):
|
|||||||
@click.option('-l', '--locations', default=None)
|
@click.option('-l', '--locations', default=None)
|
||||||
@click.option('-t', '--track', default=None)
|
@click.option('-t', '--track', default=None)
|
||||||
@click.option('-f', '--file', help='File to store the stream of tweets')
|
@click.option('-f', '--file', help='File to store the stream of tweets')
|
||||||
|
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
def get_stream(ctx, locations, track, file):
|
def get_stream(ctx, locations, track, file, politelyretry):
|
||||||
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
|
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
|
||||||
|
|
||||||
query_args = {}
|
query_args = {}
|
||||||
@@ -400,17 +403,28 @@ def get_stream(ctx, locations, track, file):
|
|||||||
query_args['locations'] = locations
|
query_args['locations'] = locations
|
||||||
if track:
|
if track:
|
||||||
query_args['track'] = track
|
query_args['track'] = track
|
||||||
if not query_args:
|
|
||||||
iterator = wq.statuses.sample()
|
|
||||||
else:
|
|
||||||
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
|
|
||||||
|
|
||||||
if not file:
|
if not file:
|
||||||
file = sys.stdout
|
file = sys.stdout
|
||||||
else:
|
else:
|
||||||
file = open(file, 'a')
|
file = open(file, 'a')
|
||||||
|
|
||||||
for tweet in tqdm(iterator):
|
def insist():
|
||||||
|
lasthangup = time.time()
|
||||||
|
while True:
|
||||||
|
if not query_args:
|
||||||
|
iterator = wq.statuses.sample()
|
||||||
|
else:
|
||||||
|
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
|
||||||
|
for i in iterator:
|
||||||
|
yield i
|
||||||
|
if not politelyretry:
|
||||||
|
return
|
||||||
|
thishangup = time.time()
|
||||||
|
if thishangup - lasthangup < 60:
|
||||||
|
raise Exception('Too many hangups in a row.')
|
||||||
|
time.sleep(3)
|
||||||
|
|
||||||
|
for tweet in tqdm(insist()):
|
||||||
print(json.dumps(tweet), file=file)
|
print(json.dumps(tweet), file=file)
|
||||||
if file != sys.stdout:
|
if file != sys.stdout:
|
||||||
file.close()
|
file.close()
|
||||||
|
|||||||
@@ -10,6 +10,12 @@ from twitter import *
|
|||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
from itertools import islice
|
from itertools import islice
|
||||||
|
from functools import partial
|
||||||
|
try:
|
||||||
|
import itertools.ifilter as filter
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
from . import utils
|
from . import utils
|
||||||
from . import config
|
from . import config
|
||||||
|
|
||||||
@@ -33,6 +39,9 @@ class AttrToFunc(object):
|
|||||||
else:
|
else:
|
||||||
return extend_call(k)
|
return extend_call(k)
|
||||||
|
|
||||||
|
def __getitem__(self, k):
|
||||||
|
return partial(self.handler, self.__uriparts+k.split('/'))
|
||||||
|
|
||||||
def __call__(self, *args, **kwargs):
|
def __call__(self, *args, **kwargs):
|
||||||
# for i, a in enumerate(args)e
|
# for i, a in enumerate(args)e
|
||||||
# kwargs[i] = a
|
# kwargs[i] = a
|
||||||
@@ -70,6 +79,12 @@ class TwitterWorker(object):
|
|||||||
self._client = self.api_class(auth=auth)
|
self._client = self.api_class(auth=auth)
|
||||||
return self._client
|
return self._client
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
msg = '<{} for {}>'.format(self.__class__.__name__, self.name)
|
||||||
|
if self.busy:
|
||||||
|
msg += ' [busy]'
|
||||||
|
return msg
|
||||||
|
|
||||||
class RestWorker(TwitterWorker):
|
class RestWorker(TwitterWorker):
|
||||||
api_class = Twitter
|
api_class = Twitter
|
||||||
|
|
||||||
@@ -178,9 +193,13 @@ class TwitterQueue(QueueMixin):
|
|||||||
patience -= 1
|
patience -= 1
|
||||||
|
|
||||||
def get_wait(self, uriparts):
|
def get_wait(self, uriparts):
|
||||||
available = next(lambda x: not x.busy, self.queue)
|
# Stop as soon as one is available to avoid initiating the rest
|
||||||
first_worker = min(available, key=lambda x: x.get_wait(uriparts))
|
for i in self.queue:
|
||||||
diff = first_worker.get_wait(uriparts)
|
if not i.busy and i.get_wait(uriparts) == 0:
|
||||||
|
return 0
|
||||||
|
# If None is available, let's see how much we have to wait
|
||||||
|
available = filter(lambda x: not x.busy, self.queue)
|
||||||
|
diff = min(worker.get_wait(uriparts) for worker in self.queue if not worker.busy)
|
||||||
return diff
|
return diff
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import json
|
|||||||
|
|
||||||
from sqlalchemy.ext.declarative import declarative_base
|
from sqlalchemy.ext.declarative import declarative_base
|
||||||
from sqlalchemy.types import BigInteger, Integer, Text, Boolean
|
from sqlalchemy.types import BigInteger, Integer, Text, Boolean
|
||||||
|
from sqlalchemy.pool import SingletonThreadPool
|
||||||
from sqlalchemy import Column, Index
|
from sqlalchemy import Column, Index
|
||||||
|
|
||||||
from sqlalchemy import create_engine
|
from sqlalchemy import create_engine
|
||||||
@@ -85,16 +86,20 @@ class ExtractorEntry(Base):
|
|||||||
user = Column(BigInteger, index=True)
|
user = Column(BigInteger, index=True)
|
||||||
cursor = Column(BigInteger, default=-1)
|
cursor = Column(BigInteger, default=-1)
|
||||||
pending = Column(Boolean, default=False)
|
pending = Column(Boolean, default=False)
|
||||||
|
errors = Column(Text, default="")
|
||||||
|
busy = Column(Boolean, default=False)
|
||||||
|
|
||||||
|
|
||||||
def make_session(url):
|
def make_session(url):
|
||||||
engine = create_engine(url)#, echo=True)
|
if not isinstance(url, str):
|
||||||
|
print(url)
|
||||||
|
raise Exception("FUCK")
|
||||||
|
engine = create_engine(url, poolclass=SingletonThreadPool)#, echo=True)
|
||||||
Base.metadata.create_all(engine)
|
Base.metadata.create_all(engine)
|
||||||
Session = sessionmaker(bind=engine)
|
Session = sessionmaker(bind=engine)
|
||||||
session = Session()
|
session = Session()
|
||||||
return session
|
return session
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def test(db='sqlite:///users.db'):
|
def test(db='sqlite:///users.db'):
|
||||||
|
|
||||||
from sqlalchemy import exists
|
from sqlalchemy import exists
|
||||||
|
|||||||
444
bitter/utils.py
444
bitter/utils.py
@@ -1,3 +1,5 @@
|
|||||||
|
from __future__ import print_function
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
import json
|
import json
|
||||||
@@ -9,11 +11,17 @@ import os
|
|||||||
import multiprocessing
|
import multiprocessing
|
||||||
from multiprocessing.pool import ThreadPool
|
from multiprocessing.pool import ThreadPool
|
||||||
|
|
||||||
from itertools import islice
|
from functools import partial
|
||||||
|
|
||||||
|
from tqdm import tqdm
|
||||||
|
|
||||||
|
from itertools import islice, chain
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from itertools import zip_longest
|
|
||||||
from collections import Counter
|
from collections import Counter
|
||||||
|
|
||||||
|
from builtins import map, filter
|
||||||
|
|
||||||
from twitter import TwitterHTTPError
|
from twitter import TwitterHTTPError
|
||||||
|
|
||||||
from bitter.models import Following, User, ExtractorEntry, make_session
|
from bitter.models import Following, User, ExtractorEntry, make_session
|
||||||
@@ -27,17 +35,20 @@ def signal_handler(signal, frame):
|
|||||||
logger.info('You pressed Ctrl+C!')
|
logger.info('You pressed Ctrl+C!')
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
def chunk(iterable, n, fillvalue=None):
|
|
||||||
args = [iter(iterable)] * n
|
|
||||||
return zip_longest(*args, fillvalue=fillvalue)
|
|
||||||
|
|
||||||
def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()):
|
def chunk(iterable, n):
|
||||||
if chunksize:
|
it = iter(iterable)
|
||||||
source = chunk(source, chunksize)
|
return iter(lambda: tuple(islice(it, n)), ())
|
||||||
p = ThreadPool(numcpus)
|
|
||||||
for i in p.imap(func, source):
|
|
||||||
|
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
|
||||||
|
source = chunk(source, chunksize)
|
||||||
|
p = ThreadPool(numcpus*2)
|
||||||
|
results = p.imap_unordered(func, source, chunksize=int(1000/numcpus))
|
||||||
|
for i in chain.from_iterable(results):
|
||||||
yield i
|
yield i
|
||||||
|
|
||||||
|
|
||||||
def get_credentials_path(credfile=None):
|
def get_credentials_path(credfile=None):
|
||||||
if not credfile:
|
if not credfile:
|
||||||
if config.CREDENTIALS:
|
if config.CREDENTIALS:
|
||||||
@@ -46,17 +57,20 @@ def get_credentials_path(credfile=None):
|
|||||||
raise Exception('No valid credentials file')
|
raise Exception('No valid credentials file')
|
||||||
return os.path.expanduser(credfile)
|
return os.path.expanduser(credfile)
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def credentials_file(credfile, *args, **kwargs):
|
def credentials_file(credfile, *args, **kwargs):
|
||||||
p = get_credentials_path(credfile)
|
p = get_credentials_path(credfile)
|
||||||
with open(p, *args, **kwargs) as f:
|
with open(p, *args, **kwargs) as f:
|
||||||
yield f
|
yield f
|
||||||
|
|
||||||
|
|
||||||
def iter_credentials(credfile=None):
|
def iter_credentials(credfile=None):
|
||||||
with credentials_file(credfile) as f:
|
with credentials_file(credfile) as f:
|
||||||
for l in f:
|
for l in f:
|
||||||
yield json.loads(l.strip())
|
yield json.loads(l.strip())
|
||||||
|
|
||||||
|
|
||||||
def get_credentials(credfile=None, inverse=False, **kwargs):
|
def get_credentials(credfile=None, inverse=False, **kwargs):
|
||||||
creds = []
|
creds = []
|
||||||
for i in iter_credentials(credfile):
|
for i in iter_credentials(credfile):
|
||||||
@@ -67,11 +81,13 @@ def get_credentials(credfile=None, inverse=False, **kwargs):
|
|||||||
creds.append(i)
|
creds.append(i)
|
||||||
return creds
|
return creds
|
||||||
|
|
||||||
|
|
||||||
def create_credentials(credfile=None):
|
def create_credentials(credfile=None):
|
||||||
credfile = get_credentials_path(credfile)
|
credfile = get_credentials_path(credfile)
|
||||||
with credentials_file(credfile, 'a'):
|
with credentials_file(credfile, 'a'):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def delete_credentials(credfile=None, **creds):
|
def delete_credentials(credfile=None, **creds):
|
||||||
tokeep = get_credentials(credfile, inverse=True, **creds)
|
tokeep = get_credentials(credfile, inverse=True, **creds)
|
||||||
with credentials_file(credfile, 'w') as f:
|
with credentials_file(credfile, 'w') as f:
|
||||||
@@ -79,6 +95,7 @@ def delete_credentials(credfile=None, **creds):
|
|||||||
f.write(json.dumps(i))
|
f.write(json.dumps(i))
|
||||||
f.write('\n')
|
f.write('\n')
|
||||||
|
|
||||||
|
|
||||||
def add_credentials(credfile=None, **creds):
|
def add_credentials(credfile=None, **creds):
|
||||||
exist = get_credentials(credfile, **creds)
|
exist = get_credentials(credfile, **creds)
|
||||||
if not exist:
|
if not exist:
|
||||||
@@ -93,6 +110,7 @@ def get_hashtags(iter_tweets, best=None):
|
|||||||
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
|
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
|
||||||
return c
|
return c
|
||||||
|
|
||||||
|
|
||||||
def read_file(filename, tail=False):
|
def read_file(filename, tail=False):
|
||||||
with open(filename) as f:
|
with open(filename) as f:
|
||||||
while True:
|
while True:
|
||||||
@@ -134,6 +152,7 @@ def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
|
|||||||
else:
|
else:
|
||||||
yield user
|
yield user
|
||||||
|
|
||||||
|
|
||||||
def trim_user(user):
|
def trim_user(user):
|
||||||
if 'status' in user:
|
if 'status' in user:
|
||||||
del user['status']
|
del user['status']
|
||||||
@@ -147,142 +166,218 @@ def trim_user(user):
|
|||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
||||||
def add_user(session, user, enqueue=False):
|
def add_user(user, dburi=None, session=None, update=False):
|
||||||
|
if not session:
|
||||||
|
session = make_session(dburi)
|
||||||
|
|
||||||
user = trim_user(user)
|
user = trim_user(user)
|
||||||
olduser = session.query(User).filter(User.id==user['id'])
|
olduser = session.query(User).filter(User.id == user['id'])
|
||||||
if olduser:
|
if olduser:
|
||||||
|
if not update:
|
||||||
|
return
|
||||||
olduser.delete()
|
olduser.delete()
|
||||||
user = User(**user)
|
nuser = User()
|
||||||
session.add(user)
|
for key, value in user.items():
|
||||||
if extract:
|
setattr(nuser, key, value)
|
||||||
logging.debug('Adding entry')
|
user = nuser
|
||||||
|
if update:
|
||||||
|
session.add(user)
|
||||||
|
logger.debug('Adding entry')
|
||||||
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
|
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
|
||||||
if not entry:
|
if not entry:
|
||||||
entry = ExtractorEntry(user=user.id)
|
entry = ExtractorEntry(user=user.id)
|
||||||
session.add(entry)
|
session.add(entry)
|
||||||
logging.debug(entry.pending)
|
logger.debug(entry.pending)
|
||||||
entry.pending = True
|
entry.pending = True
|
||||||
entry.cursor = -1
|
entry.cursor = -1
|
||||||
session.commit()
|
session.commit()
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
|
def download_entry(wq, entry_id, dburi=None, recursive=False):
|
||||||
|
session = make_session(dburi)
|
||||||
|
if not session:
|
||||||
|
raise Exception("Provide dburi or session")
|
||||||
|
logger.info("Downloading entry: %s (%s)" % (entry_id, type(entry_id)))
|
||||||
|
entry = session.query(ExtractorEntry).filter(ExtractorEntry.id==entry_id).first()
|
||||||
|
user = session.query(User).filter(User.id == entry.user).first()
|
||||||
|
download_user(wq, session, user, entry, recursive)
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
|
def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
|
||||||
|
|
||||||
|
total_followers = user.followers_count
|
||||||
|
|
||||||
|
if total_followers > max_followers:
|
||||||
|
entry.pending = False
|
||||||
|
logger.info("Too many followers for user: %s" % user.screen_name)
|
||||||
|
session.add(entry)
|
||||||
|
session.commit()
|
||||||
|
return
|
||||||
|
|
||||||
|
if not entry:
|
||||||
|
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() or ExtractorEntry(user=user.id)
|
||||||
|
session.add(entry)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
pending = True
|
||||||
|
cursor = entry.cursor
|
||||||
|
uid = user.id
|
||||||
|
name = user.name
|
||||||
|
|
||||||
|
logger.info("#"*20)
|
||||||
|
logger.info("Getting %s - %s" % (uid, name))
|
||||||
|
logger.info("Cursor %s" % cursor)
|
||||||
|
logger.info("Using account: %s" % wq.name)
|
||||||
|
|
||||||
|
_fetched_followers = 0
|
||||||
|
|
||||||
|
def fetched_followers():
|
||||||
|
return session.query(Following).filter(Following.isfollowed==uid).count()
|
||||||
|
|
||||||
|
attempts = 0
|
||||||
|
while cursor > 0 or fetched_followers() < total_followers:
|
||||||
|
try:
|
||||||
|
resp = wq.followers.ids(user_id=uid, cursor=cursor)
|
||||||
|
except TwitterHTTPError as ex:
|
||||||
|
attempts += 1
|
||||||
|
if ex.e.code in (401, ) or attempts > 3:
|
||||||
|
logger.info('Not authorized for user: {}'.format(uid))
|
||||||
|
entry.errors = ex.message
|
||||||
|
break
|
||||||
|
if 'ids' not in resp:
|
||||||
|
logger.info("Error with id %s %s" % (uid, resp))
|
||||||
|
entry.pending = False
|
||||||
|
entry.errors = "No ids in response: %s" % resp
|
||||||
|
break
|
||||||
|
|
||||||
|
logger.info("New followers: %s" % len(resp['ids']))
|
||||||
|
if recursive:
|
||||||
|
newusers = get_users(wq, resp)
|
||||||
|
for newuser in newusers:
|
||||||
|
add_user(session=session, user=newuser)
|
||||||
|
|
||||||
|
if 'ids' not in resp or not resp['ids']:
|
||||||
|
logger.info('NO IDS in response')
|
||||||
|
break
|
||||||
|
for i in resp['ids']:
|
||||||
|
existing_user = session.query(Following).\
|
||||||
|
filter(Following.isfollowed == uid).\
|
||||||
|
filter(Following.follower == i).first()
|
||||||
|
now = int(time.time())
|
||||||
|
if existing_user:
|
||||||
|
existing_user.created_at_stamp = now
|
||||||
|
else:
|
||||||
|
f = Following(isfollowed=uid,
|
||||||
|
follower=i,
|
||||||
|
created_at_stamp=now)
|
||||||
|
session.add(f)
|
||||||
|
|
||||||
|
logger.info("Fetched: %s/%s followers" % (fetched_followers(),
|
||||||
|
total_followers))
|
||||||
|
entry.cursor = resp["next_cursor"]
|
||||||
|
|
||||||
|
session.add(entry)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
logger.info("Done getting followers for %s" % uid)
|
||||||
|
|
||||||
|
entry.pending = False
|
||||||
|
entry.busy = False
|
||||||
|
session.add(entry)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
|
||||||
|
sys.stdout.flush()
|
||||||
|
|
||||||
|
|
||||||
|
def classify_user(id_or_name, screen_names, user_ids):
|
||||||
|
try:
|
||||||
|
int(id_or_name)
|
||||||
|
user_ids.append(id_or_name)
|
||||||
|
logger.debug("Added user id")
|
||||||
|
except ValueError:
|
||||||
|
logger.debug("Added screen_name")
|
||||||
|
screen_names.append(id_or_name.split('@')[-1])
|
||||||
|
|
||||||
|
|
||||||
# TODO: adapt to the crawler
|
|
||||||
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
|
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
|
||||||
signal.signal(signal.SIGINT, signal_handler)
|
signal.signal(signal.SIGINT, signal_handler)
|
||||||
|
|
||||||
w = wq.next()
|
|
||||||
if not dburi:
|
if not dburi:
|
||||||
dburi = 'sqlite:///%s.db' % extractor_name
|
dburi = 'sqlite:///%s.db' % extractor_name
|
||||||
|
|
||||||
session = make_session(dburi)
|
session = make_session(dburi)
|
||||||
|
session.query(ExtractorEntry).update({ExtractorEntry.busy: False})
|
||||||
|
session.commit()
|
||||||
|
|
||||||
screen_names = []
|
|
||||||
user_ids = []
|
|
||||||
|
|
||||||
def classify_user(id_or_name):
|
if not (user or initfile):
|
||||||
try:
|
|
||||||
int(user)
|
|
||||||
user_ids.append(user)
|
|
||||||
logger.info("Added user id")
|
|
||||||
except ValueError:
|
|
||||||
logger.info("Added screen_name")
|
|
||||||
screen_names.append(user.split('@')[-1])
|
|
||||||
|
|
||||||
if user:
|
|
||||||
classify_user(user)
|
|
||||||
|
|
||||||
elif initfile:
|
|
||||||
logger.info("No user. I will open %s" % initfile)
|
|
||||||
with open(initfile, 'r') as f:
|
|
||||||
for line in f:
|
|
||||||
user = line.strip().split(',')[0]
|
|
||||||
classify_user(user)
|
|
||||||
else:
|
|
||||||
logger.info('Using pending users from last session')
|
logger.info('Using pending users from last session')
|
||||||
|
else:
|
||||||
|
screen_names = []
|
||||||
|
user_ids = []
|
||||||
|
if user:
|
||||||
|
classify_user(user, screen_names, user_ids)
|
||||||
|
elif initfile:
|
||||||
|
logger.info("No user. I will open %s" % initfile)
|
||||||
|
with open(initfile, 'r') as f:
|
||||||
|
for line in f:
|
||||||
|
user = line.strip().split(',')[0]
|
||||||
|
classify_user(user, screen_names, user_ids)
|
||||||
|
|
||||||
|
def missing_user(ix, column=User.screen_name):
|
||||||
|
res = session.query(User).filter(column == ix).count() == 0
|
||||||
|
if res:
|
||||||
|
logger.info("Missing user %s. Count: %s" % (ix, res))
|
||||||
|
return res
|
||||||
|
|
||||||
nusers = list(get_users(wq, screen_names, by_name=True))
|
screen_names = list(filter(missing_user, screen_names))
|
||||||
if user_ids:
|
user_ids = list(filter(partial(missing_user, column=User.id_str), user_ids))
|
||||||
nusers += list(get_users(wq, user_ids, by_name=False))
|
nusers = []
|
||||||
|
logger.info("Missing user ids: %s" % user_ids)
|
||||||
|
logger.info("Missing screen names: %s" % screen_names)
|
||||||
|
if screen_names:
|
||||||
|
nusers = list(get_users(wq, screen_names, by_name=True))
|
||||||
|
if user_ids:
|
||||||
|
nusers += list(get_users(wq, user_ids, by_name=False))
|
||||||
|
|
||||||
for i in nusers:
|
for i in nusers:
|
||||||
add_user(session, i, enqueue=True)
|
add_user(dburi=dburi, user=i)
|
||||||
|
|
||||||
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
|
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
|
||||||
logging.info('Total users: {}'.format(total_users))
|
logger.info('Total users: {}'.format(total_users))
|
||||||
def pending_entries():
|
|
||||||
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
|
|
||||||
logging.info('Pending: {}'.format(pending))
|
|
||||||
return pending
|
|
||||||
|
|
||||||
while pending_entries() > 0:
|
de = partial(download_entry, wq, dburi=dburi)
|
||||||
logger.info("Using account: %s" % w.name)
|
pending = pending_entries(dburi)
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users):
|
||||||
|
logger.info("Got %s" % i)
|
||||||
|
|
||||||
|
|
||||||
|
def pending_entries(dburi):
|
||||||
|
session = make_session(dburi)
|
||||||
|
while True:
|
||||||
candidate, entry = session.query(User, ExtractorEntry).\
|
candidate, entry = session.query(User, ExtractorEntry).\
|
||||||
filter(ExtractorEntry.user == User.id).\
|
filter(ExtractorEntry.user == User.id).\
|
||||||
filter(ExtractorEntry.pending == True).\
|
filter(ExtractorEntry.pending == True).\
|
||||||
order_by(User.followers_count).first()
|
filter(ExtractorEntry.busy == False).\
|
||||||
if not candidate:
|
order_by(User.followers_count).first()
|
||||||
break
|
if candidate:
|
||||||
pending = True
|
entry.busy = True
|
||||||
cursor = entry.cursor
|
session.add(entry)
|
||||||
uid = candidate.id
|
session.commit()
|
||||||
uobject = session.query(User).filter(User.id==uid).first()
|
yield int(entry.id)
|
||||||
name = uobject.screen_name if uobject else None
|
continue
|
||||||
|
if session.query(ExtractorEntry).\
|
||||||
logger.info("#"*20)
|
filter(ExtractorEntry.busy == True).count() > 0:
|
||||||
logger.info("Getting %s - %s" % (uid, name))
|
time.sleep(1)
|
||||||
logger.info("Cursor %s" % cursor)
|
continue
|
||||||
logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users))
|
logger.info("No more pending entries")
|
||||||
try:
|
break
|
||||||
resp = wq.followers.ids(user_id=uid, cursor=cursor)
|
session.close()
|
||||||
except TwitterHTTPError as ex:
|
|
||||||
if ex.e.code in (401, ):
|
|
||||||
logger.info('Not authorized for user: {}'.format(uid))
|
|
||||||
resp = {}
|
|
||||||
if 'ids' in resp:
|
|
||||||
logger.info("New followers: %s" % len(resp['ids']))
|
|
||||||
if recursive:
|
|
||||||
newusers = get_users(wq, resp)
|
|
||||||
for user in newusers:
|
|
||||||
add_user(session, newuser, enqueue=True)
|
|
||||||
for i in resp['ids']:
|
|
||||||
existing_user = session.query(Following).\
|
|
||||||
filter(Following.isfollowed==uid).\
|
|
||||||
filter(Following.follower==i).first()
|
|
||||||
now = int(time.time())
|
|
||||||
if existing_user:
|
|
||||||
existing_user.created_at_stamp = now
|
|
||||||
else:
|
|
||||||
f = Following(isfollowed=uid,
|
|
||||||
follower=i,
|
|
||||||
created_at_stamp=now)
|
|
||||||
session.add(f)
|
|
||||||
|
|
||||||
total_followers = candidate.followers_count
|
|
||||||
fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count()
|
|
||||||
logger.info("Fetched: %s/%s followers" % (fetched_followers,
|
|
||||||
total_followers))
|
|
||||||
cursor = resp["next_cursor"]
|
|
||||||
if cursor > 0:
|
|
||||||
pending = True
|
|
||||||
logger.info("Getting more followers for %s" % uid)
|
|
||||||
else:
|
|
||||||
logger.info("Done getting followers for %s" % uid)
|
|
||||||
cursor = -1
|
|
||||||
pending = False
|
|
||||||
else:
|
|
||||||
logger.info("Error with id %s %s" % (uid, resp))
|
|
||||||
pending = False
|
|
||||||
|
|
||||||
entry.pending = pending
|
|
||||||
entry.cursor = cursor
|
|
||||||
logging.debug('Entry: {} - {}'.format(entry.user, entry.pending))
|
|
||||||
|
|
||||||
session.add(candidate)
|
|
||||||
session.commit()
|
|
||||||
|
|
||||||
sys.stdout.flush()
|
|
||||||
|
|
||||||
|
|
||||||
def get_tweet(c, tid):
|
def get_tweet(c, tid):
|
||||||
return c.statuses.show(id=tid)
|
return c.statuses.show(id=tid)
|
||||||
@@ -302,3 +397,118 @@ def get_user(c, user):
|
|||||||
return c.users.lookup(user_id=user)[0]
|
return c.users.lookup(user_id=user)[0]
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return c.users.lookup(screen_name=user)[0]
|
return c.users.lookup(screen_name=user)[0]
|
||||||
|
|
||||||
|
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
|
||||||
|
cached = cached_tweet(tweetid, folder)
|
||||||
|
tweet = None
|
||||||
|
if update or not cached:
|
||||||
|
tweet = get_tweet(wq, tweetid)
|
||||||
|
js = json.dumps(tweet, indent=2)
|
||||||
|
if write:
|
||||||
|
if tweet:
|
||||||
|
write_tweet_json(js, folder)
|
||||||
|
else:
|
||||||
|
print(js)
|
||||||
|
|
||||||
|
|
||||||
|
def cached_tweet(tweetid, folder):
|
||||||
|
tweet = None
|
||||||
|
file = os.path.join(folder, '%s.json' % tweetid)
|
||||||
|
if os.path.exists(file) and os.path.isfile(file):
|
||||||
|
try:
|
||||||
|
# print('%s: Tweet exists' % tweetid)
|
||||||
|
with open(file) as f:
|
||||||
|
tweet = json.load(f)
|
||||||
|
except Exception as ex:
|
||||||
|
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
|
||||||
|
return tweet
|
||||||
|
|
||||||
|
def write_tweet_json(js, folder):
|
||||||
|
tweetid = js['id']
|
||||||
|
file = tweet_file(tweetid, folder)
|
||||||
|
if not os.path.exists(folder):
|
||||||
|
os.makedirs(folder)
|
||||||
|
with open(file, 'w') as f:
|
||||||
|
json.dump(js, f, indent=2)
|
||||||
|
logger.info('Written {} to file {}'.format(tweetid, file))
|
||||||
|
|
||||||
|
def tweet_file(tweetid, folder):
|
||||||
|
return os.path.join(folder, '%s.json' % tweetid)
|
||||||
|
|
||||||
|
def tweet_fail_file(tweetid, folder):
|
||||||
|
failsfolder = os.path.join(folder, 'failed')
|
||||||
|
if not os.path.exists(failsfolder):
|
||||||
|
os.makedirs(failsfolder)
|
||||||
|
return os.path.join(failsfolder, '%s.failed' % tweetid)
|
||||||
|
|
||||||
|
def tweet_failed(tweetid, folder):
|
||||||
|
return os.path.isfile(tweet_fail_file(tweetid, folder))
|
||||||
|
|
||||||
|
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
|
||||||
|
def filter_line(line):
|
||||||
|
tweetid = int(line)
|
||||||
|
# print('Checking {}'.format(tweetid))
|
||||||
|
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
|
||||||
|
yield None
|
||||||
|
else:
|
||||||
|
yield line
|
||||||
|
|
||||||
|
def print_result(res):
|
||||||
|
tid, tweet = res
|
||||||
|
if tweet:
|
||||||
|
try:
|
||||||
|
write_tweet_json(tweet, folder=folder)
|
||||||
|
yield 1
|
||||||
|
except Exception as ex:
|
||||||
|
logger.error('%s: %s' % (tid, ex))
|
||||||
|
if not ignore_fails:
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
logger.info('Tweet not recovered: {}'.format(tid))
|
||||||
|
with open(tweet_fail_file(tid, folder), 'w') as f:
|
||||||
|
print('Tweet not found', file=f)
|
||||||
|
yield -1
|
||||||
|
|
||||||
|
def download_batch(batch):
|
||||||
|
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
|
||||||
|
return tweets.items()
|
||||||
|
|
||||||
|
with open(tweetsfile) as f:
|
||||||
|
lines = map(lambda x: x.strip(), f)
|
||||||
|
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
|
||||||
|
tweets = parallel(download_batch, lines_to_crawl, 100)
|
||||||
|
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def download_timeline(wq, user):
|
||||||
|
return wq.statuses.user_timeline(id=user)
|
||||||
|
|
||||||
|
|
||||||
|
def consume_feed(func, *args, **kwargs):
|
||||||
|
'''
|
||||||
|
Get all the tweets using pagination and a given method.
|
||||||
|
It can be controlled with the `count` parameter.
|
||||||
|
|
||||||
|
If count < 0 => Loop until the whole feed is consumed.
|
||||||
|
If count == 0 => Only call the API once, with the default values.
|
||||||
|
If count > 0 => Get count tweets from the feed.
|
||||||
|
'''
|
||||||
|
remaining = int(kwargs.pop('count', 0))
|
||||||
|
consume = remaining < 0
|
||||||
|
limit = False
|
||||||
|
|
||||||
|
# Simulate a do-while by updating the condition at the end
|
||||||
|
while not limit:
|
||||||
|
if remaining > 0:
|
||||||
|
kwargs['count'] = remaining
|
||||||
|
resp = func(*args, **kwargs)
|
||||||
|
if not resp:
|
||||||
|
return
|
||||||
|
for t in resp:
|
||||||
|
yield t
|
||||||
|
if consume:
|
||||||
|
continue
|
||||||
|
remaining -= len(resp)
|
||||||
|
max_id = min(s['id'] for s in func(*args, **kwargs)) - 1
|
||||||
|
kwargs['max_id'] = max_id
|
||||||
|
limit = remaining <= 0
|
||||||
|
|||||||
4
setup.cfg
Normal file
4
setup.cfg
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
[metadata]
|
||||||
|
description-file = README.md
|
||||||
|
[aliases]
|
||||||
|
test=pytest
|
||||||
13
setup.py
13
setup.py
@@ -38,10 +38,19 @@ setup(
|
|||||||
extras_require = {
|
extras_require = {
|
||||||
'server': ['flask', 'flask-oauthlib']
|
'server': ['flask', 'flask-oauthlib']
|
||||||
},
|
},
|
||||||
test_suite="tests",
|
setup_requires=['pytest-runner',],
|
||||||
include_package_data=True,
|
include_package_data=True,
|
||||||
entry_points="""
|
entry_points="""
|
||||||
[console_scripts]
|
[console_scripts]
|
||||||
bitter=bitter.cli:main
|
bitter=bitter.cli:main
|
||||||
"""
|
""",
|
||||||
|
classifiers=[
|
||||||
|
'Development Status :: 4 - Beta',
|
||||||
|
'Intended Audience :: Developers',
|
||||||
|
'Intended Audience :: Science/Research',
|
||||||
|
'License :: OSI Approved :: Apache Software License',
|
||||||
|
'Programming Language :: Python :: 2',
|
||||||
|
'Programming Language :: Python :: 2.7',
|
||||||
|
'Programming Language :: Python :: 3',
|
||||||
|
]
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -5,14 +5,14 @@ import types
|
|||||||
import datetime
|
import datetime
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from bitter import utils
|
from bitter import utils, easy
|
||||||
from bitter.crawlers import TwitterQueue, TwitterWorker, TwitterQueueException
|
from bitter.crawlers import QueueException
|
||||||
from bitter import config as c
|
from bitter import config as c
|
||||||
|
|
||||||
class TestUtils(TestCase):
|
class TestCrawlers(TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json'))
|
self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json'))
|
||||||
|
|
||||||
def test_create_worker(self):
|
def test_create_worker(self):
|
||||||
assert len(self.wq.queue)==1
|
assert len(self.wq.queue)==1
|
||||||
@@ -64,12 +64,9 @@ class TestUtils(TestCase):
|
|||||||
try:
|
try:
|
||||||
# resp = self.wq.friends.list(screen_name='balkian')
|
# resp = self.wq.friends.list(screen_name='balkian')
|
||||||
self.wq.next(['friends', 'list'])
|
self.wq.next(['friends', 'list'])
|
||||||
except TwitterQueueException:
|
except QueueException:
|
||||||
failed = True
|
failed = True
|
||||||
assert failed
|
assert failed
|
||||||
l2 = w1.get_limit(['friends', 'list'])
|
l2 = w1.get_limit(['friends', 'list'])
|
||||||
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
|
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
|
||||||
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)
|
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)
|
||||||
time.sleep(w1.get_wait(['friends', 'list']))
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -58,4 +58,6 @@ class TestUtils(TestCase):
|
|||||||
assert list(resp) == [1,2,3]
|
assert list(resp) == [1,2,3]
|
||||||
toc = time.time()
|
toc = time.time()
|
||||||
assert (tic-toc) < 6000
|
assert (tic-toc) < 6000
|
||||||
|
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
|
||||||
|
assert list(resp2) == [1,2,3,4]
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user