1
0
mirror of https://github.com/balkian/bitter.git synced 2024-12-22 08:28:12 +00:00

API command

* Added API command
* Fixed bug in chunk
This commit is contained in:
J. Fernando Sánchez 2017-11-30 16:49:42 +01:00
parent e65f6836b3
commit cf766a6bf3
6 changed files with 282 additions and 121 deletions

View File

@ -22,6 +22,13 @@ wq = easy()
print(wq.users.show(user_name='balkian')) print(wq.users.show(user_name='balkian'))
``` ```
You can also make custom calls to the API through the command line.
e.g. to get the latest 500 tweets by the python software foundation:
```
bitter api statuses/user_timeline --id thepsf --count 500
```
# Credentials format # Credentials format
``` ```

View File

@ -1 +1 @@
0.7.2 0.7.4

View File

@ -240,11 +240,6 @@ def crawl_users(ctx, usersfile, skip, until, threads, db):
logger.info('Done!') logger.info('Done!')
@main.group('api')
def api():
pass
@main.group('extractor') @main.group('extractor')
@click.pass_context @click.pass_context
@click.option('--db', required=True, help='Database of users.') @click.option('--db', required=True, help='Database of users.')
@ -332,7 +327,7 @@ def reset_extractor(ctx):
session = make_session(db) session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@api.command('limits') @main.command('limits')
@click.argument('url', required=False) @click.argument('url', required=False)
@click.pass_context @click.pass_context
def get_limits(ctx, url): def get_limits(ctx, url):
@ -353,6 +348,32 @@ def get_limits(ctx, url):
else: else:
print(json.dumps(resp, indent=2)) print(json.dumps(resp, indent=2))
@main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False))
@click.argument('cmd', nargs=1)
@click.argument('api_args', nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def api(ctx, cmd, api_args):
opts = {}
i = iter(api_args)
for k, v in zip(i, i):
k = k.replace('--', '')
opts[k] = v
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
resp = utils.consume_feed(wq[cmd], **opts)
# A hack to stream jsons
print('[')
first = True
for i in resp:
if not first:
print(',')
else:
first = False
print(json.dumps(i, indent=2))
print(']')
@main.command('server') @main.command('server')
@click.argument('CONSUMER_KEY', required=True) @click.argument('CONSUMER_KEY', required=True)
@click.argument('CONSUMER_SECRET', required=True) @click.argument('CONSUMER_SECRET', required=True)

View File

@ -10,6 +10,7 @@ from twitter import *
from collections import OrderedDict from collections import OrderedDict
from threading import Lock from threading import Lock
from itertools import islice from itertools import islice
from functools import partial
try: try:
import itertools.ifilter as filter import itertools.ifilter as filter
except ImportError: except ImportError:
@ -38,6 +39,9 @@ class AttrToFunc(object):
else: else:
return extend_call(k) return extend_call(k)
def __getitem__(self, k):
return partial(self.handler, self.__uriparts+k.split('/'))
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
# for i, a in enumerate(args)e # for i, a in enumerate(args)e
# kwargs[i] = a # kwargs[i] = a
@ -75,6 +79,12 @@ class TwitterWorker(object):
self._client = self.api_class(auth=auth) self._client = self.api_class(auth=auth)
return self._client return self._client
def __repr__(self):
msg = '<{} for {}>'.format(self.__class__.__name__, self.name)
if self.busy:
msg += ' [busy]'
return msg
class RestWorker(TwitterWorker): class RestWorker(TwitterWorker):
api_class = Twitter api_class = Twitter

View File

@ -3,6 +3,7 @@ import json
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.types import BigInteger, Integer, Text, Boolean from sqlalchemy.types import BigInteger, Integer, Text, Boolean
from sqlalchemy.pool import SingletonThreadPool
from sqlalchemy import Column, Index from sqlalchemy import Column, Index
from sqlalchemy import create_engine from sqlalchemy import create_engine
@ -85,16 +86,20 @@ class ExtractorEntry(Base):
user = Column(BigInteger, index=True) user = Column(BigInteger, index=True)
cursor = Column(BigInteger, default=-1) cursor = Column(BigInteger, default=-1)
pending = Column(Boolean, default=False) pending = Column(Boolean, default=False)
errors = Column(Text, default="")
busy = Column(Boolean, default=False)
def make_session(url): def make_session(url):
engine = create_engine(url)#, echo=True) if not isinstance(url, str):
print(url)
raise Exception("FUCK")
engine = create_engine(url, poolclass=SingletonThreadPool)#, echo=True)
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine) Session = sessionmaker(bind=engine)
session = Session() session = Session()
return session return session
def test(db='sqlite:///users.db'): def test(db='sqlite:///users.db'):
from sqlalchemy import exists from sqlalchemy import exists

