1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-25 12:48:23 +00:00

1 Commits
0.7.1 ... 0.7.0

Author SHA1 Message Date
J. Fernando Sánchez
4b2f107b8a Py2 compatibility and queue handling
* Removed install_aliases(), which caused problems with urllib2
* Better waiting time calculation (used in queue handling)
2016-11-23 12:27:53 +01:00
13 changed files with 41 additions and 212 deletions

1
.gitignore vendored
View File

@@ -1,4 +1,3 @@
__pycache__
*.egg-info
dist
env

View File

@@ -1,7 +0,0 @@
# onbuild copies . to /usr/src/app/
From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -1,7 +0,0 @@
# onbuild copies . to /usr/src/app/
From python:3.4-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -1,7 +0,0 @@
# onbuild copies . to /usr/src/app/
From python:{{PYVERSION}}-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -5,6 +5,4 @@ include README.md
include bitter/VERSION
graft bitter/templates
graft bitter/static
include tests/test*
global-exclude *.pyc
global-exclude __pycache__
graft test

View File

@@ -1,62 +0,0 @@
PYVERSIONS=3.4 2.7
PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter
REPO=balkian
VERSION=$(shell cat $(NAME)/VERSION)
all: build run
dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS))
Dockerfile-%: Dockerfile.template
sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$*
build: $(addprefix build-, $(PYMAIN))
buildall: $(addprefix build-, $(PYVERSIONS))
build-%: Dockerfile-%
docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .;
test: $(addprefix test-,$(PYMAIN))
testall: $(addprefix test-,$(PYVERSIONS))
test-%: build-%
docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
pip_test-%:
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
dist/$(NAME)-$(VERSION).tar.gz:
docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist;
sdist: dist/$(NAME)-$(VERSION).tar.gz
upload-%: test-%
docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
upload: testall $(addprefix upload-,$(PYVERSIONS))
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)'
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)'
clean:
@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true
@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true
upload_git:
git commit -a
git tag ${VERSION}
git push --tags origin master
pip_upload:
python setup.py sdist upload ;
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
run: build
docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
.PHONY: test test-% build-% build test test_pip run

View File

@@ -1 +1 @@
0.7.1
0.7.0

View File

