1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-26 05:08:22 +00:00

1 Commits
0.7.4 ... 0.7.0

Author SHA1 Message Date
J. Fernando Sánchez
4b2f107b8a Py2 compatibility and queue handling
* Removed install_aliases(), which caused problems with urllib2
* Better waiting time calculation (used in queue handling)
2016-11-23 12:27:53 +01:00
17 changed files with 159 additions and 505 deletions

1
.gitignore vendored
View File

@@ -1,4 +1,3 @@
__pycache__
*.egg-info *.egg-info
dist dist
env env

View File

@@ -1,7 +0,0 @@
# onbuild copies . to /usr/src/app/
From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -1,7 +0,0 @@
# onbuild copies . to /usr/src/app/
From python:3.4-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -1,7 +0,0 @@
# onbuild copies . to /usr/src/app/
From python:{{PYVERSION}}-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -5,6 +5,4 @@ include README.md
include bitter/VERSION include bitter/VERSION
graft bitter/templates graft bitter/templates
graft bitter/static graft bitter/static
include tests/test* graft test
global-exclude *.pyc
global-exclude __pycache__

View File

@@ -1,76 +0,0 @@
PYVERSIONS=3.4 2.7
PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter
REPO=balkian
VERSION=$(shell cat $(NAME)/VERSION)
TARNAME=$(NAME)-$(VERSION).tar.gz
IMAGENAME=$(REPO)/$(NAME)
IMAGEWTAG=$(IMAGENAME):$(VERSION)
all: build run
dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS))
Dockerfile-%: Dockerfile.template
sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$*
dev-%:
@docker start $(NAME)-dev$* || (\
$(MAKE) build-$*; \
docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
)\
docker exec -ti $(NAME)-dev$* bash
dev: dev-$(PYMAIN)
build: $(addprefix build-, $(PYMAIN))
buildall: $(addprefix build-, $(PYVERSIONS))
build-%: Dockerfile-%
docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .;
test: $(addprefix test-,$(PYMAIN))
testall: $(addprefix test-,$(PYVERSIONS))
test-%: build-%
docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
pip_test-%:
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
dist/$(NAME)-$(VERSION).tar.gz:
docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist;
sdist: dist/$(NAME)-$(VERSION).tar.gz
upload-%: test-%
docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
upload: testall $(addprefix upload-,$(PYVERSIONS))
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)'
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)'
clean:
@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true
@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true
upload_git:
git commit -a
git tag ${VERSION}
git push --tags origin master
pip_upload:
python setup.py sdist upload ;
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
run: build
docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
.PHONY: test test-% build-% build test test_pip run

View File

@@ -17,18 +17,11 @@ or
Programmatically: Programmatically:
```python ```python
from bitter import easy from bitter.crawlers import TwitterQueue
wq = easy() wq = TwitterQueue.from_credentials()
print(wq.users.show(user_name='balkian')) print(wq.users.show(user_name='balkian'))
``` ```
You can also make custom calls to the API through the command line.
e.g. to get the latest 500 tweets by the python software foundation:
```
bitter api statuses/user_timeline --id thepsf --count 500
```
# Credentials format # Credentials format
``` ```

View File

@@ -1 +1 @@
0.7.4 0.7.0

View File

