1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-25 20:58:24 +00:00

5 Commits
0.7.0 ... 0.7.2

Author SHA1 Message Date
J. Fernando Sánchez
c940709df8 Fixed tweet error 2017-05-21 21:27:46 +02:00
J. Fernando Sánchez
1cb86abbdd Use easy in tests and README 2017-03-08 12:15:48 +01:00
J. Fernando Sánchez
b212a46ab7 Added CI and tests 2016-12-06 01:30:32 +01:00
J. Fernando Sánchez
0a0d8fd5f1 Improved tweet downloader (CLI and API) 2016-12-06 00:03:38 +01:00
J. Fernando Sánchez
e3a78968da Py2 compatibility and queue handling
* Removed install_aliases(), which caused problems with urllib2
* Better waiting time calculation (used in queue handling)
2016-11-23 12:31:02 +01:00
15 changed files with 232 additions and 47 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
__pycache__
*.egg-info *.egg-info
dist dist
env env

7
Dockerfile-2.7 Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

7
Dockerfile-3.4 Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:3.4-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

7
Dockerfile.template Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:{{PYVERSION}}-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -5,4 +5,6 @@ include README.md
include bitter/VERSION include bitter/VERSION
graft bitter/templates graft bitter/templates
graft bitter/static graft bitter/static
graft test include tests/test*
global-exclude *.pyc
global-exclude __pycache__

76
Makefile Normal file
View File

@@ -0,0 +1,76 @@
PYVERSIONS=3.4 2.7
PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter
REPO=balkian
VERSION=$(shell cat $(NAME)/VERSION)
TARNAME=$(NAME)-$(VERSION).tar.gz
IMAGENAME=$(REPO)/$(NAME)
IMAGEWTAG=$(IMAGENAME):$(VERSION)
all: build run
dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS))
Dockerfile-%: Dockerfile.template
sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$*
dev-%:
@docker start $(NAME)-dev$* || (\
$(MAKE) build-$*; \
docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
)\
docker exec -ti $(NAME)-dev$* bash
dev: dev-$(PYMAIN)
build: $(addprefix build-, $(PYMAIN))
buildall: $(addprefix build-, $(PYVERSIONS))
build-%: Dockerfile-%
docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .;
test: $(addprefix test-,$(PYMAIN))
testall: $(addprefix test-,$(PYVERSIONS))
test-%: build-%
docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
pip_test-%:
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
dist/$(NAME)-$(VERSION).tar.gz:
docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist;
sdist: dist/$(NAME)-$(VERSION).tar.gz
upload-%: test-%
docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
upload: testall $(addprefix upload-,$(PYVERSIONS))
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)'
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)'
clean:
@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true
@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true
upload_git:
git commit -a
git tag ${VERSION}
git push --tags origin master
pip_upload:
python setup.py sdist upload ;
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
run: build
docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
.PHONY: test test-% build-% build test test_pip run

View File

@@ -17,8 +17,8 @@ or
Programmatically: Programmatically:
```python ```python
from bitter.crawlers import TwitterQueue from bitter import easy
wq = TwitterQueue.from_credentials() wq = easy()
print(wq.users.show(user_name='balkian')) print(wq.users.show(user_name='balkian'))
``` ```

View File

@@ -1 +1 @@
0.7.0 0.7.1

View File

