1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-25 20:58:24 +00:00

6 Commits
0.7.0 ... 0.7.4

Author SHA1 Message Date
J. Fernando Sánchez
cf766a6bf3 API command
* Added API command
* Fixed bug in chunk
2017-11-30 16:49:42 +01:00
J. Fernando Sánchez
e65f6836b3 Fixed tweet error 2017-05-21 21:28:35 +02:00
J. Fernando Sánchez
1cb86abbdd Use easy in tests and README 2017-03-08 12:15:48 +01:00
J. Fernando Sánchez
b212a46ab7 Added CI and tests 2016-12-06 01:30:32 +01:00
J. Fernando Sánchez
0a0d8fd5f1 Improved tweet downloader (CLI and API) 2016-12-06 00:03:38 +01:00
J. Fernando Sánchez
e3a78968da Py2 compatibility and queue handling
* Removed install_aliases(), which caused problems with urllib2
* Better waiting time calculation (used in queue handling)
2016-11-23 12:31:02 +01:00
17 changed files with 546 additions and 180 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
__pycache__
*.egg-info *.egg-info
dist dist
env env

7
Dockerfile-2.7 Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

7
Dockerfile-3.4 Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:3.4-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

7
Dockerfile.template Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:{{PYVERSION}}-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -5,4 +5,6 @@ include README.md
include bitter/VERSION include bitter/VERSION
graft bitter/templates graft bitter/templates
graft bitter/static graft bitter/static
graft test include tests/test*
global-exclude *.pyc
global-exclude __pycache__

76
Makefile Normal file
View File

@@ -0,0 +1,76 @@
PYVERSIONS=3.4 2.7
PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter
REPO=balkian
VERSION=$(shell cat $(NAME)/VERSION)
TARNAME=$(NAME)-$(VERSION).tar.gz
IMAGENAME=$(REPO)/$(NAME)
IMAGEWTAG=$(IMAGENAME):$(VERSION)
all: build run
dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS))
Dockerfile-%: Dockerfile.template
sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$*
dev-%:
@docker start $(NAME)-dev$* || (\
$(MAKE) build-$*; \
docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
)\
docker exec -ti $(NAME)-dev$* bash
dev: dev-$(PYMAIN)
build: $(addprefix build-, $(PYMAIN))
buildall: $(addprefix build-, $(PYVERSIONS))
build-%: Dockerfile-%
docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .;
test: $(addprefix test-,$(PYMAIN))
testall: $(addprefix test-,$(PYVERSIONS))
test-%: build-%
docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
pip_test-%:
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
dist/$(NAME)-$(VERSION).tar.gz:
docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist;
sdist: dist/$(NAME)-$(VERSION).tar.gz
upload-%: test-%
docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
upload: testall $(addprefix upload-,$(PYVERSIONS))
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)'
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)'
clean:
@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true
@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true
upload_git:
git commit -a
git tag ${VERSION}
git push --tags origin master
pip_upload:
python setup.py sdist upload ;
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
run: build
docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
.PHONY: test test-% build-% build test test_pip run

View File

@@ -17,11 +17,18 @@ or
Programmatically: Programmatically:
```python ```python
from bitter.crawlers import TwitterQueue from bitter import easy
wq = TwitterQueue.from_credentials() wq = easy()
print(wq.users.show(user_name='balkian')) print(wq.users.show(user_name='balkian'))
``` ```
You can also make custom calls to the API through the command line.
e.g. to get the latest 500 tweets by the python software foundation:
```
bitter api statuses/user_timeline --id thepsf --count 500
```
# Credentials format # Credentials format
``` ```

View File

@@ -1 +1 @@
0.6.6 0.7.4

View File