View File

@ -11,16 +11,13 @@ import os
import multiprocessing import multiprocessing
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from functools import partial
from tqdm import tqdm from tqdm import tqdm
from itertools import islice, chain from itertools import islice, chain
from contextlib import contextmanager from contextlib import contextmanager
try:
from itertools import izip_longest
except ImportError:
from itertools import zip_longest
from collections import Counter from collections import Counter
from builtins import map, filter from builtins import map, filter
@ -38,16 +35,20 @@ def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!') logger.info('You pressed Ctrl+C!')
sys.exit(0) sys.exit(0)
def chunk(iterable, n): def chunk(iterable, n):
it = iter(iterable) it = iter(iterable)
return iter(lambda: tuple(islice(it, n)), ()) return iter(lambda: tuple(islice(it, n)), ())
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize) source = chunk(source, chunksize)
p = ThreadPool(numcpus*2) p = ThreadPool(numcpus*2)
for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))): results = p.imap_unordered(func, source, chunksize=int(1000/numcpus))
for i in chain.from_iterable(results):
yield i yield i
def get_credentials_path(credfile=None): def get_credentials_path(credfile=None):
if not credfile: if not credfile:
if config.CREDENTIALS: if config.CREDENTIALS:
@ -56,17 +57,20 @@ def get_credentials_path(credfile=None):
raise Exception('No valid credentials file') raise Exception('No valid credentials file')
return os.path.expanduser(credfile) return os.path.expanduser(credfile)
@contextmanager @contextmanager
def credentials_file(credfile, *args, **kwargs): def credentials_file(credfile, *args, **kwargs):
p = get_credentials_path(credfile) p = get_credentials_path(credfile)
with open(p, *args, **kwargs) as f: with open(p, *args, **kwargs) as f:
yield f yield f
def iter_credentials(credfile=None): def iter_credentials(credfile=None):
with credentials_file(credfile) as f: with credentials_file(credfile) as f:
for l in f: for l in f:
yield json.loads(l.strip()) yield json.loads(l.strip())
def get_credentials(credfile=None, inverse=False, **kwargs): def get_credentials(credfile=None, inverse=False, **kwargs):
creds = [] creds = []
for i in iter_credentials(credfile): for i in iter_credentials(credfile):
@ -77,11 +81,13 @@ def get_credentials(credfile=None, inverse=False, **kwargs):
creds.append(i) creds.append(i)
return creds return creds
def create_credentials(credfile=None): def create_credentials(credfile=None):
credfile = get_credentials_path(credfile) credfile = get_credentials_path(credfile)
with credentials_file(credfile, 'a'): with credentials_file(credfile, 'a'):
pass pass
def delete_credentials(credfile=None, **creds): def delete_credentials(credfile=None, **creds):
tokeep = get_credentials(credfile, inverse=True, **creds) tokeep = get_credentials(credfile, inverse=True, **creds)
with credentials_file(credfile, 'w') as f: with credentials_file(credfile, 'w') as f:
@ -89,6 +95,7 @@ def delete_credentials(credfile=None, **creds):
f.write(json.dumps(i)) f.write(json.dumps(i))
f.write('\n') f.write('\n')
def add_credentials(credfile=None, **creds): def add_credentials(credfile=None, **creds):
exist = get_credentials(credfile, **creds) exist = get_credentials(credfile, **creds)
if not exist: if not exist:
@ -103,6 +110,7 @@ def get_hashtags(iter_tweets, best=None):
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {})) c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
return c return c
def read_file(filename, tail=False): def read_file(filename, tail=False):
with open(filename) as f: with open(filename) as f:
while True: while True:
@ -144,6 +152,7 @@ def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
else: else:
yield user yield user
def trim_user(user): def trim_user(user):
if 'status' in user: if 'status' in user:
del user['status'] del user['status']
@ -157,14 +166,22 @@ def trim_user(user):
return user return user
def add_user(session, user, enqueue=False): def add_user(user, dburi=None, session=None, update=False):
if not session:
session = make_session(dburi)
user = trim_user(user) user = trim_user(user)
olduser = session.query(User).filter(User.id==user['id']) olduser = session.query(User).filter(User.id == user['id'])
if olduser: if olduser:
if not update:
return
olduser.delete() olduser.delete()
user = User(**user) nuser = User()
session.add(user) for key, value in user.items():
if extract: setattr(nuser, key, value)
user = nuser
if update:
session.add(user)
logger.debug('Adding entry') logger.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry: if not entry:
@ -174,125 +191,193 @@ def add_user(session, user, enqueue=False):
entry.pending = True entry.pending = True
entry.cursor = -1 entry.cursor = -1
session.commit() session.commit()
session.close()
def download_entry(wq, entry_id, dburi=None, recursive=False):
session = make_session(dburi)
if not session:
raise Exception("Provide dburi or session")
logger.info("Downloading entry: %s (%s)" % (entry_id, type(entry_id)))
entry = session.query(ExtractorEntry).filter(ExtractorEntry.id==entry_id).first()
user = session.query(User).filter(User.id == entry.user).first()
download_user(wq, session, user, entry, recursive)
session.close()
def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
total_followers = user.followers_count
if total_followers > max_followers:
entry.pending = False
logger.info("Too many followers for user: %s" % user.screen_name)
session.add(entry)
session.commit()
return
if not entry:
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() or ExtractorEntry(user=user.id)
session.add(entry)
session.commit()
pending = True
cursor = entry.cursor
uid = user.id
name = user.name
logger.info("#"*20)
logger.info("Getting %s - %s" % (uid, name))
logger.info("Cursor %s" % cursor)
logger.info("Using account: %s" % wq.name)
_fetched_followers = 0
def fetched_followers():
return session.query(Following).filter(Following.isfollowed==uid).count()
attempts = 0
while cursor > 0 or fetched_followers() < total_followers:
try:
resp = wq.followers.ids(user_id=uid, cursor=cursor)
except TwitterHTTPError as ex:
attempts += 1
if ex.e.code in (401, ) or attempts > 3:
logger.info('Not authorized for user: {}'.format(uid))
entry.errors = ex.message
break
if 'ids' not in resp:
logger.info("Error with id %s %s" % (uid, resp))
entry.pending = False
entry.errors = "No ids in response: %s" % resp
break
logger.info("New followers: %s" % len(resp['ids']))
if recursive:
newusers = get_users(wq, resp)
for newuser in newusers:
add_user(session=session, user=newuser)
if 'ids' not in resp or not resp['ids']:
logger.info('NO IDS in response')
break
for i in resp['ids']:
existing_user = session.query(Following).\
filter(Following.isfollowed == uid).\
filter(Following.follower == i).first()
now = int(time.time())
if existing_user:
existing_user.created_at_stamp = now
else:
f = Following(isfollowed=uid,
follower=i,
created_at_stamp=now)
session.add(f)
logger.info("Fetched: %s/%s followers" % (fetched_followers(),
total_followers))
entry.cursor = resp["next_cursor"]
session.add(entry)
session.commit()
logger.info("Done getting followers for %s" % uid)
entry.pending = False
entry.busy = False
session.add(entry)
session.commit()
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
sys.stdout.flush()
def classify_user(id_or_name, screen_names, user_ids):
try:
int(id_or_name)
user_ids.append(id_or_name)
logger.debug("Added user id")
except ValueError:
logger.debug("Added screen_name")
screen_names.append(id_or_name.split('@')[-1])
# TODO: adapt to the crawler
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None): def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
w = wq.next()
if not dburi: if not dburi:
dburi = 'sqlite:///%s.db' % extractor_name dburi = 'sqlite:///%s.db' % extractor_name
session = make_session(dburi) session = make_session(dburi)
session.query(ExtractorEntry).update({ExtractorEntry.busy: False})
session.commit()
screen_names = []
user_ids = []
def classify_user(id_or_name): if not (user or initfile):
try:
int(user)
user_ids.append(user)
logger.info("Added user id")
except ValueError:
logger.info("Added screen_name")
screen_names.append(user.split('@')[-1])
if user:
classify_user(user)
elif initfile:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
classify_user(user)
else:
logger.info('Using pending users from last session') logger.info('Using pending users from last session')
else:
screen_names = []
user_ids = []
if user:
classify_user(user, screen_names, user_ids)
elif initfile:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
classify_user(user, screen_names, user_ids)
def missing_user(ix, column=User.screen_name):
res = session.query(User).filter(column == ix).count() == 0
if res:
logger.info("Missing user %s. Count: %s" % (ix, res))
return res
nusers = list(get_users(wq, screen_names, by_name=True)) screen_names = list(filter(missing_user, screen_names))
if user_ids: user_ids = list(filter(partial(missing_user, column=User.id_str), user_ids))
nusers += list(get_users(wq, user_ids, by_name=False)) nusers = []
logger.info("Missing user ids: %s" % user_ids)
logger.info("Missing screen names: %s" % screen_names)
if screen_names:
nusers = list(get_users(wq, screen_names, by_name=True))
if user_ids:
nusers += list(get_users(wq, user_ids, by_name=False))
for i in nusers: for i in nusers:
add_user(session, i, enqueue=True) add_user(dburi=dburi, user=i)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar() total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logger.info('Total users: {}'.format(total_users)) logger.info('Total users: {}'.format(total_users))
def pending_entries():
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
logger.info('Pending: {}'.format(pending))
return pending
while pending_entries() > 0: de = partial(download_entry, wq, dburi=dburi)
logger.info("Using account: %s" % w.name) pending = pending_entries(dburi)
session.close()
for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users):
logger.info("Got %s" % i)
def pending_entries(dburi):
session = make_session(dburi)
while True:
candidate, entry = session.query(User, ExtractorEntry).\ candidate, entry = session.query(User, ExtractorEntry).\
filter(ExtractorEntry.user == User.id).\ filter(ExtractorEntry.user == User.id).\
filter(ExtractorEntry.pending == True).\ filter(ExtractorEntry.pending == True).\
order_by(User.followers_count).first() filter(ExtractorEntry.busy == False).\
if not candidate: order_by(User.followers_count).first()
break if candidate:
pending = True entry.busy = True
cursor = entry.cursor session.add(entry)
uid = candidate.id session.commit()
uobject = session.query(User).filter(User.id==uid).first() yield int(entry.id)
name = uobject.screen_name if uobject else None continue
if session.query(ExtractorEntry).\
logger.info("#"*20) filter(ExtractorEntry.busy == True).count() > 0:
logger.info("Getting %s - %s" % (uid, name)) time.sleep(1)
logger.info("Cursor %s" % cursor) continue
logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users)) logger.info("No more pending entries")
try: break
resp = wq.followers.ids(user_id=uid, cursor=cursor) session.close()
except TwitterHTTPError as ex:
if ex.e.code in (401, ):
logger.info('Not authorized for user: {}'.format(uid))
resp = {}
if 'ids' in resp:
logger.info("New followers: %s" % len(resp['ids']))
if recursive:
newusers = get_users(wq, resp)
for user in newusers:
add_user(session, newuser, enqueue=True)
for i in resp['ids']:
existing_user = session.query(Following).\
filter(Following.isfollowed==uid).\
filter(Following.follower==i).first()
now = int(time.time())
if existing_user:
existing_user.created_at_stamp = now
else:
f = Following(isfollowed=uid,
follower=i,
created_at_stamp=now)
session.add(f)
total_followers = candidate.followers_count
fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count()
logger.info("Fetched: %s/%s followers" % (fetched_followers,
total_followers))
cursor = resp["next_cursor"]
if cursor > 0:
pending = True
logger.info("Getting more followers for %s" % uid)
else:
logger.info("Done getting followers for %s" % uid)
cursor = -1
pending = False
else:
logger.info("Error with id %s %s" % (uid, resp))
pending = False
entry.pending = pending
entry.cursor = cursor
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate)
session.commit()
sys.stdout.flush()
def get_tweet(c, tid): def get_tweet(c, tid):
return c.statuses.show(id=tid) return c.statuses.show(id=tid)
@ -394,3 +479,36 @@ def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ig
tweets = parallel(download_batch, lines_to_crawl, 100) tweets = parallel(download_batch, lines_to_crawl, 100)
for res in tqdm(parallel(print_result, tweets), desc='Queried'): for res in tqdm(parallel(print_result, tweets), desc='Queried'):
pass pass
def download_timeline(wq, user):
return wq.statuses.user_timeline(id=user)
def consume_feed(func, *args, **kwargs):
'''
Get all the tweets using pagination and a given method.
It can be controlled with the `count` parameter.
If count < 0 => Loop until the whole feed is consumed.
If count == 0 => Only call the API once, with the default values.
If count > 0 => Get count tweets from the feed.
'''
remaining = int(kwargs.pop('count', 0))
consume = remaining < 0
limit = False
# Simulate a do-while by updating the condition at the end
while not limit:
if remaining > 0:
kwargs['count'] = remaining
resp = func(*args, **kwargs)
if not resp:
return
for t in resp:
yield t
if consume:
continue
remaining -= len(resp)
max_id = min(s['id'] for s in func(*args, **kwargs)) - 1
kwargs['max_id'] = max_id
limit = remaining <= 0