1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-24 20:28:24 +00:00

16 Commits
0.5.0 ... 0.7.2

Author SHA1 Message Date
J. Fernando Sánchez
c940709df8 Fixed tweet error 2017-05-21 21:27:46 +02:00
J. Fernando Sánchez
1cb86abbdd Use easy in tests and README 2017-03-08 12:15:48 +01:00
J. Fernando Sánchez
b212a46ab7 Added CI and tests 2016-12-06 01:30:32 +01:00
J. Fernando Sánchez
0a0d8fd5f1 Improved tweet downloader (CLI and API) 2016-12-06 00:03:38 +01:00
J. Fernando Sánchez
e3a78968da Py2 compatibility and queue handling
* Removed install_aliases(), which caused problems with urllib2
* Better waiting time calculation (used in queue handling)
2016-11-23 12:31:02 +01:00
J. Fernando Sánchez
67ef307cce Improved tweet extraction @ CLI 2016-11-23 10:50:01 +01:00
J. Fernando Sánchez
cb054ac365 Added (bulk) user download 2016-11-22 20:07:18 +01:00
J. Fernando Sánchez
bdc4690240 Fixed tail argument 2016-11-19 21:42:08 +01:00
J. Fernando Sánchez
c0309a1e52 Fixed python2 compatibility issues (print!) 2016-11-19 20:38:44 +01:00
J. Fernando Sánchez
4afdd6807d Fixed MANIFEST error 2016-11-19 20:24:02 +01:00
J. Fernando Sánchez
38605ba2c8 Added stream to CLI
* Save stream to file
* Parse file and get the most important hashtags
2016-11-19 20:16:56 +01:00
J. Fernando Sánchez
738823c8a2 Added Streaming workers/queues 2016-11-18 16:08:29 +01:00
J. Fernando Sánchez
3f42879751 Bumped to v0.5.6 2016-09-28 06:31:33 +02:00
J. Fernando Sánchez
35f0c6376d Fixed limits bug, added tests 2016-09-28 06:30:49 +02:00
J. Fernando Sánchez
2036d51d96 Added limits to every call 2016-09-28 05:06:12 +02:00
J. Fernando Sánchez
09feb050a6 Changed versioning, added locks 2016-09-28 01:10:10 +02:00
19 changed files with 632 additions and 106 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
__pycache__
*.egg-info
dist
env

7
Dockerfile-2.7 Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

7
Dockerfile-3.4 Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:3.4-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

7
Dockerfile.template Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:{{PYVERSION}}-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -1,6 +1,10 @@
include requirements.txt
include requirements-py2.txt
include test-requirements.txt
include README.md
include bitter/VERSION
graft bitter/templates
graft bitter/static
graft test
include tests/test*
global-exclude *.pyc
global-exclude __pycache__

76
Makefile Normal file
View File

@@ -0,0 +1,76 @@
PYVERSIONS=3.4 2.7
PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter
REPO=balkian
VERSION=$(shell cat $(NAME)/VERSION)
TARNAME=$(NAME)-$(VERSION).tar.gz
IMAGENAME=$(REPO)/$(NAME)
IMAGEWTAG=$(IMAGENAME):$(VERSION)
all: build run
dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS))
Dockerfile-%: Dockerfile.template
sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$*
dev-%:
@docker start $(NAME)-dev$* || (\
$(MAKE) build-$*; \
docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
)\
docker exec -ti $(NAME)-dev$* bash
dev: dev-$(PYMAIN)
build: $(addprefix build-, $(PYMAIN))
buildall: $(addprefix build-, $(PYVERSIONS))
build-%: Dockerfile-%
docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .;
test: $(addprefix test-,$(PYMAIN))
testall: $(addprefix test-,$(PYVERSIONS))
test-%: build-%
docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
pip_test-%:
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
dist/$(NAME)-$(VERSION).tar.gz:
docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist;
sdist: dist/$(NAME)-$(VERSION).tar.gz
upload-%: test-%
docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
upload: testall $(addprefix upload-,$(PYVERSIONS))
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)'
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)'
clean:
@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true
@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true
upload_git:
git commit -a
git tag ${VERSION}
git push --tags origin master
pip_upload:
python setup.py sdist upload ;
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
run: build
docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
.PHONY: test test-% build-% build test test_pip run

View File

@@ -17,8 +17,8 @@ or
Programmatically:
```python
from bitter.crawlers import TwitterQueue
wq = TwitterQueue.from_credentials()
from bitter import easy
wq = easy()
print(wq.users.show(user_name='balkian'))
```

