1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-25 12:48:23 +00:00

14 Commits
0.5.5 ... 0.7.2

Author SHA1 Message Date
J. Fernando Sánchez
c940709df8 Fixed tweet error 2017-05-21 21:27:46 +02:00
J. Fernando Sánchez
1cb86abbdd Use easy in tests and README 2017-03-08 12:15:48 +01:00
J. Fernando Sánchez
b212a46ab7 Added CI and tests 2016-12-06 01:30:32 +01:00
J. Fernando Sánchez
0a0d8fd5f1 Improved tweet downloader (CLI and API) 2016-12-06 00:03:38 +01:00
J. Fernando Sánchez
e3a78968da Py2 compatibility and queue handling
* Removed install_aliases(), which caused problems with urllib2
* Better waiting time calculation (used in queue handling)
2016-11-23 12:31:02 +01:00
J. Fernando Sánchez
67ef307cce Improved tweet extraction @ CLI 2016-11-23 10:50:01 +01:00
J. Fernando Sánchez
cb054ac365 Added (bulk) user download 2016-11-22 20:07:18 +01:00
J. Fernando Sánchez
bdc4690240 Fixed tail argument 2016-11-19 21:42:08 +01:00
J. Fernando Sánchez
c0309a1e52 Fixed python2 compatibility issues (print!) 2016-11-19 20:38:44 +01:00
J. Fernando Sánchez
4afdd6807d Fixed MANIFEST error 2016-11-19 20:24:02 +01:00
J. Fernando Sánchez
38605ba2c8 Added stream to CLI
* Save stream to file
* Parse file and get the most important hashtags
2016-11-19 20:16:56 +01:00
J. Fernando Sánchez
738823c8a2 Added Streaming workers/queues 2016-11-18 16:08:29 +01:00
J. Fernando Sánchez
3f42879751 Bumped to v0.5.6 2016-09-28 06:31:33 +02:00
J. Fernando Sánchez
35f0c6376d Fixed limits bug, added tests 2016-09-28 06:30:49 +02:00
17 changed files with 524 additions and 84 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
__pycache__
*.egg-info *.egg-info
dist dist
env env

7
Dockerfile-2.7 Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

7
Dockerfile-3.4 Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:3.4-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

7
Dockerfile.template Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:{{PYVERSION}}-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -1,7 +1,10 @@
include requirements.txt include requirements.txt
include requirements-py2.txt
include test-requirements.txt include test-requirements.txt
include README.md include README.md
include bitter/VERSION include bitter/VERSION
graft bitter/templates graft bitter/templates
graft bitter/static graft bitter/static
graft test include tests/test*
global-exclude *.pyc
global-exclude __pycache__

76
Makefile Normal file
View File

@@ -0,0 +1,76 @@
PYVERSIONS=3.4 2.7
PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter
REPO=balkian
VERSION=$(shell cat $(NAME)/VERSION)
TARNAME=$(NAME)-$(VERSION).tar.gz
IMAGENAME=$(REPO)/$(NAME)
IMAGEWTAG=$(IMAGENAME):$(VERSION)
all: build run
dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS))
Dockerfile-%: Dockerfile.template
sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$*
dev-%:
@docker start $(NAME)-dev$* || (\
$(MAKE) build-$*; \
docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
)\
docker exec -ti $(NAME)-dev$* bash
dev: dev-$(PYMAIN)
build: $(addprefix build-, $(PYMAIN))
buildall: $(addprefix build-, $(PYVERSIONS))
build-%: Dockerfile-%
docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .;
test: $(addprefix test-,$(PYMAIN))
testall: $(addprefix test-,$(PYVERSIONS))
test-%: build-%
docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
pip_test-%:
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
dist/$(NAME)-$(VERSION).tar.gz:
docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist;
sdist: dist/$(NAME)-$(VERSION).tar.gz
upload-%: test-%
docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
upload: testall $(addprefix upload-,$(PYVERSIONS))
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)'
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)'
clean:
@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true
@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true
upload_git:
git commit -a
git tag ${VERSION}
git push --tags origin master
pip_upload:
python setup.py sdist upload ;
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
run: build
docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
.PHONY: test test-% build-% build test test_pip run

View File

@@ -17,8 +17,8 @@ or
Programmatically: Programmatically:
```python ```python
from bitter.crawlers import TwitterQueue from bitter import easy
wq = TwitterQueue.from_credentials() wq = easy()
print(wq.users.show(user_name='balkian')) print(wq.users.show(user_name='balkian'))
``` ```

