1
0
mirror of https://github.com/balkian/TwitterDigger.git synced 2024-11-22 00:02:28 +00:00

Clumsy threading

This commit is contained in:
J.Fernando Sánchez 2013-03-15 16:11:46 +01:00
parent 36cd334b2e
commit 30d46c4edc

184
myego.py
View File

@ -6,62 +6,59 @@ import math
import signal import signal
import sys import sys
import argparse import argparse
import threading
from twitter import Twitter, OAuth, TwitterHTTPError from twitter import Twitter, OAuth, TwitterHTTPError
from httplib import IncompleteRead from httplib import IncompleteRead
parser = argparse.ArgumentParser(description='Get an ego network for a given ID.') class Fetcher(threading.Thread):
def __init__(self, fname, credentials):
parser.add_argument('NAME', nargs='?', metavar='id', self.t = Twitter(auth=OAuth(credentials.ACCESS_TOKEN,
default='me', type=str, help='Name of the user to be used as a center')
parser.add_argument('--new','-n',action='store_true', help='Start a new search.')
args = parser.parse_args()
print args.new
print args.NAME
sh = shelve.open('twits_shelf-%s.db' % args.NAME,writeback=True)
keys=['followers','distance','userobject']
for key in keys:
if key not in sh or args.new:
sh[key]={}
followers = sh['followers']
distance = sh['distance']
pending = set()
userobject = sh['userobject']
t = Twitter(auth=OAuth(credentials.ACCESS_TOKEN,
credentials.ACCESS_TOKEN_SECRET, credentials.ACCESS_TOKEN_SECRET,
credentials.CONSUMER_KEY, credentials.CONSUMER_KEY,
credentials.CONSUMER_SECRET)) credentials.CONSUMER_SECRET))
self.fname=fname
threading.Thread.__init__(self)
if not args.new: def run(self):
print 'Recovering state.' while True:
for key in followers: timetowait = 0
for follower in followers[key]: try:
if follower not in followers: lock.acquire()
pending.add(follower) print "Iteration! for Fetcher %s" % self.fname
nextuser=None
minimumscore=1000
unknowns = [person for person in pending if person not in userobject]
for i in xrange(0,int(math.ceil(len(unknowns)/100.0))):
piece = unknowns[i*100:(i+1)*100]
self.getinfo(piece)
for user in pending.copy():
userscore = userobject[user]['followers_count']**distance[user]
if(userscore < minimumscore):
minimumscore = userscore
nextuser=user
self.explore_user(nextuser)
pending.remove(nextuser)
sh.sync()
timetowait = 5
except TwitterHTTPError as ex:
print "Exception %s - %s" % (ex,type(ex))
if ex.e.code/10 == 42:
print "Sleeping for 1 minute"
timetowait = 60
except IncompleteRead:
print "IncompleteRead!"
timetowait = 5
except Exception as ex:
print "Exception"
raise ex
finally:
lock.release()
if timetowait:
time.sleep(timetowait)
if args.new or len(pending)<1: def getinfo(self,piece):
if args.NAME != 'me':
user = t.users.lookup(screen_name=args.NAME)[0]
uid = user['id']
distance[uid] = 0
pending.add(uid)
else:
creds = t.account.verify_credentials()
myid = creds['id']
distance[myid]=0
pending.add(myid)
print 'Pending is now: %s' %pending
def getinfo(piece):
piecestr = ",".join(map(str,piece)) if type(piece) != str and len(piece)>1 else piece[0] piecestr = ",".join(map(str,piece)) if type(piece) != str and len(piece)>1 else piece[0]
look=t.users.lookup(user_id=piecestr) look=self.t.users.lookup(user_id=piecestr)
retrievedids=[] retrievedids=[]
for ob in look: for ob in look:
uid=ob['id'] uid=ob['id']
@ -74,7 +71,8 @@ def getinfo(piece):
else: else:
print "Total: 1" print "Total: 1"
def explore_user(t,uid): def explore_user(self,uid):
t = self.t
print "Exploring uid %s" % uid print "Exploring uid %s" % uid
newdist = distance[uid]+ 1 newdist = distance[uid]+ 1
try: try:
@ -90,40 +88,64 @@ def explore_user(t,uid):
pending.add(follo) pending.add(follo)
distance[follo] = newdist distance[follo] = newdist
def signal_handler(signal, frame): if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Get an ego network for a given ID.')
parser.add_argument('NAME', nargs='?', metavar='id',
default='me', type=str, help='Name of the user to be used as a center')
parser.add_argument('--new','-n',action='store_true', help='Start a new search.')
args = parser.parse_args()
print args.new
print args.NAME
sh = shelve.open('twits_shelf-%s.db' % args.NAME,writeback=True)
keys=['followers','distance','userobject']
for key in keys:
if key not in sh or args.new:
sh[key]={}
followers = sh['followers']
distance = sh['distance']
pending = set()
userobject = sh['userobject']
lock = threading.RLock()
t = Twitter(auth=OAuth(credentials.ACCESS_TOKEN,
credentials.ACCESS_TOKEN_SECRET,
credentials.CONSUMER_KEY,
credentials.CONSUMER_SECRET))
if not args.new:
print 'Recovering state.'
for key in followers:
for follower in followers[key]:
if follower not in followers:
pending.add(follower)
if args.new or len(pending)<1:
if args.NAME != 'me':
user = t.users.lookup(screen_name=args.NAME)[0]
uid = user['id']
distance[uid] = 0
pending.add(uid)
else:
creds = t.account.verify_credentials()
myid = creds['id']
distance[myid]=0
pending.add(myid)
print 'Pending is now: %s' %pending
def signal_handler(signal, frame):
print 'You pressed Ctrl+C!' print 'You pressed Ctrl+C!'
sh.close() sh.close()
sys.exit(0) sys.exit(0)
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
f1 = Fetcher('f1',credentials)
while True: f1.run()
try:
print "Iteration!"
nextuser=None
minimumscore=1000
unknowns = [person for person in pending if person not in userobject]
for i in xrange(0,int(math.ceil(len(unknowns)/100.0))):
piece = unknowns[i*100:(i+1)*100]
getinfo(piece)
for user in pending.copy():
userscore = userobject[user]['followers_count']**distance[user]
if(userscore < minimumscore):
minimumscore = userscore
nextuser=user
explore_user(t,nextuser)
pending.remove(nextuser)
sh.sync()
time.sleep(5)
except TwitterHTTPError as ex:
print "Exception %s - %s" % (ex,type(ex))
if ex.e.code/10 == 42:
print "Sleeping for 1 minute"
time.sleep(60)
except IncompleteRead:
print "IncompleteRead!"
time.sleep(5)
except Exception as ex:
print "Exception"
sh.close()
raise ex