1
bitter/VERSION Normal file
View File

@@ -0,0 +1 @@
0.7.1

View File

@@ -3,8 +3,14 @@ Bitter module. A library and cli for Twitter using python-twitter.
http://github.com/balkian/bitter
"""
from future.standard_library import install_aliases
install_aliases()
import os
from .version import __version__
def easy(*args, **kwargs):
from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
__version__ = '0.5.0'
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -1,3 +1,5 @@
from __future__ import print_function
import click
import json
import os
@@ -6,10 +8,12 @@ import time
import sqlalchemy.types
import threading
import sqlite3
from tqdm import tqdm
from sqlalchemy import exists
from bitter import utils, models, crawlers
from bitter import config as bconf
from bitter.models import make_session, User, ExtractorEntry, Following
import sys
@@ -33,7 +37,7 @@ def main(ctx, verbose, logging_level, config, credentials):
ctx.obj = {}
ctx.obj['VERBOSE'] = verbose
ctx.obj['CONFIG'] = config
ctx.obj['CREDENTIALS'] = credentials
bconf.CREDENTIALS = credentials
utils.create_credentials(credentials)
@main.group()
@@ -42,30 +46,36 @@ def tweet(ctx):
pass
@tweet.command('get')
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
@click.argument('tweetid')
@click.pass_context
def get_tweet(ctx, tweetid):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
t = utils.get_tweet(wq, tweetid)
print(json.dumps(t, indent=2))
def get_tweet(tweetid, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
utils.download_tweet(wq, tweetid, write, folder, update)
@tweet.command('get_all')
@click.argument('tweetsfile', 'File with a list of tweets to look up')
@click.option('-f', '--folder', default="tweets")
@click.pass_context
def get_tweets(ctx, tweetsfile, folder):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
utils.download_tweets(wq, tweetsfile, folder)
@tweet.command('search')
@click.argument('query')
@click.pass_context
def get_tweet(ctx, query):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
c = wq.next()
t = utils.search_tweet(c.client, query)
def search(ctx, query):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
t = utils.search_tweet(wq, query)
print(json.dumps(t, indent=2))
@tweet.command('timeline')
@click.argument('user')
@click.pass_context
def get_tweet(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
c = wq.next()
t = utils.user_timeline(c.client, user)
def timeline(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
t = utils.user_timeline(wq, user)
print(json.dumps(t, indent=2))
@main.group()
@@ -84,23 +94,47 @@ def list_users(ctx, db):
for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j)))
@users.command('get_one')
@click.argument('user')
@click.pass_context
def get_user(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
c = wq.next()
u = utils.get_user(c.client, user)
print(json.dumps(u, indent=2))
@users.command('get')
@click.argument('user')
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
def get_user(user, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
if not write:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % user)
if not update and os.path.exists(file) and os.path.isfile(file):
print('User exists: %s' % user)
return
with open(file, 'w') as f:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js, file=f)
@users.command('get_all')
@click.argument('usersfile', 'File with a list of users to look up')
@click.option('-f', '--folder', default="users")
@click.pass_context
def get_users(ctx, usersfile, folder):
with open(usersfile) as f:
for line in f:
uid = line.strip()
ctx.invoke(get_user, folder=folder, user=uid, write=True)
@users.command('crawl')
@click.option('--db', required=True, help='Database to save all users.')
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
@click.argument('usersfile', 'File with a list of users to look up')
@click.pass_context
def get_users(ctx, usersfile, skip, until, threads, db):
def crawl_users(ctx, usersfile, skip, until, threads, db):
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
if '://' not in db:
@@ -112,7 +146,7 @@ def get_users(ctx, usersfile, skip, until, threads, db):
return ExitStack()
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
len(wq.queue)))
@@ -281,7 +315,7 @@ def users_extractor(ctx):
@click.pass_context
def extract(ctx, recursive, user, name, initfile):
print(locals())
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
dburi = ctx.obj['DBURI']
utils.extract(wq,
recursive=recursive,
@@ -293,7 +327,7 @@ def extract(ctx, recursive, user, name, initfile):
@extractor.command('reset')
@click.pass_context
def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
db = ctx.obj['DBURI']
session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@@ -302,7 +336,7 @@ def reset_extractor(ctx):
@click.argument('url', required=False)
@click.pass_context
def get_limits(ctx, url):
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
print('#'*20)
@@ -324,11 +358,75 @@ def get_limits(ctx, url):
@click.argument('CONSUMER_SECRET', required=True)
@click.pass_context
def run_server(ctx, consumer_key, consumer_secret):
from . import config
config.CONSUMER_KEY = consumer_key
config.CONSUMER_SECRET = consumer_secret
bconf.CONSUMER_KEY = consumer_key
bconf.CONSUMER_SECRET = consumer_secret
from .webserver import app
app.run()
app.run(host='0.0.0.0')
@main.group()
@click.pass_context
def stream(ctx):
pass
@stream.command('get')
@click.option('-l', '--locations', default=None)
@click.option('-t', '--track', default=None)
@click.option('-f', '--file', help='File to store the stream of tweets')
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
@click.pass_context
def get_stream(ctx, locations, track, file, politelyretry):
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
query_args = {}
if locations:
query_args['locations'] = locations
if track:
query_args['track'] = track
if not file:
file = sys.stdout
else:
file = open(file, 'a')
def insist():
lasthangup = time.time()
while True:
if not query_args:
iterator = wq.statuses.sample()
else:
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
for i in iterator:
yield i
if not politelyretry:
return
thishangup = time.time()
if thishangup - lasthangup < 60:
raise Exception('Too many hangups in a row.')
time.sleep(3)
for tweet in tqdm(insist()):
print(json.dumps(tweet), file=file)
if file != sys.stdout:
file.close()
@stream.command('read')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False)
@click.pass_context
def read_stream(ctx, file, tail):
for tweet in utils.read_file(file, tail=tail):
try:
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
except (KeyError, TypeError):
print('Raw tweet: {}'.format(tweet))
@stream.command('tags')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.argument('limit', required=False, default=None, type=int)
@click.pass_context
def tags_stream(ctx, file, limit):
c = utils.get_hashtags(utils.read_file(file))
for count, tag in c.most_common(limit):
print(u'{} - {}'.format(count, tag))
if __name__ == '__main__':

View File

@@ -8,6 +8,13 @@ logger = logging.getLogger(__name__)
from twitter import *
from collections import OrderedDict
from threading import Lock
from itertools import islice
try:
import itertools.ifilter as filter
except ImportError:
pass
from . import utils
from . import config
@@ -36,29 +43,96 @@ class AttrToFunc(object):
# kwargs[i] = a
return self.handler(self.__uriparts, *args, **kwargs)
class FromCredentialsMixin(object):
@classmethod
def from_credentials(cls, cred_file=None, max_workers=None):
wq = cls()
for cred in islice(utils.get_credentials(cred_file), max_workers):
wq.ready(cls.worker_class(cred["user"], cred))
return wq
class TwitterWorker(object):
def __init__(self, name, client):
api_class = None
def __init__(self, name, creds):
self.name = name
self.client = client
self.throttled_time = False
self._client = None
self.cred = creds
self._lock = Lock()
self.busy = False
@property
def throttled(self):
if not self.throttled_time:
return False
t = time.time()
delta = self.throttled_time - t
if delta > 0:
return True
return False
def client(self):
if not self._client:
auth=OAuth(self.cred['token_key'],
self.cred['token_secret'],
self.cred['consumer_key'],
self.cred['consumer_secret'])
self._client = self.api_class(auth=auth)
return self._client
def throttle_until(self, epoch=None):
self.throttled_time = int(epoch)
logger.info("Worker %s throttled for %s seconds" % (self.name, str(epoch-time.time())))
class RestWorker(TwitterWorker):
api_class = Twitter
def __init__(self, *args, **kwargs):
super(RestWorker, self).__init__(*args, **kwargs)
self._limits = None
@property
def limits(self):
if not self._limits:
self._limits = self.client.application.rate_limit_status()
return self._limits
def is_limited(self, uriparts):
return self.get_wait(uriparts)>0
def get_wait(self, uriparts):
limits = self.get_limit(uriparts)
if limits['remaining'] > 0:
return 0
reset = limits.get('reset', 0)
now = time.time()
return max(0, (reset-now))
def get_limit(self, uriparts):
uri = '/'+'/'.join(uriparts)
for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items():
if ix.startswith(uri):
return i
return {}
def set_limit(self, uriparts, value):
uri = '/'+'/'.join(uriparts)
if 'resources' not in self.limits:
self.limits['resources'] = {}
resources = self.limits['resources']
if uriparts[0] not in resources:
resources[uriparts[0]] = {}
resource = resources[uriparts[0]]
resource[uri] = value
def update_limits(self, uriparts, remaining, reset, limit):
self.set_limit(uriparts, {'remaining': remaining,
'reset': reset,
'limit': limit})
def update_limits_from_headers(self, uriparts, headers):
reset = float(headers.get('X-Rate-Limit-Reset', time.time() + 30))
remaining = int(headers.get('X-Rate-Limit-Remaining', 0))
limit = int(headers.get('X-Rate-Limit-Limit', -1))
self.update_limits(uriparts=uriparts, remaining=remaining, reset=reset, limit=limit)
class TwitterQueue(AttrToFunc):
class QueueException(BaseException):
pass
class QueueMixin(AttrToFunc, FromCredentialsMixin):
def __init__(self, wait=True):
logger.debug('Creating worker queue')
self.queue = set()
@@ -69,75 +143,119 @@ class TwitterQueue(AttrToFunc):
def ready(self, worker):
self.queue.add(worker)
class TwitterQueue(QueueMixin):
worker_class = RestWorker
def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs))
while True:
patience = 1
while patience:
c = None
try:
c = self.next()
c = self.next(uriparts)
c._lock.acquire()
c.busy = True
logger.debug('Next: {}'.format(c.name))
ping = time.time()
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
pong = time.time()
c.update_limits_from_headers(uriparts, resp.headers)
logger.debug('Took: {}'.format(pong-ping))
return resp
except TwitterHTTPError as ex:
if ex.e.code in (429, 502, 503, 504):
limit = ex.e.headers.get('X-Rate-Limit-Reset', time.time() + 30)
logger.info('{} limited'.format(c.name))
c.throttle_until(limit)
c.update_limits_from_headers(uriparts, ex.e.headers)
continue
else:
raise
except urllib.error.URLError as ex:
time.sleep(5)
logger.info('Something fishy happened: {}'.format(ex))
raise
finally:
if c:
c.busy = False
c._lock.release()
if not self.wait:
patience -= 1
@property
def client(self):
return self.next().client
def get_wait(self, uriparts):
# Stop as soon as one is available to avoid initiating the rest
for i in self.queue:
if not i.busy and i.get_wait(uriparts) == 0:
return 0
# If None is available, let's see how much we have to wait
available = filter(lambda x: not x.busy, self.queue)
diff = min(worker.get_wait(uriparts) for worker in self.queue if not worker.busy)
return diff
@classmethod
def from_credentials(self, cred_file=None):
wq = TwitterQueue()
for cred in utils.get_credentials(cred_file):
c = Twitter(auth=OAuth(cred['token_key'],
cred['token_secret'],
cred['consumer_key'],
cred['consumer_secret']))
wq.ready(TwitterWorker(cred["user"], c))
return wq
def _next(self):
def _next(self, uriparts):
logger.debug('Getting next available')
s = list(self.queue)
random.shuffle(s)
for worker in s:
if not worker.throttled and not worker.busy:
if not worker.is_limited(uriparts) and not worker.busy:
return worker
raise Exception('No worker is available')
raise QueueException('No worker is available')
def next(self):
def next(self, uriparts):
if not self.wait:
return self._next()
return self._next(uriparts)
while True:
try:
return self._next()
except Exception:
return self._next(uriparts)
except QueueException:
available = filter(lambda x: not x.busy, self.queue)
if available:
first_worker = min(available, key=lambda x: x.throttled_time)
diff = first_worker.throttled_time - time.time()
diff = self.get_wait(uriparts)
logger.info("All workers are throttled. Waiting %s seconds" % diff)
else:
diff = 5
logger.info("All workers are busy. Waiting %s seconds" % diff)
time.sleep(diff)
class StreamWorker(TwitterWorker):
api_class = TwitterStream
def __init__(self, *args, **kwargs):
super(StreamWorker, self).__init__(*args, **kwargs)
class StreamQueue(QueueMixin):
worker_class = StreamWorker
def __init__(self, wait=True):
logger.debug('Creating worker queue')
self.queue = set()
self.index = 0
self.wait = wait
AttrToFunc.__init__(self, handler=self.handle_call)
def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs))
c = None
c = self.next(uriparts)
c._lock.acquire()
c.busy = True
logger.debug('Next: {}'.format(c.name))
ping = time.time()
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
for i in resp:
yield i
pong = time.time()
logger.debug('Listening for: {}'.format(pong-ping))
c.busy = False
c._lock.release()
def next(self, uriparts):
logger.debug('Getting next available')
s = list(self.queue)
random.shuffle(s)
for worker in s:
if not worker.busy:
return worker
raise QueueException('No worker is available')

View File

@@ -1,3 +1,5 @@
from __future__ import print_function
import logging
import time
import json
@@ -9,9 +11,19 @@ import os
import multiprocessing
from multiprocessing.pool import ThreadPool
from itertools import islice
from tqdm import tqdm
from itertools import islice, chain
from contextlib import contextmanager
from itertools import zip_longest
try:
from itertools import izip_longest
except ImportError:
from itertools import zip_longest
from collections import Counter
from builtins import map, filter
from twitter import TwitterHTTPError
@@ -26,15 +38,14 @@ def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!')
sys.exit(0)
def chunk(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def chunk(iterable, n):
it = iter(iterable)
return iter(lambda: tuple(islice(it, n)), ())
def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()):
if chunksize:
source = chunk(source, chunksize)
p = ThreadPool(numcpus)
for i in p.imap(func, source):
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize)
p = ThreadPool(numcpus*2)
for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))):
yield i
def get_credentials_path(credfile=None):
@@ -86,6 +97,26 @@ def add_credentials(credfile=None, **creds):
f.write('\n')
def get_hashtags(iter_tweets, best=None):
c = Counter()
for tweet in iter_tweets:
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
return c
def read_file(filename, tail=False):
with open(filename) as f:
while True:
line = f.readline()
if line not in (None, '', '\n'):
tweet = json.loads(line.strip())
yield tweet
else:
if tail:
time.sleep(1)
else:
return
def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
t = 'name' if by_name else 'uid'
logger.debug('Getting users by {}: {}'.format(t, ulist))
@@ -134,12 +165,12 @@ def add_user(session, user, enqueue=False):
user = User(**user)
session.add(user)
if extract:
logging.debug('Adding entry')
logger.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry:
entry = ExtractorEntry(user=user.id)
session.add(entry)
logging.debug(entry.pending)
logger.debug(entry.pending)
entry.pending = True
entry.cursor = -1
session.commit()
@@ -188,10 +219,10 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
add_user(session, i, enqueue=True)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logging.info('Total users: {}'.format(total_users))
logger.info('Total users: {}'.format(total_users))
def pending_entries():
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
logging.info('Pending: {}'.format(pending))
logger.info('Pending: {}'.format(pending))
return pending
while pending_entries() > 0:
@@ -255,7 +286,7 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
entry.pending = pending
entry.cursor = cursor
logging.debug('Entry: {} - {}'.format(entry.user, entry.pending))
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate)
session.commit()
@@ -281,3 +312,85 @@ def get_user(c, user):
return c.users.lookup(user_id=user)[0]
except ValueError:
return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
cached = cached_tweet(tweetid, folder)
tweet = None
if update or not cached:
tweet = get_tweet(wq, tweetid)
js = json.dumps(tweet, indent=2)
if write:
if tweet:
write_tweet_json(js, folder)
else:
print(js)
def cached_tweet(tweetid, folder):
tweet = None
file = os.path.join(folder, '%s.json' % tweetid)
if os.path.exists(file) and os.path.isfile(file):
try:
# print('%s: Tweet exists' % tweetid)
with open(file) as f:
tweet = json.load(f)
except Exception as ex:
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
return tweet
def write_tweet_json(js, folder):
tweetid = js['id']
file = tweet_file(tweetid, folder)
if not os.path.exists(folder):
os.makedirs(folder)
with open(file, 'w') as f:
json.dump(js, f, indent=2)
logger.info('Written {} to file {}'.format(tweetid, file))
def tweet_file(tweetid, folder):
return os.path.join(folder, '%s.json' % tweetid)
def tweet_fail_file(tweetid, folder):
failsfolder = os.path.join(folder, 'failed')
if not os.path.exists(failsfolder):
os.makedirs(failsfolder)
return os.path.join(failsfolder, '%s.failed' % tweetid)
def tweet_failed(tweetid, folder):
return os.path.isfile(tweet_fail_file(tweetid, folder))
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
def filter_line(line):
tweetid = int(line)
# print('Checking {}'.format(tweetid))
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
yield None
else:
yield line
def print_result(res):
tid, tweet = res
if tweet:
try:
write_tweet_json(tweet, folder=folder)
yield 1
except Exception as ex:
logger.error('%s: %s' % (tid, ex))
if not ignore_fails:
raise
else:
logger.info('Tweet not recovered: {}'.format(tid))
with open(tweet_fail_file(tid, folder), 'w') as f:
print('Tweet not found', file=f)
yield -1
def download_batch(batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
return tweets.items()
with open(tweetsfile) as f:
lines = map(lambda x: x.strip(), f)
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
tweets = parallel(download_batch, lines_to_crawl, 100)
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
pass

4
bitter/version.py Normal file
View File

@@ -0,0 +1,4 @@
import os
with open(os.path.join(os.path.dirname(__file__), 'VERSION')) as f:
__version__ = f.read().strip()

View File

@@ -1 +1,2 @@
contextlib2
future

View File

@@ -1,4 +1,4 @@
sqlalchemy
twitter
click
future
tqdm

4
setup.cfg Normal file
View File

@@ -0,0 +1,4 @@
[metadata]
description-file = README.md
[aliases]
test=pytest

View File

@@ -23,16 +23,12 @@ if sys.version_info <= (3, 0):
install_reqs = [str(ir.req) for ir in install_reqs]
test_reqs = [str(ir.req) for ir in test_reqs]
with open('bitter/__init__.py') as f:
exec(f.read())
from bitter import __version__
setup(
name="bitter",
packages=['bitter'],
description='''
Simplifying how researchers access Data.
It includes a CLI and a library.
''',
description=" Simplifying how researchers access Data. It includes a CLI and a library.",
author='J. Fernando Sanchez',
author_email='balkian@gmail.com',
url="http://balkian.com",
@@ -42,10 +38,19 @@ setup(
extras_require = {
'server': ['flask', 'flask-oauthlib']
},
test_suite="tests",
setup_requires=['pytest-runner',],
include_package_data=True,
entry_points="""
[console_scripts]
bitter=bitter.cli:main
"""
""",
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Science/Research',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
]
)