@@ -7,10 +7,4 @@ import os
from .version import __version__ from .version import __version__
def easy(*args, **kwargs):
from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -52,15 +52,34 @@ def tweet(ctx):
@click.argument('tweetid') @click.argument('tweetid')
def get_tweet(tweetid, write, folder, update): def get_tweet(tweetid, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
utils.download_tweet(wq, tweetid, write, folder, update) if not write:
t = utils.get_tweet(wq, tweetid)
js = json.dumps(t, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % tweetid)
if not update and os.path.exists(file) and os.path.isfile(file):
print('%s: Tweet exists' % tweetid)
return
try:
t = utils.get_tweet(wq, tweetid)
with open(file, 'w') as f:
js = json.dumps(t, indent=2)
print(js, file=f)
except Exception as ex:
print('%s: %s' % (tweetid, ex), file=sys.stderr)
@tweet.command('get_all') @tweet.command('get_all')
@click.argument('tweetsfile', 'File with a list of tweets to look up') @click.argument('tweetsfile', 'File with a list of tweets to look up')
@click.option('-f', '--folder', default="tweets") @click.option('-f', '--folder', default="tweets")
@click.pass_context @click.pass_context
def get_tweets(ctx, tweetsfile, folder): def get_tweets(ctx, tweetsfile, folder):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) with open(tweetsfile) as f:
utils.download_tweets(wq, tweetsfile, folder) for line in f:
tid = line.strip()
ctx.invoke(get_tweet, folder=folder, tweetid=tid, write=True)
@tweet.command('search') @tweet.command('search')
@click.argument('query') @click.argument('query')
@@ -240,6 +259,11 @@ def crawl_users(ctx, usersfile, skip, until, threads, db):
logger.info('Done!') logger.info('Done!')
@main.group('api')
def api():
pass
@main.group('extractor') @main.group('extractor')
@click.pass_context @click.pass_context
@click.option('--db', required=True, help='Database of users.') @click.option('--db', required=True, help='Database of users.')
@@ -327,7 +351,7 @@ def reset_extractor(ctx):
session = make_session(db) session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@main.command('limits') @api.command('limits')
@click.argument('url', required=False) @click.argument('url', required=False)
@click.pass_context @click.pass_context
def get_limits(ctx, url): def get_limits(ctx, url):
@@ -348,32 +372,6 @@ def get_limits(ctx, url):
else: else:
print(json.dumps(resp, indent=2)) print(json.dumps(resp, indent=2))
@main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False))
@click.argument('cmd', nargs=1)
@click.argument('api_args', nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def api(ctx, cmd, api_args):
opts = {}
i = iter(api_args)
for k, v in zip(i, i):
k = k.replace('--', '')
opts[k] = v
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
resp = utils.consume_feed(wq[cmd], **opts)
# A hack to stream jsons
print('[')
first = True
for i in resp:
if not first:
print(',')
else:
first = False
print(json.dumps(i, indent=2))
print(']')
@main.command('server') @main.command('server')
@click.argument('CONSUMER_KEY', required=True) @click.argument('CONSUMER_KEY', required=True)
@click.argument('CONSUMER_SECRET', required=True) @click.argument('CONSUMER_SECRET', required=True)

View File

@@ -10,7 +10,6 @@ from twitter import *
from collections import OrderedDict from collections import OrderedDict
from threading import Lock from threading import Lock
from itertools import islice from itertools import islice
from functools import partial
try: try:
import itertools.ifilter as filter import itertools.ifilter as filter
except ImportError: except ImportError:
@@ -39,9 +38,6 @@ class AttrToFunc(object):
else: else:
return extend_call(k) return extend_call(k)
def __getitem__(self, k):
return partial(self.handler, self.__uriparts+k.split('/'))
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
# for i, a in enumerate(args)e # for i, a in enumerate(args)e
# kwargs[i] = a # kwargs[i] = a
@@ -79,12 +75,6 @@ class TwitterWorker(object):
self._client = self.api_class(auth=auth) self._client = self.api_class(auth=auth)
return self._client return self._client
def __repr__(self):
msg = '<{} for {}>'.format(self.__class__.__name__, self.name)
if self.busy:
msg += ' [busy]'
return msg
class RestWorker(TwitterWorker): class RestWorker(TwitterWorker):
api_class = Twitter api_class = Twitter

View File

@@ -3,7 +3,6 @@ import json
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.types import BigInteger, Integer, Text, Boolean from sqlalchemy.types import BigInteger, Integer, Text, Boolean
from sqlalchemy.pool import SingletonThreadPool
from sqlalchemy import Column, Index from sqlalchemy import Column, Index
from sqlalchemy import create_engine from sqlalchemy import create_engine
@@ -86,20 +85,16 @@ class ExtractorEntry(Base):
user = Column(BigInteger, index=True) user = Column(BigInteger, index=True)
cursor = Column(BigInteger, default=-1) cursor = Column(BigInteger, default=-1)
pending = Column(Boolean, default=False) pending = Column(Boolean, default=False)
errors = Column(Text, default="")
busy = Column(Boolean, default=False)
def make_session(url): def make_session(url):
if not isinstance(url, str): engine = create_engine(url)#, echo=True)
print(url)
raise Exception("FUCK")
engine = create_engine(url, poolclass=SingletonThreadPool)#, echo=True)
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine) Session = sessionmaker(bind=engine)
session = Session() session = Session()
return session return session
def test(db='sqlite:///users.db'): def test(db='sqlite:///users.db'):
from sqlalchemy import exists from sqlalchemy import exists