@@ -7,4 +7,10 @@ import os
from .version import __version__ from .version import __version__
def easy(*args, **kwargs):
from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -52,34 +52,15 @@ def tweet(ctx):
@click.argument('tweetid') @click.argument('tweetid')
def get_tweet(tweetid, write, folder, update): def get_tweet(tweetid, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
if not write: utils.download_tweet(wq, tweetid, write, folder, update)
t = utils.get_tweet(wq, tweetid)
js = json.dumps(t, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % tweetid)
if not update and os.path.exists(file) and os.path.isfile(file):
print('%s: Tweet exists' % tweetid)
return
try:
t = utils.get_tweet(wq, tweetid)
with open(file, 'w') as f:
js = json.dumps(t, indent=2)
print(js, file=f)
except Exception as ex:
print('%s: %s' % (tweetid, ex), file=sys.stderr)
@tweet.command('get_all') @tweet.command('get_all')
@click.argument('tweetsfile', 'File with a list of tweets to look up') @click.argument('tweetsfile', 'File with a list of tweets to look up')
@click.option('-f', '--folder', default="tweets") @click.option('-f', '--folder', default="tweets")
@click.pass_context @click.pass_context
def get_tweets(ctx, tweetsfile, folder): def get_tweets(ctx, tweetsfile, folder):
with open(tweetsfile) as f: wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
for line in f: utils.download_tweets(wq, tweetsfile, folder)
tid = line.strip()
ctx.invoke(get_tweet, folder=folder, tweetid=tid, write=True)
@tweet.command('search') @tweet.command('search')
@click.argument('query') @click.argument('query')

View File

@@ -1,3 +1,5 @@
from __future__ import print_function
import logging import logging
import time import time
import json import json
@@ -9,11 +11,20 @@ import os
import multiprocessing import multiprocessing
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from itertools import islice from tqdm import tqdm
from itertools import islice, chain
from contextlib import contextmanager from contextlib import contextmanager
from future.moves.itertools import zip_longest
try:
from itertools import izip_longest
except ImportError:
from itertools import zip_longest
from collections import Counter from collections import Counter
from builtins import map, filter
from twitter import TwitterHTTPError from twitter import TwitterHTTPError
from bitter.models import Following, User, ExtractorEntry, make_session from bitter.models import Following, User, ExtractorEntry, make_session
@@ -27,15 +38,14 @@ def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!') logger.info('You pressed Ctrl+C!')
sys.exit(0) sys.exit(0)
def chunk(iterable, n, fillvalue=None): def chunk(iterable, n):
args = [iter(iterable)] * n it = iter(iterable)
return zip_longest(*args, fillvalue=fillvalue) return iter(lambda: tuple(islice(it, n)), ())
def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()): def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
if chunksize:
source = chunk(source, chunksize) source = chunk(source, chunksize)
p = ThreadPool(numcpus) p = ThreadPool(numcpus*2)
for i in p.imap(func, source): for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))):
yield i yield i
def get_credentials_path(credfile=None): def get_credentials_path(credfile=None):
@@ -155,12 +165,12 @@ def add_user(session, user, enqueue=False):
user = User(**user) user = User(**user)
session.add(user) session.add(user)
if extract: if extract:
logging.debug('Adding entry') logger.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry: if not entry:
entry = ExtractorEntry(user=user.id) entry = ExtractorEntry(user=user.id)
session.add(entry) session.add(entry)
logging.debug(entry.pending) logger.debug(entry.pending)
entry.pending = True entry.pending = True
entry.cursor = -1 entry.cursor = -1
session.commit() session.commit()
@@ -209,10 +219,10 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
add_user(session, i, enqueue=True) add_user(session, i, enqueue=True)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar() total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logging.info('Total users: {}'.format(total_users)) logger.info('Total users: {}'.format(total_users))
def pending_entries(): def pending_entries():
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count() pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
logging.info('Pending: {}'.format(pending)) logger.info('Pending: {}'.format(pending))
return pending return pending
while pending_entries() > 0: while pending_entries() > 0:
@@ -276,7 +286,7 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
entry.pending = pending entry.pending = pending
entry.cursor = cursor entry.cursor = cursor
logging.debug('Entry: {} - {}'.format(entry.user, entry.pending)) logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate) session.add(candidate)
session.commit() session.commit()
@@ -302,3 +312,85 @@ def get_user(c, user):
return c.users.lookup(user_id=user)[0] return c.users.lookup(user_id=user)[0]
except ValueError: except ValueError:
return c.users.lookup(screen_name=user)[0] return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
cached = cached_tweet(tweetid, folder)
tweet = None
if update or not cached:
tweet = get_tweet(wq, tweetid)
js = json.dumps(tweet, indent=2)
if write:
if tweet:
write_tweet_json(js, folder)
else:
print(js)
def cached_tweet(tweetid, folder):
tweet = None
file = os.path.join(folder, '%s.json' % tweetid)
if os.path.exists(file) and os.path.isfile(file):
try:
# print('%s: Tweet exists' % tweetid)
with open(file) as f:
tweet = json.load(f)
except Exception as ex:
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
return tweet
def write_tweet_json(js, folder):
tweetid = js['id']
file = tweet_file(tweetid, folder)
if not os.path.exists(folder):
os.makedirs(folder)
with open(file, 'w') as f:
json.dump(js, f, indent=2)
logger.info('Written {} to file {}'.format(tweetid, file))
def tweet_file(tweetid, folder):
return os.path.join(folder, '%s.json' % tweetid)
def tweet_fail_file(tweetid, folder):
failsfolder = os.path.join(folder, 'failed')
if not os.path.exists(failsfolder):
os.makedirs(failsfolder)
return os.path.join(failsfolder, '%s.failed' % tweetid)
def tweet_failed(tweetid, folder):
return os.path.isfile(tweet_fail_file(tweetid, folder))
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
def filter_line(line):
tweetid = int(line)
# print('Checking {}'.format(tweetid))
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
yield None
else:
yield line
def print_result(res):
tid, tweet = res
if tweet:
try:
write_tweet_json(tweet, folder=folder)
yield 1
except Exception as ex:
logger.error('%s: %s' % (tid, ex))
if not ignore_fails:
raise
else:
logger.info('Tweet not recovered: {}'.format(tid))
with open(tweet_fail_file(tid, folder), 'w') as f:
print('Tweet not found', file=f)
yield -1
def download_batch(batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
return tweets.items()
with open(tweetsfile) as f:
lines = map(lambda x: x.strip(), f)
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
tweets = parallel(download_batch, lines_to_crawl, 100)
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
pass

4
setup.cfg Normal file
View File

@@ -0,0 +1,4 @@
[metadata]
description-file = README.md
[aliases]
test=pytest

View File

@@ -38,7 +38,7 @@ setup(
extras_require = { extras_require = {
'server': ['flask', 'flask-oauthlib'] 'server': ['flask', 'flask-oauthlib']
}, },
test_suite="tests", setup_requires=['pytest-runner',],
include_package_data=True, include_package_data=True,
entry_points=""" entry_points="""
[console_scripts] [console_scripts]
@@ -48,7 +48,7 @@ setup(
'Development Status :: 4 - Beta', 'Development Status :: 4 - Beta',
'Intended Audience :: Developers', 'Intended Audience :: Developers',
'Intended Audience :: Science/Research', 'Intended Audience :: Science/Research',
'License :: OSI Approved :: Apache 2 License', 'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 2', 'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3',

View File

@@ -5,14 +5,14 @@ import types
import datetime import datetime
import time import time
from bitter import utils from bitter import utils, easy
from bitter.crawlers import TwitterQueue, TwitterWorker, QueueException from bitter.crawlers import QueueException
from bitter import config as c from bitter import config as c
class TestUtils(TestCase): class TestCrawlers(TestCase):
def setUp(self): def setUp(self):
self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json')) self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json'))
def test_create_worker(self): def test_create_worker(self):
assert len(self.wq.queue)==1 assert len(self.wq.queue)==1

View File

@@ -58,4 +58,6 @@ class TestUtils(TestCase):
assert list(resp) == [1,2,3] assert list(resp) == [1,2,3]
toc = time.time() toc = time.time()
assert (tic-toc) < 6000 assert (tic-toc) < 6000
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert list(resp2) == [1,2,3,4]