1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-25 20:58:24 +00:00

1 Commits
0.7.2 ... 0.7.0

Author SHA1 Message Date
J. Fernando Sánchez
4b2f107b8a Py2 compatibility and queue handling
* Removed install_aliases(), which caused problems with urllib2
* Better waiting time calculation (used in queue handling)
2016-11-23 12:27:53 +01:00
15 changed files with 47 additions and 232 deletions

1
.gitignore vendored
View File

@@ -1,4 +1,3 @@
__pycache__
*.egg-info *.egg-info
dist dist
env env

View File

@@ -1,7 +0,0 @@
# onbuild copies . to /usr/src/app/
From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -1,7 +0,0 @@
# onbuild copies . to /usr/src/app/
From python:3.4-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -1,7 +0,0 @@
# onbuild copies . to /usr/src/app/
From python:{{PYVERSION}}-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -5,6 +5,4 @@ include README.md
include bitter/VERSION include bitter/VERSION
graft bitter/templates graft bitter/templates
graft bitter/static graft bitter/static
include tests/test* graft test
global-exclude *.pyc
global-exclude __pycache__

View File

@@ -1,76 +0,0 @@
PYVERSIONS=3.4 2.7
PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter
REPO=balkian
VERSION=$(shell cat $(NAME)/VERSION)
TARNAME=$(NAME)-$(VERSION).tar.gz
IMAGENAME=$(REPO)/$(NAME)
IMAGEWTAG=$(IMAGENAME):$(VERSION)
all: build run
dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS))
Dockerfile-%: Dockerfile.template
sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$*
dev-%:
@docker start $(NAME)-dev$* || (\
$(MAKE) build-$*; \
docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
)\
docker exec -ti $(NAME)-dev$* bash
dev: dev-$(PYMAIN)
build: $(addprefix build-, $(PYMAIN))
buildall: $(addprefix build-, $(PYVERSIONS))
build-%: Dockerfile-%
docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .;
test: $(addprefix test-,$(PYMAIN))
testall: $(addprefix test-,$(PYVERSIONS))
test-%: build-%
docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
pip_test-%:
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
dist/$(NAME)-$(VERSION).tar.gz:
docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist;
sdist: dist/$(NAME)-$(VERSION).tar.gz
upload-%: test-%
docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
upload: testall $(addprefix upload-,$(PYVERSIONS))
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)'
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)'
clean:
@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true
@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true
upload_git:
git commit -a
git tag ${VERSION}
git push --tags origin master
pip_upload:
python setup.py sdist upload ;
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
run: build
docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
.PHONY: test test-% build-% build test test_pip run

View File

@@ -17,8 +17,8 @@ or
Programmatically: Programmatically:
```python ```python
from bitter import easy from bitter.crawlers import TwitterQueue
wq = easy() wq = TwitterQueue.from_credentials()
print(wq.users.show(user_name='balkian')) print(wq.users.show(user_name='balkian'))
``` ```

View File

@@ -1 +1 @@
0.7.1 0.7.0

View File

@@ -7,10 +7,4 @@ import os
from .version import __version__ from .version import __version__
def easy(*args, **kwargs):
from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -52,15 +52,34 @@ def tweet(ctx):
@click.argument('tweetid') @click.argument('tweetid')
def get_tweet(tweetid, write, folder, update): def get_tweet(tweetid, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
utils.download_tweet(wq, tweetid, write, folder, update) if not write:
t = utils.get_tweet(wq, tweetid)
js = json.dumps(t, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % tweetid)
if not update and os.path.exists(file) and os.path.isfile(file):
print('%s: Tweet exists' % tweetid)
return
try:
t = utils.get_tweet(wq, tweetid)
with open(file, 'w') as f:
js = json.dumps(t, indent=2)
print(js, file=f)
except Exception as ex:
print('%s: %s' % (tweetid, ex), file=sys.stderr)
@tweet.command('get_all') @tweet.command('get_all')
@click.argument('tweetsfile', 'File with a list of tweets to look up') @click.argument('tweetsfile', 'File with a list of tweets to look up')
@click.option('-f', '--folder', default="tweets") @click.option('-f', '--folder', default="tweets")
@click.pass_context @click.pass_context
def get_tweets(ctx, tweetsfile, folder): def get_tweets(ctx, tweetsfile, folder):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) with open(tweetsfile) as f:
utils.download_tweets(wq, tweetsfile, folder) for line in f:
tid = line.strip()
ctx.invoke(get_tweet, folder=folder, tweetid=tid, write=True)
@tweet.command('search') @tweet.command('search')
@click.argument('query') @click.argument('query')

View File

