1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-25 12:48:23 +00:00

8 Commits
0.5.6 ... 0.7.0

Author SHA1 Message Date
J. Fernando Sánchez
4b2f107b8a Py2 compatibility and queue handling
* Removed install_aliases(), which caused problems with urllib2
* Better waiting time calculation (used in queue handling)
2016-11-23 12:27:53 +01:00
J. Fernando Sánchez
67ef307cce Improved tweet extraction @ CLI 2016-11-23 10:50:01 +01:00
J. Fernando Sánchez
cb054ac365 Added (bulk) user download 2016-11-22 20:07:18 +01:00
J. Fernando Sánchez
bdc4690240 Fixed tail argument 2016-11-19 21:42:08 +01:00
J. Fernando Sánchez
c0309a1e52 Fixed python2 compatibility issues (print!) 2016-11-19 20:38:44 +01:00
J. Fernando Sánchez
4afdd6807d Fixed MANIFEST error 2016-11-19 20:24:02 +01:00
J. Fernando Sánchez
38605ba2c8 Added stream to CLI
* Save stream to file
* Parse file and get the most important hashtags
2016-11-19 20:16:56 +01:00
J. Fernando Sánchez
738823c8a2 Added Streaming workers/queues 2016-11-18 16:08:29 +01:00
9 changed files with 279 additions and 58 deletions

View File

@@ -1,4 +1,5 @@
include requirements.txt include requirements.txt
include requirements-py2.txt
include test-requirements.txt include test-requirements.txt
include README.md include README.md
include bitter/VERSION include bitter/VERSION

View File

@@ -1 +1 @@
0.5.6 0.7.0

View File

@@ -3,13 +3,6 @@ Bitter module. A library and cli for Twitter using python-twitter.
http://github.com/balkian/bitter http://github.com/balkian/bitter
""" """
try:
from future.standard_library import install_aliases
install_aliases()
except ImportError:
# Avoid problems at setup.py and py3.x
pass
import os import os
from .version import __version__ from .version import __version__

View File