@@ -3,15 +3,14 @@ Bitter module. A library and cli for Twitter using python-twitter.
http://github.com/balkian/bitter http://github.com/balkian/bitter
""" """
try:
from future.standard_library import install_aliases
install_aliases()
except ImportError:
# Avoid problems at setup.py and py3.x
pass
import os import os
from .version import __version__ from .version import __version__
def easy(*args, **kwargs):
from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -52,34 +52,15 @@ def tweet(ctx):
@click.argument('tweetid') @click.argument('tweetid')
def get_tweet(tweetid, write, folder, update): def get_tweet(tweetid, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
if not write: utils.download_tweet(wq, tweetid, write, folder, update)
t = utils.get_tweet(wq, tweetid)
js = json.dumps(t, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % tweetid)
if not update and os.path.exists(file) and os.path.isfile(file):
print('%s: Tweet exists' % tweetid)
return
try:
t = utils.get_tweet(wq, tweetid)
with open(file, 'w') as f:
js = json.dumps(t, indent=2)
print(js, file=f)
except Exception as ex:
print('%s: %s' % (tweetid, ex), file=sys.stderr)
@tweet.command('get_all') @tweet.command('get_all')
@click.argument('tweetsfile', 'File with a list of tweets to look up') @click.argument('tweetsfile', 'File with a list of tweets to look up')
@click.option('-f', '--folder', default="tweets") @click.option('-f', '--folder', default="tweets")
@click.pass_context @click.pass_context
def get_tweets(ctx, tweetsfile, folder): def get_tweets(ctx, tweetsfile, folder):
with open(tweetsfile) as f: wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
for line in f: utils.download_tweets(wq, tweetsfile, folder)
tid = line.strip()
ctx.invoke(get_tweet, folder=folder, tweetid=tid, write=True)
@tweet.command('search') @tweet.command('search')
@click.argument('query') @click.argument('query')
@@ -259,11 +240,6 @@ def crawl_users(ctx, usersfile, skip, until, threads, db):
logger.info('Done!') logger.info('Done!')
@main.group('api')
def api():
pass
@main.group('extractor') @main.group('extractor')
@click.pass_context @click.pass_context
@click.option('--db', required=True, help='Database of users.') @click.option('--db', required=True, help='Database of users.')
@@ -351,7 +327,7 @@ def reset_extractor(ctx):
session = make_session(db) session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@api.command('limits') @main.command('limits')
@click.argument('url', required=False) @click.argument('url', required=False)
@click.pass_context @click.pass_context
def get_limits(ctx, url): def get_limits(ctx, url):
@@ -372,6 +348,32 @@ def get_limits(ctx, url):
else: else:
print(json.dumps(resp, indent=2)) print(json.dumps(resp, indent=2))
@main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False))
@click.argument('cmd', nargs=1)
@click.argument('api_args', nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def api(ctx, cmd, api_args):
opts = {}
i = iter(api_args)
for k, v in zip(i, i):
k = k.replace('--', '')
opts[k] = v
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
resp = utils.consume_feed(wq[cmd], **opts)
# A hack to stream jsons
print('[')
first = True
for i in resp:
if not first:
print(',')
else:
first = False
print(json.dumps(i, indent=2))
print(']')
@main.command('server') @main.command('server')
@click.argument('CONSUMER_KEY', required=True) @click.argument('CONSUMER_KEY', required=True)
@click.argument('CONSUMER_SECRET', required=True) @click.argument('CONSUMER_SECRET', required=True)
@@ -391,8 +393,9 @@ def stream(ctx):
@click.option('-l', '--locations', default=None) @click.option('-l', '--locations', default=None)
@click.option('-t', '--track', default=None) @click.option('-t', '--track', default=None)
@click.option('-f', '--file', help='File to store the stream of tweets') @click.option('-f', '--file', help='File to store the stream of tweets')
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
@click.pass_context @click.pass_context
def get_stream(ctx, locations, track, file): def get_stream(ctx, locations, track, file, politelyretry):
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1) wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
query_args = {} query_args = {}
@@ -400,17 +403,28 @@ def get_stream(ctx, locations, track, file):
query_args['locations'] = locations query_args['locations'] = locations
if track: if track:
query_args['track'] = track query_args['track'] = track
if not query_args:
iterator = wq.statuses.sample()
else:
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
if not file: if not file:
file = sys.stdout file = sys.stdout
else: else:
file = open(file, 'a') file = open(file, 'a')
for tweet in tqdm(iterator): def insist():
lasthangup = time.time()
while True:
if not query_args:
iterator = wq.statuses.sample()
else:
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
for i in iterator:
yield i
if not politelyretry:
return
thishangup = time.time()
if thishangup - lasthangup < 60:
raise Exception('Too many hangups in a row.')
time.sleep(3)
for tweet in tqdm(insist()):
print(json.dumps(tweet), file=file) print(json.dumps(tweet), file=file)
if file != sys.stdout: if file != sys.stdout:
file.close() file.close()

