mirror of
https://github.com/balkian/bitter.git
synced 2025-10-25 04:38:25 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c940709df8 | ||
|
|
1cb86abbdd | ||
|
|
b212a46ab7 | ||
|
|
0a0d8fd5f1 | ||
|
|
e3a78968da |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
|||||||
|
__pycache__
|
||||||
*.egg-info
|
*.egg-info
|
||||||
dist
|
dist
|
||||||
env
|
env
|
||||||
|
|||||||
7
Dockerfile-2.7
Normal file
7
Dockerfile-2.7
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
# onbuild copies . to /usr/src/app/
|
||||||
|
From python:2.7-onbuild
|
||||||
|
Maintainer J. Fernando Sánchez @balkian
|
||||||
|
|
||||||
|
RUN pip install -e "/usr/src/app/[server]"
|
||||||
|
|
||||||
|
ENTRYPOINT ["bitter"]
|
||||||
7
Dockerfile-3.4
Normal file
7
Dockerfile-3.4
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
# onbuild copies . to /usr/src/app/
|
||||||
|
From python:3.4-onbuild
|
||||||
|
Maintainer J. Fernando Sánchez @balkian
|
||||||
|
|
||||||
|
RUN pip install -e "/usr/src/app/[server]"
|
||||||
|
|
||||||
|
ENTRYPOINT ["bitter"]
|
||||||
7
Dockerfile.template
Normal file
7
Dockerfile.template
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
# onbuild copies . to /usr/src/app/
|
||||||
|
From python:{{PYVERSION}}-onbuild
|
||||||
|
Maintainer J. Fernando Sánchez @balkian
|
||||||
|
|
||||||
|
RUN pip install -e "/usr/src/app/[server]"
|
||||||
|
|
||||||
|
ENTRYPOINT ["bitter"]
|
||||||
@@ -5,4 +5,6 @@ include README.md
|
|||||||
include bitter/VERSION
|
include bitter/VERSION
|
||||||
graft bitter/templates
|
graft bitter/templates
|
||||||
graft bitter/static
|
graft bitter/static
|
||||||
graft test
|
include tests/test*
|
||||||
|
global-exclude *.pyc
|
||||||
|
global-exclude __pycache__
|
||||||
76
Makefile
Normal file
76
Makefile
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
PYVERSIONS=3.4 2.7
|
||||||
|
PYMAIN=$(firstword $(PYVERSIONS))
|
||||||
|
NAME=bitter
|
||||||
|
REPO=balkian
|
||||||
|
VERSION=$(shell cat $(NAME)/VERSION)
|
||||||
|
TARNAME=$(NAME)-$(VERSION).tar.gz
|
||||||
|
IMAGENAME=$(REPO)/$(NAME)
|
||||||
|
IMAGEWTAG=$(IMAGENAME):$(VERSION)
|
||||||
|
|
||||||
|
|
||||||
|
all: build run
|
||||||
|
|
||||||
|
dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS))
|
||||||
|
|
||||||
|
Dockerfile-%: Dockerfile.template
|
||||||
|
sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$*
|
||||||
|
|
||||||
|
|
||||||
|
dev-%:
|
||||||
|
@docker start $(NAME)-dev$* || (\
|
||||||
|
$(MAKE) build-$*; \
|
||||||
|
docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
|
||||||
|
)\
|
||||||
|
|
||||||
|
docker exec -ti $(NAME)-dev$* bash
|
||||||
|
|
||||||
|
dev: dev-$(PYMAIN)
|
||||||
|
|
||||||
|
build: $(addprefix build-, $(PYMAIN))
|
||||||
|
|
||||||
|
buildall: $(addprefix build-, $(PYVERSIONS))
|
||||||
|
|
||||||
|
build-%: Dockerfile-%
|
||||||
|
docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .;
|
||||||
|
|
||||||
|
test: $(addprefix test-,$(PYMAIN))
|
||||||
|
|
||||||
|
testall: $(addprefix test-,$(PYVERSIONS))
|
||||||
|
|
||||||
|
test-%: build-%
|
||||||
|
docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
|
||||||
|
|
||||||
|
pip_test-%:
|
||||||
|
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
|
||||||
|
|
||||||
|
dist/$(NAME)-$(VERSION).tar.gz:
|
||||||
|
docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist;
|
||||||
|
|
||||||
|
sdist: dist/$(NAME)-$(VERSION).tar.gz
|
||||||
|
|
||||||
|
|
||||||
|
upload-%: test-%
|
||||||
|
docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
|
||||||
|
|
||||||
|
upload: testall $(addprefix upload-,$(PYVERSIONS))
|
||||||
|
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)'
|
||||||
|
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)'
|
||||||
|
|
||||||
|
clean:
|
||||||
|
@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true
|
||||||
|
@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true
|
||||||
|
|
||||||
|
upload_git:
|
||||||
|
git commit -a
|
||||||
|
git tag ${VERSION}
|
||||||
|
git push --tags origin master
|
||||||
|
|
||||||
|
pip_upload:
|
||||||
|
python setup.py sdist upload ;
|
||||||
|
|
||||||
|
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
|
||||||
|
|
||||||
|
run: build
|
||||||
|
docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
|
||||||
|
|
||||||
|
.PHONY: test test-% build-% build test test_pip run
|
||||||
@@ -17,8 +17,8 @@ or
|
|||||||
Programmatically:
|
Programmatically:
|
||||||
|
|
||||||
```python
|
```python
|
||||||
from bitter.crawlers import TwitterQueue
|
from bitter import easy
|
||||||
wq = TwitterQueue.from_credentials()
|
wq = easy()
|
||||||
print(wq.users.show(user_name='balkian'))
|
print(wq.users.show(user_name='balkian'))
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
0.7.0
|
0.7.1
|
||||||
|
|||||||
@@ -7,4 +7,10 @@ import os
|
|||||||
|
|
||||||
from .version import __version__
|
from .version import __version__
|
||||||
|
|
||||||
|
def easy(*args, **kwargs):
|
||||||
|
from .crawlers import TwitterQueue
|
||||||
|
return TwitterQueue.from_credentials(*args, **kwargs)
|
||||||
|
|
||||||
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]
|
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -52,34 +52,15 @@ def tweet(ctx):
|
|||||||
@click.argument('tweetid')
|
@click.argument('tweetid')
|
||||||
def get_tweet(tweetid, write, folder, update):
|
def get_tweet(tweetid, write, folder, update):
|
||||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
if not write:
|
utils.download_tweet(wq, tweetid, write, folder, update)
|
||||||
t = utils.get_tweet(wq, tweetid)
|
|
||||||
js = json.dumps(t, indent=2)
|
|
||||||
print(js)
|
|
||||||
return
|
|
||||||
if not os.path.exists(folder):
|
|
||||||
os.makedirs(folder)
|
|
||||||
file = os.path.join(folder, '%s.json' % tweetid)
|
|
||||||
if not update and os.path.exists(file) and os.path.isfile(file):
|
|
||||||
print('%s: Tweet exists' % tweetid)
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
t = utils.get_tweet(wq, tweetid)
|
|
||||||
with open(file, 'w') as f:
|
|
||||||
js = json.dumps(t, indent=2)
|
|
||||||
print(js, file=f)
|
|
||||||
except Exception as ex:
|
|
||||||
print('%s: %s' % (tweetid, ex), file=sys.stderr)
|
|
||||||
|
|
||||||
@tweet.command('get_all')
|
@tweet.command('get_all')
|
||||||
@click.argument('tweetsfile', 'File with a list of tweets to look up')
|
@click.argument('tweetsfile', 'File with a list of tweets to look up')
|
||||||
@click.option('-f', '--folder', default="tweets")
|
@click.option('-f', '--folder', default="tweets")
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
def get_tweets(ctx, tweetsfile, folder):
|
def get_tweets(ctx, tweetsfile, folder):
|
||||||
with open(tweetsfile) as f:
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
for line in f:
|
utils.download_tweets(wq, tweetsfile, folder)
|
||||||
tid = line.strip()
|
|
||||||
ctx.invoke(get_tweet, folder=folder, tweetid=tid, write=True)
|
|
||||||
|
|
||||||
@tweet.command('search')
|
@tweet.command('search')
|
||||||
@click.argument('query')
|
@click.argument('query')
|
||||||
|
|||||||
122
bitter/utils.py
122
bitter/utils.py
@@ -1,3 +1,5 @@
|
|||||||
|
from __future__ import print_function
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
import json
|
import json
|
||||||
@@ -9,11 +11,20 @@ import os
|
|||||||
import multiprocessing
|
import multiprocessing
|
||||||
from multiprocessing.pool import ThreadPool
|
from multiprocessing.pool import ThreadPool
|
||||||
|
|
||||||
from itertools import islice
|
from tqdm import tqdm
|
||||||
|
|
||||||
|
from itertools import islice, chain
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from future.moves.itertools import zip_longest
|
|
||||||
|
try:
|
||||||
|
from itertools import izip_longest
|
||||||
|
except ImportError:
|
||||||
|
from itertools import zip_longest
|
||||||
|
|
||||||
from collections import Counter
|
from collections import Counter
|
||||||
|
|
||||||
|
from builtins import map, filter
|
||||||
|
|
||||||
from twitter import TwitterHTTPError
|
from twitter import TwitterHTTPError
|
||||||
|
|
||||||
from bitter.models import Following, User, ExtractorEntry, make_session
|
from bitter.models import Following, User, ExtractorEntry, make_session
|
||||||
@@ -27,15 +38,14 @@ def signal_handler(signal, frame):
|
|||||||
logger.info('You pressed Ctrl+C!')
|
logger.info('You pressed Ctrl+C!')
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
def chunk(iterable, n, fillvalue=None):
|
def chunk(iterable, n):
|
||||||
args = [iter(iterable)] * n
|
it = iter(iterable)
|
||||||
return zip_longest(*args, fillvalue=fillvalue)
|
return iter(lambda: tuple(islice(it, n)), ())
|
||||||
|
|
||||||
def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()):
|
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
|
||||||
if chunksize:
|
source = chunk(source, chunksize)
|
||||||
source = chunk(source, chunksize)
|
p = ThreadPool(numcpus*2)
|
||||||
p = ThreadPool(numcpus)
|
for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))):
|
||||||
for i in p.imap(func, source):
|
|
||||||
yield i
|
yield i
|
||||||
|
|
||||||
def get_credentials_path(credfile=None):
|
def get_credentials_path(credfile=None):
|
||||||
@@ -155,12 +165,12 @@ def add_user(session, user, enqueue=False):
|
|||||||
user = User(**user)
|
user = User(**user)
|
||||||
session.add(user)
|
session.add(user)
|
||||||
if extract:
|
if extract:
|
||||||
logging.debug('Adding entry')
|
logger.debug('Adding entry')
|
||||||
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
|
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
|
||||||
if not entry:
|
if not entry:
|
||||||
entry = ExtractorEntry(user=user.id)
|
entry = ExtractorEntry(user=user.id)
|
||||||
session.add(entry)
|
session.add(entry)
|
||||||
logging.debug(entry.pending)
|
logger.debug(entry.pending)
|
||||||
entry.pending = True
|
entry.pending = True
|
||||||
entry.cursor = -1
|
entry.cursor = -1
|
||||||
session.commit()
|
session.commit()
|
||||||
@@ -209,10 +219,10 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
|
|||||||
add_user(session, i, enqueue=True)
|
add_user(session, i, enqueue=True)
|
||||||
|
|
||||||
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
|
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
|
||||||
logging.info('Total users: {}'.format(total_users))
|
logger.info('Total users: {}'.format(total_users))
|
||||||
def pending_entries():
|
def pending_entries():
|
||||||
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
|
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
|
||||||
logging.info('Pending: {}'.format(pending))
|
logger.info('Pending: {}'.format(pending))
|
||||||
return pending
|
return pending
|
||||||
|
|
||||||
while pending_entries() > 0:
|
while pending_entries() > 0:
|
||||||
@@ -276,7 +286,7 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
|
|||||||
|
|
||||||
entry.pending = pending
|
entry.pending = pending
|
||||||
entry.cursor = cursor
|
entry.cursor = cursor
|
||||||
logging.debug('Entry: {} - {}'.format(entry.user, entry.pending))
|
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
|
||||||
|
|
||||||
session.add(candidate)
|
session.add(candidate)
|
||||||
session.commit()
|
session.commit()
|
||||||
@@ -302,3 +312,85 @@ def get_user(c, user):
|
|||||||
return c.users.lookup(user_id=user)[0]
|
return c.users.lookup(user_id=user)[0]
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return c.users.lookup(screen_name=user)[0]
|
return c.users.lookup(screen_name=user)[0]
|
||||||
|
|
||||||
|
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
|
||||||
|
cached = cached_tweet(tweetid, folder)
|
||||||
|
tweet = None
|
||||||
|
if update or not cached:
|
||||||
|
tweet = get_tweet(wq, tweetid)
|
||||||
|
js = json.dumps(tweet, indent=2)
|
||||||
|
if write:
|
||||||
|
if tweet:
|
||||||
|
write_tweet_json(js, folder)
|
||||||
|
else:
|
||||||
|
print(js)
|
||||||
|
|
||||||
|
|
||||||
|
def cached_tweet(tweetid, folder):
|
||||||
|
tweet = None
|
||||||
|
file = os.path.join(folder, '%s.json' % tweetid)
|
||||||
|
if os.path.exists(file) and os.path.isfile(file):
|
||||||
|
try:
|
||||||
|
# print('%s: Tweet exists' % tweetid)
|
||||||
|
with open(file) as f:
|
||||||
|
tweet = json.load(f)
|
||||||
|
except Exception as ex:
|
||||||
|
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
|
||||||
|
return tweet
|
||||||
|
|
||||||
|
def write_tweet_json(js, folder):
|
||||||
|
tweetid = js['id']
|
||||||
|
file = tweet_file(tweetid, folder)
|
||||||
|
if not os.path.exists(folder):
|
||||||
|
os.makedirs(folder)
|
||||||
|
with open(file, 'w') as f:
|
||||||
|
json.dump(js, f, indent=2)
|
||||||
|
logger.info('Written {} to file {}'.format(tweetid, file))
|
||||||
|
|
||||||
|
def tweet_file(tweetid, folder):
|
||||||
|
return os.path.join(folder, '%s.json' % tweetid)
|
||||||
|
|
||||||
|
def tweet_fail_file(tweetid, folder):
|
||||||
|
failsfolder = os.path.join(folder, 'failed')
|
||||||
|
if not os.path.exists(failsfolder):
|
||||||
|
os.makedirs(failsfolder)
|
||||||
|
return os.path.join(failsfolder, '%s.failed' % tweetid)
|
||||||
|
|
||||||
|
def tweet_failed(tweetid, folder):
|
||||||
|
return os.path.isfile(tweet_fail_file(tweetid, folder))
|
||||||
|
|
||||||
|
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
|
||||||
|
def filter_line(line):
|
||||||
|
tweetid = int(line)
|
||||||
|
# print('Checking {}'.format(tweetid))
|
||||||
|
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
|
||||||
|
yield None
|
||||||
|
else:
|
||||||
|
yield line
|
||||||
|
|
||||||
|
def print_result(res):
|
||||||
|
tid, tweet = res
|
||||||
|
if tweet:
|
||||||
|
try:
|
||||||
|
write_tweet_json(tweet, folder=folder)
|
||||||
|
yield 1
|
||||||
|
except Exception as ex:
|
||||||
|
logger.error('%s: %s' % (tid, ex))
|
||||||
|
if not ignore_fails:
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
logger.info('Tweet not recovered: {}'.format(tid))
|
||||||
|
with open(tweet_fail_file(tid, folder), 'w') as f:
|
||||||
|
print('Tweet not found', file=f)
|
||||||
|
yield -1
|
||||||
|
|
||||||
|
def download_batch(batch):
|
||||||
|
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
|
||||||
|
return tweets.items()
|
||||||
|
|
||||||
|
with open(tweetsfile) as f:
|
||||||
|
lines = map(lambda x: x.strip(), f)
|
||||||
|
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
|
||||||
|
tweets = parallel(download_batch, lines_to_crawl, 100)
|
||||||
|
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
|
||||||
|
pass
|
||||||
|
|||||||
4
setup.cfg
Normal file
4
setup.cfg
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
[metadata]
|
||||||
|
description-file = README.md
|
||||||
|
[aliases]
|
||||||
|
test=pytest
|
||||||
4
setup.py
4
setup.py
@@ -38,7 +38,7 @@ setup(
|
|||||||
extras_require = {
|
extras_require = {
|
||||||
'server': ['flask', 'flask-oauthlib']
|
'server': ['flask', 'flask-oauthlib']
|
||||||
},
|
},
|
||||||
test_suite="tests",
|
setup_requires=['pytest-runner',],
|
||||||
include_package_data=True,
|
include_package_data=True,
|
||||||
entry_points="""
|
entry_points="""
|
||||||
[console_scripts]
|
[console_scripts]
|
||||||
@@ -48,7 +48,7 @@ setup(
|
|||||||
'Development Status :: 4 - Beta',
|
'Development Status :: 4 - Beta',
|
||||||
'Intended Audience :: Developers',
|
'Intended Audience :: Developers',
|
||||||
'Intended Audience :: Science/Research',
|
'Intended Audience :: Science/Research',
|
||||||
'License :: OSI Approved :: Apache 2 License',
|
'License :: OSI Approved :: Apache Software License',
|
||||||
'Programming Language :: Python :: 2',
|
'Programming Language :: Python :: 2',
|
||||||
'Programming Language :: Python :: 2.7',
|
'Programming Language :: Python :: 2.7',
|
||||||
'Programming Language :: Python :: 3',
|
'Programming Language :: Python :: 3',
|
||||||
|
|||||||
@@ -5,14 +5,14 @@ import types
|
|||||||
import datetime
|
import datetime
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from bitter import utils
|
from bitter import utils, easy
|
||||||
from bitter.crawlers import TwitterQueue, TwitterWorker, QueueException
|
from bitter.crawlers import QueueException
|
||||||
from bitter import config as c
|
from bitter import config as c
|
||||||
|
|
||||||
class TestUtils(TestCase):
|
class TestCrawlers(TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json'))
|
self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json'))
|
||||||
|
|
||||||
def test_create_worker(self):
|
def test_create_worker(self):
|
||||||
assert len(self.wq.queue)==1
|
assert len(self.wq.queue)==1
|
||||||
|
|||||||
@@ -58,4 +58,6 @@ class TestUtils(TestCase):
|
|||||||
assert list(resp) == [1,2,3]
|
assert list(resp) == [1,2,3]
|
||||||
toc = time.time()
|
toc = time.time()
|
||||||
assert (tic-toc) < 6000
|
assert (tic-toc) < 6000
|
||||||
|
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
|
||||||
|
assert list(resp2) == [1,2,3,4]
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user