72
tests/test_crawlers.py Normal file
View File

@@ -0,0 +1,72 @@
from unittest import TestCase
import os
import types
import datetime
import time
from bitter import utils, easy
from bitter.crawlers import QueueException
from bitter import config as c
class TestCrawlers(TestCase):
def setUp(self):
self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json'))
def test_create_worker(self):
assert len(self.wq.queue)==1
def test_get_limits(self):
w1 = list(self.wq.queue)[0]
print(w1.limits)
limitslook = w1.get_limit(['statuses', 'lookup'])
assert limitslook['remaining'] == limitslook['limit']
def test_set_limits(self):
w1 = list(self.wq.queue)[0]
w1.set_limit(['test', 'test2'], {'remaining': 0})
assert w1.get_limit(['test', 'test2']) == {'remaining': 0}
def test_await(self):
w1 = list(self.wq.queue)[0]
w1.set_limit(['test', 'wait'], {'remaining': 0, 'reset': time.time()+2})
assert w1.get_wait(['test', 'wait']) > 1
time.sleep(2)
assert w1.get_wait(['test', 'wait']) == 0
assert w1.get_wait(['statuses', 'lookup']) == 0
def test_is_limited(self):
w1 = list(self.wq.queue)[0]
assert not w1.is_limited(['statuses', 'lookup'])
w1.set_limit(['test', 'limited'], {'remaining': 0, 'reset': time.time()+100})
assert w1.is_limited(['test', 'limited'])
def test_call(self):
w1 = list(self.wq.queue)[0]
l1 = w1.get_limit(['users', 'lookup'])
resp = self.wq.users.lookup(screen_name='balkian')
l2 = w1.get_limit(['users', 'lookup'])
assert l1['remaining']-l2['remaining'] == 1
def test_consume(self):
w1 = list(self.wq.queue)[0]
l1 = w1.get_limit(['friends', 'list'])
self.wq.wait = False
for i in range(l1['remaining']):
print(i)
resp = self.wq.friends.list(screen_name='balkian')
# l2 = w1.get_limit(['users', 'lookup'])
# assert l2['remaining'] == 0
# self.wq.users.lookup(screen_name='balkian')
failed = False
try:
# resp = self.wq.friends.list(screen_name='balkian')
self.wq.next(['friends', 'list'])
except QueueException:
failed = True
assert failed
l2 = w1.get_limit(['friends', 'list'])
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)

View File

@@ -58,4 +58,6 @@ class TestUtils(TestCase):
assert list(resp) == [1,2,3]
toc = time.time()
assert (tic-toc) < 6000
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert list(resp2) == [1,2,3,4]