View File

@@ -10,6 +10,12 @@ from twitter import *
from collections import OrderedDict from collections import OrderedDict
from threading import Lock from threading import Lock
from itertools import islice from itertools import islice
from functools import partial
try:
import itertools.ifilter as filter
except ImportError:
pass
from . import utils from . import utils
from . import config from . import config
@@ -33,6 +39,9 @@ class AttrToFunc(object):
else: else:
return extend_call(k) return extend_call(k)
def __getitem__(self, k):
return partial(self.handler, self.__uriparts+k.split('/'))
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
# for i, a in enumerate(args)e # for i, a in enumerate(args)e
# kwargs[i] = a # kwargs[i] = a
@@ -70,6 +79,12 @@ class TwitterWorker(object):
self._client = self.api_class(auth=auth) self._client = self.api_class(auth=auth)
return self._client return self._client
def __repr__(self):
msg = '<{} for {}>'.format(self.__class__.__name__, self.name)
if self.busy:
msg += ' [busy]'
return msg
class RestWorker(TwitterWorker): class RestWorker(TwitterWorker):
api_class = Twitter api_class = Twitter
@@ -178,9 +193,13 @@ class TwitterQueue(QueueMixin):
patience -= 1 patience -= 1
def get_wait(self, uriparts): def get_wait(self, uriparts):
available = next(lambda x: not x.busy, self.queue) # Stop as soon as one is available to avoid initiating the rest
first_worker = min(available, key=lambda x: x.get_wait(uriparts)) for i in self.queue:
diff = first_worker.get_wait(uriparts) if not i.busy and i.get_wait(uriparts) == 0:
return 0
# If None is available, let's see how much we have to wait
available = filter(lambda x: not x.busy, self.queue)
diff = min(worker.get_wait(uriparts) for worker in self.queue if not worker.busy)
return diff return diff

View File

@@ -3,6 +3,7 @@ import json
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.types import BigInteger, Integer, Text, Boolean from sqlalchemy.types import BigInteger, Integer, Text, Boolean
from sqlalchemy.pool import SingletonThreadPool
from sqlalchemy import Column, Index from sqlalchemy import Column, Index
from sqlalchemy import create_engine from sqlalchemy import create_engine
@@ -85,16 +86,20 @@ class ExtractorEntry(Base):
user = Column(BigInteger, index=True) user = Column(BigInteger, index=True)
cursor = Column(BigInteger, default=-1) cursor = Column(BigInteger, default=-1)
pending = Column(Boolean, default=False) pending = Column(Boolean, default=False)
errors = Column(Text, default="")
busy = Column(Boolean, default=False)
def make_session(url): def make_session(url):
engine = create_engine(url)#, echo=True) if not isinstance(url, str):
print(url)
raise Exception("FUCK")
engine = create_engine(url, poolclass=SingletonThreadPool)#, echo=True)
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine) Session = sessionmaker(bind=engine)
session = Session() session = Session()
return session return session
def test(db='sqlite:///users.db'): def test(db='sqlite:///users.db'):
from sqlalchemy import exists from sqlalchemy import exists

View File

