mirror of
https://github.com/balkian/bitter.git
synced 2025-10-25 20:58:24 +00:00
Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b212a46ab7 | ||
|
|
0a0d8fd5f1 | ||
|
|
e3a78968da | ||
|
|
67ef307cce | ||
|
|
cb054ac365 | ||
|
|
bdc4690240 | ||
|
|
c0309a1e52 | ||
|
|
4afdd6807d | ||
|
|
38605ba2c8 | ||
|
|
738823c8a2 | ||
|
|
3f42879751 | ||
|
|
35f0c6376d | ||
|
|
2036d51d96 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
|||||||
|
__pycache__
|
||||||
*.egg-info
|
*.egg-info
|
||||||
dist
|
dist
|
||||||
env
|
env
|
||||||
|
|||||||
7
Dockerfile-2.7
Normal file
7
Dockerfile-2.7
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
# onbuild copies . to /usr/src/app/
|
||||||
|
From python:2.7-onbuild
|
||||||
|
Maintainer J. Fernando Sánchez @balkian
|
||||||
|
|
||||||
|
RUN pip install -e "/usr/src/app/[server]"
|
||||||
|
|
||||||
|
ENTRYPOINT ["bitter"]
|
||||||
7
Dockerfile-3.4
Normal file
7
Dockerfile-3.4
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
# onbuild copies . to /usr/src/app/
|
||||||
|
From python:3.4-onbuild
|
||||||
|
Maintainer J. Fernando Sánchez @balkian
|
||||||
|
|
||||||
|
RUN pip install -e "/usr/src/app/[server]"
|
||||||
|
|
||||||
|
ENTRYPOINT ["bitter"]
|
||||||
7
Dockerfile.template
Normal file
7
Dockerfile.template
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
# onbuild copies . to /usr/src/app/
|
||||||
|
From python:{{PYVERSION}}-onbuild
|
||||||
|
Maintainer J. Fernando Sánchez @balkian
|
||||||
|
|
||||||
|
RUN pip install -e "/usr/src/app/[server]"
|
||||||
|
|
||||||
|
ENTRYPOINT ["bitter"]
|
||||||
@@ -1,7 +1,10 @@
|
|||||||
include requirements.txt
|
include requirements.txt
|
||||||
|
include requirements-py2.txt
|
||||||
include test-requirements.txt
|
include test-requirements.txt
|
||||||
include README.md
|
include README.md
|
||||||
include bitter/VERSION
|
include bitter/VERSION
|
||||||
graft bitter/templates
|
graft bitter/templates
|
||||||
graft bitter/static
|
graft bitter/static
|
||||||
graft test
|
include tests/test*
|
||||||
|
global-exclude *.pyc
|
||||||
|
global-exclude __pycache__
|
||||||
62
Makefile
Normal file
62
Makefile
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
PYVERSIONS=3.4 2.7
|
||||||
|
PYMAIN=$(firstword $(PYVERSIONS))
|
||||||
|
NAME=bitter
|
||||||
|
REPO=balkian
|
||||||
|
VERSION=$(shell cat $(NAME)/VERSION)
|
||||||
|
|
||||||
|
|
||||||
|
all: build run
|
||||||
|
|
||||||
|
dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS))
|
||||||
|
|
||||||
|
Dockerfile-%: Dockerfile.template
|
||||||
|
sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$*
|
||||||
|
|
||||||
|
build: $(addprefix build-, $(PYMAIN))
|
||||||
|
|
||||||
|
buildall: $(addprefix build-, $(PYVERSIONS))
|
||||||
|
|
||||||
|
build-%: Dockerfile-%
|
||||||
|
docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .;
|
||||||
|
|
||||||
|
test: $(addprefix test-,$(PYMAIN))
|
||||||
|
|
||||||
|
testall: $(addprefix test-,$(PYVERSIONS))
|
||||||
|
|
||||||
|
test-%: build-%
|
||||||
|
docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
|
||||||
|
|
||||||
|
pip_test-%:
|
||||||
|
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
|
||||||
|
|
||||||
|
dist/$(NAME)-$(VERSION).tar.gz:
|
||||||
|
docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist;
|
||||||
|
|
||||||
|
sdist: dist/$(NAME)-$(VERSION).tar.gz
|
||||||
|
|
||||||
|
|
||||||
|
upload-%: test-%
|
||||||
|
docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
|
||||||
|
|
||||||
|
upload: testall $(addprefix upload-,$(PYVERSIONS))
|
||||||
|
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)'
|
||||||
|
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)'
|
||||||
|
|
||||||
|
clean:
|
||||||
|
@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true
|
||||||
|
@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true
|
||||||
|
|
||||||
|
upload_git:
|
||||||
|
git commit -a
|
||||||
|
git tag ${VERSION}
|
||||||
|
git push --tags origin master
|
||||||
|
|
||||||
|
pip_upload:
|
||||||
|
python setup.py sdist upload ;
|
||||||
|
|
||||||
|
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
|
||||||
|
|
||||||
|
run: build
|
||||||
|
docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
|
||||||
|
|
||||||
|
.PHONY: test test-% build-% build test test_pip run
|
||||||
@@ -1 +1 @@
|
|||||||
0.5.4
|
0.7.1
|
||||||
|
|||||||
@@ -3,15 +3,14 @@ Bitter module. A library and cli for Twitter using python-twitter.
|
|||||||
http://github.com/balkian/bitter
|
http://github.com/balkian/bitter
|
||||||
"""
|
"""
|
||||||
|
|
||||||
try:
|
|
||||||
from future.standard_library import install_aliases
|
|
||||||
install_aliases()
|
|
||||||
except ImportError:
|
|
||||||
# Avoid problems at setup.py and py3.x
|
|
||||||
pass
|
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from .version import __version__
|
from .version import __version__
|
||||||
|
|
||||||
|
def easy(*args, **kwargs):
|
||||||
|
from .crawlers import TwitterQueue
|
||||||
|
return TwitterQueue.from_credentials(*args, **kwargs)
|
||||||
|
|
||||||
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]
|
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
162
bitter/cli.py
162
bitter/cli.py
@@ -1,3 +1,5 @@
|
|||||||
|
from __future__ import print_function
|
||||||
|
|
||||||
import click
|
import click
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
@@ -6,10 +8,12 @@ import time
|
|||||||
import sqlalchemy.types
|
import sqlalchemy.types
|
||||||
import threading
|
import threading
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
from tqdm import tqdm
|
||||||
|
|
||||||
from sqlalchemy import exists
|
from sqlalchemy import exists
|
||||||
|
|
||||||
from bitter import utils, models, crawlers
|
from bitter import utils, models, crawlers
|
||||||
|
from bitter import config as bconf
|
||||||
from bitter.models import make_session, User, ExtractorEntry, Following
|
from bitter.models import make_session, User, ExtractorEntry, Following
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
@@ -33,7 +37,7 @@ def main(ctx, verbose, logging_level, config, credentials):
|
|||||||
ctx.obj = {}
|
ctx.obj = {}
|
||||||
ctx.obj['VERBOSE'] = verbose
|
ctx.obj['VERBOSE'] = verbose
|
||||||
ctx.obj['CONFIG'] = config
|
ctx.obj['CONFIG'] = config
|
||||||
ctx.obj['CREDENTIALS'] = credentials
|
bconf.CREDENTIALS = credentials
|
||||||
utils.create_credentials(credentials)
|
utils.create_credentials(credentials)
|
||||||
|
|
||||||
@main.group()
|
@main.group()
|
||||||
@@ -42,30 +46,36 @@ def tweet(ctx):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
@tweet.command('get')
|
@tweet.command('get')
|
||||||
|
@click.option('-w', '--write', is_flag=True, default=False)
|
||||||
|
@click.option('-f', '--folder', default="tweets")
|
||||||
|
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
|
||||||
@click.argument('tweetid')
|
@click.argument('tweetid')
|
||||||
@click.pass_context
|
def get_tweet(tweetid, write, folder, update):
|
||||||
def get_tweet(ctx, tweetid):
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
utils.download_tweet(wq, tweetid, write, folder, update)
|
||||||
t = utils.get_tweet(wq, tweetid)
|
|
||||||
print(json.dumps(t, indent=2))
|
|
||||||
|
|
||||||
|
@tweet.command('get_all')
|
||||||
|
@click.argument('tweetsfile', 'File with a list of tweets to look up')
|
||||||
|
@click.option('-f', '--folder', default="tweets")
|
||||||
|
@click.pass_context
|
||||||
|
def get_tweets(ctx, tweetsfile, folder):
|
||||||
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
|
utils.download_tweets(wq, tweetsfile, folder)
|
||||||
|
|
||||||
@tweet.command('search')
|
@tweet.command('search')
|
||||||
@click.argument('query')
|
@click.argument('query')
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
def get_tweet(ctx, query):
|
def search(ctx, query):
|
||||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
c = wq.next()
|
t = utils.search_tweet(wq, query)
|
||||||
t = utils.search_tweet(c.client, query)
|
|
||||||
print(json.dumps(t, indent=2))
|
print(json.dumps(t, indent=2))
|
||||||
|
|
||||||
@tweet.command('timeline')
|
@tweet.command('timeline')
|
||||||
@click.argument('user')
|
@click.argument('user')
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
def get_tweet(ctx, user):
|
def timeline(ctx, user):
|
||||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
c = wq.next()
|
t = utils.user_timeline(wq, user)
|
||||||
t = utils.user_timeline(c.client, user)
|
|
||||||
print(json.dumps(t, indent=2))
|
print(json.dumps(t, indent=2))
|
||||||
|
|
||||||
@main.group()
|
@main.group()
|
||||||
@@ -84,23 +94,47 @@ def list_users(ctx, db):
|
|||||||
for j in i.__dict__:
|
for j in i.__dict__:
|
||||||
print('\t{}: {}'.format(j, getattr(i,j)))
|
print('\t{}: {}'.format(j, getattr(i,j)))
|
||||||
|
|
||||||
@users.command('get_one')
|
|
||||||
@click.argument('user')
|
|
||||||
@click.pass_context
|
|
||||||
def get_user(ctx, user):
|
|
||||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
|
||||||
c = wq.next()
|
|
||||||
u = utils.get_user(c.client, user)
|
|
||||||
print(json.dumps(u, indent=2))
|
|
||||||
|
|
||||||
@users.command('get')
|
@users.command('get')
|
||||||
|
@click.argument('user')
|
||||||
|
@click.option('-w', '--write', is_flag=True, default=False)
|
||||||
|
@click.option('-f', '--folder', default="users")
|
||||||
|
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
|
||||||
|
def get_user(user, write, folder, update):
|
||||||
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
|
if not write:
|
||||||
|
u = utils.get_user(wq, user)
|
||||||
|
js = json.dumps(u, indent=2)
|
||||||
|
print(js)
|
||||||
|
return
|
||||||
|
if not os.path.exists(folder):
|
||||||
|
os.makedirs(folder)
|
||||||
|
file = os.path.join(folder, '%s.json' % user)
|
||||||
|
if not update and os.path.exists(file) and os.path.isfile(file):
|
||||||
|
print('User exists: %s' % user)
|
||||||
|
return
|
||||||
|
with open(file, 'w') as f:
|
||||||
|
u = utils.get_user(wq, user)
|
||||||
|
js = json.dumps(u, indent=2)
|
||||||
|
print(js, file=f)
|
||||||
|
|
||||||
|
@users.command('get_all')
|
||||||
|
@click.argument('usersfile', 'File with a list of users to look up')
|
||||||
|
@click.option('-f', '--folder', default="users")
|
||||||
|
@click.pass_context
|
||||||
|
def get_users(ctx, usersfile, folder):
|
||||||
|
with open(usersfile) as f:
|
||||||
|
for line in f:
|
||||||
|
uid = line.strip()
|
||||||
|
ctx.invoke(get_user, folder=folder, user=uid, write=True)
|
||||||
|
|
||||||
|
@users.command('crawl')
|
||||||
@click.option('--db', required=True, help='Database to save all users.')
|
@click.option('--db', required=True, help='Database to save all users.')
|
||||||
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
|
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
|
||||||
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
|
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
|
||||||
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
|
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
|
||||||
@click.argument('usersfile', 'File with a list of users to look up')
|
@click.argument('usersfile', 'File with a list of users to look up')
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
def get_users(ctx, usersfile, skip, until, threads, db):
|
def crawl_users(ctx, usersfile, skip, until, threads, db):
|
||||||
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
|
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
|
||||||
|
|
||||||
if '://' not in db:
|
if '://' not in db:
|
||||||
@@ -112,7 +146,7 @@ def get_users(ctx, usersfile, skip, until, threads, db):
|
|||||||
return ExitStack()
|
return ExitStack()
|
||||||
|
|
||||||
|
|
||||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
|
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
|
||||||
len(wq.queue)))
|
len(wq.queue)))
|
||||||
|
|
||||||
@@ -281,7 +315,7 @@ def users_extractor(ctx):
|
|||||||
@click.pass_context
|
@click.pass_context
|
||||||
def extract(ctx, recursive, user, name, initfile):
|
def extract(ctx, recursive, user, name, initfile):
|
||||||
print(locals())
|
print(locals())
|
||||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
dburi = ctx.obj['DBURI']
|
dburi = ctx.obj['DBURI']
|
||||||
utils.extract(wq,
|
utils.extract(wq,
|
||||||
recursive=recursive,
|
recursive=recursive,
|
||||||
@@ -293,7 +327,7 @@ def extract(ctx, recursive, user, name, initfile):
|
|||||||
@extractor.command('reset')
|
@extractor.command('reset')
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
def reset_extractor(ctx):
|
def reset_extractor(ctx):
|
||||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
db = ctx.obj['DBURI']
|
db = ctx.obj['DBURI']
|
||||||
session = make_session(db)
|
session = make_session(db)
|
||||||
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
|
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
|
||||||
@@ -302,7 +336,7 @@ def reset_extractor(ctx):
|
|||||||
@click.argument('url', required=False)
|
@click.argument('url', required=False)
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
def get_limits(ctx, url):
|
def get_limits(ctx, url):
|
||||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||||
for worker in wq.queue:
|
for worker in wq.queue:
|
||||||
resp = worker.client.application.rate_limit_status()
|
resp = worker.client.application.rate_limit_status()
|
||||||
print('#'*20)
|
print('#'*20)
|
||||||
@@ -324,11 +358,75 @@ def get_limits(ctx, url):
|
|||||||
@click.argument('CONSUMER_SECRET', required=True)
|
@click.argument('CONSUMER_SECRET', required=True)
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
def run_server(ctx, consumer_key, consumer_secret):
|
def run_server(ctx, consumer_key, consumer_secret):
|
||||||
from . import config
|
bconf.CONSUMER_KEY = consumer_key
|
||||||
config.CONSUMER_KEY = consumer_key
|
bconf.CONSUMER_SECRET = consumer_secret
|
||||||
config.CONSUMER_SECRET = consumer_secret
|
|
||||||
from .webserver import app
|
from .webserver import app
|
||||||
app.run()
|
app.run(host='0.0.0.0')
|
||||||
|
|
||||||
|
@main.group()
|
||||||
|
@click.pass_context
|
||||||
|
def stream(ctx):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@stream.command('get')
|
||||||
|
@click.option('-l', '--locations', default=None)
|
||||||
|
@click.option('-t', '--track', default=None)
|
||||||
|
@click.option('-f', '--file', help='File to store the stream of tweets')
|
||||||
|
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
|
||||||
|
@click.pass_context
|
||||||
|
def get_stream(ctx, locations, track, file, politelyretry):
|
||||||
|
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
|
||||||
|
|
||||||
|
query_args = {}
|
||||||
|
if locations:
|
||||||
|
query_args['locations'] = locations
|
||||||
|
if track:
|
||||||
|
query_args['track'] = track
|
||||||
|
if not file:
|
||||||
|
file = sys.stdout
|
||||||
|
else:
|
||||||
|
file = open(file, 'a')
|
||||||
|
|
||||||
|
def insist():
|
||||||
|
lasthangup = time.time()
|
||||||
|
while True:
|
||||||
|
if not query_args:
|
||||||
|
iterator = wq.statuses.sample()
|
||||||
|
else:
|
||||||
|
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
|
||||||
|
for i in iterator:
|
||||||
|
yield i
|
||||||
|
if not politelyretry:
|
||||||
|
return
|
||||||
|
thishangup = time.time()
|
||||||
|
if thishangup - lasthangup < 60:
|
||||||
|
raise Exception('Too many hangups in a row.')
|
||||||
|
time.sleep(3)
|
||||||
|
|
||||||
|
for tweet in tqdm(insist()):
|
||||||
|
print(json.dumps(tweet), file=file)
|
||||||
|
if file != sys.stdout:
|
||||||
|
file.close()
|
||||||
|
|
||||||
|
@stream.command('read')
|
||||||
|
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
|
||||||
|
@click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False)
|
||||||
|
@click.pass_context
|
||||||
|
def read_stream(ctx, file, tail):
|
||||||
|
for tweet in utils.read_file(file, tail=tail):
|
||||||
|
try:
|
||||||
|
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
|
||||||
|
except (KeyError, TypeError):
|
||||||
|
print('Raw tweet: {}'.format(tweet))
|
||||||
|
|
||||||
|
@stream.command('tags')
|
||||||
|
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
|
||||||
|
@click.argument('limit', required=False, default=None, type=int)
|
||||||
|
@click.pass_context
|
||||||
|
def tags_stream(ctx, file, limit):
|
||||||
|
c = utils.get_hashtags(utils.read_file(file))
|
||||||
|
for count, tag in c.most_common(limit):
|
||||||
|
print(u'{} - {}'.format(count, tag))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|||||||
@@ -9,6 +9,12 @@ logger = logging.getLogger(__name__)
|
|||||||
from twitter import *
|
from twitter import *
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
|
from itertools import islice
|
||||||
|
try:
|
||||||
|
import itertools.ifilter as filter
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
from . import utils
|
from . import utils
|
||||||
from . import config
|
from . import config
|
||||||
|
|
||||||
@@ -37,30 +43,96 @@ class AttrToFunc(object):
|
|||||||
# kwargs[i] = a
|
# kwargs[i] = a
|
||||||
return self.handler(self.__uriparts, *args, **kwargs)
|
return self.handler(self.__uriparts, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class FromCredentialsMixin(object):
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_credentials(cls, cred_file=None, max_workers=None):
|
||||||
|
wq = cls()
|
||||||
|
|
||||||
|
for cred in islice(utils.get_credentials(cred_file), max_workers):
|
||||||
|
wq.ready(cls.worker_class(cred["user"], cred))
|
||||||
|
return wq
|
||||||
|
|
||||||
|
|
||||||
class TwitterWorker(object):
|
class TwitterWorker(object):
|
||||||
def __init__(self, name, client):
|
api_class = None
|
||||||
|
|
||||||
|
def __init__(self, name, creds):
|
||||||
self.name = name
|
self.name = name
|
||||||
self.client = client
|
self._client = None
|
||||||
self.throttled_time = False
|
self.cred = creds
|
||||||
self._lock = Lock()
|
self._lock = Lock()
|
||||||
self.busy = False
|
self.busy = False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def throttled(self):
|
def client(self):
|
||||||
if not self.throttled_time:
|
if not self._client:
|
||||||
return False
|
auth=OAuth(self.cred['token_key'],
|
||||||
t = time.time()
|
self.cred['token_secret'],
|
||||||
delta = self.throttled_time - t
|
self.cred['consumer_key'],
|
||||||
if delta > 0:
|
self.cred['consumer_secret'])
|
||||||
return True
|
self._client = self.api_class(auth=auth)
|
||||||
return False
|
return self._client
|
||||||
|
|
||||||
def throttle_until(self, epoch=None):
|
class RestWorker(TwitterWorker):
|
||||||
self.throttled_time = int(epoch)
|
api_class = Twitter
|
||||||
logger.info("Worker %s throttled for %s seconds" % (self.name, str(epoch-time.time())))
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(RestWorker, self).__init__(*args, **kwargs)
|
||||||
|
self._limits = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def limits(self):
|
||||||
|
if not self._limits:
|
||||||
|
self._limits = self.client.application.rate_limit_status()
|
||||||
|
return self._limits
|
||||||
|
|
||||||
|
def is_limited(self, uriparts):
|
||||||
|
return self.get_wait(uriparts)>0
|
||||||
|
|
||||||
|
def get_wait(self, uriparts):
|
||||||
|
limits = self.get_limit(uriparts)
|
||||||
|
if limits['remaining'] > 0:
|
||||||
|
return 0
|
||||||
|
reset = limits.get('reset', 0)
|
||||||
|
now = time.time()
|
||||||
|
return max(0, (reset-now))
|
||||||
|
|
||||||
|
def get_limit(self, uriparts):
|
||||||
|
uri = '/'+'/'.join(uriparts)
|
||||||
|
for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items():
|
||||||
|
if ix.startswith(uri):
|
||||||
|
return i
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def set_limit(self, uriparts, value):
|
||||||
|
uri = '/'+'/'.join(uriparts)
|
||||||
|
if 'resources' not in self.limits:
|
||||||
|
self.limits['resources'] = {}
|
||||||
|
resources = self.limits['resources']
|
||||||
|
if uriparts[0] not in resources:
|
||||||
|
resources[uriparts[0]] = {}
|
||||||
|
resource = resources[uriparts[0]]
|
||||||
|
resource[uri] = value
|
||||||
|
|
||||||
|
def update_limits(self, uriparts, remaining, reset, limit):
|
||||||
|
self.set_limit(uriparts, {'remaining': remaining,
|
||||||
|
'reset': reset,
|
||||||
|
'limit': limit})
|
||||||
|
|
||||||
|
def update_limits_from_headers(self, uriparts, headers):
|
||||||
|
reset = float(headers.get('X-Rate-Limit-Reset', time.time() + 30))
|
||||||
|
remaining = int(headers.get('X-Rate-Limit-Remaining', 0))
|
||||||
|
limit = int(headers.get('X-Rate-Limit-Limit', -1))
|
||||||
|
self.update_limits(uriparts=uriparts, remaining=remaining, reset=reset, limit=limit)
|
||||||
|
|
||||||
|
|
||||||
class TwitterQueue(AttrToFunc):
|
|
||||||
|
class QueueException(BaseException):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class QueueMixin(AttrToFunc, FromCredentialsMixin):
|
||||||
def __init__(self, wait=True):
|
def __init__(self, wait=True):
|
||||||
logger.debug('Creating worker queue')
|
logger.debug('Creating worker queue')
|
||||||
self.queue = set()
|
self.queue = set()
|
||||||
@@ -71,77 +143,119 @@ class TwitterQueue(AttrToFunc):
|
|||||||
def ready(self, worker):
|
def ready(self, worker):
|
||||||
self.queue.add(worker)
|
self.queue.add(worker)
|
||||||
|
|
||||||
|
class TwitterQueue(QueueMixin):
|
||||||
|
|
||||||
|
worker_class = RestWorker
|
||||||
|
|
||||||
def handle_call(self, uriparts, *args, **kwargs):
|
def handle_call(self, uriparts, *args, **kwargs):
|
||||||
logger.debug('Called: {}'.format(uriparts))
|
logger.debug('Called: {}'.format(uriparts))
|
||||||
logger.debug('With: {} {}'.format(args, kwargs))
|
logger.debug('With: {} {}'.format(args, kwargs))
|
||||||
while True:
|
patience = 1
|
||||||
|
while patience:
|
||||||
c = None
|
c = None
|
||||||
try:
|
try:
|
||||||
c = self.next()
|
c = self.next(uriparts)
|
||||||
c._lock.acquire()
|
c._lock.acquire()
|
||||||
c.busy = True
|
c.busy = True
|
||||||
logger.debug('Next: {}'.format(c.name))
|
logger.debug('Next: {}'.format(c.name))
|
||||||
ping = time.time()
|
ping = time.time()
|
||||||
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
|
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
|
||||||
pong = time.time()
|
pong = time.time()
|
||||||
|
c.update_limits_from_headers(uriparts, resp.headers)
|
||||||
logger.debug('Took: {}'.format(pong-ping))
|
logger.debug('Took: {}'.format(pong-ping))
|
||||||
return resp
|
return resp
|
||||||
except TwitterHTTPError as ex:
|
except TwitterHTTPError as ex:
|
||||||
if ex.e.code in (429, 502, 503, 504):
|
if ex.e.code in (429, 502, 503, 504):
|
||||||
limit = ex.e.headers.get('X-Rate-Limit-Reset', time.time() + 30)
|
|
||||||
logger.info('{} limited'.format(c.name))
|
logger.info('{} limited'.format(c.name))
|
||||||
c.throttle_until(limit)
|
c.update_limits_from_headers(uriparts, ex.e.headers)
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
except urllib.error.URLError as ex:
|
except urllib.error.URLError as ex:
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
logger.info('Something fishy happened: {}'.format(ex))
|
logger.info('Something fishy happened: {}'.format(ex))
|
||||||
|
raise
|
||||||
finally:
|
finally:
|
||||||
if c:
|
if c:
|
||||||
c.busy = False
|
c.busy = False
|
||||||
c._lock.release()
|
c._lock.release()
|
||||||
|
if not self.wait:
|
||||||
|
patience -= 1
|
||||||
|
|
||||||
|
def get_wait(self, uriparts):
|
||||||
|
# Stop as soon as one is available to avoid initiating the rest
|
||||||
|
for i in self.queue:
|
||||||
|
if not i.busy and i.get_wait(uriparts) == 0:
|
||||||
|
return 0
|
||||||
|
# If None is available, let's see how much we have to wait
|
||||||
|
available = filter(lambda x: not x.busy, self.queue)
|
||||||
|
diff = min(worker.get_wait(uriparts) for worker in self.queue if not worker.busy)
|
||||||
|
return diff
|
||||||
|
|
||||||
|
|
||||||
@property
|
def _next(self, uriparts):
|
||||||
def client(self):
|
|
||||||
return self.next().client
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_credentials(self, cred_file=None):
|
|
||||||
wq = TwitterQueue()
|
|
||||||
|
|
||||||
for cred in utils.get_credentials(cred_file):
|
|
||||||
c = Twitter(auth=OAuth(cred['token_key'],
|
|
||||||
cred['token_secret'],
|
|
||||||
cred['consumer_key'],
|
|
||||||
cred['consumer_secret']))
|
|
||||||
wq.ready(TwitterWorker(cred["user"], c))
|
|
||||||
return wq
|
|
||||||
|
|
||||||
def _next(self):
|
|
||||||
logger.debug('Getting next available')
|
logger.debug('Getting next available')
|
||||||
s = list(self.queue)
|
s = list(self.queue)
|
||||||
random.shuffle(s)
|
random.shuffle(s)
|
||||||
for worker in s:
|
for worker in s:
|
||||||
if not worker.throttled and not worker.busy:
|
if not worker.is_limited(uriparts) and not worker.busy:
|
||||||
return worker
|
return worker
|
||||||
raise Exception('No worker is available')
|
raise QueueException('No worker is available')
|
||||||
|
|
||||||
def next(self):
|
def next(self, uriparts):
|
||||||
if not self.wait:
|
if not self.wait:
|
||||||
return self._next()
|
return self._next(uriparts)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
return self._next()
|
return self._next(uriparts)
|
||||||
except Exception:
|
except QueueException:
|
||||||
available = filter(lambda x: not x.busy, self.queue)
|
available = filter(lambda x: not x.busy, self.queue)
|
||||||
if available:
|
if available:
|
||||||
first_worker = min(available, key=lambda x: x.throttled_time)
|
diff = self.get_wait(uriparts)
|
||||||
diff = first_worker.throttled_time - time.time()
|
|
||||||
logger.info("All workers are throttled. Waiting %s seconds" % diff)
|
logger.info("All workers are throttled. Waiting %s seconds" % diff)
|
||||||
else:
|
else:
|
||||||
diff = 5
|
diff = 5
|
||||||
logger.info("All workers are busy. Waiting %s seconds" % diff)
|
logger.info("All workers are busy. Waiting %s seconds" % diff)
|
||||||
time.sleep(diff)
|
time.sleep(diff)
|
||||||
|
|
||||||
|
class StreamWorker(TwitterWorker):
|
||||||
|
api_class = TwitterStream
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(StreamWorker, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
class StreamQueue(QueueMixin):
|
||||||
|
worker_class = StreamWorker
|
||||||
|
|
||||||
|
def __init__(self, wait=True):
|
||||||
|
logger.debug('Creating worker queue')
|
||||||
|
self.queue = set()
|
||||||
|
self.index = 0
|
||||||
|
self.wait = wait
|
||||||
|
AttrToFunc.__init__(self, handler=self.handle_call)
|
||||||
|
|
||||||
|
def handle_call(self, uriparts, *args, **kwargs):
|
||||||
|
logger.debug('Called: {}'.format(uriparts))
|
||||||
|
logger.debug('With: {} {}'.format(args, kwargs))
|
||||||
|
c = None
|
||||||
|
c = self.next(uriparts)
|
||||||
|
c._lock.acquire()
|
||||||
|
c.busy = True
|
||||||
|
logger.debug('Next: {}'.format(c.name))
|
||||||
|
ping = time.time()
|
||||||
|
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
|
||||||
|
for i in resp:
|
||||||
|
yield i
|
||||||
|
pong = time.time()
|
||||||
|
logger.debug('Listening for: {}'.format(pong-ping))
|
||||||
|
c.busy = False
|
||||||
|
c._lock.release()
|
||||||
|
|
||||||
|
def next(self, uriparts):
|
||||||
|
logger.debug('Getting next available')
|
||||||
|
s = list(self.queue)
|
||||||
|
random.shuffle(s)
|
||||||
|
for worker in s:
|
||||||
|
if not worker.busy:
|
||||||
|
return worker
|
||||||
|
raise QueueException('No worker is available')
|
||||||
|
|||||||
139
bitter/utils.py
139
bitter/utils.py
@@ -1,3 +1,5 @@
|
|||||||
|
from __future__ import print_function
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
import json
|
import json
|
||||||
@@ -9,10 +11,20 @@ import os
|
|||||||
import multiprocessing
|
import multiprocessing
|
||||||
from multiprocessing.pool import ThreadPool
|
from multiprocessing.pool import ThreadPool
|
||||||
|
|
||||||
from itertools import islice
|
from tqdm import tqdm
|
||||||
|
|
||||||
|
from itertools import islice, chain
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
||||||
|
try:
|
||||||
|
from itertools import izip_longest
|
||||||
|
except ImportError:
|
||||||
from itertools import zip_longest
|
from itertools import zip_longest
|
||||||
|
|
||||||
|
from collections import Counter
|
||||||
|
|
||||||
|
from builtins import map, filter
|
||||||
|
|
||||||
from twitter import TwitterHTTPError
|
from twitter import TwitterHTTPError
|
||||||
|
|
||||||
from bitter.models import Following, User, ExtractorEntry, make_session
|
from bitter.models import Following, User, ExtractorEntry, make_session
|
||||||
@@ -26,15 +38,14 @@ def signal_handler(signal, frame):
|
|||||||
logger.info('You pressed Ctrl+C!')
|
logger.info('You pressed Ctrl+C!')
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
def chunk(iterable, n, fillvalue=None):
|
def chunk(iterable, n):
|
||||||
args = [iter(iterable)] * n
|
it = iter(iterable)
|
||||||
return zip_longest(*args, fillvalue=fillvalue)
|
return iter(lambda: tuple(islice(it, n)), ())
|
||||||
|
|
||||||
def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()):
|
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
|
||||||
if chunksize:
|
|
||||||
source = chunk(source, chunksize)
|
source = chunk(source, chunksize)
|
||||||
p = ThreadPool(numcpus)
|
p = ThreadPool(numcpus*2)
|
||||||
for i in p.imap(func, source):
|
for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))):
|
||||||
yield i
|
yield i
|
||||||
|
|
||||||
def get_credentials_path(credfile=None):
|
def get_credentials_path(credfile=None):
|
||||||
@@ -86,6 +97,26 @@ def add_credentials(credfile=None, **creds):
|
|||||||
f.write('\n')
|
f.write('\n')
|
||||||
|
|
||||||
|
|
||||||
|
def get_hashtags(iter_tweets, best=None):
|
||||||
|
c = Counter()
|
||||||
|
for tweet in iter_tweets:
|
||||||
|
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
|
||||||
|
return c
|
||||||
|
|
||||||
|
def read_file(filename, tail=False):
|
||||||
|
with open(filename) as f:
|
||||||
|
while True:
|
||||||
|
line = f.readline()
|
||||||
|
if line not in (None, '', '\n'):
|
||||||
|
tweet = json.loads(line.strip())
|
||||||
|
yield tweet
|
||||||
|
else:
|
||||||
|
if tail:
|
||||||
|
time.sleep(1)
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
|
def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
|
||||||
t = 'name' if by_name else 'uid'
|
t = 'name' if by_name else 'uid'
|
||||||
logger.debug('Getting users by {}: {}'.format(t, ulist))
|
logger.debug('Getting users by {}: {}'.format(t, ulist))
|
||||||
@@ -134,12 +165,12 @@ def add_user(session, user, enqueue=False):
|
|||||||
user = User(**user)
|
user = User(**user)
|
||||||
session.add(user)
|
session.add(user)
|
||||||
if extract:
|
if extract:
|
||||||
logging.debug('Adding entry')
|
logger.debug('Adding entry')
|
||||||
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
|
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
|
||||||
if not entry:
|
if not entry:
|
||||||
entry = ExtractorEntry(user=user.id)
|
entry = ExtractorEntry(user=user.id)
|
||||||
session.add(entry)
|
session.add(entry)
|
||||||
logging.debug(entry.pending)
|
logger.debug(entry.pending)
|
||||||
entry.pending = True
|
entry.pending = True
|
||||||
entry.cursor = -1
|
entry.cursor = -1
|
||||||
session.commit()
|
session.commit()
|
||||||
@@ -188,10 +219,10 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
|
|||||||
add_user(session, i, enqueue=True)
|
add_user(session, i, enqueue=True)
|
||||||
|
|
||||||
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
|
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
|
||||||
logging.info('Total users: {}'.format(total_users))
|
logger.info('Total users: {}'.format(total_users))
|
||||||
def pending_entries():
|
def pending_entries():
|
||||||
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
|
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
|
||||||
logging.info('Pending: {}'.format(pending))
|
logger.info('Pending: {}'.format(pending))
|
||||||
return pending
|
return pending
|
||||||
|
|
||||||
while pending_entries() > 0:
|
while pending_entries() > 0:
|
||||||
@@ -255,7 +286,7 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
|
|||||||
|
|
||||||
entry.pending = pending
|
entry.pending = pending
|
||||||
entry.cursor = cursor
|
entry.cursor = cursor
|
||||||
logging.debug('Entry: {} - {}'.format(entry.user, entry.pending))
|
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
|
||||||
|
|
||||||
session.add(candidate)
|
session.add(candidate)
|
||||||
session.commit()
|
session.commit()
|
||||||
@@ -281,3 +312,85 @@ def get_user(c, user):
|
|||||||
return c.users.lookup(user_id=user)[0]
|
return c.users.lookup(user_id=user)[0]
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return c.users.lookup(screen_name=user)[0]
|
return c.users.lookup(screen_name=user)[0]
|
||||||
|
|
||||||
|
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
|
||||||
|
cached = cached_tweet(tweetid, folder)
|
||||||
|
newtweet = None
|
||||||
|
if update or not cached:
|
||||||
|
newtweet = get_tweet(wq, tweetid)
|
||||||
|
js = json.dumps(tweet, indent=2)
|
||||||
|
if write:
|
||||||
|
if newtweet:
|
||||||
|
write_tweet_json(js, folder)
|
||||||
|
else:
|
||||||
|
print(js)
|
||||||
|
|
||||||
|
|
||||||
|
def cached_tweet(tweetid, folder):
|
||||||
|
tweet = None
|
||||||
|
file = os.path.join(folder, '%s.json' % tweetid)
|
||||||
|
if os.path.exists(file) and os.path.isfile(file):
|
||||||
|
try:
|
||||||
|
# print('%s: Tweet exists' % tweetid)
|
||||||
|
with open(file) as f:
|
||||||
|
tweet = json.load(f)
|
||||||
|
except Exception as ex:
|
||||||
|
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
|
||||||
|
return tweet
|
||||||
|
|
||||||
|
def write_tweet_json(js, folder):
|
||||||
|
tweetid = js['id']
|
||||||
|
file = tweet_file(tweetid, folder)
|
||||||
|
if not os.path.exists(folder):
|
||||||
|
os.makedirs(folder)
|
||||||
|
with open(file, 'w') as f:
|
||||||
|
json.dump(js, f, indent=2)
|
||||||
|
logger.info('Written {} to file {}'.format(tweetid, file))
|
||||||
|
|
||||||
|
def tweet_file(tweetid, folder):
|
||||||
|
return os.path.join(folder, '%s.json' % tweetid)
|
||||||
|
|
||||||
|
def tweet_fail_file(tweetid, folder):
|
||||||
|
failsfolder = os.path.join(folder, 'failed')
|
||||||
|
if not os.path.exists(failsfolder):
|
||||||
|
os.makedirs(failsfolder)
|
||||||
|
return os.path.join(failsfolder, '%s.failed' % tweetid)
|
||||||
|
|
||||||
|
def tweet_failed(tweetid, folder):
|
||||||
|
return os.path.isfile(tweet_fail_file(tweetid, folder))
|
||||||
|
|
||||||
|
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
|
||||||
|
def filter_line(line):
|
||||||
|
tweetid = int(line)
|
||||||
|
# print('Checking {}'.format(tweetid))
|
||||||
|
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
|
||||||
|
yield None
|
||||||
|
else:
|
||||||
|
yield line
|
||||||
|
|
||||||
|
def print_result(res):
|
||||||
|
tid, tweet = res
|
||||||
|
if tweet:
|
||||||
|
try:
|
||||||
|
write_tweet_json(tweet, folder=folder)
|
||||||
|
yield 1
|
||||||
|
except Exception as ex:
|
||||||
|
logger.error('%s: %s' % (tid, ex))
|
||||||
|
if not ignore_fails:
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
logger.info('Tweet not recovered: {}'.format(tid))
|
||||||
|
with open(tweet_fail_file(tid, folder), 'w') as f:
|
||||||
|
print('Tweet not found', file=f)
|
||||||
|
yield -1
|
||||||
|
|
||||||
|
def download_batch(batch):
|
||||||
|
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
|
||||||
|
return tweets.items()
|
||||||
|
|
||||||
|
with open(tweetsfile) as f:
|
||||||
|
lines = map(lambda x: x.strip(), f)
|
||||||
|
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
|
||||||
|
tweets = parallel(download_batch, lines_to_crawl, 100)
|
||||||
|
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
|
||||||
|
pass
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
sqlalchemy
|
sqlalchemy
|
||||||
twitter
|
twitter
|
||||||
click
|
click
|
||||||
|
tqdm
|
||||||
|
|||||||
4
setup.cfg
Normal file
4
setup.cfg
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
[metadata]
|
||||||
|
description-file = README.md
|
||||||
|
[aliases]
|
||||||
|
test=pytest
|
||||||
13
setup.py
13
setup.py
@@ -38,10 +38,19 @@ setup(
|
|||||||
extras_require = {
|
extras_require = {
|
||||||
'server': ['flask', 'flask-oauthlib']
|
'server': ['flask', 'flask-oauthlib']
|
||||||
},
|
},
|
||||||
test_suite="tests",
|
setup_requires=['pytest-runner',],
|
||||||
include_package_data=True,
|
include_package_data=True,
|
||||||
entry_points="""
|
entry_points="""
|
||||||
[console_scripts]
|
[console_scripts]
|
||||||
bitter=bitter.cli:main
|
bitter=bitter.cli:main
|
||||||
"""
|
""",
|
||||||
|
classifiers=[
|
||||||
|
'Development Status :: 4 - Beta',
|
||||||
|
'Intended Audience :: Developers',
|
||||||
|
'Intended Audience :: Science/Research',
|
||||||
|
'License :: OSI Approved :: Apache Software License',
|
||||||
|
'Programming Language :: Python :: 2',
|
||||||
|
'Programming Language :: Python :: 2.7',
|
||||||
|
'Programming Language :: Python :: 3',
|
||||||
|
]
|
||||||
)
|
)
|
||||||
|
|||||||
72
tests/test_crawlers.py
Normal file
72
tests/test_crawlers.py
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
from unittest import TestCase
|
||||||
|
|
||||||
|
import os
|
||||||
|
import types
|
||||||
|
import datetime
|
||||||
|
import time
|
||||||
|
|
||||||
|
from bitter import utils
|
||||||
|
from bitter.crawlers import TwitterQueue, TwitterWorker, QueueException
|
||||||
|
from bitter import config as c
|
||||||
|
|
||||||
|
class TestUtils(TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json'))
|
||||||
|
|
||||||
|
def test_create_worker(self):
|
||||||
|
assert len(self.wq.queue)==1
|
||||||
|
|
||||||
|
def test_get_limits(self):
|
||||||
|
w1 = list(self.wq.queue)[0]
|
||||||
|
print(w1.limits)
|
||||||
|
limitslook = w1.get_limit(['statuses', 'lookup'])
|
||||||
|
assert limitslook['remaining'] == limitslook['limit']
|
||||||
|
|
||||||
|
def test_set_limits(self):
|
||||||
|
w1 = list(self.wq.queue)[0]
|
||||||
|
w1.set_limit(['test', 'test2'], {'remaining': 0})
|
||||||
|
assert w1.get_limit(['test', 'test2']) == {'remaining': 0}
|
||||||
|
|
||||||
|
def test_await(self):
|
||||||
|
w1 = list(self.wq.queue)[0]
|
||||||
|
w1.set_limit(['test', 'wait'], {'remaining': 0, 'reset': time.time()+2})
|
||||||
|
assert w1.get_wait(['test', 'wait']) > 1
|
||||||
|
time.sleep(2)
|
||||||
|
assert w1.get_wait(['test', 'wait']) == 0
|
||||||
|
assert w1.get_wait(['statuses', 'lookup']) == 0
|
||||||
|
|
||||||
|
def test_is_limited(self):
|
||||||
|
w1 = list(self.wq.queue)[0]
|
||||||
|
assert not w1.is_limited(['statuses', 'lookup'])
|
||||||
|
w1.set_limit(['test', 'limited'], {'remaining': 0, 'reset': time.time()+100})
|
||||||
|
assert w1.is_limited(['test', 'limited'])
|
||||||
|
|
||||||
|
def test_call(self):
|
||||||
|
w1 = list(self.wq.queue)[0]
|
||||||
|
l1 = w1.get_limit(['users', 'lookup'])
|
||||||
|
resp = self.wq.users.lookup(screen_name='balkian')
|
||||||
|
l2 = w1.get_limit(['users', 'lookup'])
|
||||||
|
assert l1['remaining']-l2['remaining'] == 1
|
||||||
|
|
||||||
|
def test_consume(self):
|
||||||
|
w1 = list(self.wq.queue)[0]
|
||||||
|
l1 = w1.get_limit(['friends', 'list'])
|
||||||
|
self.wq.wait = False
|
||||||
|
for i in range(l1['remaining']):
|
||||||
|
print(i)
|
||||||
|
resp = self.wq.friends.list(screen_name='balkian')
|
||||||
|
# l2 = w1.get_limit(['users', 'lookup'])
|
||||||
|
# assert l2['remaining'] == 0
|
||||||
|
# self.wq.users.lookup(screen_name='balkian')
|
||||||
|
|
||||||
|
failed = False
|
||||||
|
try:
|
||||||
|
# resp = self.wq.friends.list(screen_name='balkian')
|
||||||
|
self.wq.next(['friends', 'list'])
|
||||||
|
except QueueException:
|
||||||
|
failed = True
|
||||||
|
assert failed
|
||||||
|
l2 = w1.get_limit(['friends', 'list'])
|
||||||
|
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
|
||||||
|
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)
|
||||||
@@ -58,4 +58,6 @@ class TestUtils(TestCase):
|
|||||||
assert list(resp) == [1,2,3]
|
assert list(resp) == [1,2,3]
|
||||||
toc = time.time()
|
toc = time.time()
|
||||||
assert (tic-toc) < 6000
|
assert (tic-toc) < 6000
|
||||||
|
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
|
||||||
|
assert list(resp2) == [1,2,3,4]
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user