View File

@@ -1 +1 @@
0.5.5 0.7.1

View File

@@ -3,15 +3,14 @@ Bitter module. A library and cli for Twitter using python-twitter.
http://github.com/balkian/bitter http://github.com/balkian/bitter
""" """
try:
from future.standard_library import install_aliases
install_aliases()
except ImportError:
# Avoid problems at setup.py and py3.x
pass
import os import os
from .version import __version__ from .version import __version__
def easy(*args, **kwargs):
from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -1,3 +1,5 @@
from __future__ import print_function
import click import click
import json import json
import os import os
@@ -6,6 +8,7 @@ import time
import sqlalchemy.types import sqlalchemy.types
import threading import threading
import sqlite3 import sqlite3
from tqdm import tqdm
from sqlalchemy import exists from sqlalchemy import exists
@@ -43,30 +46,36 @@ def tweet(ctx):
pass pass
@tweet.command('get') @tweet.command('get')
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
@click.argument('tweetid') @click.argument('tweetid')
@click.pass_context def get_tweet(tweetid, write, folder, update):
def get_tweet(ctx, tweetid):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
t = utils.get_tweet(wq, tweetid) utils.download_tweet(wq, tweetid, write, folder, update)
print(json.dumps(t, indent=2))
@tweet.command('get_all')
@click.argument('tweetsfile', 'File with a list of tweets to look up')
@click.option('-f', '--folder', default="tweets")
@click.pass_context
def get_tweets(ctx, tweetsfile, folder):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
utils.download_tweets(wq, tweetsfile, folder)
@tweet.command('search') @tweet.command('search')
@click.argument('query') @click.argument('query')
@click.pass_context @click.pass_context
def get_tweet(ctx, query): def search(ctx, query):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() t = utils.search_tweet(wq, query)
t = utils.search_tweet(c.client, query)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@tweet.command('timeline') @tweet.command('timeline')
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def get_tweet(ctx, user): def timeline(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() t = utils.user_timeline(wq, user)
t = utils.user_timeline(c.client, user)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@main.group() @main.group()
@@ -85,23 +94,47 @@ def list_users(ctx, db):
for j in i.__dict__: for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j))) print('\t{}: {}'.format(j, getattr(i,j)))
@users.command('get_one')
@click.argument('user')
@click.pass_context
def get_user(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next()
u = utils.get_user(c.client, user)
print(json.dumps(u, indent=2))
@users.command('get') @users.command('get')
@click.argument('user')
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
def get_user(user, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
if not write:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % user)
if not update and os.path.exists(file) and os.path.isfile(file):
print('User exists: %s' % user)
return
with open(file, 'w') as f:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js, file=f)
@users.command('get_all')
@click.argument('usersfile', 'File with a list of users to look up')
@click.option('-f', '--folder', default="users")
@click.pass_context
def get_users(ctx, usersfile, folder):
with open(usersfile) as f:
for line in f:
uid = line.strip()
ctx.invoke(get_user, folder=folder, user=uid, write=True)
@users.command('crawl')
@click.option('--db', required=True, help='Database to save all users.') @click.option('--db', required=True, help='Database to save all users.')
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.') @click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.') @click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.') @click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
@click.argument('usersfile', 'File with a list of users to look up') @click.argument('usersfile', 'File with a list of users to look up')
@click.pass_context @click.pass_context
def get_users(ctx, usersfile, skip, until, threads, db): def crawl_users(ctx, usersfile, skip, until, threads, db):
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
if '://' not in db: if '://' not in db:
@@ -330,6 +363,71 @@ def run_server(ctx, consumer_key, consumer_secret):
from .webserver import app from .webserver import app
app.run(host='0.0.0.0') app.run(host='0.0.0.0')
@main.group()
@click.pass_context
def stream(ctx):
pass
@stream.command('get')
@click.option('-l', '--locations', default=None)
@click.option('-t', '--track', default=None)
@click.option('-f', '--file', help='File to store the stream of tweets')
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
@click.pass_context
def get_stream(ctx, locations, track, file, politelyretry):
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
query_args = {}
if locations:
query_args['locations'] = locations
if track:
query_args['track'] = track
if not file:
file = sys.stdout
else:
file = open(file, 'a')
def insist():
lasthangup = time.time()
while True:
if not query_args:
iterator = wq.statuses.sample()
else:
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
for i in iterator:
yield i
if not politelyretry:
return
thishangup = time.time()
if thishangup - lasthangup < 60:
raise Exception('Too many hangups in a row.')
time.sleep(3)
for tweet in tqdm(insist()):
print(json.dumps(tweet), file=file)
if file != sys.stdout:
file.close()
@stream.command('read')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False)
@click.pass_context
def read_stream(ctx, file, tail):
for tweet in utils.read_file(file, tail=tail):
try:
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
except (KeyError, TypeError):
print('Raw tweet: {}'.format(tweet))
@stream.command('tags')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.argument('limit', required=False, default=None, type=int)
@click.pass_context
def tags_stream(ctx, file, limit):
c = utils.get_hashtags(utils.read_file(file))
for count, tag in c.most_common(limit):
print(u'{} - {}'.format(count, tag))
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -1,5 +1,4 @@
import time import time
import datetime
import urllib import urllib
import random import random
import json import json
@@ -10,6 +9,12 @@ logger = logging.getLogger(__name__)
from twitter import * from twitter import *
from collections import OrderedDict from collections import OrderedDict
from threading import Lock from threading import Lock
from itertools import islice
try:
import itertools.ifilter as filter
except ImportError:
pass
from . import utils from . import utils
from . import config from . import config
@@ -38,34 +43,68 @@ class AttrToFunc(object):
# kwargs[i] = a # kwargs[i] = a
return self.handler(self.__uriparts, *args, **kwargs) return self.handler(self.__uriparts, *args, **kwargs)
class FromCredentialsMixin(object):
@classmethod
def from_credentials(cls, cred_file=None, max_workers=None):
wq = cls()
for cred in islice(utils.get_credentials(cred_file), max_workers):
wq.ready(cls.worker_class(cred["user"], cred))
return wq
class TwitterWorker(object): class TwitterWorker(object):
def __init__(self, name, client): api_class = None
def __init__(self, name, creds):
self.name = name self.name = name
self.client = client self._client = None
self.cred = creds
self._lock = Lock() self._lock = Lock()
self.busy = False self.busy = False
self.limits = self.client.application.rate_limit_status()
@property
def client(self):
if not self._client:
auth=OAuth(self.cred['token_key'],
self.cred['token_secret'],
self.cred['consumer_key'],
self.cred['consumer_secret'])
self._client = self.api_class(auth=auth)
return self._client
class RestWorker(TwitterWorker):
api_class = Twitter
def __init__(self, *args, **kwargs):
super(RestWorker, self).__init__(*args, **kwargs)
self._limits = None
@property
def limits(self):
if not self._limits:
self._limits = self.client.application.rate_limit_status()
return self._limits
def is_limited(self, uriparts): def is_limited(self, uriparts):
limit = self.get_limit(uriparts) return self.get_wait(uriparts)>0
if limit and limit['remaining'] <=0:
t = datime.datetime.now()
delta = limit['reset'] - t
if delta < datetime.timedelta(seconds=1):
return True
return False
def get_wait(self, uriparts): def get_wait(self, uriparts):
limits = self.get_limit(uriparts) limits = self.get_limit(uriparts)
if limits['remaining'] > 0: if limits['remaining'] > 0:
return 0 return 0
reset = datetime.datetime.fromtimestamp(limits.get('reset', 0)) reset = limits.get('reset', 0)
now = datetime.datetime.now() now = time.time()
return max(0, (reset-now).total_seconds()) return max(0, (reset-now))
def get_limit(self, uriparts): def get_limit(self, uriparts):
uri = '/'+'/'.join(uriparts) uri = '/'+'/'.join(uriparts)
return self.limits.get('resources', {}).get(uriparts[0], {}).get(uri, {}) for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items():
if ix.startswith(uri):
return i
return {}
def set_limit(self, uriparts, value): def set_limit(self, uriparts, value):
uri = '/'+'/'.join(uriparts) uri = '/'+'/'.join(uriparts)
@@ -90,7 +129,10 @@ class TwitterWorker(object):
class TwitterQueue(AttrToFunc): class QueueException(BaseException):
pass
class QueueMixin(AttrToFunc, FromCredentialsMixin):
def __init__(self, wait=True): def __init__(self, wait=True):
logger.debug('Creating worker queue') logger.debug('Creating worker queue')
self.queue = set() self.queue = set()
@@ -101,10 +143,15 @@ class TwitterQueue(AttrToFunc):
def ready(self, worker): def ready(self, worker):
self.queue.add(worker) self.queue.add(worker)
class TwitterQueue(QueueMixin):
worker_class = RestWorker
def handle_call(self, uriparts, *args, **kwargs): def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts)) logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs)) logger.debug('With: {} {}'.format(args, kwargs))
while True: patience = 1
while patience:
c = None c = None
try: try:
c = self.next(uriparts) c = self.next(uriparts)
@@ -127,22 +174,24 @@ class TwitterQueue(AttrToFunc):
except urllib.error.URLError as ex: except urllib.error.URLError as ex:
time.sleep(5) time.sleep(5)
logger.info('Something fishy happened: {}'.format(ex)) logger.info('Something fishy happened: {}'.format(ex))
raise
finally: finally:
if c: if c:
c.busy = False c.busy = False
c._lock.release() c._lock.release()
if not self.wait:
patience -= 1
@classmethod def get_wait(self, uriparts):
def from_credentials(self, cred_file=None): # Stop as soon as one is available to avoid initiating the rest
wq = TwitterQueue() for i in self.queue:
if not i.busy and i.get_wait(uriparts) == 0:
return 0
# If None is available, let's see how much we have to wait
available = filter(lambda x: not x.busy, self.queue)
diff = min(worker.get_wait(uriparts) for worker in self.queue if not worker.busy)
return diff
for cred in utils.get_credentials(cred_file):
c = Twitter(auth=OAuth(cred['token_key'],
cred['token_secret'],
cred['consumer_key'],
cred['consumer_secret']))
wq.ready(TwitterWorker(cred["user"], c))
return wq
def _next(self, uriparts): def _next(self, uriparts):
logger.debug('Getting next available') logger.debug('Getting next available')
@@ -151,7 +200,7 @@ class TwitterQueue(AttrToFunc):
for worker in s: for worker in s:
if not worker.is_limited(uriparts) and not worker.busy: if not worker.is_limited(uriparts) and not worker.busy:
return worker return worker
raise Exception('No worker is available') raise QueueException('No worker is available')
def next(self, uriparts): def next(self, uriparts):
if not self.wait: if not self.wait:
@@ -159,14 +208,54 @@ class TwitterQueue(AttrToFunc):
while True: while True:
try: try:
return self._next(uriparts) return self._next(uriparts)
except Exception: except QueueException:
available = filter(lambda x: not x.busy, self.queue) available = filter(lambda x: not x.busy, self.queue)
if available: if available:
first_worker = min(available, key=lambda x: x.get_wait(uriparts)) diff = self.get_wait(uriparts)
diff = first_worker.get_wait(uriparts)
logger.info("All workers are throttled. Waiting %s seconds" % diff) logger.info("All workers are throttled. Waiting %s seconds" % diff)
else: else:
diff = 5 diff = 5
logger.info("All workers are busy. Waiting %s seconds" % diff) logger.info("All workers are busy. Waiting %s seconds" % diff)
time.sleep(diff) time.sleep(diff)
class StreamWorker(TwitterWorker):
api_class = TwitterStream
def __init__(self, *args, **kwargs):
super(StreamWorker, self).__init__(*args, **kwargs)
class StreamQueue(QueueMixin):
worker_class = StreamWorker
def __init__(self, wait=True):
logger.debug('Creating worker queue')
self.queue = set()
self.index = 0
self.wait = wait
AttrToFunc.__init__(self, handler=self.handle_call)
def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs))
c = None
c = self.next(uriparts)
c._lock.acquire()
c.busy = True
logger.debug('Next: {}'.format(c.name))
ping = time.time()
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
for i in resp:
yield i
pong = time.time()
logger.debug('Listening for: {}'.format(pong-ping))
c.busy = False
c._lock.release()
def next(self, uriparts):
logger.debug('Getting next available')
s = list(self.queue)
random.shuffle(s)
for worker in s:
if not worker.busy:
return worker
raise QueueException('No worker is available')

