From cf766a6bf3852e42fbc8bcbed240dce42cd906a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=2E=20Fernando=20S=C3=A1nchez?= Date: Thu, 30 Nov 2017 16:49:42 +0100 Subject: [PATCH] API command * Added API command * Fixed bug in chunk --- README.md | 7 + bitter/VERSION | 2 +- bitter/cli.py | 33 ++++- bitter/crawlers.py | 10 ++ bitter/models.py | 11 +- bitter/utils.py | 338 ++++++++++++++++++++++++++++++--------------- 6 files changed, 281 insertions(+), 120 deletions(-) diff --git a/README.md b/README.md index db26060..a9f3d88 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,13 @@ wq = easy() print(wq.users.show(user_name='balkian')) ``` + +You can also make custom calls to the API through the command line. +e.g. to get the latest 500 tweets by the python software foundation: + +``` +bitter api statuses/user_timeline --id thepsf --count 500 +``` # Credentials format ``` diff --git a/bitter/VERSION b/bitter/VERSION index 7486fdb..0a1ffad 100644 --- a/bitter/VERSION +++ b/bitter/VERSION @@ -1 +1 @@ -0.7.2 +0.7.4 diff --git a/bitter/cli.py b/bitter/cli.py index 1d7f228..e00407c 100644 --- a/bitter/cli.py +++ b/bitter/cli.py @@ -240,11 +240,6 @@ def crawl_users(ctx, usersfile, skip, until, threads, db): logger.info('Done!') -@main.group('api') -def api(): - pass - - @main.group('extractor') @click.pass_context @click.option('--db', required=True, help='Database of users.') @@ -332,7 +327,7 @@ def reset_extractor(ctx): session = make_session(db) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) -@api.command('limits') +@main.command('limits') @click.argument('url', required=False) @click.pass_context def get_limits(ctx, url): @@ -353,6 +348,32 @@ def get_limits(ctx, url): else: print(json.dumps(resp, indent=2)) + +@main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False)) +@click.argument('cmd', nargs=1) +@click.argument('api_args', nargs=-1, type=click.UNPROCESSED) +@click.pass_context +def api(ctx, cmd, api_args): + opts = {} + i = iter(api_args) + for k, v in zip(i, i): + k = k.replace('--', '') + opts[k] = v + wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) + resp = utils.consume_feed(wq[cmd], **opts) + # A hack to stream jsons + print('[') + first = True + for i in resp: + if not first: + print(',') + else: + first = False + + print(json.dumps(i, indent=2)) + print(']') + + @main.command('server') @click.argument('CONSUMER_KEY', required=True) @click.argument('CONSUMER_SECRET', required=True) diff --git a/bitter/crawlers.py b/bitter/crawlers.py index e69b223..3eaf37b 100644 --- a/bitter/crawlers.py +++ b/bitter/crawlers.py @@ -10,6 +10,7 @@ from twitter import * from collections import OrderedDict from threading import Lock from itertools import islice +from functools import partial try: import itertools.ifilter as filter except ImportError: @@ -38,6 +39,9 @@ class AttrToFunc(object): else: return extend_call(k) + def __getitem__(self, k): + return partial(self.handler, self.__uriparts+k.split('/')) + def __call__(self, *args, **kwargs): # for i, a in enumerate(args)e # kwargs[i] = a @@ -75,6 +79,12 @@ class TwitterWorker(object): self._client = self.api_class(auth=auth) return self._client + def __repr__(self): + msg = '<{} for {}>'.format(self.__class__.__name__, self.name) + if self.busy: + msg += ' [busy]' + return msg + class RestWorker(TwitterWorker): api_class = Twitter diff --git a/bitter/models.py b/bitter/models.py index 59698e9..c2216d4 100644 --- a/bitter/models.py +++ b/bitter/models.py @@ -3,6 +3,7 @@ import json from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.types import BigInteger, Integer, Text, Boolean +from sqlalchemy.pool import SingletonThreadPool from sqlalchemy import Column, Index from sqlalchemy import create_engine @@ -85,15 +86,19 @@ class ExtractorEntry(Base): user = Column(BigInteger, index=True) cursor = Column(BigInteger, default=-1) pending = Column(Boolean, default=False) + errors = Column(Text, default="") + busy = Column(Boolean, default=False) + def make_session(url): - engine = create_engine(url)#, echo=True) + if not isinstance(url, str): + print(url) + raise Exception("FUCK") + engine = create_engine(url, poolclass=SingletonThreadPool)#, echo=True) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() return session - - def test(db='sqlite:///users.db'): diff --git a/bitter/utils.py b/bitter/utils.py index db984b9..4f9bfd2 100644 --- a/bitter/utils.py +++ b/bitter/utils.py @@ -11,16 +11,13 @@ import os import multiprocessing from multiprocessing.pool import ThreadPool +from functools import partial + from tqdm import tqdm from itertools import islice, chain from contextlib import contextmanager -try: - from itertools import izip_longest -except ImportError: - from itertools import zip_longest - from collections import Counter from builtins import map, filter @@ -38,16 +35,20 @@ def signal_handler(signal, frame): logger.info('You pressed Ctrl+C!') sys.exit(0) + def chunk(iterable, n): it = iter(iterable) return iter(lambda: tuple(islice(it, n)), ()) + def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): source = chunk(source, chunksize) p = ThreadPool(numcpus*2) - for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))): + results = p.imap_unordered(func, source, chunksize=int(1000/numcpus)) + for i in chain.from_iterable(results): yield i + def get_credentials_path(credfile=None): if not credfile: if config.CREDENTIALS: @@ -56,17 +57,20 @@ def get_credentials_path(credfile=None): raise Exception('No valid credentials file') return os.path.expanduser(credfile) + @contextmanager def credentials_file(credfile, *args, **kwargs): p = get_credentials_path(credfile) with open(p, *args, **kwargs) as f: yield f + def iter_credentials(credfile=None): with credentials_file(credfile) as f: for l in f: yield json.loads(l.strip()) + def get_credentials(credfile=None, inverse=False, **kwargs): creds = [] for i in iter_credentials(credfile): @@ -77,11 +81,13 @@ def get_credentials(credfile=None, inverse=False, **kwargs): creds.append(i) return creds + def create_credentials(credfile=None): credfile = get_credentials_path(credfile) with credentials_file(credfile, 'a'): pass + def delete_credentials(credfile=None, **creds): tokeep = get_credentials(credfile, inverse=True, **creds) with credentials_file(credfile, 'w') as f: @@ -89,6 +95,7 @@ def delete_credentials(credfile=None, **creds): f.write(json.dumps(i)) f.write('\n') + def add_credentials(credfile=None, **creds): exist = get_credentials(credfile, **creds) if not exist: @@ -103,6 +110,7 @@ def get_hashtags(iter_tweets, best=None): c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {})) return c + def read_file(filename, tail=False): with open(filename) as f: while True: @@ -115,7 +123,7 @@ def read_file(filename, tail=False): time.sleep(1) else: return - + def get_users(wq, ulist, by_name=False, queue=None, max_users=100): t = 'name' if by_name else 'uid' @@ -144,6 +152,7 @@ def get_users(wq, ulist, by_name=False, queue=None, max_users=100): else: yield user + def trim_user(user): if 'status' in user: del user['status'] @@ -157,14 +166,22 @@ def trim_user(user): return user -def add_user(session, user, enqueue=False): +def add_user(user, dburi=None, session=None, update=False): + if not session: + session = make_session(dburi) + user = trim_user(user) - olduser = session.query(User).filter(User.id==user['id']) + olduser = session.query(User).filter(User.id == user['id']) if olduser: + if not update: + return olduser.delete() - user = User(**user) - session.add(user) - if extract: + nuser = User() + for key, value in user.items(): + setattr(nuser, key, value) + user = nuser + if update: + session.add(user) logger.debug('Adding entry') entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() if not entry: @@ -174,126 +191,194 @@ def add_user(session, user, enqueue=False): entry.pending = True entry.cursor = -1 session.commit() + session.close() -# TODO: adapt to the crawler -def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None): - signal.signal(signal.SIGINT, signal_handler) +def download_entry(wq, entry_id, dburi=None, recursive=False): + session = make_session(dburi) + if not session: + raise Exception("Provide dburi or session") + logger.info("Downloading entry: %s (%s)" % (entry_id, type(entry_id))) + entry = session.query(ExtractorEntry).filter(ExtractorEntry.id==entry_id).first() + user = session.query(User).filter(User.id == entry.user).first() + download_user(wq, session, user, entry, recursive) + session.close() - w = wq.next() - if not dburi: - dburi = 'sqlite:///%s.db' % extractor_name - session = make_session(dburi) +def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000): - screen_names = [] - user_ids = [] + total_followers = user.followers_count - def classify_user(id_or_name): - try: - int(user) - user_ids.append(user) - logger.info("Added user id") - except ValueError: - logger.info("Added screen_name") - screen_names.append(user.split('@')[-1]) - - if user: - classify_user(user) - - elif initfile: - logger.info("No user. I will open %s" % initfile) - with open(initfile, 'r') as f: - for line in f: - user = line.strip().split(',')[0] - classify_user(user) - else: - logger.info('Using pending users from last session') + if total_followers > max_followers: + entry.pending = False + logger.info("Too many followers for user: %s" % user.screen_name) + session.add(entry) + session.commit() + return + if not entry: + entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() or ExtractorEntry(user=user.id) + session.add(entry) + session.commit() - nusers = list(get_users(wq, screen_names, by_name=True)) - if user_ids: - nusers += list(get_users(wq, user_ids, by_name=False)) + pending = True + cursor = entry.cursor + uid = user.id + name = user.name - for i in nusers: - add_user(session, i, enqueue=True) + logger.info("#"*20) + logger.info("Getting %s - %s" % (uid, name)) + logger.info("Cursor %s" % cursor) + logger.info("Using account: %s" % wq.name) - total_users = session.query(sqlalchemy.func.count(User.id)).scalar() - logger.info('Total users: {}'.format(total_users)) - def pending_entries(): - pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count() - logger.info('Pending: {}'.format(pending)) - return pending + _fetched_followers = 0 - while pending_entries() > 0: - logger.info("Using account: %s" % w.name) - candidate, entry = session.query(User, ExtractorEntry).\ - filter(ExtractorEntry.user == User.id).\ - filter(ExtractorEntry.pending == True).\ - order_by(User.followers_count).first() - if not candidate: - break - pending = True - cursor = entry.cursor - uid = candidate.id - uobject = session.query(User).filter(User.id==uid).first() - name = uobject.screen_name if uobject else None - - logger.info("#"*20) - logger.info("Getting %s - %s" % (uid, name)) - logger.info("Cursor %s" % cursor) - logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users)) + def fetched_followers(): + return session.query(Following).filter(Following.isfollowed==uid).count() + + attempts = 0 + while cursor > 0 or fetched_followers() < total_followers: try: resp = wq.followers.ids(user_id=uid, cursor=cursor) except TwitterHTTPError as ex: - if ex.e.code in (401, ): + attempts += 1 + if ex.e.code in (401, ) or attempts > 3: logger.info('Not authorized for user: {}'.format(uid)) - resp = {} - if 'ids' in resp: - logger.info("New followers: %s" % len(resp['ids'])) - if recursive: - newusers = get_users(wq, resp) - for user in newusers: - add_user(session, newuser, enqueue=True) - for i in resp['ids']: - existing_user = session.query(Following).\ - filter(Following.isfollowed==uid).\ - filter(Following.follower==i).first() - now = int(time.time()) - if existing_user: - existing_user.created_at_stamp = now - else: - f = Following(isfollowed=uid, - follower=i, - created_at_stamp=now) - session.add(f) - - total_followers = candidate.followers_count - fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count() - logger.info("Fetched: %s/%s followers" % (fetched_followers, - total_followers)) - cursor = resp["next_cursor"] - if cursor > 0: - pending = True - logger.info("Getting more followers for %s" % uid) - else: - logger.info("Done getting followers for %s" % uid) - cursor = -1 - pending = False - else: + entry.errors = ex.message + break + if 'ids' not in resp: logger.info("Error with id %s %s" % (uid, resp)) - pending = False + entry.pending = False + entry.errors = "No ids in response: %s" % resp + break + + logger.info("New followers: %s" % len(resp['ids'])) + if recursive: + newusers = get_users(wq, resp) + for newuser in newusers: + add_user(session=session, user=newuser) + + if 'ids' not in resp or not resp['ids']: + logger.info('NO IDS in response') + break + for i in resp['ids']: + existing_user = session.query(Following).\ + filter(Following.isfollowed == uid).\ + filter(Following.follower == i).first() + now = int(time.time()) + if existing_user: + existing_user.created_at_stamp = now + else: + f = Following(isfollowed=uid, + follower=i, + created_at_stamp=now) + session.add(f) - entry.pending = pending - entry.cursor = cursor - logger.debug('Entry: {} - {}'.format(entry.user, entry.pending)) + logger.info("Fetched: %s/%s followers" % (fetched_followers(), + total_followers)) + entry.cursor = resp["next_cursor"] - session.add(candidate) + session.add(entry) session.commit() - sys.stdout.flush() + logger.info("Done getting followers for %s" % uid) + + entry.pending = False + entry.busy = False + session.add(entry) + session.commit() + + logger.debug('Entry: {} - {}'.format(entry.user, entry.pending)) + sys.stdout.flush() + + +def classify_user(id_or_name, screen_names, user_ids): + try: + int(id_or_name) + user_ids.append(id_or_name) + logger.debug("Added user id") + except ValueError: + logger.debug("Added screen_name") + screen_names.append(id_or_name.split('@')[-1]) +def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None): + signal.signal(signal.SIGINT, signal_handler) + + if not dburi: + dburi = 'sqlite:///%s.db' % extractor_name + + session = make_session(dburi) + session.query(ExtractorEntry).update({ExtractorEntry.busy: False}) + session.commit() + + + if not (user or initfile): + logger.info('Using pending users from last session') + else: + screen_names = [] + user_ids = [] + if user: + classify_user(user, screen_names, user_ids) + elif initfile: + logger.info("No user. I will open %s" % initfile) + with open(initfile, 'r') as f: + for line in f: + user = line.strip().split(',')[0] + classify_user(user, screen_names, user_ids) + + def missing_user(ix, column=User.screen_name): + res = session.query(User).filter(column == ix).count() == 0 + if res: + logger.info("Missing user %s. Count: %s" % (ix, res)) + return res + + screen_names = list(filter(missing_user, screen_names)) + user_ids = list(filter(partial(missing_user, column=User.id_str), user_ids)) + nusers = [] + logger.info("Missing user ids: %s" % user_ids) + logger.info("Missing screen names: %s" % screen_names) + if screen_names: + nusers = list(get_users(wq, screen_names, by_name=True)) + if user_ids: + nusers += list(get_users(wq, user_ids, by_name=False)) + + for i in nusers: + add_user(dburi=dburi, user=i) + + total_users = session.query(sqlalchemy.func.count(User.id)).scalar() + logger.info('Total users: {}'.format(total_users)) + + de = partial(download_entry, wq, dburi=dburi) + pending = pending_entries(dburi) + session.close() + + for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users): + logger.info("Got %s" % i) + + +def pending_entries(dburi): + session = make_session(dburi) + while True: + candidate, entry = session.query(User, ExtractorEntry).\ + filter(ExtractorEntry.user == User.id).\ + filter(ExtractorEntry.pending == True).\ + filter(ExtractorEntry.busy == False).\ + order_by(User.followers_count).first() + if candidate: + entry.busy = True + session.add(entry) + session.commit() + yield int(entry.id) + continue + if session.query(ExtractorEntry).\ + filter(ExtractorEntry.busy == True).count() > 0: + time.sleep(1) + continue + logger.info("No more pending entries") + break + session.close() + def get_tweet(c, tid): return c.statuses.show(id=tid) @@ -394,3 +479,36 @@ def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ig tweets = parallel(download_batch, lines_to_crawl, 100) for res in tqdm(parallel(print_result, tweets), desc='Queried'): pass + +def download_timeline(wq, user): + return wq.statuses.user_timeline(id=user) + + +def consume_feed(func, *args, **kwargs): + ''' + Get all the tweets using pagination and a given method. + It can be controlled with the `count` parameter. + + If count < 0 => Loop until the whole feed is consumed. + If count == 0 => Only call the API once, with the default values. + If count > 0 => Get count tweets from the feed. + ''' + remaining = int(kwargs.pop('count', 0)) + consume = remaining < 0 + limit = False + + # Simulate a do-while by updating the condition at the end + while not limit: + if remaining > 0: + kwargs['count'] = remaining + resp = func(*args, **kwargs) + if not resp: + return + for t in resp: + yield t + if consume: + continue + remaining -= len(resp) + max_id = min(s['id'] for s in func(*args, **kwargs)) - 1 + kwargs['max_id'] = max_id + limit = remaining <= 0