From 30d46c4edc59d44f79ac07911be23f46eb0fb738 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=2EFernando=20S=C3=A1nchez?= Date: Fri, 15 Mar 2013 16:11:46 +0100 Subject: [PATCH] Clumsy threading --- myego.py | 226 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 124 insertions(+), 102 deletions(-) diff --git a/myego.py b/myego.py index 09b47c1..a1c8225 100644 --- a/myego.py +++ b/myego.py @@ -6,124 +6,146 @@ import math import signal import sys import argparse +import threading from twitter import Twitter, OAuth, TwitterHTTPError from httplib import IncompleteRead -parser = argparse.ArgumentParser(description='Get an ego network for a given ID.') +class Fetcher(threading.Thread): + def __init__(self, fname, credentials): + self.t = Twitter(auth=OAuth(credentials.ACCESS_TOKEN, + credentials.ACCESS_TOKEN_SECRET, + credentials.CONSUMER_KEY, + credentials.CONSUMER_SECRET)) + self.fname=fname + threading.Thread.__init__(self) -parser.add_argument('NAME', nargs='?', metavar='id', - default='me', type=str, help='Name of the user to be used as a center') + def run(self): + while True: + timetowait = 0 + try: + lock.acquire() + print "Iteration! for Fetcher %s" % self.fname + nextuser=None + minimumscore=1000 + unknowns = [person for person in pending if person not in userobject] + for i in xrange(0,int(math.ceil(len(unknowns)/100.0))): + piece = unknowns[i*100:(i+1)*100] + self.getinfo(piece) + for user in pending.copy(): + userscore = userobject[user]['followers_count']**distance[user] + if(userscore < minimumscore): + minimumscore = userscore + nextuser=user + self.explore_user(nextuser) + pending.remove(nextuser) + sh.sync() + timetowait = 5 + except TwitterHTTPError as ex: + print "Exception %s - %s" % (ex,type(ex)) + if ex.e.code/10 == 42: + print "Sleeping for 1 minute" + timetowait = 60 + except IncompleteRead: + print "IncompleteRead!" + timetowait = 5 + except Exception as ex: + print "Exception" + raise ex + finally: + lock.release() + if timetowait: + time.sleep(timetowait) -parser.add_argument('--new','-n',action='store_true', help='Start a new search.') + def getinfo(self,piece): + piecestr = ",".join(map(str,piece)) if type(piece) != str and len(piece)>1 else piece[0] + look=self.t.users.lookup(user_id=piecestr) + retrievedids=[] + for ob in look: + uid=ob['id'] + userobject[uid]=ob + retrievedids.append(uid) + print "Got info for %s" % repr(piece) + if type(piece) != int: + print "Total: %s" % len(piece) + print "Difference: %s" % [person for person in piece if person not in retrievedids] + else: + print "Total: 1" -args = parser.parse_args() + def explore_user(self,uid): + t = self.t + print "Exploring uid %s" % uid + newdist = distance[uid]+ 1 + try: + follos = t.followers.ids(user_id=uid)['ids'] + except TwitterHTTPError as ex: + print "Error code %s" % ex.e.code + if ex.e.code == 401: # Private Twitter + return + raise ex + followers[uid]=follos + for follo in follos: + if follo not in followers.keys(): + pending.add(follo) + distance[follo] = newdist -print args.new -print args.NAME +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Get an ego network for a given ID.') -sh = shelve.open('twits_shelf-%s.db' % args.NAME,writeback=True) -keys=['followers','distance','userobject'] + parser.add_argument('NAME', nargs='?', metavar='id', + default='me', type=str, help='Name of the user to be used as a center') -for key in keys: - if key not in sh or args.new: - sh[key]={} + parser.add_argument('--new','-n',action='store_true', help='Start a new search.') -followers = sh['followers'] -distance = sh['distance'] -pending = set() -userobject = sh['userobject'] + args = parser.parse_args() -t = Twitter(auth=OAuth(credentials.ACCESS_TOKEN, - credentials.ACCESS_TOKEN_SECRET, - credentials.CONSUMER_KEY, - credentials.CONSUMER_SECRET)) + print args.new + print args.NAME -if not args.new: - print 'Recovering state.' - for key in followers: - for follower in followers[key]: - if follower not in followers: - pending.add(follower) + sh = shelve.open('twits_shelf-%s.db' % args.NAME,writeback=True) + keys=['followers','distance','userobject'] -if args.new or len(pending)<1: - if args.NAME != 'me': - user = t.users.lookup(screen_name=args.NAME)[0] - uid = user['id'] - distance[uid] = 0 - pending.add(uid) - else: - creds = t.account.verify_credentials() - myid = creds['id'] - distance[myid]=0 - pending.add(myid) + for key in keys: + if key not in sh or args.new: + sh[key]={} -print 'Pending is now: %s' %pending + followers = sh['followers'] + distance = sh['distance'] + pending = set() + userobject = sh['userobject'] -def getinfo(piece): - piecestr = ",".join(map(str,piece)) if type(piece) != str and len(piece)>1 else piece[0] - look=t.users.lookup(user_id=piecestr) - retrievedids=[] - for ob in look: - uid=ob['id'] - userobject[uid]=ob - retrievedids.append(uid) - print "Got info for %s" % repr(piece) - if type(piece) != int: - print "Total: %s" % len(piece) - print "Difference: %s" % [person for person in piece if person not in retrievedids] - else: - print "Total: 1" + lock = threading.RLock() -def explore_user(t,uid): - print "Exploring uid %s" % uid - newdist = distance[uid]+ 1 - try: - follos = t.followers.ids(user_id=uid)['ids'] - except TwitterHTTPError as ex: - print "Error code %s" % ex.e.code - if ex.e.code == 401: # Private Twitter - return - raise ex - followers[uid]=follos - for follo in follos: - if follo not in followers.keys(): - pending.add(follo) - distance[follo] = newdist + t = Twitter(auth=OAuth(credentials.ACCESS_TOKEN, + credentials.ACCESS_TOKEN_SECRET, + credentials.CONSUMER_KEY, + credentials.CONSUMER_SECRET)) -def signal_handler(signal, frame): - print 'You pressed Ctrl+C!' - sh.close() - sys.exit(0) -signal.signal(signal.SIGINT, signal_handler) + if not args.new: + print 'Recovering state.' + for key in followers: + for follower in followers[key]: + if follower not in followers: + pending.add(follower) -while True: - try: - print "Iteration!" - nextuser=None - minimumscore=1000 - unknowns = [person for person in pending if person not in userobject] - for i in xrange(0,int(math.ceil(len(unknowns)/100.0))): - piece = unknowns[i*100:(i+1)*100] - getinfo(piece) - for user in pending.copy(): - userscore = userobject[user]['followers_count']**distance[user] - if(userscore < minimumscore): - minimumscore = userscore - nextuser=user - explore_user(t,nextuser) - pending.remove(nextuser) - sh.sync() - time.sleep(5) - except TwitterHTTPError as ex: - print "Exception %s - %s" % (ex,type(ex)) - if ex.e.code/10 == 42: - print "Sleeping for 1 minute" - time.sleep(60) - except IncompleteRead: - print "IncompleteRead!" - time.sleep(5) - except Exception as ex: - print "Exception" - sh.close() - raise ex + if args.new or len(pending)<1: + if args.NAME != 'me': + user = t.users.lookup(screen_name=args.NAME)[0] + uid = user['id'] + distance[uid] = 0 + pending.add(uid) + else: + creds = t.account.verify_credentials() + myid = creds['id'] + distance[myid]=0 + pending.add(myid) + + print 'Pending is now: %s' %pending + + def signal_handler(signal, frame): + print 'You pressed Ctrl+C!' + sh.close() + sys.exit(0) + signal.signal(signal.SIGINT, signal_handler) + f1 = Fetcher('f1',credentials) + f1.run()