View File

@@ -1,3 +1,5 @@
from __future__ import print_function
import logging import logging
import time import time
import json import json
@@ -9,10 +11,20 @@ import os
import multiprocessing import multiprocessing
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from itertools import islice from tqdm import tqdm
from itertools import islice, chain
from contextlib import contextmanager from contextlib import contextmanager
try:
from itertools import izip_longest
except ImportError:
from itertools import zip_longest from itertools import zip_longest
from collections import Counter
from builtins import map, filter
from twitter import TwitterHTTPError from twitter import TwitterHTTPError
from bitter.models import Following, User, ExtractorEntry, make_session from bitter.models import Following, User, ExtractorEntry, make_session
@@ -26,15 +38,14 @@ def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!') logger.info('You pressed Ctrl+C!')
sys.exit(0) sys.exit(0)
def chunk(iterable, n, fillvalue=None): def chunk(iterable, n):
args = [iter(iterable)] * n it = iter(iterable)
return zip_longest(*args, fillvalue=fillvalue) return iter(lambda: tuple(islice(it, n)), ())
def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()): def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
if chunksize:
source = chunk(source, chunksize) source = chunk(source, chunksize)
p = ThreadPool(numcpus) p = ThreadPool(numcpus*2)
for i in p.imap(func, source): for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))):
yield i yield i
def get_credentials_path(credfile=None): def get_credentials_path(credfile=None):
@@ -86,6 +97,26 @@ def add_credentials(credfile=None, **creds):
f.write('\n') f.write('\n')
def get_hashtags(iter_tweets, best=None):
c = Counter()
for tweet in iter_tweets:
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
return c
def read_file(filename, tail=False):
with open(filename) as f:
while True:
line = f.readline()
if line not in (None, '', '\n'):
tweet = json.loads(line.strip())
yield tweet
else:
if tail:
time.sleep(1)
else:
return
def get_users(wq, ulist, by_name=False, queue=None, max_users=100): def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
t = 'name' if by_name else 'uid' t = 'name' if by_name else 'uid'
logger.debug('Getting users by {}: {}'.format(t, ulist)) logger.debug('Getting users by {}: {}'.format(t, ulist))
@@ -134,12 +165,12 @@ def add_user(session, user, enqueue=False):
user = User(**user) user = User(**user)
session.add(user) session.add(user)
if extract: if extract:
logging.debug('Adding entry') logger.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry: if not entry:
entry = ExtractorEntry(user=user.id) entry = ExtractorEntry(user=user.id)
session.add(entry) session.add(entry)
logging.debug(entry.pending) logger.debug(entry.pending)
entry.pending = True entry.pending = True
entry.cursor = -1 entry.cursor = -1
session.commit() session.commit()
@@ -188,10 +219,10 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
add_user(session, i, enqueue=True) add_user(session, i, enqueue=True)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar() total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logging.info('Total users: {}'.format(total_users)) logger.info('Total users: {}'.format(total_users))
def pending_entries(): def pending_entries():
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count() pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
logging.info('Pending: {}'.format(pending)) logger.info('Pending: {}'.format(pending))
return pending return pending
while pending_entries() > 0: while pending_entries() > 0:
@@ -255,7 +286,7 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
entry.pending = pending entry.pending = pending
entry.cursor = cursor entry.cursor = cursor
logging.debug('Entry: {} - {}'.format(entry.user, entry.pending)) logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate) session.add(candidate)
session.commit() session.commit()
@@ -281,3 +312,85 @@ def get_user(c, user):
return c.users.lookup(user_id=user)[0] return c.users.lookup(user_id=user)[0]
except ValueError: except ValueError:
return c.users.lookup(screen_name=user)[0] return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
cached = cached_tweet(tweetid, folder)
tweet = None
if update or not cached:
tweet = get_tweet(wq, tweetid)
js = json.dumps(tweet, indent=2)
if write:
if tweet:
write_tweet_json(js, folder)
else:
print(js)
def cached_tweet(tweetid, folder):
tweet = None
file = os.path.join(folder, '%s.json' % tweetid)
if os.path.exists(file) and os.path.isfile(file):
try:
# print('%s: Tweet exists' % tweetid)
with open(file) as f:
tweet = json.load(f)
except Exception as ex:
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
return tweet
def write_tweet_json(js, folder):
tweetid = js['id']
file = tweet_file(tweetid, folder)
if not os.path.exists(folder):
os.makedirs(folder)
with open(file, 'w') as f:
json.dump(js, f, indent=2)
logger.info('Written {} to file {}'.format(tweetid, file))
def tweet_file(tweetid, folder):
return os.path.join(folder, '%s.json' % tweetid)
def tweet_fail_file(tweetid, folder):
failsfolder = os.path.join(folder, 'failed')
if not os.path.exists(failsfolder):
os.makedirs(failsfolder)
return os.path.join(failsfolder, '%s.failed' % tweetid)
def tweet_failed(tweetid, folder):
return os.path.isfile(tweet_fail_file(tweetid, folder))
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
def filter_line(line):
tweetid = int(line)
# print('Checking {}'.format(tweetid))
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
yield None
else:
yield line
def print_result(res):
tid, tweet = res
if tweet:
try:
write_tweet_json(tweet, folder=folder)
yield 1
except Exception as ex:
logger.error('%s: %s' % (tid, ex))
if not ignore_fails:
raise
else:
logger.info('Tweet not recovered: {}'.format(tid))
with open(tweet_fail_file(tid, folder), 'w') as f:
print('Tweet not found', file=f)
yield -1
def download_batch(batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
return tweets.items()
with open(tweetsfile) as f:
lines = map(lambda x: x.strip(), f)
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
tweets = parallel(download_batch, lines_to_crawl, 100)
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
pass

View File

@@ -1,3 +1,4 @@
sqlalchemy sqlalchemy
twitter twitter
click click
tqdm

4
setup.cfg Normal file
View File

@@ -0,0 +1,4 @@
[metadata]
description-file = README.md
[aliases]
test=pytest

View File

@@ -38,10 +38,19 @@ setup(
extras_require = { extras_require = {
'server': ['flask', 'flask-oauthlib'] 'server': ['flask', 'flask-oauthlib']
}, },
test_suite="tests", setup_requires=['pytest-runner',],
include_package_data=True, include_package_data=True,
entry_points=""" entry_points="""
[console_scripts] [console_scripts]
bitter=bitter.cli:main bitter=bitter.cli:main
""" """,
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Science/Research',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
]
) )

