mirror of
https://github.com/balkian/bitter.git
synced 2025-10-25 04:38:25 +00:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4b2f107b8a | ||
|
|
67ef307cce | ||
|
|
cb054ac365 | ||
|
|
bdc4690240 | ||
|
|
c0309a1e52 | ||
|
|
4afdd6807d | ||
|
|
38605ba2c8 | ||
|
|
738823c8a2 |
@@ -1,4 +1,5 @@
|
||||
include requirements.txt
|
||||
include requirements-py2.txt
|
||||
include test-requirements.txt
|
||||
include README.md
|
||||
include bitter/VERSION
|
||||
|
||||
@@ -1 +1 @@
|
||||
0.5.6
|
||||
0.7.0
|
||||
|
||||
@@ -3,13 +3,6 @@ Bitter module. A library and cli for Twitter using python-twitter.
|
||||
http://github.com/balkian/bitter
|
||||
"""
|
||||
|
||||
try:
|
||||
from future.standard_library import install_aliases
|
||||
install_aliases()
|
||||
except ImportError:
|
||||
# Avoid problems at setup.py and py3.x
|
||||
pass
|
||||
|
||||
import os
|
||||
|
||||
from .version import __version__
|
||||
|
||||
157
bitter/cli.py
157
bitter/cli.py
@@ -1,3 +1,5 @@
|
||||
from __future__ import print_function
|
||||
|
||||
import click
|
||||
import json
|
||||
import os
|
||||
@@ -6,6 +8,7 @@ import time
|
||||
import sqlalchemy.types
|
||||
import threading
|
||||
import sqlite3
|
||||
from tqdm import tqdm
|
||||
|
||||
from sqlalchemy import exists
|
||||
|
||||
@@ -43,30 +46,55 @@ def tweet(ctx):
|
||||
pass
|
||||
|
||||
@tweet.command('get')
|
||||
@click.option('-w', '--write', is_flag=True, default=False)
|
||||
@click.option('-f', '--folder', default="tweets")
|
||||
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
|
||||
@click.argument('tweetid')
|
||||
@click.pass_context
|
||||
def get_tweet(ctx, tweetid):
|
||||
def get_tweet(tweetid, write, folder, update):
|
||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||
t = utils.get_tweet(wq, tweetid)
|
||||
print(json.dumps(t, indent=2))
|
||||
if not write:
|
||||
t = utils.get_tweet(wq, tweetid)
|
||||
js = json.dumps(t, indent=2)
|
||||
print(js)
|
||||
return
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
file = os.path.join(folder, '%s.json' % tweetid)
|
||||
if not update and os.path.exists(file) and os.path.isfile(file):
|
||||
print('%s: Tweet exists' % tweetid)
|
||||
return
|
||||
try:
|
||||
t = utils.get_tweet(wq, tweetid)
|
||||
with open(file, 'w') as f:
|
||||
js = json.dumps(t, indent=2)
|
||||
print(js, file=f)
|
||||
except Exception as ex:
|
||||
print('%s: %s' % (tweetid, ex), file=sys.stderr)
|
||||
|
||||
@tweet.command('get_all')
|
||||
@click.argument('tweetsfile', 'File with a list of tweets to look up')
|
||||
@click.option('-f', '--folder', default="tweets")
|
||||
@click.pass_context
|
||||
def get_tweets(ctx, tweetsfile, folder):
|
||||
with open(tweetsfile) as f:
|
||||
for line in f:
|
||||
tid = line.strip()
|
||||
ctx.invoke(get_tweet, folder=folder, tweetid=tid, write=True)
|
||||
|
||||
@tweet.command('search')
|
||||
@click.argument('query')
|
||||
@click.pass_context
|
||||
def get_tweet(ctx, query):
|
||||
def search(ctx, query):
|
||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||
c = wq.next()
|
||||
t = utils.search_tweet(c.client, query)
|
||||
t = utils.search_tweet(wq, query)
|
||||
print(json.dumps(t, indent=2))
|
||||
|
||||
@tweet.command('timeline')
|
||||
@click.argument('user')
|
||||
@click.pass_context
|
||||
def get_tweet(ctx, user):
|
||||
def timeline(ctx, user):
|
||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||
c = wq.next()
|
||||
t = utils.user_timeline(c.client, user)
|
||||
t = utils.user_timeline(wq, user)
|
||||
print(json.dumps(t, indent=2))
|
||||
|
||||
@main.group()
|
||||
@@ -85,23 +113,47 @@ def list_users(ctx, db):
|
||||
for j in i.__dict__:
|
||||
print('\t{}: {}'.format(j, getattr(i,j)))
|
||||
|
||||
@users.command('get_one')
|
||||
@click.argument('user')
|
||||
@click.pass_context
|
||||
def get_user(ctx, user):
|
||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||
c = wq.next()
|
||||
u = utils.get_user(c.client, user)
|
||||
print(json.dumps(u, indent=2))
|
||||
|
||||
@users.command('get')
|
||||
@click.argument('user')
|
||||
@click.option('-w', '--write', is_flag=True, default=False)
|
||||
@click.option('-f', '--folder', default="users")
|
||||
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
|
||||
def get_user(user, write, folder, update):
|
||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||
if not write:
|
||||
u = utils.get_user(wq, user)
|
||||
js = json.dumps(u, indent=2)
|
||||
print(js)
|
||||
return
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
file = os.path.join(folder, '%s.json' % user)
|
||||
if not update and os.path.exists(file) and os.path.isfile(file):
|
||||
print('User exists: %s' % user)
|
||||
return
|
||||
with open(file, 'w') as f:
|
||||
u = utils.get_user(wq, user)
|
||||
js = json.dumps(u, indent=2)
|
||||
print(js, file=f)
|
||||
|
||||
@users.command('get_all')
|
||||
@click.argument('usersfile', 'File with a list of users to look up')
|
||||
@click.option('-f', '--folder', default="users")
|
||||
@click.pass_context
|
||||
def get_users(ctx, usersfile, folder):
|
||||
with open(usersfile) as f:
|
||||
for line in f:
|
||||
uid = line.strip()
|
||||
ctx.invoke(get_user, folder=folder, user=uid, write=True)
|
||||
|
||||
@users.command('crawl')
|
||||
@click.option('--db', required=True, help='Database to save all users.')
|
||||
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
|
||||
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
|
||||
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
|
||||
@click.argument('usersfile', 'File with a list of users to look up')
|
||||
@click.pass_context
|
||||
def get_users(ctx, usersfile, skip, until, threads, db):
|
||||
def crawl_users(ctx, usersfile, skip, until, threads, db):
|
||||
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
|
||||
|
||||
if '://' not in db:
|
||||
@@ -330,6 +382,71 @@ def run_server(ctx, consumer_key, consumer_secret):
|
||||
from .webserver import app
|
||||
app.run(host='0.0.0.0')
|
||||
|
||||
@main.group()
|
||||
@click.pass_context
|
||||
def stream(ctx):
|
||||
pass
|
||||
|
||||
@stream.command('get')
|
||||
@click.option('-l', '--locations', default=None)
|
||||
@click.option('-t', '--track', default=None)
|
||||
@click.option('-f', '--file', help='File to store the stream of tweets')
|
||||
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
|
||||
@click.pass_context
|
||||
def get_stream(ctx, locations, track, file, politelyretry):
|
||||
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
|
||||
|
||||
query_args = {}
|
||||
if locations:
|
||||
query_args['locations'] = locations
|
||||
if track:
|
||||
query_args['track'] = track
|
||||
if not file:
|
||||
file = sys.stdout
|
||||
else:
|
||||
file = open(file, 'a')
|
||||
|
||||
def insist():
|
||||
lasthangup = time.time()
|
||||
while True:
|
||||
if not query_args:
|
||||
iterator = wq.statuses.sample()
|
||||
else:
|
||||
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
|
||||
for i in iterator:
|
||||
yield i
|
||||
if not politelyretry:
|
||||
return
|
||||
thishangup = time.time()
|
||||
if thishangup - lasthangup < 60:
|
||||
raise Exception('Too many hangups in a row.')
|
||||
time.sleep(3)
|
||||
|
||||
for tweet in tqdm(insist()):
|
||||
print(json.dumps(tweet), file=file)
|
||||
if file != sys.stdout:
|
||||
file.close()
|
||||
|
||||
@stream.command('read')
|
||||
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
|
||||
@click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False)
|
||||
@click.pass_context
|
||||
def read_stream(ctx, file, tail):
|
||||
for tweet in utils.read_file(file, tail=tail):
|
||||
try:
|
||||
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
|
||||
except (KeyError, TypeError):
|
||||
print('Raw tweet: {}'.format(tweet))
|
||||
|
||||
@stream.command('tags')
|
||||
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
|
||||
@click.argument('limit', required=False, default=None, type=int)
|
||||
@click.pass_context
|
||||
def tags_stream(ctx, file, limit):
|
||||
c = utils.get_hashtags(utils.read_file(file))
|
||||
for count, tag in c.most_common(limit):
|
||||
print(u'{} - {}'.format(count, tag))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
@@ -9,6 +9,12 @@ logger = logging.getLogger(__name__)
|
||||
from twitter import *
|
||||
from collections import OrderedDict
|
||||
from threading import Lock
|
||||
from itertools import islice
|
||||
try:
|
||||
import itertools.ifilter as filter
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
from . import utils
|
||||
from . import config
|
||||
|
||||
@@ -37,13 +43,50 @@ class AttrToFunc(object):
|
||||
# kwargs[i] = a
|
||||
return self.handler(self.__uriparts, *args, **kwargs)
|
||||
|
||||
|
||||
class FromCredentialsMixin(object):
|
||||
|
||||
@classmethod
|
||||
def from_credentials(cls, cred_file=None, max_workers=None):
|
||||
wq = cls()
|
||||
|
||||
for cred in islice(utils.get_credentials(cred_file), max_workers):
|
||||
wq.ready(cls.worker_class(cred["user"], cred))
|
||||
return wq
|
||||
|
||||
|
||||
class TwitterWorker(object):
|
||||
def __init__(self, name, client):
|
||||
api_class = None
|
||||
|
||||
def __init__(self, name, creds):
|
||||
self.name = name
|
||||
self.client = client
|
||||
self._client = None
|
||||
self.cred = creds
|
||||
self._lock = Lock()
|
||||
self.busy = False
|
||||
self.limits = self.client.application.rate_limit_status()
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
if not self._client:
|
||||
auth=OAuth(self.cred['token_key'],
|
||||
self.cred['token_secret'],
|
||||
self.cred['consumer_key'],
|
||||
self.cred['consumer_secret'])
|
||||
self._client = self.api_class(auth=auth)
|
||||
return self._client
|
||||
|
||||
class RestWorker(TwitterWorker):
|
||||
api_class = Twitter
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(RestWorker, self).__init__(*args, **kwargs)
|
||||
self._limits = None
|
||||
|
||||
@property
|
||||
def limits(self):
|
||||
if not self._limits:
|
||||
self._limits = self.client.application.rate_limit_status()
|
||||
return self._limits
|
||||
|
||||
def is_limited(self, uriparts):
|
||||
return self.get_wait(uriparts)>0
|
||||
@@ -58,7 +101,10 @@ class TwitterWorker(object):
|
||||
|
||||
def get_limit(self, uriparts):
|
||||
uri = '/'+'/'.join(uriparts)
|
||||
return self.limits.get('resources', {}).get(uriparts[0], {}).get(uri, {})
|
||||
for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items():
|
||||
if ix.startswith(uri):
|
||||
return i
|
||||
return {}
|
||||
|
||||
def set_limit(self, uriparts, value):
|
||||
uri = '/'+'/'.join(uriparts)
|
||||
@@ -83,10 +129,10 @@ class TwitterWorker(object):
|
||||
|
||||
|
||||
|
||||
class TwitterQueueException(BaseException):
|
||||
class QueueException(BaseException):
|
||||
pass
|
||||
|
||||
class TwitterQueue(AttrToFunc):
|
||||
class QueueMixin(AttrToFunc, FromCredentialsMixin):
|
||||
def __init__(self, wait=True):
|
||||
logger.debug('Creating worker queue')
|
||||
self.queue = set()
|
||||
@@ -97,6 +143,10 @@ class TwitterQueue(AttrToFunc):
|
||||
def ready(self, worker):
|
||||
self.queue.add(worker)
|
||||
|
||||
class TwitterQueue(QueueMixin):
|
||||
|
||||
worker_class = RestWorker
|
||||
|
||||
def handle_call(self, uriparts, *args, **kwargs):
|
||||
logger.debug('Called: {}'.format(uriparts))
|
||||
logger.debug('With: {} {}'.format(args, kwargs))
|
||||
@@ -132,23 +182,14 @@ class TwitterQueue(AttrToFunc):
|
||||
if not self.wait:
|
||||
patience -= 1
|
||||
|
||||
@classmethod
|
||||
def from_credentials(self, cred_file=None):
|
||||
wq = TwitterQueue()
|
||||
|
||||
for cred in utils.get_credentials(cred_file):
|
||||
c = Twitter(auth=OAuth(cred['token_key'],
|
||||
cred['token_secret'],
|
||||
cred['consumer_key'],
|
||||
cred['consumer_secret']))
|
||||
wq.ready(TwitterWorker(cred["user"], c))
|
||||
return wq
|
||||
|
||||
|
||||
def get_wait(self, uriparts):
|
||||
# Stop as soon as one is available to avoid initiating the rest
|
||||
for i in self.queue:
|
||||
if not i.busy and i.get_wait(uriparts) == 0:
|
||||
return 0
|
||||
# If None is available, let's see how much we have to wait
|
||||
available = filter(lambda x: not x.busy, self.queue)
|
||||
first_worker = min(available, key=lambda x: x.get_wait(uriparts))
|
||||
diff = first_worker.get_wait(uriparts)
|
||||
diff = min(worker.get_wait(uriparts) for worker in self.queue if not worker.busy)
|
||||
return diff
|
||||
|
||||
|
||||
@@ -159,7 +200,7 @@ class TwitterQueue(AttrToFunc):
|
||||
for worker in s:
|
||||
if not worker.is_limited(uriparts) and not worker.busy:
|
||||
return worker
|
||||
raise TwitterQueueException('No worker is available')
|
||||
raise QueueException('No worker is available')
|
||||
|
||||
def next(self, uriparts):
|
||||
if not self.wait:
|
||||
@@ -167,7 +208,7 @@ class TwitterQueue(AttrToFunc):
|
||||
while True:
|
||||
try:
|
||||
return self._next(uriparts)
|
||||
except TwitterQueueException:
|
||||
except QueueException:
|
||||
available = filter(lambda x: not x.busy, self.queue)
|
||||
if available:
|
||||
diff = self.get_wait(uriparts)
|
||||
@@ -177,3 +218,44 @@ class TwitterQueue(AttrToFunc):
|
||||
logger.info("All workers are busy. Waiting %s seconds" % diff)
|
||||
time.sleep(diff)
|
||||
|
||||
class StreamWorker(TwitterWorker):
|
||||
api_class = TwitterStream
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(StreamWorker, self).__init__(*args, **kwargs)
|
||||
|
||||
class StreamQueue(QueueMixin):
|
||||
worker_class = StreamWorker
|
||||
|
||||
def __init__(self, wait=True):
|
||||
logger.debug('Creating worker queue')
|
||||
self.queue = set()
|
||||
self.index = 0
|
||||
self.wait = wait
|
||||
AttrToFunc.__init__(self, handler=self.handle_call)
|
||||
|
||||
def handle_call(self, uriparts, *args, **kwargs):
|
||||
logger.debug('Called: {}'.format(uriparts))
|
||||
logger.debug('With: {} {}'.format(args, kwargs))
|
||||
c = None
|
||||
c = self.next(uriparts)
|
||||
c._lock.acquire()
|
||||
c.busy = True
|
||||
logger.debug('Next: {}'.format(c.name))
|
||||
ping = time.time()
|
||||
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
|
||||
for i in resp:
|
||||
yield i
|
||||
pong = time.time()
|
||||
logger.debug('Listening for: {}'.format(pong-ping))
|
||||
c.busy = False
|
||||
c._lock.release()
|
||||
|
||||
def next(self, uriparts):
|
||||
logger.debug('Getting next available')
|
||||
s = list(self.queue)
|
||||
random.shuffle(s)
|
||||
for worker in s:
|
||||
if not worker.busy:
|
||||
return worker
|
||||
raise QueueException('No worker is available')
|
||||
|
||||
@@ -11,7 +11,8 @@ from multiprocessing.pool import ThreadPool
|
||||
|
||||
from itertools import islice
|
||||
from contextlib import contextmanager
|
||||
from itertools import zip_longest
|
||||
from future.moves.itertools import zip_longest
|
||||
from collections import Counter
|
||||
|
||||
from twitter import TwitterHTTPError
|
||||
|
||||
@@ -86,6 +87,26 @@ def add_credentials(credfile=None, **creds):
|
||||
f.write('\n')
|
||||
|
||||
|
||||
def get_hashtags(iter_tweets, best=None):
|
||||
c = Counter()
|
||||
for tweet in iter_tweets:
|
||||
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
|
||||
return c
|
||||
|
||||
def read_file(filename, tail=False):
|
||||
with open(filename) as f:
|
||||
while True:
|
||||
line = f.readline()
|
||||
if line not in (None, '', '\n'):
|
||||
tweet = json.loads(line.strip())
|
||||
yield tweet
|
||||
else:
|
||||
if tail:
|
||||
time.sleep(1)
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
|
||||
t = 'name' if by_name else 'uid'
|
||||
logger.debug('Getting users by {}: {}'.format(t, ulist))
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
sqlalchemy
|
||||
twitter
|
||||
click
|
||||
tqdm
|
||||
|
||||
11
setup.py
11
setup.py
@@ -43,5 +43,14 @@ setup(
|
||||
entry_points="""
|
||||
[console_scripts]
|
||||
bitter=bitter.cli:main
|
||||
"""
|
||||
""",
|
||||
classifiers=[
|
||||
'Development Status :: 4 - Beta',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: Science/Research',
|
||||
'License :: OSI Approved :: Apache 2 License',
|
||||
'Programming Language :: Python :: 2',
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3',
|
||||
]
|
||||
)
|
||||
|
||||
@@ -6,7 +6,7 @@ import datetime
|
||||
import time
|
||||
|
||||
from bitter import utils
|
||||
from bitter.crawlers import TwitterQueue, TwitterWorker, TwitterQueueException
|
||||
from bitter.crawlers import TwitterQueue, TwitterWorker, QueueException
|
||||
from bitter import config as c
|
||||
|
||||
class TestUtils(TestCase):
|
||||
@@ -64,12 +64,9 @@ class TestUtils(TestCase):
|
||||
try:
|
||||
# resp = self.wq.friends.list(screen_name='balkian')
|
||||
self.wq.next(['friends', 'list'])
|
||||
except TwitterQueueException:
|
||||
except QueueException:
|
||||
failed = True
|
||||
assert failed
|
||||
l2 = w1.get_limit(['friends', 'list'])
|
||||
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
|
||||
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)
|
||||
time.sleep(w1.get_wait(['friends', 'list']))
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user