mirror of
https://github.com/balkian/bitter.git
synced 2024-12-22 08:28:12 +00:00
Py2 compatibility and queue handling
* Removed install_aliases(), which caused problems with urllib2 * Better waiting time calculation (used in queue handling)
This commit is contained in:
parent
67ef307cce
commit
e3a78968da
@ -1 +1 @@
|
|||||||
0.6.6
|
0.7.0
|
||||||
|
@ -3,13 +3,6 @@ Bitter module. A library and cli for Twitter using python-twitter.
|
|||||||
http://github.com/balkian/bitter
|
http://github.com/balkian/bitter
|
||||||
"""
|
"""
|
||||||
|
|
||||||
try:
|
|
||||||
from future.standard_library import install_aliases
|
|
||||||
install_aliases()
|
|
||||||
except ImportError:
|
|
||||||
# Avoid problems at setup.py and py3.x
|
|
||||||
pass
|
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from .version import __version__
|
from .version import __version__
|
||||||
|
@ -391,8 +391,9 @@ def stream(ctx):
|
|||||||
@click.option('-l', '--locations', default=None)
|
@click.option('-l', '--locations', default=None)
|
||||||
@click.option('-t', '--track', default=None)
|
@click.option('-t', '--track', default=None)
|
||||||
@click.option('-f', '--file', help='File to store the stream of tweets')
|
@click.option('-f', '--file', help='File to store the stream of tweets')
|
||||||
|
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
def get_stream(ctx, locations, track, file):
|
def get_stream(ctx, locations, track, file, politelyretry):
|
||||||
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
|
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
|
||||||
|
|
||||||
query_args = {}
|
query_args = {}
|
||||||
@ -400,17 +401,28 @@ def get_stream(ctx, locations, track, file):
|
|||||||
query_args['locations'] = locations
|
query_args['locations'] = locations
|
||||||
if track:
|
if track:
|
||||||
query_args['track'] = track
|
query_args['track'] = track
|
||||||
if not query_args:
|
|
||||||
iterator = wq.statuses.sample()
|
|
||||||
else:
|
|
||||||
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
|
|
||||||
|
|
||||||
if not file:
|
if not file:
|
||||||
file = sys.stdout
|
file = sys.stdout
|
||||||
else:
|
else:
|
||||||
file = open(file, 'a')
|
file = open(file, 'a')
|
||||||
|
|
||||||
for tweet in tqdm(iterator):
|
def insist():
|
||||||
|
lasthangup = time.time()
|
||||||
|
while True:
|
||||||
|
if not query_args:
|
||||||
|
iterator = wq.statuses.sample()
|
||||||
|
else:
|
||||||
|
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
|
||||||
|
for i in iterator:
|
||||||
|
yield i
|
||||||
|
if not politelyretry:
|
||||||
|
return
|
||||||
|
thishangup = time.time()
|
||||||
|
if thishangup - lasthangup < 60:
|
||||||
|
raise Exception('Too many hangups in a row.')
|
||||||
|
time.sleep(3)
|
||||||
|
|
||||||
|
for tweet in tqdm(insist()):
|
||||||
print(json.dumps(tweet), file=file)
|
print(json.dumps(tweet), file=file)
|
||||||
if file != sys.stdout:
|
if file != sys.stdout:
|
||||||
file.close()
|
file.close()
|
||||||
|
@ -10,6 +10,11 @@ from twitter import *
|
|||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
from itertools import islice
|
from itertools import islice
|
||||||
|
try:
|
||||||
|
import itertools.ifilter as filter
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
from . import utils
|
from . import utils
|
||||||
from . import config
|
from . import config
|
||||||
|
|
||||||
@ -178,9 +183,13 @@ class TwitterQueue(QueueMixin):
|
|||||||
patience -= 1
|
patience -= 1
|
||||||
|
|
||||||
def get_wait(self, uriparts):
|
def get_wait(self, uriparts):
|
||||||
available = next(lambda x: not x.busy, self.queue)
|
# Stop as soon as one is available to avoid initiating the rest
|
||||||
first_worker = min(available, key=lambda x: x.get_wait(uriparts))
|
for i in self.queue:
|
||||||
diff = first_worker.get_wait(uriparts)
|
if not i.busy and i.get_wait(uriparts) == 0:
|
||||||
|
return 0
|
||||||
|
# If None is available, let's see how much we have to wait
|
||||||
|
available = filter(lambda x: not x.busy, self.queue)
|
||||||
|
diff = min(worker.get_wait(uriparts) for worker in self.queue if not worker.busy)
|
||||||
return diff
|
return diff
|
||||||
|
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ from multiprocessing.pool import ThreadPool
|
|||||||
|
|
||||||
from itertools import islice
|
from itertools import islice
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from itertools import zip_longest
|
from future.moves.itertools import zip_longest
|
||||||
from collections import Counter
|
from collections import Counter
|
||||||
|
|
||||||
from twitter import TwitterHTTPError
|
from twitter import TwitterHTTPError
|
||||||
|
11
setup.py
11
setup.py
@ -43,5 +43,14 @@ setup(
|
|||||||
entry_points="""
|
entry_points="""
|
||||||
[console_scripts]
|
[console_scripts]
|
||||||
bitter=bitter.cli:main
|
bitter=bitter.cli:main
|
||||||
"""
|
""",
|
||||||
|
classifiers=[
|
||||||
|
'Development Status :: 4 - Beta',
|
||||||
|
'Intended Audience :: Developers',
|
||||||
|
'Intended Audience :: Science/Research',
|
||||||
|
'License :: OSI Approved :: Apache Software License',
|
||||||
|
'Programming Language :: Python :: 2',
|
||||||
|
'Programming Language :: Python :: 2.7',
|
||||||
|
'Programming Language :: Python :: 3',
|
||||||
|
]
|
||||||
)
|
)
|
||||||
|
@ -6,7 +6,7 @@ import datetime
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from bitter import utils
|
from bitter import utils
|
||||||
from bitter.crawlers import TwitterQueue, TwitterWorker, TwitterQueueException
|
from bitter.crawlers import TwitterQueue, TwitterWorker, QueueException
|
||||||
from bitter import config as c
|
from bitter import config as c
|
||||||
|
|
||||||
class TestUtils(TestCase):
|
class TestUtils(TestCase):
|
||||||
@ -64,12 +64,9 @@ class TestUtils(TestCase):
|
|||||||
try:
|
try:
|
||||||
# resp = self.wq.friends.list(screen_name='balkian')
|
# resp = self.wq.friends.list(screen_name='balkian')
|
||||||
self.wq.next(['friends', 'list'])
|
self.wq.next(['friends', 'list'])
|
||||||
except TwitterQueueException:
|
except QueueException:
|
||||||
failed = True
|
failed = True
|
||||||
assert failed
|
assert failed
|
||||||
l2 = w1.get_limit(['friends', 'list'])
|
l2 = w1.get_limit(['friends', 'list'])
|
||||||
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
|
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
|
||||||
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)
|
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)
|
||||||
time.sleep(w1.get_wait(['friends', 'list']))
|
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user