@@ -1,3 +1,5 @@
from __future__ import print_function
import logging import logging
import time import time
import json import json
@@ -9,11 +11,17 @@ import os
import multiprocessing import multiprocessing
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from itertools import islice from functools import partial
from tqdm import tqdm
from itertools import islice, chain
from contextlib import contextmanager from contextlib import contextmanager
from itertools import zip_longest
from collections import Counter from collections import Counter
from builtins import map, filter
from twitter import TwitterHTTPError from twitter import TwitterHTTPError
from bitter.models import Following, User, ExtractorEntry, make_session from bitter.models import Following, User, ExtractorEntry, make_session
@@ -27,17 +35,20 @@ def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!') logger.info('You pressed Ctrl+C!')
sys.exit(0) sys.exit(0)
def chunk(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()): def chunk(iterable, n):
if chunksize: it = iter(iterable)
return iter(lambda: tuple(islice(it, n)), ())
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize) source = chunk(source, chunksize)
p = ThreadPool(numcpus) p = ThreadPool(numcpus*2)
for i in p.imap(func, source): results = p.imap_unordered(func, source, chunksize=int(1000/numcpus))
for i in chain.from_iterable(results):
yield i yield i
def get_credentials_path(credfile=None): def get_credentials_path(credfile=None):
if not credfile: if not credfile:
if config.CREDENTIALS: if config.CREDENTIALS:
@@ -46,17 +57,20 @@ def get_credentials_path(credfile=None):
raise Exception('No valid credentials file') raise Exception('No valid credentials file')
return os.path.expanduser(credfile) return os.path.expanduser(credfile)
@contextmanager @contextmanager
def credentials_file(credfile, *args, **kwargs): def credentials_file(credfile, *args, **kwargs):
p = get_credentials_path(credfile) p = get_credentials_path(credfile)
with open(p, *args, **kwargs) as f: with open(p, *args, **kwargs) as f:
yield f yield f
def iter_credentials(credfile=None): def iter_credentials(credfile=None):
with credentials_file(credfile) as f: with credentials_file(credfile) as f:
for l in f: for l in f:
yield json.loads(l.strip()) yield json.loads(l.strip())
def get_credentials(credfile=None, inverse=False, **kwargs): def get_credentials(credfile=None, inverse=False, **kwargs):
creds = [] creds = []
for i in iter_credentials(credfile): for i in iter_credentials(credfile):
@@ -67,11 +81,13 @@ def get_credentials(credfile=None, inverse=False, **kwargs):
creds.append(i) creds.append(i)
return creds return creds
def create_credentials(credfile=None): def create_credentials(credfile=None):
credfile = get_credentials_path(credfile) credfile = get_credentials_path(credfile)
with credentials_file(credfile, 'a'): with credentials_file(credfile, 'a'):
pass pass
def delete_credentials(credfile=None, **creds): def delete_credentials(credfile=None, **creds):
tokeep = get_credentials(credfile, inverse=True, **creds) tokeep = get_credentials(credfile, inverse=True, **creds)
with credentials_file(credfile, 'w') as f: with credentials_file(credfile, 'w') as f:
@@ -79,6 +95,7 @@ def delete_credentials(credfile=None, **creds):
f.write(json.dumps(i)) f.write(json.dumps(i))
f.write('\n') f.write('\n')
def add_credentials(credfile=None, **creds): def add_credentials(credfile=None, **creds):
exist = get_credentials(credfile, **creds) exist = get_credentials(credfile, **creds)
if not exist: if not exist:
@@ -93,6 +110,7 @@ def get_hashtags(iter_tweets, best=None):
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {})) c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
return c return c
def read_file(filename, tail=False): def read_file(filename, tail=False):
with open(filename) as f: with open(filename) as f:
while True: while True:
@@ -134,6 +152,7 @@ def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
else: else:
yield user yield user
def trim_user(user): def trim_user(user):
if 'status' in user: if 'status' in user:
del user['status'] del user['status']
@@ -147,104 +166,101 @@ def trim_user(user):
return user return user
def add_user(session, user, enqueue=False): def add_user(user, dburi=None, session=None, update=False):
if not session:
session = make_session(dburi)
user = trim_user(user) user = trim_user(user)
olduser = session.query(User).filter(User.id == user['id']) olduser = session.query(User).filter(User.id == user['id'])
if olduser: if olduser:
if not update:
return
olduser.delete() olduser.delete()
user = User(**user) nuser = User()
for key, value in user.items():
setattr(nuser, key, value)
user = nuser
if update:
session.add(user) session.add(user)
if extract: logger.debug('Adding entry')
logging.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry: if not entry:
entry = ExtractorEntry(user=user.id) entry = ExtractorEntry(user=user.id)
session.add(entry) session.add(entry)
logging.debug(entry.pending) logger.debug(entry.pending)
entry.pending = True entry.pending = True
entry.cursor = -1 entry.cursor = -1
session.commit() session.commit()
session.close()
# TODO: adapt to the crawler def download_entry(wq, entry_id, dburi=None, recursive=False):
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
signal.signal(signal.SIGINT, signal_handler)
w = wq.next()
if not dburi:
dburi = 'sqlite:///%s.db' % extractor_name
session = make_session(dburi) session = make_session(dburi)
if not session:
screen_names = [] raise Exception("Provide dburi or session")
user_ids = [] logger.info("Downloading entry: %s (%s)" % (entry_id, type(entry_id)))
entry = session.query(ExtractorEntry).filter(ExtractorEntry.id==entry_id).first()
def classify_user(id_or_name): user = session.query(User).filter(User.id == entry.user).first()
try: download_user(wq, session, user, entry, recursive)
int(user) session.close()
user_ids.append(user)
logger.info("Added user id")
except ValueError:
logger.info("Added screen_name")
screen_names.append(user.split('@')[-1])
if user:
classify_user(user)
elif initfile:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
classify_user(user)
else:
logger.info('Using pending users from last session')
nusers = list(get_users(wq, screen_names, by_name=True)) def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
if user_ids:
nusers += list(get_users(wq, user_ids, by_name=False))
for i in nusers: total_followers = user.followers_count
add_user(session, i, enqueue=True)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar() if total_followers > max_followers:
logging.info('Total users: {}'.format(total_users)) entry.pending = False
def pending_entries(): logger.info("Too many followers for user: %s" % user.screen_name)
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count() session.add(entry)
logging.info('Pending: {}'.format(pending)) session.commit()
return pending return
if not entry:
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() or ExtractorEntry(user=user.id)
session.add(entry)
session.commit()
while pending_entries() > 0:
logger.info("Using account: %s" % w.name)
candidate, entry = session.query(User, ExtractorEntry).\
filter(ExtractorEntry.user == User.id).\
filter(ExtractorEntry.pending == True).\
order_by(User.followers_count).first()
if not candidate:
break
pending = True pending = True
cursor = entry.cursor cursor = entry.cursor
uid = candidate.id uid = user.id
uobject = session.query(User).filter(User.id==uid).first() name = user.name
name = uobject.screen_name if uobject else None
logger.info("#"*20) logger.info("#"*20)
logger.info("Getting %s - %s" % (uid, name)) logger.info("Getting %s - %s" % (uid, name))
logger.info("Cursor %s" % cursor) logger.info("Cursor %s" % cursor)
logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users)) logger.info("Using account: %s" % wq.name)
_fetched_followers = 0
def fetched_followers():
return session.query(Following).filter(Following.isfollowed==uid).count()
attempts = 0
while cursor > 0 or fetched_followers() < total_followers:
try: try:
resp = wq.followers.ids(user_id=uid, cursor=cursor) resp = wq.followers.ids(user_id=uid, cursor=cursor)
except TwitterHTTPError as ex: except TwitterHTTPError as ex:
if ex.e.code in (401, ): attempts += 1
if ex.e.code in (401, ) or attempts > 3:
logger.info('Not authorized for user: {}'.format(uid)) logger.info('Not authorized for user: {}'.format(uid))
resp = {} entry.errors = ex.message
if 'ids' in resp: break
if 'ids' not in resp:
logger.info("Error with id %s %s" % (uid, resp))
entry.pending = False
entry.errors = "No ids in response: %s" % resp
break
logger.info("New followers: %s" % len(resp['ids'])) logger.info("New followers: %s" % len(resp['ids']))
if recursive: if recursive:
newusers = get_users(wq, resp) newusers = get_users(wq, resp)
for user in newusers: for newuser in newusers:
add_user(session, newuser, enqueue=True) add_user(session=session, user=newuser)
if 'ids' not in resp or not resp['ids']:
logger.info('NO IDS in response')
break
for i in resp['ids']: for i in resp['ids']:
existing_user = session.query(Following).\ existing_user = session.query(Following).\
filter(Following.isfollowed == uid).\ filter(Following.isfollowed == uid).\
@@ -258,32 +274,111 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
created_at_stamp=now) created_at_stamp=now)
session.add(f) session.add(f)
total_followers = candidate.followers_count logger.info("Fetched: %s/%s followers" % (fetched_followers(),
fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count()
logger.info("Fetched: %s/%s followers" % (fetched_followers,
total_followers)) total_followers))
cursor = resp["next_cursor"] entry.cursor = resp["next_cursor"]
if cursor > 0:
pending = True
logger.info("Getting more followers for %s" % uid)
else:
logger.info("Done getting followers for %s" % uid)
cursor = -1
pending = False
else:
logger.info("Error with id %s %s" % (uid, resp))
pending = False
entry.pending = pending session.add(entry)
entry.cursor = cursor
logging.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate)
session.commit() session.commit()
logger.info("Done getting followers for %s" % uid)
entry.pending = False
entry.busy = False
session.add(entry)
session.commit()
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
sys.stdout.flush() sys.stdout.flush()
def classify_user(id_or_name, screen_names, user_ids):
try:
int(id_or_name)
user_ids.append(id_or_name)
logger.debug("Added user id")
except ValueError:
logger.debug("Added screen_name")
screen_names.append(id_or_name.split('@')[-1])
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
signal.signal(signal.SIGINT, signal_handler)
if not dburi:
dburi = 'sqlite:///%s.db' % extractor_name
session = make_session(dburi)
session.query(ExtractorEntry).update({ExtractorEntry.busy: False})
session.commit()
if not (user or initfile):
logger.info('Using pending users from last session')
else:
screen_names = []
user_ids = []
if user:
classify_user(user, screen_names, user_ids)
elif initfile:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
classify_user(user, screen_names, user_ids)
def missing_user(ix, column=User.screen_name):
res = session.query(User).filter(column == ix).count() == 0
if res:
logger.info("Missing user %s. Count: %s" % (ix, res))
return res
screen_names = list(filter(missing_user, screen_names))
user_ids = list(filter(partial(missing_user, column=User.id_str), user_ids))
nusers = []
logger.info("Missing user ids: %s" % user_ids)
logger.info("Missing screen names: %s" % screen_names)
if screen_names:
nusers = list(get_users(wq, screen_names, by_name=True))
if user_ids:
nusers += list(get_users(wq, user_ids, by_name=False))
for i in nusers:
add_user(dburi=dburi, user=i)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logger.info('Total users: {}'.format(total_users))
de = partial(download_entry, wq, dburi=dburi)
pending = pending_entries(dburi)
session.close()
for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users):
logger.info("Got %s" % i)
def pending_entries(dburi):
session = make_session(dburi)
while True:
candidate, entry = session.query(User, ExtractorEntry).\
filter(ExtractorEntry.user == User.id).\
filter(ExtractorEntry.pending == True).\
filter(ExtractorEntry.busy == False).\
order_by(User.followers_count).first()
if candidate:
entry.busy = True
session.add(entry)
session.commit()
yield int(entry.id)
continue
if session.query(ExtractorEntry).\
filter(ExtractorEntry.busy == True).count() > 0:
time.sleep(1)
continue
logger.info("No more pending entries")
break
session.close()
def get_tweet(c, tid): def get_tweet(c, tid):
return c.statuses.show(id=tid) return c.statuses.show(id=tid)
@@ -302,3 +397,118 @@ def get_user(c, user):
return c.users.lookup(user_id=user)[0] return c.users.lookup(user_id=user)[0]
except ValueError: except ValueError:
return c.users.lookup(screen_name=user)[0] return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
cached = cached_tweet(tweetid, folder)
tweet = None
if update or not cached:
tweet = get_tweet(wq, tweetid)
js = json.dumps(tweet, indent=2)
if write:
if tweet:
write_tweet_json(js, folder)
else:
print(js)
def cached_tweet(tweetid, folder):
tweet = None
file = os.path.join(folder, '%s.json' % tweetid)
if os.path.exists(file) and os.path.isfile(file):
try:
# print('%s: Tweet exists' % tweetid)
with open(file) as f:
tweet = json.load(f)
except Exception as ex:
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
return tweet
def write_tweet_json(js, folder):
tweetid = js['id']
file = tweet_file(tweetid, folder)
if not os.path.exists(folder):
os.makedirs(folder)
with open(file, 'w') as f:
json.dump(js, f, indent=2)
logger.info('Written {} to file {}'.format(tweetid, file))
def tweet_file(tweetid, folder):
return os.path.join(folder, '%s.json' % tweetid)
def tweet_fail_file(tweetid, folder):
failsfolder = os.path.join(folder, 'failed')
if not os.path.exists(failsfolder):
os.makedirs(failsfolder)
return os.path.join(failsfolder, '%s.failed' % tweetid)
def tweet_failed(tweetid, folder):
return os.path.isfile(tweet_fail_file(tweetid, folder))
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
def filter_line(line):
tweetid = int(line)
# print('Checking {}'.format(tweetid))
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
yield None
else:
yield line
def print_result(res):
tid, tweet = res
if tweet:
try:
write_tweet_json(tweet, folder=folder)
yield 1
except Exception as ex:
logger.error('%s: %s' % (tid, ex))
if not ignore_fails:
raise
else:
logger.info('Tweet not recovered: {}'.format(tid))
with open(tweet_fail_file(tid, folder), 'w') as f:
print('Tweet not found', file=f)
yield -1
def download_batch(batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
return tweets.items()
with open(tweetsfile) as f:
lines = map(lambda x: x.strip(), f)
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
tweets = parallel(download_batch, lines_to_crawl, 100)
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
pass
def download_timeline(wq, user):
return wq.statuses.user_timeline(id=user)
def consume_feed(func, *args, **kwargs):
'''
Get all the tweets using pagination and a given method.
It can be controlled with the `count` parameter.
If count < 0 => Loop until the whole feed is consumed.
If count == 0 => Only call the API once, with the default values.
If count > 0 => Get count tweets from the feed.
'''
remaining = int(kwargs.pop('count', 0))
consume = remaining < 0
limit = False
# Simulate a do-while by updating the condition at the end
while not limit:
if remaining > 0:
kwargs['count'] = remaining
resp = func(*args, **kwargs)
if not resp:
return
for t in resp:
yield t
if consume:
continue
remaining -= len(resp)
max_id = min(s['id'] for s in func(*args, **kwargs)) - 1
kwargs['max_id'] = max_id
limit = remaining <= 0

4
setup.cfg Normal file
View File

@@ -0,0 +1,4 @@
[metadata]
description-file = README.md
[aliases]
test=pytest

View File

@@ -38,10 +38,19 @@ setup(
extras_require = { extras_require = {
'server': ['flask', 'flask-oauthlib'] 'server': ['flask', 'flask-oauthlib']
}, },
test_suite="tests", setup_requires=['pytest-runner',],
include_package_data=True, include_package_data=True,
entry_points=""" entry_points="""
[console_scripts] [console_scripts]
bitter=bitter.cli:main bitter=bitter.cli:main
""" """,
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Science/Research',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
]
) )

