1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-24 20:28:24 +00:00

14 Commits
0.5.5 ... 0.7.2

Author SHA1 Message Date
J. Fernando Sánchez
c940709df8 Fixed tweet error 2017-05-21 21:27:46 +02:00
J. Fernando Sánchez
1cb86abbdd Use easy in tests and README 2017-03-08 12:15:48 +01:00
J. Fernando Sánchez
b212a46ab7 Added CI and tests 2016-12-06 01:30:32 +01:00
J. Fernando Sánchez
0a0d8fd5f1 Improved tweet downloader (CLI and API) 2016-12-06 00:03:38 +01:00
J. Fernando Sánchez
e3a78968da Py2 compatibility and queue handling
* Removed install_aliases(), which caused problems with urllib2
* Better waiting time calculation (used in queue handling)
2016-11-23 12:31:02 +01:00
J. Fernando Sánchez
67ef307cce Improved tweet extraction @ CLI 2016-11-23 10:50:01 +01:00
J. Fernando Sánchez
cb054ac365 Added (bulk) user download 2016-11-22 20:07:18 +01:00
J. Fernando Sánchez
bdc4690240 Fixed tail argument 2016-11-19 21:42:08 +01:00
J. Fernando Sánchez
c0309a1e52 Fixed python2 compatibility issues (print!) 2016-11-19 20:38:44 +01:00
J. Fernando Sánchez
4afdd6807d Fixed MANIFEST error 2016-11-19 20:24:02 +01:00
J. Fernando Sánchez
38605ba2c8 Added stream to CLI
* Save stream to file
* Parse file and get the most important hashtags
2016-11-19 20:16:56 +01:00
J. Fernando Sánchez
738823c8a2 Added Streaming workers/queues 2016-11-18 16:08:29 +01:00
J. Fernando Sánchez
3f42879751 Bumped to v0.5.6 2016-09-28 06:31:33 +02:00
J. Fernando Sánchez
35f0c6376d Fixed limits bug, added tests 2016-09-28 06:30:49 +02:00
17 changed files with 524 additions and 84 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
__pycache__
*.egg-info
dist
env

7
Dockerfile-2.7 Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

7
Dockerfile-3.4 Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:3.4-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

7
Dockerfile.template Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:{{PYVERSION}}-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -1,7 +1,10 @@
include requirements.txt
include requirements-py2.txt
include test-requirements.txt
include README.md
include bitter/VERSION
graft bitter/templates
graft bitter/static
graft test
include tests/test*
global-exclude *.pyc
global-exclude __pycache__

76
Makefile Normal file
View File

@@ -0,0 +1,76 @@
PYVERSIONS=3.4 2.7
PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter
REPO=balkian
VERSION=$(shell cat $(NAME)/VERSION)
TARNAME=$(NAME)-$(VERSION).tar.gz
IMAGENAME=$(REPO)/$(NAME)
IMAGEWTAG=$(IMAGENAME):$(VERSION)
all: build run
dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS))
Dockerfile-%: Dockerfile.template
sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$*
dev-%:
@docker start $(NAME)-dev$* || (\
$(MAKE) build-$*; \
docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
)\
docker exec -ti $(NAME)-dev$* bash
dev: dev-$(PYMAIN)
build: $(addprefix build-, $(PYMAIN))
buildall: $(addprefix build-, $(PYVERSIONS))
build-%: Dockerfile-%
docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .;
test: $(addprefix test-,$(PYMAIN))
testall: $(addprefix test-,$(PYVERSIONS))
test-%: build-%
docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
pip_test-%:
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
dist/$(NAME)-$(VERSION).tar.gz:
docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist;
sdist: dist/$(NAME)-$(VERSION).tar.gz
upload-%: test-%
docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
upload: testall $(addprefix upload-,$(PYVERSIONS))
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)'
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)'
clean:
@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true
@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true
upload_git:
git commit -a
git tag ${VERSION}
git push --tags origin master
pip_upload:
python setup.py sdist upload ;
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
run: build
docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
.PHONY: test test-% build-% build test test_pip run

View File

@@ -17,8 +17,8 @@ or
Programmatically:
```python
from bitter.crawlers import TwitterQueue
wq = TwitterQueue.from_credentials()
from bitter import easy
wq = easy()
print(wq.users.show(user_name='balkian'))
```

View File

@@ -1 +1 @@
0.5.5
0.7.1

View File