@@ -7,10 +7,4 @@ import os
from .version import __version__
def easy(*args, **kwargs):
from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -52,15 +52,34 @@ def tweet(ctx):
@click.argument('tweetid')
def get_tweet(tweetid, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
utils.download_tweet(wq, tweetid, write, folder, update)
if not write:
t = utils.get_tweet(wq, tweetid)
js = json.dumps(t, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % tweetid)
if not update and os.path.exists(file) and os.path.isfile(file):
print('%s: Tweet exists' % tweetid)
return
try:
t = utils.get_tweet(wq, tweetid)
with open(file, 'w') as f:
js = json.dumps(t, indent=2)
print(js, file=f)
except Exception as ex:
print('%s: %s' % (tweetid, ex), file=sys.stderr)
@tweet.command('get_all')
@click.argument('tweetsfile', 'File with a list of tweets to look up')
@click.option('-f', '--folder', default="tweets")
@click.pass_context
def get_tweets(ctx, tweetsfile, folder):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
utils.download_tweets(wq, tweetsfile, folder)
with open(tweetsfile) as f:
for line in f:
tid = line.strip()
ctx.invoke(get_tweet, folder=folder, tweetid=tid, write=True)
@tweet.command('search')
@click.argument('query')

View File

@@ -1,5 +1,3 @@
from __future__ import print_function
import logging
import time
import json
@@ -11,20 +9,11 @@ import os
import multiprocessing
from multiprocessing.pool import ThreadPool
from tqdm import tqdm
from itertools import islice, chain
from itertools import islice
from contextlib import contextmanager
try:
from itertools import izip_longest
except ImportError:
from itertools import zip_longest
from future.moves.itertools import zip_longest
from collections import Counter
from builtins import map, filter
from twitter import TwitterHTTPError
from bitter.models import Following, User, ExtractorEntry, make_session
@@ -38,14 +27,15 @@ def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!')
sys.exit(0)
def chunk(iterable, n):
it = iter(iterable)
return iter(lambda: tuple(islice(it, n)), ())
def chunk(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize)
p = ThreadPool(numcpus*2)
for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))):
def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()):
if chunksize:
source = chunk(source, chunksize)
p = ThreadPool(numcpus)
for i in p.imap(func, source):
yield i
def get_credentials_path(credfile=None):
@@ -165,12 +155,12 @@ def add_user(session, user, enqueue=False):
user = User(**user)
session.add(user)
if extract:
logger.debug('Adding entry')
logging.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry:
entry = ExtractorEntry(user=user.id)
session.add(entry)
logger.debug(entry.pending)
logging.debug(entry.pending)
entry.pending = True
entry.cursor = -1
session.commit()
@@ -219,10 +209,10 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
add_user(session, i, enqueue=True)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logger.info('Total users: {}'.format(total_users))
logging.info('Total users: {}'.format(total_users))
def pending_entries():
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
logger.info('Pending: {}'.format(pending))
logging.info('Pending: {}'.format(pending))
return pending
while pending_entries() > 0:
@@ -286,7 +276,7 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
entry.pending = pending
entry.cursor = cursor
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
logging.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate)
session.commit()
@@ -312,85 +302,3 @@ def get_user(c, user):
return c.users.lookup(user_id=user)[0]
except ValueError:
return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
cached = cached_tweet(tweetid, folder)
newtweet = None
if update or not cached:
newtweet = get_tweet(wq, tweetid)
js = json.dumps(tweet, indent=2)
if write:
if newtweet:
write_tweet_json(js, folder)
else:
print(js)
def cached_tweet(tweetid, folder):
tweet = None
file = os.path.join(folder, '%s.json' % tweetid)
if os.path.exists(file) and os.path.isfile(file):
try:
# print('%s: Tweet exists' % tweetid)
with open(file) as f:
tweet = json.load(f)
except Exception as ex:
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
return tweet
def write_tweet_json(js, folder):
tweetid = js['id']
file = tweet_file(tweetid, folder)
if not os.path.exists(folder):
os.makedirs(folder)
with open(file, 'w') as f:
json.dump(js, f, indent=2)
logger.info('Written {} to file {}'.format(tweetid, file))
def tweet_file(tweetid, folder):
return os.path.join(folder, '%s.json' % tweetid)
def tweet_fail_file(tweetid, folder):
failsfolder = os.path.join(folder, 'failed')
if not os.path.exists(failsfolder):
os.makedirs(failsfolder)
return os.path.join(failsfolder, '%s.failed' % tweetid)
def tweet_failed(tweetid, folder):
return os.path.isfile(tweet_fail_file(tweetid, folder))
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
def filter_line(line):
tweetid = int(line)
# print('Checking {}'.format(tweetid))
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
yield None
else:
yield line
def print_result(res):
tid, tweet = res
if tweet:
try:
write_tweet_json(tweet, folder=folder)
yield 1
except Exception as ex:
logger.error('%s: %s' % (tid, ex))
if not ignore_fails:
raise
else:
logger.info('Tweet not recovered: {}'.format(tid))
with open(tweet_fail_file(tid, folder), 'w') as f:
print('Tweet not found', file=f)
yield -1
def download_batch(batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
return tweets.items()
with open(tweetsfile) as f:
lines = map(lambda x: x.strip(), f)
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
tweets = parallel(download_batch, lines_to_crawl, 100)
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
pass

View File

@@ -1,4 +0,0 @@
[metadata]
description-file = README.md
[aliases]
test=pytest

View File

@@ -38,7 +38,7 @@ setup(
extras_require = {
'server': ['flask', 'flask-oauthlib']
},
setup_requires=['pytest-runner',],
test_suite="tests",
include_package_data=True,
entry_points="""
[console_scripts]
@@ -48,7 +48,7 @@ setup(
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Science/Research',
'License :: OSI Approved :: Apache Software License',
'License :: OSI Approved :: Apache 2 License',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',

View File

@@ -58,6 +58,4 @@ class TestUtils(TestCase):
assert list(resp) == [1,2,3]
toc = time.time()
assert (tic-toc) < 6000
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert list(resp2) == [1,2,3,4]