View File

@@ -5,14 +5,14 @@ import types
import datetime import datetime
import time import time
from bitter import utils from bitter import utils, easy
from bitter.crawlers import TwitterQueue, TwitterWorker, TwitterQueueException from bitter.crawlers import QueueException
from bitter import config as c from bitter import config as c
class TestUtils(TestCase): class TestCrawlers(TestCase):
def setUp(self): def setUp(self):
self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json')) self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json'))
def test_create_worker(self): def test_create_worker(self):
assert len(self.wq.queue)==1 assert len(self.wq.queue)==1
@@ -64,12 +64,9 @@ class TestUtils(TestCase):
try: try:
# resp = self.wq.friends.list(screen_name='balkian') # resp = self.wq.friends.list(screen_name='balkian')
self.wq.next(['friends', 'list']) self.wq.next(['friends', 'list'])
except TwitterQueueException: except QueueException:
failed = True failed = True
assert failed assert failed
l2 = w1.get_limit(['friends', 'list']) l2 = w1.get_limit(['friends', 'list'])
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time()) assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2) assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)
time.sleep(w1.get_wait(['friends', 'list']))

View File

@@ -58,4 +58,6 @@ class TestUtils(TestCase):
assert list(resp) == [1,2,3] assert list(resp) == [1,2,3]
toc = time.time() toc = time.time()
assert (tic-toc) < 6000 assert (tic-toc) < 6000
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert list(resp2) == [1,2,3,4]