@@ -3,15 +3,14 @@ Bitter module. A library and cli for Twitter using python-twitter.
http://github.com/balkian/bitter
"""
try:
from future.standard_library import install_aliases
install_aliases()
except ImportError:
# Avoid problems at setup.py and py3.x
pass
import os
from .version import __version__
def easy(*args, **kwargs):
from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -1,3 +1,5 @@
from __future__ import print_function
import click
import json
import os
@@ -6,6 +8,7 @@ import time
import sqlalchemy.types
import threading
import sqlite3
from tqdm import tqdm
from sqlalchemy import exists
@@ -43,30 +46,36 @@ def tweet(ctx):
pass
@tweet.command('get')
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
@click.argument('tweetid')
@click.pass_context
def get_tweet(ctx, tweetid):
def get_tweet(tweetid, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
t = utils.get_tweet(wq, tweetid)
print(json.dumps(t, indent=2))
utils.download_tweet(wq, tweetid, write, folder, update)
@tweet.command('get_all')
@click.argument('tweetsfile', 'File with a list of tweets to look up')
@click.option('-f', '--folder', default="tweets")
@click.pass_context
def get_tweets(ctx, tweetsfile, folder):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
utils.download_tweets(wq, tweetsfile, folder)
@tweet.command('search')
@click.argument('query')
@click.pass_context
def get_tweet(ctx, query):
def search(ctx, query):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next()
t = utils.search_tweet(c.client, query)
t = utils.search_tweet(wq, query)
print(json.dumps(t, indent=2))
@tweet.command('timeline')
@click.argument('user')
@click.pass_context
def get_tweet(ctx, user):
def timeline(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next()
t = utils.user_timeline(c.client, user)
t = utils.user_timeline(wq, user)
print(json.dumps(t, indent=2))
@main.group()
@@ -85,23 +94,47 @@ def list_users(ctx, db):
for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j)))
@users.command('get_one')
@click.argument('user')
@click.pass_context
def get_user(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next()
u = utils.get_user(c.client, user)
print(json.dumps(u, indent=2))
@users.command('get')
@click.argument('user')
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
def get_user(user, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
if not write:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % user)
if not update and os.path.exists(file) and os.path.isfile(file):
print('User exists: %s' % user)
return
with open(file, 'w') as f:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js, file=f)
@users.command('get_all')
@click.argument('usersfile', 'File with a list of users to look up')
@click.option('-f', '--folder', default="users")
@click.pass_context
def get_users(ctx, usersfile, folder):
with open(usersfile) as f:
for line in f:
uid = line.strip()
ctx.invoke(get_user, folder=folder, user=uid, write=True)
@users.command('crawl')
@click.option('--db', required=True, help='Database to save all users.')
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
@click.argument('usersfile', 'File with a list of users to look up')
@click.pass_context
def get_users(ctx, usersfile, skip, until, threads, db):
def crawl_users(ctx, usersfile, skip, until, threads, db):
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
if '://' not in db:
@@ -330,6 +363,71 @@ def run_server(ctx, consumer_key, consumer_secret):
from .webserver import app
app.run(host='0.0.0.0')
@main.group()
@click.pass_context
def stream(ctx):
pass
@stream.command('get')
@click.option('-l', '--locations', default=None)
@click.option('-t', '--track', default=None)
@click.option('-f', '--file', help='File to store the stream of tweets')
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
@click.pass_context
def get_stream(ctx, locations, track, file, politelyretry):
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
query_args = {}
if locations:
query_args['locations'] = locations
if track:
query_args['track'] = track
if not file:
file = sys.stdout
else:
file = open(file, 'a')
def insist():
lasthangup = time.time()
while True:
if not query_args:
iterator = wq.statuses.sample()
else:
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
for i in iterator:
yield i
if not politelyretry:
return
thishangup = time.time()
if thishangup - lasthangup < 60:
raise Exception('Too many hangups in a row.')
time.sleep(3)
for tweet in tqdm(insist()):
print(json.dumps(tweet), file=file)
if file != sys.stdout:
file.close()
@stream.command('read')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False)
@click.pass_context
def read_stream(ctx, file, tail):
for tweet in utils.read_file(file, tail=tail):
try:
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
except (KeyError, TypeError):
print('Raw tweet: {}'.format(tweet))
@stream.command('tags')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.argument('limit', required=False, default=None, type=int)
@click.pass_context
def tags_stream(ctx, file, limit):
c = utils.get_hashtags(utils.read_file(file))
for count, tag in c.most_common(limit):
print(u'{} - {}'.format(count, tag))
if __name__ == '__main__':
main()

View File

@@ -1,5 +1,4 @@
import time
import datetime
import urllib
import random
import json
@@ -10,6 +9,12 @@ logger = logging.getLogger(__name__)
from twitter import *
from collections import OrderedDict
from threading import Lock
from itertools import islice
try:
import itertools.ifilter as filter
except ImportError:
pass
from . import utils
from . import config
@@ -38,34 +43,68 @@ class AttrToFunc(object):
# kwargs[i] = a
return self.handler(self.__uriparts, *args, **kwargs)
class FromCredentialsMixin(object):
@classmethod
def from_credentials(cls, cred_file=None, max_workers=None):
wq = cls()
for cred in islice(utils.get_credentials(cred_file), max_workers):
wq.ready(cls.worker_class(cred["user"], cred))
return wq
class TwitterWorker(object):
def __init__(self, name, client):
api_class = None
def __init__(self, name, creds):
self.name = name
self.client = client
self._client = None
self.cred = creds
self._lock = Lock()
self.busy = False
self.limits = self.client.application.rate_limit_status()
@property
def client(self):
if not self._client:
auth=OAuth(self.cred['token_key'],
self.cred['token_secret'],
self.cred['consumer_key'],
self.cred['consumer_secret'])
self._client = self.api_class(auth=auth)
return self._client
class RestWorker(TwitterWorker):
api_class = Twitter
def __init__(self, *args, **kwargs):
super(RestWorker, self).__init__(*args, **kwargs)
self._limits = None
@property
def limits(self):
if not self._limits:
self._limits = self.client.application.rate_limit_status()
return self._limits
def is_limited(self, uriparts):
limit = self.get_limit(uriparts)
if limit and limit['remaining'] <=0:
t = datime.datetime.now()
delta = limit['reset'] - t
if delta < datetime.timedelta(seconds=1):
return True
return False
return self.get_wait(uriparts)>0
def get_wait(self, uriparts):
limits = self.get_limit(uriparts)
if limits['remaining'] > 0:
return 0
reset = datetime.datetime.fromtimestamp(limits.get('reset', 0))
now = datetime.datetime.now()
return max(0, (reset-now).total_seconds())
reset = limits.get('reset', 0)
now = time.time()
return max(0, (reset-now))
def get_limit(self, uriparts):
uri = '/'+'/'.join(uriparts)
return self.limits.get('resources', {}).get(uriparts[0], {}).get(uri, {})
for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items():
if ix.startswith(uri):
return i
return {}
def set_limit(self, uriparts, value):
uri = '/'+'/'.join(uriparts)
@@ -90,7 +129,10 @@ class TwitterWorker(object):
class TwitterQueue(AttrToFunc):
class QueueException(BaseException):
pass
class QueueMixin(AttrToFunc, FromCredentialsMixin):
def __init__(self, wait=True):
logger.debug('Creating worker queue')
self.queue = set()
@@ -101,10 +143,15 @@ class TwitterQueue(AttrToFunc):
def ready(self, worker):
self.queue.add(worker)
class TwitterQueue(QueueMixin):
worker_class = RestWorker
def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs))
while True:
patience = 1
while patience:
c = None
try:
c = self.next(uriparts)
@@ -127,22 +174,24 @@ class TwitterQueue(AttrToFunc):
except urllib.error.URLError as ex:
time.sleep(5)
logger.info('Something fishy happened: {}'.format(ex))
raise
finally:
if c:
c.busy = False
c._lock.release()
if not self.wait:
patience -= 1
@classmethod
def from_credentials(self, cred_file=None):
wq = TwitterQueue()
for cred in utils.get_credentials(cred_file):
c = Twitter(auth=OAuth(cred['token_key'],
cred['token_secret'],
cred['consumer_key'],
cred['consumer_secret']))
wq.ready(TwitterWorker(cred["user"], c))
return wq
def get_wait(self, uriparts):
# Stop as soon as one is available to avoid initiating the rest
for i in self.queue:
if not i.busy and i.get_wait(uriparts) == 0:
return 0
# If None is available, let's see how much we have to wait
available = filter(lambda x: not x.busy, self.queue)
diff = min(worker.get_wait(uriparts) for worker in self.queue if not worker.busy)
return diff
def _next(self, uriparts):
logger.debug('Getting next available')
@@ -151,7 +200,7 @@ class TwitterQueue(AttrToFunc):
for worker in s:
if not worker.is_limited(uriparts) and not worker.busy:
return worker
raise Exception('No worker is available')
raise QueueException('No worker is available')
def next(self, uriparts):
if not self.wait:
@@ -159,14 +208,54 @@ class TwitterQueue(AttrToFunc):
while True:
try:
return self._next(uriparts)
except Exception:
except QueueException:
available = filter(lambda x: not x.busy, self.queue)
if available:
first_worker = min(available, key=lambda x: x.get_wait(uriparts))
diff = first_worker.get_wait(uriparts)
diff = self.get_wait(uriparts)
logger.info("All workers are throttled. Waiting %s seconds" % diff)
else:
diff = 5
logger.info("All workers are busy. Waiting %s seconds" % diff)
time.sleep(diff)
class StreamWorker(TwitterWorker):
api_class = TwitterStream
def __init__(self, *args, **kwargs):
super(StreamWorker, self).__init__(*args, **kwargs)
class StreamQueue(QueueMixin):
worker_class = StreamWorker
def __init__(self, wait=True):
logger.debug('Creating worker queue')
self.queue = set()
self.index = 0
self.wait = wait
AttrToFunc.__init__(self, handler=self.handle_call)
def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs))
c = None
c = self.next(uriparts)
c._lock.acquire()
c.busy = True
logger.debug('Next: {}'.format(c.name))
ping = time.time()
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
for i in resp:
yield i
pong = time.time()
logger.debug('Listening for: {}'.format(pong-ping))
c.busy = False
c._lock.release()
def next(self, uriparts):
logger.debug('Getting next available')
s = list(self.queue)
random.shuffle(s)
for worker in s:
if not worker.busy:
return worker
raise QueueException('No worker is available')

View File

@@ -1,3 +1,5 @@
from __future__ import print_function
import logging
import time
import json
@@ -9,9 +11,19 @@ import os
import multiprocessing
from multiprocessing.pool import ThreadPool
from itertools import islice
from tqdm import tqdm
from itertools import islice, chain
from contextlib import contextmanager
from itertools import zip_longest
try:
from itertools import izip_longest
except ImportError:
from itertools import zip_longest
from collections import Counter
from builtins import map, filter
from twitter import TwitterHTTPError
@@ -26,15 +38,14 @@ def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!')
sys.exit(0)
def chunk(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def chunk(iterable, n):
it = iter(iterable)
return iter(lambda: tuple(islice(it, n)), ())
def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()):
if chunksize:
source = chunk(source, chunksize)
p = ThreadPool(numcpus)
for i in p.imap(func, source):
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize)
p = ThreadPool(numcpus*2)
for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))):
yield i
def get_credentials_path(credfile=None):
@@ -86,6 +97,26 @@ def add_credentials(credfile=None, **creds):
f.write('\n')
def get_hashtags(iter_tweets, best=None):
c = Counter()
for tweet in iter_tweets:
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
return c
def read_file(filename, tail=False):
with open(filename) as f:
while True:
line = f.readline()
if line not in (None, '', '\n'):
tweet = json.loads(line.strip())
yield tweet
else:
if tail:
time.sleep(1)
else:
return
def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
t = 'name' if by_name else 'uid'
logger.debug('Getting users by {}: {}'.format(t, ulist))
@@ -134,12 +165,12 @@ def add_user(session, user, enqueue=False):
user = User(**user)
session.add(user)
if extract:
logging.debug('Adding entry')
logger.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry:
entry = ExtractorEntry(user=user.id)
session.add(entry)
logging.debug(entry.pending)
logger.debug(entry.pending)
entry.pending = True
entry.cursor = -1
session.commit()
@@ -188,10 +219,10 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
add_user(session, i, enqueue=True)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logging.info('Total users: {}'.format(total_users))
logger.info('Total users: {}'.format(total_users))
def pending_entries():
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
logging.info('Pending: {}'.format(pending))
logger.info('Pending: {}'.format(pending))
return pending
while pending_entries() > 0:
@@ -255,7 +286,7 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
entry.pending = pending
entry.cursor = cursor
logging.debug('Entry: {} - {}'.format(entry.user, entry.pending))
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate)
session.commit()
@@ -281,3 +312,85 @@ def get_user(c, user):
return c.users.lookup(user_id=user)[0]
except ValueError:
return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
cached = cached_tweet(tweetid, folder)
tweet = None
if update or not cached:
tweet = get_tweet(wq, tweetid)
js = json.dumps(tweet, indent=2)
if write:
if tweet:
write_tweet_json(js, folder)
else:
print(js)
def cached_tweet(tweetid, folder):
tweet = None
file = os.path.join(folder, '%s.json' % tweetid)
if os.path.exists(file) and os.path.isfile(file):
try:
# print('%s: Tweet exists' % tweetid)
with open(file) as f:
tweet = json.load(f)
except Exception as ex:
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
return tweet
def write_tweet_json(js, folder):
tweetid = js['id']
file = tweet_file(tweetid, folder)
if not os.path.exists(folder):
os.makedirs(folder)
with open(file, 'w') as f:
json.dump(js, f, indent=2)
logger.info('Written {} to file {}'.format(tweetid, file))
def tweet_file(tweetid, folder):
return os.path.join(folder, '%s.json' % tweetid)
def tweet_fail_file(tweetid, folder):
failsfolder = os.path.join(folder, 'failed')
if not os.path.exists(failsfolder):
os.makedirs(failsfolder)
return os.path.join(failsfolder, '%s.failed' % tweetid)
def tweet_failed(tweetid, folder):
return os.path.isfile(tweet_fail_file(tweetid, folder))
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
def filter_line(line):
tweetid = int(line)
# print('Checking {}'.format(tweetid))
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
yield None
else:
yield line
def print_result(res):
tid, tweet = res
if tweet:
try:
write_tweet_json(tweet, folder=folder)
yield 1
except Exception as ex:
logger.error('%s: %s' % (tid, ex))
if not ignore_fails:
raise
else:
logger.info('Tweet not recovered: {}'.format(tid))
with open(tweet_fail_file(tid, folder), 'w') as f:
print('Tweet not found', file=f)
yield -1
def download_batch(batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
return tweets.items()
with open(tweetsfile) as f:
lines = map(lambda x: x.strip(), f)
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
tweets = parallel(download_batch, lines_to_crawl, 100)
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
pass

View File

@@ -1,3 +1,4 @@
sqlalchemy
twitter
click
tqdm

4
setup.cfg Normal file
View File

@@ -0,0 +1,4 @@
[metadata]
description-file = README.md
[aliases]
test=pytest

View File

@@ -38,10 +38,19 @@ setup(
extras_require = {
'server': ['flask', 'flask-oauthlib']
},
test_suite="tests",
setup_requires=['pytest-runner',],
include_package_data=True,
entry_points="""
[console_scripts]
bitter=bitter.cli:main
"""
""",
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Science/Research',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
]
)