View File

@@ -5,14 +5,14 @@ import types
import datetime import datetime
import time import time
from bitter import utils from bitter import utils, easy
from bitter.crawlers import TwitterQueue, TwitterWorker from bitter.crawlers import QueueException
from bitter import config as c from bitter import config as c
class TestUtils(TestCase): class TestCrawlers(TestCase):
def setUp(self): def setUp(self):
self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json')) self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json'))
def test_create_worker(self): def test_create_worker(self):
assert len(self.wq.queue)==1 assert len(self.wq.queue)==1
@@ -39,6 +39,8 @@ class TestUtils(TestCase):
def test_is_limited(self): def test_is_limited(self):
w1 = list(self.wq.queue)[0] w1 = list(self.wq.queue)[0]
assert not w1.is_limited(['statuses', 'lookup']) assert not w1.is_limited(['statuses', 'lookup'])
w1.set_limit(['test', 'limited'], {'remaining': 0, 'reset': time.time()+100})
assert w1.is_limited(['test', 'limited'])
def test_call(self): def test_call(self):
w1 = list(self.wq.queue)[0] w1 = list(self.wq.queue)[0]
@@ -46,3 +48,25 @@ class TestUtils(TestCase):
resp = self.wq.users.lookup(screen_name='balkian') resp = self.wq.users.lookup(screen_name='balkian')
l2 = w1.get_limit(['users', 'lookup']) l2 = w1.get_limit(['users', 'lookup'])
assert l1['remaining']-l2['remaining'] == 1 assert l1['remaining']-l2['remaining'] == 1
def test_consume(self):
w1 = list(self.wq.queue)[0]
l1 = w1.get_limit(['friends', 'list'])
self.wq.wait = False
for i in range(l1['remaining']):
print(i)
resp = self.wq.friends.list(screen_name='balkian')
# l2 = w1.get_limit(['users', 'lookup'])
# assert l2['remaining'] == 0
# self.wq.users.lookup(screen_name='balkian')
failed = False
try:
# resp = self.wq.friends.list(screen_name='balkian')
self.wq.next(['friends', 'list'])
except QueueException:
failed = True
assert failed
l2 = w1.get_limit(['friends', 'list'])
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)

View File

@@ -58,4 +58,6 @@ class TestUtils(TestCase):
assert list(resp) == [1,2,3] assert list(resp) == [1,2,3]
toc = time.time() toc = time.time()
assert (tic-toc) < 6000 assert (tic-toc) < 6000
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert list(resp2) == [1,2,3,4]