@@ -1,3 +1,5 @@
from __future__ import print_function
import click import click
import json import json
import os import os
@@ -6,6 +8,7 @@ import time
import sqlalchemy.types import sqlalchemy.types
import threading import threading
import sqlite3 import sqlite3
from tqdm import tqdm
from sqlalchemy import exists from sqlalchemy import exists
@@ -43,30 +46,55 @@ def tweet(ctx):
pass pass
@tweet.command('get') @tweet.command('get')
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
@click.argument('tweetid') @click.argument('tweetid')
@click.pass_context def get_tweet(tweetid, write, folder, update):
def get_tweet(ctx, tweetid):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
t = utils.get_tweet(wq, tweetid) if not write:
print(json.dumps(t, indent=2)) t = utils.get_tweet(wq, tweetid)
js = json.dumps(t, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % tweetid)
if not update and os.path.exists(file) and os.path.isfile(file):
print('%s: Tweet exists' % tweetid)
return
try:
t = utils.get_tweet(wq, tweetid)
with open(file, 'w') as f:
js = json.dumps(t, indent=2)
print(js, file=f)
except Exception as ex:
print('%s: %s' % (tweetid, ex), file=sys.stderr)
@tweet.command('get_all')
@click.argument('tweetsfile', 'File with a list of tweets to look up')
@click.option('-f', '--folder', default="tweets")
@click.pass_context
def get_tweets(ctx, tweetsfile, folder):
with open(tweetsfile) as f:
for line in f:
tid = line.strip()
ctx.invoke(get_tweet, folder=folder, tweetid=tid, write=True)
@tweet.command('search') @tweet.command('search')
@click.argument('query') @click.argument('query')
@click.pass_context @click.pass_context
def get_tweet(ctx, query): def search(ctx, query):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() t = utils.search_tweet(wq, query)
t = utils.search_tweet(c.client, query)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@tweet.command('timeline') @tweet.command('timeline')
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def get_tweet(ctx, user): def timeline(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next() t = utils.user_timeline(wq, user)
t = utils.user_timeline(c.client, user)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@main.group() @main.group()
@@ -85,23 +113,47 @@ def list_users(ctx, db):
for j in i.__dict__: for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j))) print('\t{}: {}'.format(j, getattr(i,j)))
@users.command('get_one')
@click.argument('user')
@click.pass_context
def get_user(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next()
u = utils.get_user(c.client, user)
print(json.dumps(u, indent=2))
@users.command('get') @users.command('get')
@click.argument('user')
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
def get_user(user, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
if not write:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % user)
if not update and os.path.exists(file) and os.path.isfile(file):
print('User exists: %s' % user)
return
with open(file, 'w') as f:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js, file=f)
@users.command('get_all')
@click.argument('usersfile', 'File with a list of users to look up')
@click.option('-f', '--folder', default="users")
@click.pass_context
def get_users(ctx, usersfile, folder):
with open(usersfile) as f:
for line in f:
uid = line.strip()
ctx.invoke(get_user, folder=folder, user=uid, write=True)
@users.command('crawl')
@click.option('--db', required=True, help='Database to save all users.') @click.option('--db', required=True, help='Database to save all users.')
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.') @click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.') @click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.') @click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
@click.argument('usersfile', 'File with a list of users to look up') @click.argument('usersfile', 'File with a list of users to look up')
@click.pass_context @click.pass_context
def get_users(ctx, usersfile, skip, until, threads, db): def crawl_users(ctx, usersfile, skip, until, threads, db):
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
if '://' not in db: if '://' not in db:
@@ -330,6 +382,71 @@ def run_server(ctx, consumer_key, consumer_secret):
from .webserver import app from .webserver import app
app.run(host='0.0.0.0') app.run(host='0.0.0.0')
@main.group()
@click.pass_context
def stream(ctx):
pass
@stream.command('get')
@click.option('-l', '--locations', default=None)
@click.option('-t', '--track', default=None)
@click.option('-f', '--file', help='File to store the stream of tweets')
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
@click.pass_context
def get_stream(ctx, locations, track, file, politelyretry):
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
query_args = {}
if locations:
query_args['locations'] = locations
if track:
query_args['track'] = track
if not file:
file = sys.stdout
else:
file = open(file, 'a')
def insist():
lasthangup = time.time()
while True:
if not query_args:
iterator = wq.statuses.sample()
else:
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
for i in iterator:
yield i
if not politelyretry:
return
thishangup = time.time()
if thishangup - lasthangup < 60:
raise Exception('Too many hangups in a row.')
time.sleep(3)
for tweet in tqdm(insist()):
print(json.dumps(tweet), file=file)
if file != sys.stdout:
file.close()
@stream.command('read')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False)
@click.pass_context
def read_stream(ctx, file, tail):
for tweet in utils.read_file(file, tail=tail):
try:
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
except (KeyError, TypeError):
print('Raw tweet: {}'.format(tweet))
@stream.command('tags')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.argument('limit', required=False, default=None, type=int)
@click.pass_context
def tags_stream(ctx, file, limit):
c = utils.get_hashtags(utils.read_file(file))
for count, tag in c.most_common(limit):
print(u'{} - {}'.format(count, tag))
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -9,6 +9,12 @@ logger = logging.getLogger(__name__)
from twitter import * from twitter import *
from collections import OrderedDict from collections import OrderedDict
from threading import Lock from threading import Lock
from itertools import islice
try:
import itertools.ifilter as filter
except ImportError:
pass
from . import utils from . import utils
from . import config from . import config
@@ -37,13 +43,50 @@ class AttrToFunc(object):
# kwargs[i] = a # kwargs[i] = a
return self.handler(self.__uriparts, *args, **kwargs) return self.handler(self.__uriparts, *args, **kwargs)
class FromCredentialsMixin(object):
@classmethod
def from_credentials(cls, cred_file=None, max_workers=None):
wq = cls()
for cred in islice(utils.get_credentials(cred_file), max_workers):
wq.ready(cls.worker_class(cred["user"], cred))
return wq
class TwitterWorker(object): class TwitterWorker(object):
def __init__(self, name, client): api_class = None
def __init__(self, name, creds):
self.name = name self.name = name
self.client = client self._client = None
self.cred = creds
self._lock = Lock() self._lock = Lock()
self.busy = False self.busy = False
self.limits = self.client.application.rate_limit_status()
@property
def client(self):
if not self._client:
auth=OAuth(self.cred['token_key'],
self.cred['token_secret'],
self.cred['consumer_key'],
self.cred['consumer_secret'])
self._client = self.api_class(auth=auth)
return self._client
class RestWorker(TwitterWorker):
api_class = Twitter
def __init__(self, *args, **kwargs):
super(RestWorker, self).__init__(*args, **kwargs)
self._limits = None
@property
def limits(self):
if not self._limits:
self._limits = self.client.application.rate_limit_status()
return self._limits
def is_limited(self, uriparts): def is_limited(self, uriparts):
return self.get_wait(uriparts)>0 return self.get_wait(uriparts)>0
@@ -58,7 +101,10 @@ class TwitterWorker(object):
def get_limit(self, uriparts): def get_limit(self, uriparts):
uri = '/'+'/'.join(uriparts) uri = '/'+'/'.join(uriparts)
return self.limits.get('resources', {}).get(uriparts[0], {}).get(uri, {}) for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items():
if ix.startswith(uri):
return i
return {}
def set_limit(self, uriparts, value): def set_limit(self, uriparts, value):
uri = '/'+'/'.join(uriparts) uri = '/'+'/'.join(uriparts)
@@ -83,10 +129,10 @@ class TwitterWorker(object):
class TwitterQueueException(BaseException): class QueueException(BaseException):
pass pass
class TwitterQueue(AttrToFunc): class QueueMixin(AttrToFunc, FromCredentialsMixin):
def __init__(self, wait=True): def __init__(self, wait=True):
logger.debug('Creating worker queue') logger.debug('Creating worker queue')
self.queue = set() self.queue = set()
@@ -97,6 +143,10 @@ class TwitterQueue(AttrToFunc):
def ready(self, worker): def ready(self, worker):
self.queue.add(worker) self.queue.add(worker)
class TwitterQueue(QueueMixin):
worker_class = RestWorker
def handle_call(self, uriparts, *args, **kwargs): def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts)) logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs)) logger.debug('With: {} {}'.format(args, kwargs))
@@ -132,23 +182,14 @@ class TwitterQueue(AttrToFunc):
if not self.wait: if not self.wait:
patience -= 1 patience -= 1
@classmethod
def from_credentials(self, cred_file=None):
wq = TwitterQueue()
for cred in utils.get_credentials(cred_file):
c = Twitter(auth=OAuth(cred['token_key'],
cred['token_secret'],
cred['consumer_key'],
cred['consumer_secret']))
wq.ready(TwitterWorker(cred["user"], c))
return wq
def get_wait(self, uriparts): def get_wait(self, uriparts):
# Stop as soon as one is available to avoid initiating the rest
for i in self.queue:
if not i.busy and i.get_wait(uriparts) == 0:
return 0
# If None is available, let's see how much we have to wait
available = filter(lambda x: not x.busy, self.queue) available = filter(lambda x: not x.busy, self.queue)
first_worker = min(available, key=lambda x: x.get_wait(uriparts)) diff = min(worker.get_wait(uriparts) for worker in self.queue if not worker.busy)
diff = first_worker.get_wait(uriparts)
return diff return diff
@@ -159,7 +200,7 @@ class TwitterQueue(AttrToFunc):
for worker in s: for worker in s:
if not worker.is_limited(uriparts) and not worker.busy: if not worker.is_limited(uriparts) and not worker.busy:
return worker return worker
raise TwitterQueueException('No worker is available') raise QueueException('No worker is available')
def next(self, uriparts): def next(self, uriparts):
if not self.wait: if not self.wait:
@@ -167,7 +208,7 @@ class TwitterQueue(AttrToFunc):
while True: while True:
try: try:
return self._next(uriparts) return self._next(uriparts)
except TwitterQueueException: except QueueException:
available = filter(lambda x: not x.busy, self.queue) available = filter(lambda x: not x.busy, self.queue)
if available: if available:
diff = self.get_wait(uriparts) diff = self.get_wait(uriparts)
@@ -177,3 +218,44 @@ class TwitterQueue(AttrToFunc):
logger.info("All workers are busy. Waiting %s seconds" % diff) logger.info("All workers are busy. Waiting %s seconds" % diff)
time.sleep(diff) time.sleep(diff)
class StreamWorker(TwitterWorker):
api_class = TwitterStream
def __init__(self, *args, **kwargs):
super(StreamWorker, self).__init__(*args, **kwargs)
class StreamQueue(QueueMixin):
worker_class = StreamWorker
def __init__(self, wait=True):
logger.debug('Creating worker queue')
self.queue = set()
self.index = 0
self.wait = wait
AttrToFunc.__init__(self, handler=self.handle_call)
def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs))
c = None
c = self.next(uriparts)
c._lock.acquire()
c.busy = True
logger.debug('Next: {}'.format(c.name))
ping = time.time()
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
for i in resp:
yield i
pong = time.time()
logger.debug('Listening for: {}'.format(pong-ping))
c.busy = False
c._lock.release()
def next(self, uriparts):
logger.debug('Getting next available')
s = list(self.queue)
random.shuffle(s)
for worker in s:
if not worker.busy:
return worker
raise QueueException('No worker is available')