View File

@@ -5,14 +5,14 @@ import types
import datetime
import time
from bitter import utils
from bitter.crawlers import TwitterQueue, TwitterWorker
from bitter import utils, easy
from bitter.crawlers import QueueException
from bitter import config as c
class TestUtils(TestCase):
class TestCrawlers(TestCase):
def setUp(self):
self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json'))
self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json'))
def test_create_worker(self):
assert len(self.wq.queue)==1
@@ -39,6 +39,8 @@ class TestUtils(TestCase):
def test_is_limited(self):
w1 = list(self.wq.queue)[0]
assert not w1.is_limited(['statuses', 'lookup'])
w1.set_limit(['test', 'limited'], {'remaining': 0, 'reset': time.time()+100})
assert w1.is_limited(['test', 'limited'])
def test_call(self):
w1 = list(self.wq.queue)[0]
@@ -46,3 +48,25 @@ class TestUtils(TestCase):
resp = self.wq.users.lookup(screen_name='balkian')
l2 = w1.get_limit(['users', 'lookup'])
assert l1['remaining']-l2['remaining'] == 1
def test_consume(self):
w1 = list(self.wq.queue)[0]
l1 = w1.get_limit(['friends', 'list'])
self.wq.wait = False
for i in range(l1['remaining']):
print(i)
resp = self.wq.friends.list(screen_name='balkian')
# l2 = w1.get_limit(['users', 'lookup'])
# assert l2['remaining'] == 0
# self.wq.users.lookup(screen_name='balkian')
failed = False
try:
# resp = self.wq.friends.list(screen_name='balkian')
self.wq.next(['friends', 'list'])
except QueueException:
failed = True
assert failed
l2 = w1.get_limit(['friends', 'list'])
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)

View File

@@ -58,4 +58,6 @@ class TestUtils(TestCase):
assert list(resp) == [1,2,3]
toc = time.time()
assert (tic-toc) < 6000
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert list(resp2) == [1,2,3,4]