@@ -1,5 +1,3 @@
from __future__ import print_function
import logging import logging
import time import time
import json import json
@@ -11,20 +9,11 @@ import os
import multiprocessing import multiprocessing
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from tqdm import tqdm from itertools import islice
from itertools import islice, chain
from contextlib import contextmanager from contextlib import contextmanager
from future.moves.itertools import zip_longest
try:
from itertools import izip_longest
except ImportError:
from itertools import zip_longest
from collections import Counter from collections import Counter
from builtins import map, filter
from twitter import TwitterHTTPError from twitter import TwitterHTTPError
from bitter.models import Following, User, ExtractorEntry, make_session from bitter.models import Following, User, ExtractorEntry, make_session
@@ -38,14 +27,15 @@ def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!') logger.info('You pressed Ctrl+C!')
sys.exit(0) sys.exit(0)
def chunk(iterable, n): def chunk(iterable, n, fillvalue=None):
it = iter(iterable) args = [iter(iterable)] * n
return iter(lambda: tuple(islice(it, n)), ()) return zip_longest(*args, fillvalue=fillvalue)
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize) if chunksize:
p = ThreadPool(numcpus*2) source = chunk(source, chunksize)
for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))): p = ThreadPool(numcpus)
for i in p.imap(func, source):
yield i yield i
def get_credentials_path(credfile=None): def get_credentials_path(credfile=None):
@@ -165,12 +155,12 @@ def add_user(session, user, enqueue=False):
user = User(**user) user = User(**user)
session.add(user) session.add(user)
if extract: if extract:
logger.debug('Adding entry') logging.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry: if not entry:
entry = ExtractorEntry(user=user.id) entry = ExtractorEntry(user=user.id)
session.add(entry) session.add(entry)
logger.debug(entry.pending) logging.debug(entry.pending)
entry.pending = True entry.pending = True
entry.cursor = -1 entry.cursor = -1
session.commit() session.commit()
@@ -219,10 +209,10 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
add_user(session, i, enqueue=True) add_user(session, i, enqueue=True)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar() total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logger.info('Total users: {}'.format(total_users)) logging.info('Total users: {}'.format(total_users))
def pending_entries(): def pending_entries():
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count() pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
logger.info('Pending: {}'.format(pending)) logging.info('Pending: {}'.format(pending))
return pending return pending
while pending_entries() > 0: while pending_entries() > 0:
@@ -286,7 +276,7 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
entry.pending = pending entry.pending = pending
entry.cursor = cursor entry.cursor = cursor
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending)) logging.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate) session.add(candidate)
session.commit() session.commit()
@@ -312,85 +302,3 @@ def get_user(c, user):
return c.users.lookup(user_id=user)[0] return c.users.lookup(user_id=user)[0]
except ValueError: except ValueError:
return c.users.lookup(screen_name=user)[0] return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
cached = cached_tweet(tweetid, folder)
tweet = None
if update or not cached:
tweet = get_tweet(wq, tweetid)
js = json.dumps(tweet, indent=2)
if write:
if tweet:
write_tweet_json(js, folder)
else:
print(js)
def cached_tweet(tweetid, folder):
tweet = None
file = os.path.join(folder, '%s.json' % tweetid)
if os.path.exists(file) and os.path.isfile(file):
try:
# print('%s: Tweet exists' % tweetid)
with open(file) as f:
tweet = json.load(f)
except Exception as ex:
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
return tweet
def write_tweet_json(js, folder):
tweetid = js['id']
file = tweet_file(tweetid, folder)
if not os.path.exists(folder):
os.makedirs(folder)
with open(file, 'w') as f:
json.dump(js, f, indent=2)
logger.info('Written {} to file {}'.format(tweetid, file))
def tweet_file(tweetid, folder):
return os.path.join(folder, '%s.json' % tweetid)
def tweet_fail_file(tweetid, folder):
failsfolder = os.path.join(folder, 'failed')
if not os.path.exists(failsfolder):
os.makedirs(failsfolder)
return os.path.join(failsfolder, '%s.failed' % tweetid)
def tweet_failed(tweetid, folder):
return os.path.isfile(tweet_fail_file(tweetid, folder))
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
def filter_line(line):
tweetid = int(line)
# print('Checking {}'.format(tweetid))
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
yield None
else:
yield line
def print_result(res):
tid, tweet = res
if tweet:
try:
write_tweet_json(tweet, folder=folder)
yield 1
except Exception as ex:
logger.error('%s: %s' % (tid, ex))
if not ignore_fails:
raise
else:
logger.info('Tweet not recovered: {}'.format(tid))
with open(tweet_fail_file(tid, folder), 'w') as f:
print('Tweet not found', file=f)
yield -1
def download_batch(batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
return tweets.items()
with open(tweetsfile) as f:
lines = map(lambda x: x.strip(), f)
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
tweets = parallel(download_batch, lines_to_crawl, 100)
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
pass

View File

@@ -1,4 +0,0 @@
[metadata]
description-file = README.md
[aliases]
test=pytest

View File

@@ -38,7 +38,7 @@ setup(
extras_require = { extras_require = {
'server': ['flask', 'flask-oauthlib'] 'server': ['flask', 'flask-oauthlib']
}, },
setup_requires=['pytest-runner',], test_suite="tests",
include_package_data=True, include_package_data=True,
entry_points=""" entry_points="""
[console_scripts] [console_scripts]
@@ -48,7 +48,7 @@ setup(
'Development Status :: 4 - Beta', 'Development Status :: 4 - Beta',
'Intended Audience :: Developers', 'Intended Audience :: Developers',
'Intended Audience :: Science/Research', 'Intended Audience :: Science/Research',
'License :: OSI Approved :: Apache Software License', 'License :: OSI Approved :: Apache 2 License',
'Programming Language :: Python :: 2', 'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3',

View File

@@ -5,14 +5,14 @@ import types
import datetime import datetime
import time import time
from bitter import utils, easy from bitter import utils
from bitter.crawlers import QueueException from bitter.crawlers import TwitterQueue, TwitterWorker, QueueException
from bitter import config as c from bitter import config as c
class TestCrawlers(TestCase): class TestUtils(TestCase):
def setUp(self): def setUp(self):
self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json')) self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json'))
def test_create_worker(self): def test_create_worker(self):
assert len(self.wq.queue)==1 assert len(self.wq.queue)==1

View File

@@ -58,6 +58,4 @@ class TestUtils(TestCase):
assert list(resp) == [1,2,3] assert list(resp) == [1,2,3]
toc = time.time() toc = time.time()
assert (tic-toc) < 6000 assert (tic-toc) < 6000
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert list(resp2) == [1,2,3,4]