diff --git a/.gitignore b/.gitignore index de962e4..fc2c0fb 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ __pycache__ *.egg-info dist env +.env __* .* *.pyc diff --git a/Makefile b/Makefile index 554a573..5bca252 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ Dockerfile-%: Dockerfile.template dev-%: @docker start $(NAME)-dev$* || (\ $(MAKE) build-$*; \ - docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \ + docker run -d -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \ )\ docker exec -ti $(NAME)-dev$* bash @@ -71,6 +71,6 @@ pip_upload: pip_test: $(addprefix pip_test-,$(PYVERSIONS)) run: build - docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' + docker run --rm --env-file $$PWD/.env -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' .PHONY: test test-% build-% build test test_pip run diff --git a/README.md b/README.md index a9f3d88..1f900ec 100644 --- a/README.md +++ b/README.md @@ -29,16 +29,66 @@ e.g. to get the latest 500 tweets by the python software foundation: ``` bitter api statuses/user_timeline --id thepsf --count 500 ``` -# Credentials format + + +# Examples + +The CLI can query the rest API: + +``` +bitter api --parameter VALUE ... | [--tweets | --users] [--max_count MAX_COUNT] [--count COUNT_PER_CALL] +``` + +For instance: + +``` +# Get 100 tweets that mentioned Obama after tweet 942689870501302300 +bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama +``` + +That is equivalent to this call to the api: `api/1.1/searc/tweets?since_id=942689870501302300&count=100&q=Obama`. + + +The flags `--tweets` and `--users` are optional. +If you use them, bitter will try to intelligently fetch all the tweets/users by using pagination with the API. + +For example: + +``` +# Download 1000 tweets, 100 tweets per call. +bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama --max_count=1000 --tweets +``` + +``` +# Download all the followers of @balkian +bitter api 'followers/list' --_id balkian --users --max_count -1 +``` + +Note that some reserved words (such as `id`) have to be preceeded by an underscore. +This limitation is imposed by the python-twitter library. + +# Configuration format + +``` +credentials: +- user: "balkian" + consumer_secret: "xxx" + consumer_key: "xxx" + token_key: "xxx" + token_secret: "xxx" +- user: .... +``` + +By default, bitter uses '~/.bitter.yaml', but you may choose a different file: ``` -{"user": "balkian", "consumer_secret": "xxx", "consumer_key": "xxx", "token_key": "xxx", "token_secret": "xxx"} +python -m bitter --config ... ``` -By default, bitter uses '~/.bitter-credentials.json', but you may choose a different file: +Or use an environment variable: ``` -python -m bitter -c ... +export BITTER_CONFIG=$(cat myconfig.yaml) ``` # Server diff --git a/bitter/VERSION b/bitter/VERSION index 0a1ffad..8bd6ba8 100644 --- a/bitter/VERSION +++ b/bitter/VERSION @@ -1 +1 @@ -0.7.4 +0.7.5 diff --git a/bitter/cli.py b/bitter/cli.py index e00407c..60b9141 100644 --- a/bitter/cli.py +++ b/bitter/cli.py @@ -29,16 +29,17 @@ logger = logging.getLogger(__name__) @click.group() @click.option("--verbose", is_flag=True) @click.option("--logging_level", required=False, default='WARN') -@click.option("--config", required=False) -@click.option('-c', '--credentials', show_default=True, default='~/.bitter-credentials.json') +@click.option('--config', show_default=True, default=bconf.CONFIG_FILE) +@click.option('--credentials', show_default=True, help="DEPRECATED: If specified, these credentials will be copied to the configuratation file.", default=bconf.CREDENTIALS) @click.pass_context def main(ctx, verbose, logging_level, config, credentials): logging.basicConfig(level=getattr(logging, logging_level)) ctx.obj = {} ctx.obj['VERBOSE'] = verbose - ctx.obj['CONFIG'] = config + bconf.CONFIG_FILE = config bconf.CREDENTIALS = credentials - utils.create_credentials(credentials) + if os.path.exists(utils.get_config_path(credentials)): + utils.copy_credentials_to_config(credentials, config) @main.group() @click.pass_context @@ -51,7 +52,7 @@ def tweet(ctx): @click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False) @click.argument('tweetid') def get_tweet(tweetid, write, folder, update): - wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) + wq = crawlers.TwitterQueue.from_config(bconf.CONFIG_FILE) utils.download_tweet(wq, tweetid, write, folder, update) @tweet.command('get_all') @@ -59,14 +60,14 @@ def get_tweet(tweetid, write, folder, update): @click.option('-f', '--folder', default="tweets") @click.pass_context def get_tweets(ctx, tweetsfile, folder): - wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) + wq = crawlers.TwitterQueue.from_config(bconf.CONFIG_FILE) utils.download_tweets(wq, tweetsfile, folder) @tweet.command('search') @click.argument('query') @click.pass_context def search(ctx, query): - wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) + wq = crawlers.TwitterQueue.from_config(bconf.CONFIG_FILE) t = utils.search_tweet(wq, query) print(json.dumps(t, indent=2)) @@ -74,7 +75,7 @@ def search(ctx, query): @click.argument('user') @click.pass_context def timeline(ctx, user): - wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) + wq = crawlers.TwitterQueue.from_config(bconf.CONFIG_FILE) t = utils.user_timeline(wq, user) print(json.dumps(t, indent=2)) @@ -100,7 +101,7 @@ def list_users(ctx, db): @click.option('-f', '--folder', default="users") @click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False) def get_user(user, write, folder, update): - wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) + wq = crawlers.TwitterQueue.from_config(bconf.CONFIG_FILE) if not write: u = utils.get_user(wq, user) js = json.dumps(u, indent=2) @@ -146,7 +147,7 @@ def crawl_users(ctx, usersfile, skip, until, threads, db): return ExitStack() - wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) + wq = crawlers.TwitterQueue.from_config(bconf.CONFIG_FILE) logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, len(wq.queue))) @@ -310,7 +311,7 @@ def users_extractor(ctx): @click.pass_context def extract(ctx, recursive, user, name, initfile): print(locals()) - wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) + wq = crawlers.TwitterQueue.from_config(bconf.CONFIG_FILE) dburi = ctx.obj['DBURI'] utils.extract(wq, recursive=recursive, @@ -322,7 +323,7 @@ def extract(ctx, recursive, user, name, initfile): @extractor.command('reset') @click.pass_context def reset_extractor(ctx): - wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) + wq = crawlers.TwitterQueue.from_config(bconf.CONFIG_FILE) db = ctx.obj['DBURI'] session = make_session(db) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) @@ -331,7 +332,7 @@ def reset_extractor(ctx): @click.argument('url', required=False) @click.pass_context def get_limits(ctx, url): - wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) + wq = crawlers.TwitterQueue.from_config(bconf.CONFIG_FILE) for worker in wq.queue: resp = worker.client.application.rate_limit_status() print('#'*20) @@ -351,27 +352,27 @@ def get_limits(ctx, url): @main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False)) @click.argument('cmd', nargs=1) +@click.option('--tweets', is_flag=True, help='Fetch more tweets using smart pagination. Use --count to control how many tweets to fetch per call, and --max_count to set the number of desired tweets (or -1 to get as many as possible).', type=bool, default=False) +@click.option('--users', is_flag=True, help='Fetch more users using smart pagination. Use --count to control how many users to fetch per call, and --max_count to set the number of desired users (or -1 to get as many as possible).', type=bool, default=False) @click.argument('api_args', nargs=-1, type=click.UNPROCESSED) @click.pass_context -def api(ctx, cmd, api_args): +def api(ctx, cmd, tweets, users, api_args): opts = {} i = iter(api_args) for k, v in zip(i, i): k = k.replace('--', '') opts[k] = v - wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) - resp = utils.consume_feed(wq[cmd], **opts) - # A hack to stream jsons - print('[') - first = True + wq = crawlers.TwitterQueue.from_config(bconf.CONFIG_FILE) + if tweets: + resp = utils.consume_tweets(wq[cmd], **opts) + elif users: + resp = utils.consume_users(wq[cmd], **opts) + else: + resp = wq[cmd](**opts) + print(json.dumps(resp)) + return for i in resp: - if not first: - print(',') - else: - first = False - - print(json.dumps(i, indent=2)) - print(']') + print(json.dumps(i)) @main.command('server') @@ -383,7 +384,7 @@ def run_server(ctx, consumer_key, consumer_secret): bconf.CONSUMER_SECRET = consumer_secret from .webserver import app app.run(host='0.0.0.0') - + @main.group() @click.pass_context def stream(ctx): @@ -396,7 +397,7 @@ def stream(ctx): @click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True) @click.pass_context def get_stream(ctx, locations, track, file, politelyretry): - wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1) + wq = crawlers.StreamQueue.from_config(bconf.CONFIG_FILE, 1) query_args = {} if locations: diff --git a/bitter/config.py b/bitter/config.py index c35066e..1a8a9e7 100644 --- a/bitter/config.py +++ b/bitter/config.py @@ -11,3 +11,4 @@ E.g.: app.run() ''' CREDENTIALS = '~/.bitter-credentials.json' +CONFIG_FILE = '~/.bitter.yaml' diff --git a/bitter/crawlers.py b/bitter/crawlers.py index 3eaf37b..da6c7b9 100644 --- a/bitter/crawlers.py +++ b/bitter/crawlers.py @@ -58,6 +58,16 @@ class FromCredentialsMixin(object): wq.ready(cls.worker_class(cred["user"], cred)) return wq +class FromConfigMixin(object): + + @classmethod + def from_config(cls, conffile=None, max_workers=None): + wq = cls() + + with utils.config(conffile) as c: + for cred in islice(c['credentials'], max_workers): + wq.ready(cls.worker_class(cred["user"], cred)) + return wq class TwitterWorker(object): api_class = None @@ -110,6 +120,7 @@ class RestWorker(TwitterWorker): return max(0, (reset-now)) def get_limit(self, uriparts): + uriparts = list(u for u in uriparts if u) uri = '/'+'/'.join(uriparts) for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items(): if ix.startswith(uri): @@ -142,7 +153,7 @@ class RestWorker(TwitterWorker): class QueueException(BaseException): pass -class QueueMixin(AttrToFunc, FromCredentialsMixin): +class QueueMixin(AttrToFunc, FromCredentialsMixin, FromConfigMixin): def __init__(self, wait=True): logger.debug('Creating worker queue') self.queue = set() diff --git a/bitter/utils.py b/bitter/utils.py index 4f9bfd2..046a212 100644 --- a/bitter/utils.py +++ b/bitter/utils.py @@ -3,6 +3,8 @@ from __future__ import print_function import logging import time import json +import yaml +import io import signal import sys @@ -49,31 +51,74 @@ def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): yield i -def get_credentials_path(credfile=None): - if not credfile: - if config.CREDENTIALS: - credfile = config.CREDENTIALS +def get_config_path(conf=None): + if not conf: + if config.CONFIG_FILE: + conf = config.CONFIG_FILE else: - raise Exception('No valid credentials file') - return os.path.expanduser(credfile) + raise Exception('No valid config file') + return os.path.expanduser(conf) + + +def copy_credentials_to_config(credfile, conffile=None): + p = get_config_path(credfile) + with open(p) as old: + for line in old: + cred = json.loads(line.strip()) + add_credentials(conffile, **cred) + + +def save_config(conf, conffile=None): + with config(conffile) as c: + c.clear() + c.update(conf) @contextmanager -def credentials_file(credfile, *args, **kwargs): - p = get_credentials_path(credfile) - with open(p, *args, **kwargs) as f: - yield f +def config(conffile=None): + d = read_config(conffile) + try: + yield d + finally: + write_config(d, conffile) + + +def read_config(conffile): + p = conffile and get_config_path(conffile) + if p and os.path.exists(p): + f = open(p, 'r') + elif 'BITTER_CONFIG' not in os.environ: + raise Exception('No config file or BITTER_CONFIG env variable.') + else: + f = io.StringIO(os.environ.get('BITTER_CONFIG', "").strip().replace('\\n', '\n')) + return yaml.load(f) or {'credentials': []} + + +def write_config(conf, conffile=None): + if conffile: + p = get_config_path(conffile) + with open(p, 'w') as f: + yaml.dump(conf, f) + else: + os.environ['BITTER_CONFIG'] = yaml.dump(conf) +def iter_credentials(conffile=None): + with config(conffile) as c: + for i in c['credentials']: + yield i -def iter_credentials(credfile=None): - with credentials_file(credfile) as f: - for l in f: - yield json.loads(l.strip()) +def create_config_file(conffile=None): + if not conffile: + return + conffile = get_config_path(conffile) + with open(conffile, 'a'): + pass -def get_credentials(credfile=None, inverse=False, **kwargs): + +def get_credentials(conffile=None, inverse=False, **kwargs): creds = [] - for i in iter_credentials(credfile): + for i in iter_credentials(conffile): matches = all(map(lambda x: i[x[0]] == x[1], kwargs.items())) if matches and not inverse: creds.append(i) @@ -82,26 +127,18 @@ def get_credentials(credfile=None, inverse=False, **kwargs): return creds -def create_credentials(credfile=None): - credfile = get_credentials_path(credfile) - with credentials_file(credfile, 'a'): - pass - - -def delete_credentials(credfile=None, **creds): - tokeep = get_credentials(credfile, inverse=True, **creds) - with credentials_file(credfile, 'w') as f: - for i in tokeep: - f.write(json.dumps(i)) - f.write('\n') +def delete_credentials(conffile=None, **creds): + tokeep = get_credentials(conffile, inverse=True, **creds) + with config(conffile) as c: + c['credentials'] = list(tokeep) -def add_credentials(credfile=None, **creds): - exist = get_credentials(credfile, **creds) - if not exist: - with credentials_file(credfile, 'a') as f: - f.write(json.dumps(creds)) - f.write('\n') +def add_credentials(conffile=None, **creds): + exist = get_credentials(conffile, **creds) + if exist: + return + with config(conffile) as c: + c['credentials'].append(creds) def get_hashtags(iter_tweets, best=None): @@ -116,7 +153,7 @@ def read_file(filename, tail=False): while True: line = f.readline() if line not in (None, '', '\n'): - tweet = json.loads(line.strip()) + tweet = json.load(line.strip()) yield tweet else: if tail: @@ -403,7 +440,7 @@ def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=F tweet = None if update or not cached: tweet = get_tweet(wq, tweetid) - js = json.dumps(tweet, indent=2) + js = json.dump(tweet, indent=2) if write: if tweet: write_tweet_json(js, folder) @@ -484,31 +521,72 @@ def download_timeline(wq, user): return wq.statuses.user_timeline(id=user) -def consume_feed(func, *args, **kwargs): +def _consume_feed(func, feed_control=None, **kwargs): ''' Get all the tweets using pagination and a given method. It can be controlled with the `count` parameter. - If count < 0 => Loop until the whole feed is consumed. - If count == 0 => Only call the API once, with the default values. - If count > 0 => Get count tweets from the feed. + If max_count < 0 => Loop until the whole feed is consumed. + If max_count == 0 => Only call the API once, with the default values. + If max_count > 0 => Get max_count tweets from the feed. ''' - remaining = int(kwargs.pop('count', 0)) - consume = remaining < 0 + remaining = int(kwargs.pop('max_count', 0)) + count = int(kwargs.get('count', -1)) limit = False - # Simulate a do-while by updating the condition at the end - while not limit: - if remaining > 0: - kwargs['count'] = remaining - resp = func(*args, **kwargs) - if not resp: - return - for t in resp: - yield t - if consume: - continue - remaining -= len(resp) - max_id = min(s['id'] for s in func(*args, **kwargs)) - 1 - kwargs['max_id'] = max_id - limit = remaining <= 0 + # We need to at least perform a query, so we simulate a do-while + # by running once with no limit and updating the condition at the end + with tqdm(total=remaining) as pbar: + while not limit: + if remaining > 0 and ((count < 0) or (count > remaining)): + kwargs['count'] = remaining + resp, stop = feed_control(func, kwargs, remaining=remaining, batch_size=count) + if not resp: + return + for entry in resp: + yield entry + pbar.update(len(resp)) + limit = stop + if remaining < 0: + # If the loop was run with a negative remaining, it will only stop + # when the control function tells it to. + continue + # Otherwise, check if we have already downloaded all the required items + remaining -= len(resp) + limit = limit or remaining <= 0 + + +def consume_tweets(*args, **kwargs): + return _consume_feed(*args, feed_control=_tweets_control, **kwargs) + + +def consume_users(*args, **kwargs): + return _consume_feed(*args, feed_control=_users_control, **kwargs) + + +def _tweets_control(func, apiargs, remaining=0, **kwargs): + ''' Return a list of entries, the remaining ''' + + resp = func(**apiargs) + if not resp: + return None, True + # Update the arguments for the next call + # Two options: either resp is a list, or a dict like: + # {'statuses': ... 'search_metadata': ...} + if isinstance(resp, dict) and 'search_metadata' in resp: + resp = resp['statuses'] + max_id = min(s['id'] for s in resp) - 1 + apiargs['max_id'] = max_id + return resp, False + + +def _users_control(func, apiargs, remaining=0, **kwargs): + resp = func(**apiargs) + stop = True + # Update the arguments for the next call + if 'next_cursor' in resp: + cursor = resp['next_cursor'] + apiargs['cursor'] = cursor + if int(cursor) != -1: + stop = False + return resp['users'], stop diff --git a/requirements.txt b/requirements.txt index 2bed540..6d88887 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ sqlalchemy twitter click tqdm +pyyaml diff --git a/tests/test_crawlers.py b/tests/test_crawlers.py index 9871dcc..e0db32c 100644 --- a/tests/test_crawlers.py +++ b/tests/test_crawlers.py @@ -12,7 +12,11 @@ from bitter import config as c class TestCrawlers(TestCase): def setUp(self): - self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json')) + CONF_PATH = os.path.join(os.path.dirname(__file__), '.bitter.yaml') + if os.path.exists(CONF_PATH): + self.wq = easy(CONF_PATH) + else: + self.wq = easy() def test_create_worker(self): assert len(self.wq.queue)==1 diff --git a/tests/test_utils.py b/tests/test_utils.py index 4494f55..e052ba0 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -8,56 +8,65 @@ from bitter import config as c class TestUtils(TestCase): + configfile = '/tmp/bitter.yaml' + def setUp(self): - self.credfile = '/tmp/credentials.txt' - c.CREDENTIALS = self.credfile - if os.path.exists(self.credfile): - os.remove(self.credfile) - utils.create_credentials(self.credfile) + c.CONFIG_FILE = self.configfile + if os.path.exists(self.configfile): + os.remove(self.configfile) + assert not os.path.exists(self.configfile) + utils.create_config_file(self.configfile) + assert os.path.exists(self.configfile) - - def test_create_credentials(self): - assert os.path.exists(self.credfile) - os.remove(self.credfile) - utils.create_credentials() # From config - assert os.path.exists(self.credfile) - def test_add_credentials(self): - utils.add_credentials(self.credfile, user="test") - assert utils.get_credentials(self.credfile) - assert utils.get_credentials(user="test") - assert list(utils.get_credentials(user="test"))[0]["user"] == "test" + utils.add_credentials(self.configfile, user="test") + assert utils.get_credentials(self.configfile) + assert utils.get_credentials(self.configfile, user="test") + assert list(utils.get_credentials(self.configfile, user="test"))[0]["user"] == "test" def test_get_credentials(self): - utils.add_credentials(self.credfile, user="test") - assert utils.get_credentials(user="test") - assert not utils.get_credentials(user="test", inverse=True) + utils.add_credentials(self.configfile, user="test") + assert utils.get_credentials(self.configfile, user="test") + assert not utils.get_credentials(self.configfile, user="test", inverse=True) def test_add_two_credentials(self): - utils.add_credentials(self.credfile, user="test") - utils.add_credentials(self.credfile, user="test2") - assert utils.get_credentials(user="test") - assert utils.get_credentials(user="test2") + utils.add_credentials(self.configfile, user="test") + utils.add_credentials(self.configfile, user="test2") + assert utils.get_credentials(self.configfile, user="test") + assert utils.get_credentials(self.configfile, user="test2") def test_delete_credentials(self): - utils.add_credentials(self.credfile, user="test") - assert utils.get_credentials(user="test") - utils.delete_credentials(user="test") - print(utils.get_credentials()) - assert not utils.get_credentials(user="test") + utils.add_credentials(self.configfile, user="test") + assert utils.get_credentials(self.configfile, user="test") + utils.delete_credentials(self.configfile, user="test") + assert not utils.get_credentials(self.configfile, user="test") def test_parallel(self): import time def echo(i): - time.sleep(2) + time.sleep(0.5) return i tic = time.time() resp = utils.parallel(echo, [1,2,3]) assert isinstance(resp, types.GeneratorType) assert list(resp) == [1,2,3] toc = time.time() - assert (tic-toc) < 6000 + assert (tic-toc) < 600 resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2) assert list(resp2) == [1,2,3,4] + + +class TestUtilsEnv(TestUtils): + configfile = None + + def setUp(self): + if 'BITTER_CONFIG' in os.environ: + self.oldenv = os.environ['BITTER_CONFIG'] + os.environ['BITTER_CONFIG'] = '' + + def tearDown(self): + if hasattr(self, 'oldenv'): + os.environ['BITTER_CONFIG'] = self.oldenv +