View File

@@ -1,5 +1,3 @@
from __future__ import print_function
import logging import logging
import time import time
import json import json
@@ -11,17 +9,11 @@ import os
import multiprocessing import multiprocessing
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from functools import partial from itertools import islice
from tqdm import tqdm
from itertools import islice, chain
from contextlib import contextmanager from contextlib import contextmanager
from future.moves.itertools import zip_longest
from collections import Counter from collections import Counter
from builtins import map, filter
from twitter import TwitterHTTPError from twitter import TwitterHTTPError
from bitter.models import Following, User, ExtractorEntry, make_session from bitter.models import Following, User, ExtractorEntry, make_session
@@ -35,20 +27,17 @@ def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!') logger.info('You pressed Ctrl+C!')
sys.exit(0) sys.exit(0)
def chunk(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def chunk(iterable, n): def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()):
it = iter(iterable) if chunksize:
return iter(lambda: tuple(islice(it, n)), ())
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize) source = chunk(source, chunksize)
p = ThreadPool(numcpus*2) p = ThreadPool(numcpus)
results = p.imap_unordered(func, source, chunksize=int(1000/numcpus)) for i in p.imap(func, source):
for i in chain.from_iterable(results):
yield i yield i
def get_credentials_path(credfile=None): def get_credentials_path(credfile=None):
if not credfile: if not credfile:
if config.CREDENTIALS: if config.CREDENTIALS:
@@ -57,20 +46,17 @@ def get_credentials_path(credfile=None):
raise Exception('No valid credentials file') raise Exception('No valid credentials file')
return os.path.expanduser(credfile) return os.path.expanduser(credfile)
@contextmanager @contextmanager
def credentials_file(credfile, *args, **kwargs): def credentials_file(credfile, *args, **kwargs):
p = get_credentials_path(credfile) p = get_credentials_path(credfile)
with open(p, *args, **kwargs) as f: with open(p, *args, **kwargs) as f:
yield f yield f
def iter_credentials(credfile=None): def iter_credentials(credfile=None):
with credentials_file(credfile) as f: with credentials_file(credfile) as f:
for l in f: for l in f:
yield json.loads(l.strip()) yield json.loads(l.strip())
def get_credentials(credfile=None, inverse=False, **kwargs): def get_credentials(credfile=None, inverse=False, **kwargs):
creds = [] creds = []
for i in iter_credentials(credfile): for i in iter_credentials(credfile):
@@ -81,13 +67,11 @@ def get_credentials(credfile=None, inverse=False, **kwargs):
creds.append(i) creds.append(i)
return creds return creds
def create_credentials(credfile=None): def create_credentials(credfile=None):
credfile = get_credentials_path(credfile) credfile = get_credentials_path(credfile)
with credentials_file(credfile, 'a'): with credentials_file(credfile, 'a'):
pass pass
def delete_credentials(credfile=None, **creds): def delete_credentials(credfile=None, **creds):
tokeep = get_credentials(credfile, inverse=True, **creds) tokeep = get_credentials(credfile, inverse=True, **creds)
with credentials_file(credfile, 'w') as f: with credentials_file(credfile, 'w') as f:
@@ -95,7 +79,6 @@ def delete_credentials(credfile=None, **creds):
f.write(json.dumps(i)) f.write(json.dumps(i))
f.write('\n') f.write('\n')
def add_credentials(credfile=None, **creds): def add_credentials(credfile=None, **creds):
exist = get_credentials(credfile, **creds) exist = get_credentials(credfile, **creds)
if not exist: if not exist:
@@ -110,7 +93,6 @@ def get_hashtags(iter_tweets, best=None):
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {})) c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
return c return c
def read_file(filename, tail=False): def read_file(filename, tail=False):
with open(filename) as f: with open(filename) as f:
while True: while True:
@@ -152,7 +134,6 @@ def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
else: else:
yield user yield user
def trim_user(user): def trim_user(user):
if 'status' in user: if 'status' in user:
del user['status'] del user['status']
@@ -166,105 +147,108 @@ def trim_user(user):
return user return user
def add_user(user, dburi=None, session=None, update=False): def add_user(session, user, enqueue=False):
if not session:
session = make_session(dburi)
user = trim_user(user) user = trim_user(user)
olduser = session.query(User).filter(User.id == user['id']) olduser = session.query(User).filter(User.id==user['id'])
if olduser: if olduser:
if not update:
return
olduser.delete() olduser.delete()
nuser = User() user = User(**user)
for key, value in user.items():
setattr(nuser, key, value)
user = nuser
if update:
session.add(user) session.add(user)
logger.debug('Adding entry') if extract:
logging.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry: if not entry:
entry = ExtractorEntry(user=user.id) entry = ExtractorEntry(user=user.id)
session.add(entry) session.add(entry)
logger.debug(entry.pending) logging.debug(entry.pending)
entry.pending = True entry.pending = True
entry.cursor = -1 entry.cursor = -1
session.commit() session.commit()
session.close()
def download_entry(wq, entry_id, dburi=None, recursive=False): # TODO: adapt to the crawler
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
signal.signal(signal.SIGINT, signal_handler)
w = wq.next()
if not dburi:
dburi = 'sqlite:///%s.db' % extractor_name
session = make_session(dburi) session = make_session(dburi)
if not session:
raise Exception("Provide dburi or session") screen_names = []
logger.info("Downloading entry: %s (%s)" % (entry_id, type(entry_id))) user_ids = []
entry = session.query(ExtractorEntry).filter(ExtractorEntry.id==entry_id).first()
user = session.query(User).filter(User.id == entry.user).first() def classify_user(id_or_name):
download_user(wq, session, user, entry, recursive) try:
session.close() int(user)
user_ids.append(user)
logger.info("Added user id")
except ValueError:
logger.info("Added screen_name")
screen_names.append(user.split('@')[-1])
if user:
classify_user(user)
elif initfile:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
classify_user(user)
else:
logger.info('Using pending users from last session')
def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000): nusers = list(get_users(wq, screen_names, by_name=True))
if user_ids:
nusers += list(get_users(wq, user_ids, by_name=False))
total_followers = user.followers_count for i in nusers:
add_user(session, i, enqueue=True)
if total_followers > max_followers: total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
entry.pending = False logging.info('Total users: {}'.format(total_users))
logger.info("Too many followers for user: %s" % user.screen_name) def pending_entries():
session.add(entry) pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
session.commit() logging.info('Pending: {}'.format(pending))
return return pending
if not entry:
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() or ExtractorEntry(user=user.id)
session.add(entry)
session.commit()
while pending_entries() > 0:
logger.info("Using account: %s" % w.name)
candidate, entry = session.query(User, ExtractorEntry).\
filter(ExtractorEntry.user == User.id).\
filter(ExtractorEntry.pending == True).\
order_by(User.followers_count).first()
if not candidate:
break
pending = True pending = True
cursor = entry.cursor cursor = entry.cursor
uid = user.id uid = candidate.id
name = user.name uobject = session.query(User).filter(User.id==uid).first()
name = uobject.screen_name if uobject else None
logger.info("#"*20) logger.info("#"*20)
logger.info("Getting %s - %s" % (uid, name)) logger.info("Getting %s - %s" % (uid, name))
logger.info("Cursor %s" % cursor) logger.info("Cursor %s" % cursor)
logger.info("Using account: %s" % wq.name) logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users))
_fetched_followers = 0
def fetched_followers():
return session.query(Following).filter(Following.isfollowed==uid).count()
attempts = 0
while cursor > 0 or fetched_followers() < total_followers:
try: try:
resp = wq.followers.ids(user_id=uid, cursor=cursor) resp = wq.followers.ids(user_id=uid, cursor=cursor)
except TwitterHTTPError as ex: except TwitterHTTPError as ex:
attempts += 1 if ex.e.code in (401, ):
if ex.e.code in (401, ) or attempts > 3:
logger.info('Not authorized for user: {}'.format(uid)) logger.info('Not authorized for user: {}'.format(uid))
entry.errors = ex.message resp = {}
break if 'ids' in resp:
if 'ids' not in resp:
logger.info("Error with id %s %s" % (uid, resp))
entry.pending = False
entry.errors = "No ids in response: %s" % resp
break
logger.info("New followers: %s" % len(resp['ids'])) logger.info("New followers: %s" % len(resp['ids']))
if recursive: if recursive:
newusers = get_users(wq, resp) newusers = get_users(wq, resp)
for newuser in newusers: for user in newusers:
add_user(session=session, user=newuser) add_user(session, newuser, enqueue=True)
if 'ids' not in resp or not resp['ids']:
logger.info('NO IDS in response')
break
for i in resp['ids']: for i in resp['ids']:
existing_user = session.query(Following).\ existing_user = session.query(Following).\
filter(Following.isfollowed == uid).\ filter(Following.isfollowed==uid).\
filter(Following.follower == i).first() filter(Following.follower==i).first()
now = int(time.time()) now = int(time.time())
if existing_user: if existing_user:
existing_user.created_at_stamp = now existing_user.created_at_stamp = now
@@ -274,111 +258,32 @@ def download_user(wq, session, user, entry=None, recursive=False, max_followers=
created_at_stamp=now) created_at_stamp=now)
session.add(f) session.add(f)
logger.info("Fetched: %s/%s followers" % (fetched_followers(), total_followers = candidate.followers_count
fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count()
logger.info("Fetched: %s/%s followers" % (fetched_followers,
total_followers)) total_followers))
entry.cursor = resp["next_cursor"] cursor = resp["next_cursor"]
if cursor > 0:
session.add(entry) pending = True
session.commit() logger.info("Getting more followers for %s" % uid)
else:
logger.info("Done getting followers for %s" % uid) logger.info("Done getting followers for %s" % uid)
cursor = -1
pending = False
else:
logger.info("Error with id %s %s" % (uid, resp))
pending = False
entry.pending = False entry.pending = pending
entry.busy = False entry.cursor = cursor
session.add(entry) logging.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate)
session.commit() session.commit()
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
sys.stdout.flush() sys.stdout.flush()
def classify_user(id_or_name, screen_names, user_ids):
try:
int(id_or_name)
user_ids.append(id_or_name)
logger.debug("Added user id")
except ValueError:
logger.debug("Added screen_name")
screen_names.append(id_or_name.split('@')[-1])
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
signal.signal(signal.SIGINT, signal_handler)
if not dburi:
dburi = 'sqlite:///%s.db' % extractor_name
session = make_session(dburi)
session.query(ExtractorEntry).update({ExtractorEntry.busy: False})
session.commit()
if not (user or initfile):
logger.info('Using pending users from last session')
else:
screen_names = []
user_ids = []
if user:
classify_user(user, screen_names, user_ids)
elif initfile:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
classify_user(user, screen_names, user_ids)
def missing_user(ix, column=User.screen_name):
res = session.query(User).filter(column == ix).count() == 0
if res:
logger.info("Missing user %s. Count: %s" % (ix, res))
return res
screen_names = list(filter(missing_user, screen_names))
user_ids = list(filter(partial(missing_user, column=User.id_str), user_ids))
nusers = []
logger.info("Missing user ids: %s" % user_ids)
logger.info("Missing screen names: %s" % screen_names)
if screen_names:
nusers = list(get_users(wq, screen_names, by_name=True))
if user_ids:
nusers += list(get_users(wq, user_ids, by_name=False))
for i in nusers:
add_user(dburi=dburi, user=i)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logger.info('Total users: {}'.format(total_users))
de = partial(download_entry, wq, dburi=dburi)
pending = pending_entries(dburi)
session.close()
for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users):
logger.info("Got %s" % i)
def pending_entries(dburi):
session = make_session(dburi)
while True:
candidate, entry = session.query(User, ExtractorEntry).\
filter(ExtractorEntry.user == User.id).\
filter(ExtractorEntry.pending == True).\
filter(ExtractorEntry.busy == False).\
order_by(User.followers_count).first()
if candidate:
entry.busy = True
session.add(entry)
session.commit()
yield int(entry.id)
continue
if session.query(ExtractorEntry).\
filter(ExtractorEntry.busy == True).count() > 0:
time.sleep(1)
continue
logger.info("No more pending entries")
break
session.close()
def get_tweet(c, tid): def get_tweet(c, tid):
return c.statuses.show(id=tid) return c.statuses.show(id=tid)
@@ -397,118 +302,3 @@ def get_user(c, user):
return c.users.lookup(user_id=user)[0] return c.users.lookup(user_id=user)[0]
except ValueError: except ValueError:
return c.users.lookup(screen_name=user)[0] return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
cached = cached_tweet(tweetid, folder)
tweet = None
if update or not cached:
tweet = get_tweet(wq, tweetid)
js = json.dumps(tweet, indent=2)
if write:
if tweet:
write_tweet_json(js, folder)
else:
print(js)
def cached_tweet(tweetid, folder):
tweet = None
file = os.path.join(folder, '%s.json' % tweetid)
if os.path.exists(file) and os.path.isfile(file):
try:
# print('%s: Tweet exists' % tweetid)
with open(file) as f:
tweet = json.load(f)
except Exception as ex:
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
return tweet
def write_tweet_json(js, folder):
tweetid = js['id']
file = tweet_file(tweetid, folder)
if not os.path.exists(folder):
os.makedirs(folder)
with open(file, 'w') as f:
json.dump(js, f, indent=2)
logger.info('Written {} to file {}'.format(tweetid, file))
def tweet_file(tweetid, folder):
return os.path.join(folder, '%s.json' % tweetid)
def tweet_fail_file(tweetid, folder):
failsfolder = os.path.join(folder, 'failed')
if not os.path.exists(failsfolder):
os.makedirs(failsfolder)
return os.path.join(failsfolder, '%s.failed' % tweetid)
def tweet_failed(tweetid, folder):
return os.path.isfile(tweet_fail_file(tweetid, folder))
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
def filter_line(line):
tweetid = int(line)
# print('Checking {}'.format(tweetid))
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
yield None
else:
yield line
def print_result(res):
tid, tweet = res
if tweet:
try:
write_tweet_json(tweet, folder=folder)
yield 1
except Exception as ex:
logger.error('%s: %s' % (tid, ex))
if not ignore_fails:
raise
else:
logger.info('Tweet not recovered: {}'.format(tid))
with open(tweet_fail_file(tid, folder), 'w') as f:
print('Tweet not found', file=f)
yield -1
def download_batch(batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
return tweets.items()
with open(tweetsfile) as f:
lines = map(lambda x: x.strip(), f)
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
tweets = parallel(download_batch, lines_to_crawl, 100)
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
pass
def download_timeline(wq, user):
return wq.statuses.user_timeline(id=user)
def consume_feed(func, *args, **kwargs):
'''
Get all the tweets using pagination and a given method.
It can be controlled with the `count` parameter.
If count < 0 => Loop until the whole feed is consumed.
If count == 0 => Only call the API once, with the default values.
If count > 0 => Get count tweets from the feed.
'''
remaining = int(kwargs.pop('count', 0))
consume = remaining < 0
limit = False
# Simulate a do-while by updating the condition at the end
while not limit:
if remaining > 0:
kwargs['count'] = remaining
resp = func(*args, **kwargs)
if not resp:
return
for t in resp:
yield t
if consume:
continue
remaining -= len(resp)
max_id = min(s['id'] for s in func(*args, **kwargs)) - 1
kwargs['max_id'] = max_id
limit = remaining <= 0

View File

@@ -1,4 +0,0 @@
[metadata]
description-file = README.md
[aliases]
test=pytest

View File

@@ -38,7 +38,7 @@ setup(
extras_require = { extras_require = {
'server': ['flask', 'flask-oauthlib'] 'server': ['flask', 'flask-oauthlib']
}, },
setup_requires=['pytest-runner',], test_suite="tests",
include_package_data=True, include_package_data=True,
entry_points=""" entry_points="""
[console_scripts] [console_scripts]
@@ -48,7 +48,7 @@ setup(
'Development Status :: 4 - Beta', 'Development Status :: 4 - Beta',
'Intended Audience :: Developers', 'Intended Audience :: Developers',
'Intended Audience :: Science/Research', 'Intended Audience :: Science/Research',
'License :: OSI Approved :: Apache Software License', 'License :: OSI Approved :: Apache 2 License',
'Programming Language :: Python :: 2', 'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3',

View File

@@ -5,14 +5,14 @@ import types
import datetime import datetime
import time import time
from bitter import utils, easy from bitter import utils
from bitter.crawlers import QueueException from bitter.crawlers import TwitterQueue, TwitterWorker, QueueException
from bitter import config as c from bitter import config as c
class TestCrawlers(TestCase): class TestUtils(TestCase):
def setUp(self): def setUp(self):
self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json')) self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json'))
def test_create_worker(self): def test_create_worker(self):
assert len(self.wq.queue)==1 assert len(self.wq.queue)==1

View File

@@ -58,6 +58,4 @@ class TestUtils(TestCase):
assert list(resp) == [1,2,3] assert list(resp) == [1,2,3]
toc = time.time() toc = time.time()
assert (tic-toc) < 6000 assert (tic-toc) < 6000
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert list(resp2) == [1,2,3,4]