View File

@@ -11,7 +11,8 @@ from multiprocessing.pool import ThreadPool
from itertools import islice from itertools import islice
from contextlib import contextmanager from contextlib import contextmanager
from itertools import zip_longest from future.moves.itertools import zip_longest
from collections import Counter
from twitter import TwitterHTTPError from twitter import TwitterHTTPError
@@ -86,6 +87,26 @@ def add_credentials(credfile=None, **creds):
f.write('\n') f.write('\n')
def get_hashtags(iter_tweets, best=None):
c = Counter()
for tweet in iter_tweets:
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
return c
def read_file(filename, tail=False):
with open(filename) as f:
while True:
line = f.readline()
if line not in (None, '', '\n'):
tweet = json.loads(line.strip())
yield tweet
else:
if tail:
time.sleep(1)
else:
return
def get_users(wq, ulist, by_name=False, queue=None, max_users=100): def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
t = 'name' if by_name else 'uid' t = 'name' if by_name else 'uid'
logger.debug('Getting users by {}: {}'.format(t, ulist)) logger.debug('Getting users by {}: {}'.format(t, ulist))

View File

@@ -1,3 +1,4 @@
sqlalchemy sqlalchemy
twitter twitter
click click
tqdm

View File

@@ -43,5 +43,14 @@ setup(
entry_points=""" entry_points="""
[console_scripts] [console_scripts]
bitter=bitter.cli:main bitter=bitter.cli:main
""" """,
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Science/Research',
'License :: OSI Approved :: Apache 2 License',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
]
) )

View File

@@ -6,7 +6,7 @@ import datetime
import time import time
from bitter import utils from bitter import utils
from bitter.crawlers import TwitterQueue, TwitterWorker, TwitterQueueException from bitter.crawlers import TwitterQueue, TwitterWorker, QueueException
from bitter import config as c from bitter import config as c
class TestUtils(TestCase): class TestUtils(TestCase):
@@ -64,12 +64,9 @@ class TestUtils(TestCase):
try: try:
# resp = self.wq.friends.list(screen_name='balkian') # resp = self.wq.friends.list(screen_name='balkian')
self.wq.next(['friends', 'list']) self.wq.next(['friends', 'list'])
except TwitterQueueException: except QueueException:
failed = True failed = True
assert failed assert failed
l2 = w1.get_limit(['friends', 'list']) l2 = w1.get_limit(['friends', 'list'])
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time()) assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2) assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)
time.sleep(w1.get_wait(['friends', 'list']))