mirror of
				https://github.com/balkian/bitter.git
				synced 2025-10-25 20:58:24 +00:00 
			
		
		
		
	Compare commits
	
		
			7 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | ea848f1a78 | ||
|  | 030c41b826 | ||
|  | bba73091e4 | ||
|  | 80b58541e7 | ||
|  | 40a8b45231 | ||
|  | fadeced761 | ||
|  | bdb844d75f | 
| @@ -2,6 +2,6 @@ | |||||||
| From python:2.7-onbuild | From python:2.7-onbuild | ||||||
| Maintainer J. Fernando Sánchez @balkian | Maintainer J. Fernando Sánchez @balkian | ||||||
|  |  | ||||||
| RUN pip install -e "/usr/src/app/[server]" | RUN pip install ".[server]" | ||||||
|  |  | ||||||
| ENTRYPOINT ["bitter"] | ENTRYPOINT ["bitter"] | ||||||
|   | |||||||
							
								
								
									
										2
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								Makefile
									
									
									
									
									
								
							| @@ -1,4 +1,4 @@ | |||||||
| PYVERSIONS=3.4 2.7 | PYVERSIONS=3.5 | ||||||
| PYMAIN=$(firstword $(PYVERSIONS)) | PYMAIN=$(firstword $(PYVERSIONS)) | ||||||
| NAME=bitter | NAME=bitter | ||||||
| REPO=balkian | REPO=balkian | ||||||
|   | |||||||
| @@ -1 +1 @@ | |||||||
| 0.9.5 | 0.10.3 | ||||||
|   | |||||||
| @@ -6,10 +6,12 @@ http://github.com/balkian/bitter | |||||||
| import os | import os | ||||||
|  |  | ||||||
| from .version import __version__ | from .version import __version__ | ||||||
|  | from . import config as bconf | ||||||
|  |  | ||||||
| def easy(*args, **kwargs): | def easy(conffile=bconf.CONFIG_FILE): | ||||||
|     from .crawlers import TwitterQueue |     from .crawlers import TwitterQueue | ||||||
|     return TwitterQueue.from_credentials(*args, **kwargs) |  | ||||||
|  |     return TwitterQueue.from_config(conffile=conffile) | ||||||
|  |  | ||||||
| __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] | __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										172
									
								
								bitter/cli.py
									
									
									
									
									
								
							
							
						
						
									
										172
									
								
								bitter/cli.py
									
									
									
									
									
								
							| @@ -26,6 +26,32 @@ else: | |||||||
|  |  | ||||||
| logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  | def serialize(function): | ||||||
|  |     '''Common options to serialize output to CSV or other formats''' | ||||||
|  |  | ||||||
|  |     @click.option('--fields', help='Provide a list of comma-separated fields to print.', default='', type=str) | ||||||
|  |     @click.option('--ignore_missing', help='Do not show warnings for missing fields.', is_flag=True) | ||||||
|  |     @click.option('--header', help='Header that will be printed at the beginning of the file', default=None) | ||||||
|  |     @click.option('--csv', help='Print each object as a csv row.', is_flag=True) | ||||||
|  |     @click.option('--jsonlines', '--json', help='Print each object as JSON in a new line.', is_flag=True) | ||||||
|  |     @click.option('--indented', help='Print each object as an indented JSON object', is_flag=True) | ||||||
|  |     @click.option('--outdelimiter', help='Delimiter for some output formats, such as CSV. It defaults to \t', default='\t') | ||||||
|  |     @click.option('--outfile', help='Output file. It defaults to STDOUT', default=sys.stdout) | ||||||
|  |     def decorated(fields, ignore_missing, header, csv, jsonlines, indented, outfile, outdelimiter, **kwargs): | ||||||
|  |         it = function(**kwargs) | ||||||
|  |         outformat = 'json' | ||||||
|  |         if csv: | ||||||
|  |             outformat = 'csv' | ||||||
|  |         elif jsonlines: | ||||||
|  |             outformat = 'jsonlines' | ||||||
|  |         elif indented: | ||||||
|  |             outformat = 'indented' | ||||||
|  |  | ||||||
|  |         return utils.serialized(it, outfile, outformat=outformat, fields=fields.split(','), ignore_missing=ignore_missing, header=header, delimiter=outdelimiter) | ||||||
|  |  | ||||||
|  |     return decorated | ||||||
|  |  | ||||||
|  |  | ||||||
| @click.group() | @click.group() | ||||||
| @click.option("--verbose", is_flag=True) | @click.option("--verbose", is_flag=True) | ||||||
| @click.option("--logging_level", required=False, default='WARN') | @click.option("--logging_level", required=False, default='WARN') | ||||||
| @@ -45,6 +71,8 @@ def main(ctx, verbose, logging_level, config, credentials): | |||||||
| @main.group(invoke_without_command=True) | @main.group(invoke_without_command=True) | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def credentials(ctx): | def credentials(ctx): | ||||||
|  |     if ctx.invoked_subcommand is not None: | ||||||
|  |         return | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||||
|     for worker in wq.queue: |     for worker in wq.queue: | ||||||
|         print('#'*20) |         print('#'*20) | ||||||
| @@ -56,44 +84,47 @@ def credentials(ctx): | |||||||
|  |  | ||||||
|  |  | ||||||
| @credentials.command('limits') | @credentials.command('limits') | ||||||
| @click.option('--all', type=bool, default=False, required=False, | @click.option('--no_aggregate', is_flag=True, default=False, | ||||||
|               help=('Print all limits. By default, it only limits that ' |               help=('Print limits from all workers. By default, limits are ' | ||||||
|  |                     'aggregated (summed).')) | ||||||
|  | @click.option('--no_diff', is_flag=True, default=False, | ||||||
|  |               help=('Print all limits. By default, only limits that ' | ||||||
|                     'have been consumed will be shown.')) |                     'have been consumed will be shown.')) | ||||||
| @click.argument('url', required=False) | @click.argument('url', required=False) | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def get_limits(ctx, all, url): | def get_limits(ctx, no_aggregate, no_diff, url): | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||||
|     total = {} |     limits = {} | ||||||
|  |     if url: | ||||||
|  |         print('URL is: {}'.format(url)) | ||||||
|     for worker in wq.queue: |     for worker in wq.queue: | ||||||
|         resp = worker.client.application.rate_limit_status() |         resp = worker.client.application.rate_limit_status() | ||||||
|         print('#'*20) |         for urlimits in resp['resources'].values(): | ||||||
|         print(worker.name) |             for url, value in urlimits.items(): | ||||||
|         if url: |                 if url not in limits: | ||||||
|             limit = 'NOT FOUND' |                     limits[url] = {} | ||||||
|             print('URL is: {}'.format(url)) |                 glob = limits[url].get('global', {}) | ||||||
|             cat = url.split('/')[1] |                 limits[url][worker.name] = value | ||||||
|             if cat in resp['resources']: |                 for k in ['limit', 'remaining']: | ||||||
|                 limit = resp['resources'][cat].get(url, None) or resp['resources'][cat] |                     if k not in glob: | ||||||
|             else: |                         glob[k] = 0 | ||||||
|                 print('Cat {} not found'.format(cat)) |                     glob[k] += value[k] | ||||||
|                 continue |                 limits[url]['global'] = glob | ||||||
|             for k in limit: |     for url, lims in limits.items(): | ||||||
|                 total[k] = total.get(k, 0) + limit[k] |         worker_list = lims.keys() if no_aggregate else ['global', ]  | ||||||
|             print('{}: {}'.format(url, limit)) |  | ||||||
|             continue |         url_printed = False | ||||||
|         nres = {} |  | ||||||
|         if not all: |         for worker in worker_list: | ||||||
|             for res, urls in resp['resources'].items(): |             vals = lims[worker] | ||||||
|                 nurls = {} |             consumed = vals['limit'] - vals['remaining']  | ||||||
|                 for u, limits in urls.items(): |             if no_diff or consumed: | ||||||
|                     if limits['limit'] != limits['remaining']: |                 if not url_printed: | ||||||
|                         nurls[u] = limits |                     print(url) | ||||||
|                 if nurls: |                     url_printed = True | ||||||
|                     nres[res] = nurls |                 print('\t', worker, ':') | ||||||
|             resp = nres |                 print('\t\t', vals) | ||||||
|         print(json.dumps(resp, indent=2)) |  | ||||||
|     if url: |  | ||||||
|         print('Total for {}: {}'.format(url, total)) |  | ||||||
|  |  | ||||||
| @credentials.command('add') | @credentials.command('add') | ||||||
| @click.option('--consumer_key', default=None) | @click.option('--consumer_key', default=None) | ||||||
| @@ -121,27 +152,30 @@ def tweet(ctx): | |||||||
|     pass |     pass | ||||||
|  |  | ||||||
| @tweet.command('get') | @tweet.command('get') | ||||||
| @click.option('-w', '--write', is_flag=True, default=False) | @click.option('-d', '--dry_run', is_flag=True, default=False) | ||||||
| @click.option('-f', '--folder', default="tweets") | @click.option('-f', '--folder', default="tweets") | ||||||
| @click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False) | @click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False) | ||||||
| @click.argument('tweetid') | @click.argument('tweetid') | ||||||
| def get_tweet(tweetid, write, folder, update): | @serialize | ||||||
|  | def get_tweet(tweetid, dry_run, folder, update): | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||||
|     utils.download_tweet(wq, tweetid, write, folder, update) |     yield from utils.download_tweet(wq, tweetid, not dry_run, folder, update) | ||||||
|  |  | ||||||
| @tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file. | @tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file. | ||||||
| The result is stored as individual json files in your folder of choice.''') | The result is stored as individual json files in your folder of choice.''') | ||||||
| @click.argument('tweetsfile', 'File with a list of tweets to look up') | @click.argument('tweetsfile') | ||||||
| @click.option('-f', '--folder', default="tweets") | @click.option('-f', '--folder', default="tweets") | ||||||
| @click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!') | @click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!') | ||||||
| @click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads') | @click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads') | ||||||
| @click.option('-d', '--delimiter', default=",") | @click.option('-d', '--delimiter', default=",") | ||||||
| @click.option('-h', '--header', help='Discard the first line (use it as a header)', | @click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results') | ||||||
|               is_flag=True, default=False) | @click.option('--skip', help='Discard the first DISCARD lines (use them as a header)', default=0) | ||||||
|  | @click.option('--commentchar', help='Lines starting with this character will be ignored', default=None) | ||||||
| @click.option('-q', '--quotechar', default='"') | @click.option('-q', '--quotechar', default='"') | ||||||
| @click.option('-c', '--column', type=int, default=0) | @click.option('-c', '--column', type=int, default=0) | ||||||
|  | @serialize | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotechar, column): | def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, nocache, skip, quotechar, commentchar, column): | ||||||
|     if update and not click.confirm('This may overwrite existing tweets. Continue?'): |     if update and not click.confirm('This may overwrite existing tweets. Continue?'): | ||||||
|         click.echo('Cancelling') |         click.echo('Cancelling') | ||||||
|         return |         return | ||||||
| @@ -149,22 +183,24 @@ def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotec | |||||||
|  |  | ||||||
|     status = tqdm('Queried') |     status = tqdm('Queried') | ||||||
|     failed = 0 |     failed = 0 | ||||||
|     for tid, obj in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter, |     for tid, obj in utils.download_tweets_file(wq, tweetsfile, folder, delimiter=delimiter, cache=not nocache, | ||||||
|                                         batch_method=utils.tweet_download_batch, |                                                skip=skip, quotechar=quotechar, commentchar=commentchar, | ||||||
|                                         header=header, quotechar=quotechar, |                                                column=column, update=update, retry_failed=retry): | ||||||
|                                         column=column, update=update, retry_failed=retry): |  | ||||||
|         status.update(1) |         status.update(1) | ||||||
|         if not obj: |         if not obj: | ||||||
|             failed += 1 |             failed += 1 | ||||||
|             status.set_description('Failed: %s. Queried' % failed, refresh=True) |             status.set_description('Failed: %s. Queried' % failed, refresh=True) | ||||||
|  |             continue | ||||||
|  |         yield obj | ||||||
|  |  | ||||||
|  |  | ||||||
| @tweet.command('search') | @tweet.command('search') | ||||||
| @click.argument('query') | @click.argument('query') | ||||||
|  | @serialize | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def search(ctx, query): | def search(ctx, query): | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||||
|     t = utils.search_tweet(wq, query) |     yield from utils.search_tweet(wq, query) | ||||||
|     print(json.dumps(t, indent=2)) |  | ||||||
|  |  | ||||||
| @tweet.command('timeline') | @tweet.command('timeline') | ||||||
| @click.argument('user') | @click.argument('user') | ||||||
| @@ -192,57 +228,48 @@ def list_users(ctx, db): | |||||||
|  |  | ||||||
| @users.command('get') | @users.command('get') | ||||||
| @click.argument('user') | @click.argument('user') | ||||||
| @click.option('-w', '--write', is_flag=True, default=False) | @click.option('-d', '--dry_run', is_flag=True, default=False) | ||||||
| @click.option('-f', '--folder', default="users") | @click.option('-f', '--folder', default="users") | ||||||
| @click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False) | @click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False) | ||||||
| def get_user(user, write, folder, update): | @serialize | ||||||
|  | def get_user(user, dry_run, folder, update): | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||||
|     if not write: |     yield from utils.download_user(wq, user, not dry_run, folder, update) | ||||||
|         u = utils.get_user(wq, user) |  | ||||||
|         js = json.dumps(u, indent=2) |  | ||||||
|         print(js) |  | ||||||
|         return |  | ||||||
|     if not os.path.exists(folder): |  | ||||||
|         os.makedirs(folder) |  | ||||||
|     file = os.path.join(folder, '%s.json' % user) |  | ||||||
|     if not update and os.path.exists(file) and os.path.isfile(file): |  | ||||||
|         print('User exists: %s' % user) |  | ||||||
|         return |  | ||||||
|     with open(file, 'w') as f: |  | ||||||
|         u = utils.get_user(wq, user) |  | ||||||
|         js = json.dumps(u, indent=2) |  | ||||||
|         print(js, file=f) |  | ||||||
|  |  | ||||||
| @users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file. | @users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file. | ||||||
|                The result is stored as individual json files in your folder of choice.''') |                The result is stored as individual json files in your folder of choice.''') | ||||||
| @click.argument('usersfile', 'File with a list of users to look up') | @click.argument('usersfile') | ||||||
| @click.option('-f', '--folder', default="users") | @click.option('-f', '--folder', default="users") | ||||||
| @click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!') | @click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!') | ||||||
| @click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads') | @click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads') | ||||||
|  | @click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results') | ||||||
| @click.option('-d', '--delimiter', default=",") | @click.option('-d', '--delimiter', default=",") | ||||||
| @click.option('-h', '--header', help='Discard the first line (use it as a header)', | @click.option('--skip', help='Discard the first SKIP lines (e.g., use them as a header)', | ||||||
|               is_flag=True, default=False) |               is_flag=True, default=False) | ||||||
| @click.option('-q', '--quotechar', default='"') | @click.option('-q', '--quotechar', default='"') | ||||||
|  | @click.option('--commentchar', help='Lines starting with this character will be ignored', default=None) | ||||||
| @click.option('-c', '--column', type=int, default=0) | @click.option('-c', '--column', type=int, default=0) | ||||||
|  | @serialize | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def get_users(ctx, usersfile, folder, update, retry, delimiter, header, quotechar, column): | def get_users(ctx, usersfile, folder, update, retry, nocache, delimiter, skip, quotechar, commentchar, column): | ||||||
|     if update and not click.confirm('This may overwrite existing users. Continue?'): |     if update and not click.confirm('This may overwrite existing users. Continue?'): | ||||||
|         click.echo('Cancelling') |         click.echo('Cancelling') | ||||||
|         return |         return | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||||
|     for i in utils.download_file(wq, usersfile, folder, delimiter=delimiter, |     for i in utils.download_users_file(wq, usersfile, folder, delimiter=delimiter, | ||||||
|                                  batch_method=utils.user_download_batch, |                                        update=update, retry_failed=retry, | ||||||
|                                  update=update, retry_failed=retry, |                                        skip=skip, quotechar=quotechar, | ||||||
|                                  header=header, quotechar=quotechar, |                                        cache=not nocache, | ||||||
|                                  column=column): |                                        commentchar=commentchar, | ||||||
|         pass |                                        column=column): | ||||||
|  |         yield i | ||||||
|  |  | ||||||
| @users.command('crawl') | @users.command('crawl') | ||||||
| @click.option('--db', required=True, help='Database to save all users.') | @click.option('--db', required=True, help='Database to save all users.') | ||||||
| @click.option('--skip', required=False, default=0, help='Skip N lines from the file.') | @click.option('--skip', required=False, default=0, help='Skip N lines from the file.') | ||||||
| @click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.') | @click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.') | ||||||
| @click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.') | @click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.') | ||||||
| @click.argument('usersfile', 'File with a list of users to look up') | @click.argument('usersfile') | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def crawl_users(ctx, usersfile, skip, until, threads, db): | def crawl_users(ctx, usersfile, skip, until, threads, db): | ||||||
|     global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock |     global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock | ||||||
| @@ -432,7 +459,6 @@ def extract(ctx, recursive, user, name, initfile): | |||||||
| @extractor.command('reset') | @extractor.command('reset') | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def reset_extractor(ctx): | def reset_extractor(ctx): | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |  | ||||||
|     db = ctx.obj['DBURI'] |     db = ctx.obj['DBURI'] | ||||||
|     session = make_session(db) |     session = make_session(db) | ||||||
|     session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) |     session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) | ||||||
|   | |||||||
							
								
								
									
										128
									
								
								bitter/utils.py
									
									
									
									
									
								
							
							
						
						
									
										128
									
								
								bitter/utils.py
									
									
									
									
									
								
							| @@ -19,7 +19,9 @@ import queue | |||||||
| import threading | import threading | ||||||
| from select import select | from select import select | ||||||
|  |  | ||||||
| from functools import partial | import operator | ||||||
|  |  | ||||||
|  | from functools import partial, reduce | ||||||
|  |  | ||||||
| from tqdm import tqdm | from tqdm import tqdm | ||||||
|  |  | ||||||
| @@ -59,7 +61,14 @@ def chunk(iterable, n): | |||||||
| def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): | def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): | ||||||
|     source = chunk(source, chunksize) |     source = chunk(source, chunksize) | ||||||
|     p = ThreadPool(numcpus*2) |     p = ThreadPool(numcpus*2) | ||||||
|     results = p.imap_unordered(func, source) |  | ||||||
|  |     def wrapped_func(*args, **kwargs): | ||||||
|  |         try: | ||||||
|  |             return func(*args, **kwargs) | ||||||
|  |         except Exception as ex: | ||||||
|  |             print('Exception on parallel thread: {}'.format(ex), file=sys.stderr) | ||||||
|  |  | ||||||
|  |     results = p.imap_unordered(wrapped_func, source) | ||||||
|     for i in chain.from_iterable(results): |     for i in chain.from_iterable(results): | ||||||
|         yield i |         yield i | ||||||
|  |  | ||||||
| @@ -106,7 +115,7 @@ def read_config(conffile): | |||||||
|         raise Exception('No config file or BITTER_CONFIG env variable.') |         raise Exception('No config file or BITTER_CONFIG env variable.') | ||||||
|     else: |     else: | ||||||
|         f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n')) |         f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n')) | ||||||
|     return yaml.load(f) or {'credentials': []} |     return yaml.load(f, Loader=yaml.SafeLoader) or {'credentials': []} | ||||||
|  |  | ||||||
|  |  | ||||||
| def write_config(conf, conffile=None): | def write_config(conf, conffile=None): | ||||||
| @@ -270,8 +279,7 @@ def download_entry(wq, entry_id, dburi=None, recursive=False): | |||||||
|     download_user(wq, session, user, entry, recursive) |     download_user(wq, session, user, entry, recursive) | ||||||
|     session.close() |     session.close() | ||||||
|  |  | ||||||
|  | def crawl_user(wq, session, user, entry=None, recursive=False, max_followers=50000): | ||||||
| def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000): |  | ||||||
|  |  | ||||||
|     total_followers = user.followers_count |     total_followers = user.followers_count | ||||||
|  |  | ||||||
| @@ -419,8 +427,11 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor | |||||||
|     pending = pending_entries(dburi) |     pending = pending_entries(dburi) | ||||||
|     session.close() |     session.close() | ||||||
|  |  | ||||||
|     for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users): |     with tqdm(parallel(de, pending), desc='Downloading users', total=total_users) as tq: | ||||||
|         logger.info("Got %s" % i) |         for i in tq:  | ||||||
|  |             tq.write('Got {}'.format(i)) | ||||||
|  |             logger.info("Got %s" % i) | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def pending_entries(dburi): | def pending_entries(dburi): | ||||||
| @@ -449,7 +460,7 @@ def get_tweet(c, tid): | |||||||
|     return c.statuses.show(id=tid) |     return c.statuses.show(id=tid) | ||||||
|  |  | ||||||
| def search_tweet(c, query): | def search_tweet(c, query): | ||||||
|     return c.search.tweets(q=query) |      yield from c.search.tweets(q=query)['statuses'] | ||||||
|  |  | ||||||
| def user_timeline(c, query): | def user_timeline(c, query): | ||||||
|     try: |     try: | ||||||
| @@ -464,17 +475,25 @@ def get_user(c, user): | |||||||
|     except ValueError: |     except ValueError: | ||||||
|         return c.users.lookup(screen_name=user)[0] |         return c.users.lookup(screen_name=user)[0] | ||||||
|  |  | ||||||
| def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False): | def download_tweet(wq, tweetid, cache=True, folder="downloaded_tweets", update=False): | ||||||
|     cached = cached_id(tweetid, folder) |     tweet = cached_id(tweetid, folder) | ||||||
|     tweet = None |     if update or not tweet: | ||||||
|     if update or not cached: |  | ||||||
|         tweet = get_tweet(wq, tweetid) |         tweet = get_tweet(wq, tweetid) | ||||||
|         js = json.dumps(tweet) |     if cache and update: | ||||||
|     if write: |  | ||||||
|         if tweet: |         if tweet: | ||||||
|  |             js = json.dumps(tweet) | ||||||
|             write_json(js, folder) |             write_json(js, folder) | ||||||
|     else: |     yield tweet | ||||||
|         print(js) |  | ||||||
|  |  | ||||||
|  | def download_user(wq, userid, cache=True, folder="downloaded_users", update=False): | ||||||
|  |     user = cached_id(userid, folder) | ||||||
|  |     if update or not user: | ||||||
|  |         user = get_user(wq, userid) | ||||||
|  |     if cache and update: | ||||||
|  |         if user: | ||||||
|  |             write_json(user, folder, aliases=[user['screen_name'], ]) | ||||||
|  |     yield user | ||||||
|  |  | ||||||
|  |  | ||||||
| def cached_id(oid, folder): | def cached_id(oid, folder): | ||||||
| @@ -489,7 +508,7 @@ def cached_id(oid, folder): | |||||||
|             logger.error('Error getting cached version of {}: {}'.format(oid, ex)) |             logger.error('Error getting cached version of {}: {}'.format(oid, ex)) | ||||||
|     return tweet |     return tweet | ||||||
|  |  | ||||||
| def write_json(js, folder, oid=None): | def write_json(js, folder, oid=None, aliases=[]): | ||||||
|     if not oid: |     if not oid: | ||||||
|       oid = js['id'] |       oid = js['id'] | ||||||
|     file = id_file(oid, folder) |     file = id_file(oid, folder) | ||||||
| @@ -498,6 +517,8 @@ def write_json(js, folder, oid=None): | |||||||
|     with open(file, 'w') as f: |     with open(file, 'w') as f: | ||||||
|         json.dump(js, f) |         json.dump(js, f) | ||||||
|         logger.info('Written {} to file {}'.format(oid, file)) |         logger.info('Written {} to file {}'.format(oid, file)) | ||||||
|  |     for alias in aliases: | ||||||
|  |         os.symlink('%s.json' % oid, id_file(alias, folder)) | ||||||
|  |  | ||||||
| def id_file(oid, folder): | def id_file(oid, folder): | ||||||
|     return os.path.join(folder, '%s.json' % oid) |     return os.path.join(folder, '%s.json' % oid) | ||||||
| @@ -570,7 +591,8 @@ def dump_result(oid, obj, folder, ignore_fails=True): | |||||||
|         with open(fail_file(oid, folder), 'w') as f: |         with open(fail_file(oid, folder), 'w') as f: | ||||||
|             print('Object not found', file=f) |             print('Object not found', file=f) | ||||||
|  |  | ||||||
| def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False, |  | ||||||
|  | def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False, cache=True, | ||||||
|                   batch_method=tweet_download_batch): |                   batch_method=tweet_download_batch): | ||||||
|  |  | ||||||
|     done = Queue() |     done = Queue() | ||||||
| @@ -581,7 +603,6 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail | |||||||
|     def filter_list(lst, done, down): |     def filter_list(lst, done, down): | ||||||
|         print('filtering') |         print('filtering') | ||||||
|         for oid in lst: |         for oid in lst: | ||||||
|             # print('Checking {}'.format(line)) |  | ||||||
|             cached = cached_id(oid, folder) |             cached = cached_id(oid, folder) | ||||||
|             if (cached and not update): |             if (cached and not update): | ||||||
|                 done.put((oid, cached)) |                 done.put((oid, cached)) | ||||||
| @@ -595,7 +616,9 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail | |||||||
|         def gen(): |         def gen(): | ||||||
|             while True: |             while True: | ||||||
|                 r = down.get() |                 r = down.get() | ||||||
|                 if not r: |                 if r is None: | ||||||
|  |                     down.close() | ||||||
|  |                     down.join_thread() | ||||||
|                     return |                     return | ||||||
|                 yield r |                 yield r | ||||||
|  |  | ||||||
| @@ -622,21 +645,37 @@ def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fail | |||||||
|         rec = done.get() |         rec = done.get() | ||||||
|  |  | ||||||
|         if rec is None: |         if rec is None: | ||||||
|  |             done.close() | ||||||
|  |             done.join_thread() | ||||||
|             break |             break | ||||||
|  |  | ||||||
|         oid, obj = rec |         oid, obj = rec | ||||||
|         dump_result(oid, obj, folder, ignore_fails) |         if cache or (not obj): | ||||||
|  |             dump_result(oid, obj, folder, ignore_fails) | ||||||
|         yield rec |         yield rec | ||||||
|  |  | ||||||
|     wait.join() |     wait.join() | ||||||
|  |  | ||||||
|  |  | ||||||
| def download_file(wq, csvfile, folder, column=0, delimiter=',', | def download_tweets_file(*args, **kwargs): | ||||||
|                   header=False, quotechar='"', batch_method=tweet_download_batch, |     kwargs['batch_method'] = tweet_download_batch | ||||||
|  |     yield from download_file(*args, **kwargs) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def download_users_file(*args, **kwargs): | ||||||
|  |     kwargs['batch_method'] = user_download_batch | ||||||
|  |     yield from download_file(*args, **kwargs) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def download_file(wq, csvfile, folder, column=0, delimiter=',', skip=0, cache=True, | ||||||
|  |                   quotechar='"', commentchar=None, batch_method=tweet_download_batch, | ||||||
|                   **kwargs): |                   **kwargs): | ||||||
|     with open(csvfile) as f: |     with open(csvfile) as f: | ||||||
|  |         if commentchar: | ||||||
|  |             f = (line for line in f if not line.startswith('#')) | ||||||
|  |  | ||||||
|         csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar)) |         csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar)) | ||||||
|         if header: |         for n in range(skip): | ||||||
|             next(csvreader) |             next(csvreader) | ||||||
|  |  | ||||||
|         def reader(r): |         def reader(r): | ||||||
| @@ -645,7 +684,7 @@ def download_file(wq, csvfile, folder, column=0, delimiter=',', | |||||||
|                     yield row[column].strip() |                     yield row[column].strip() | ||||||
|  |  | ||||||
|  |  | ||||||
|         for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method, |         for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method, cache=cache, | ||||||
|                                  **kwargs): |                                  **kwargs): | ||||||
|             yield res |             yield res | ||||||
|  |  | ||||||
| @@ -723,3 +762,42 @@ def _users_control(func, apiargs, remaining=0, **kwargs): | |||||||
|         if int(cursor) != -1: |         if int(cursor) != -1: | ||||||
|             stop = False |             stop = False | ||||||
|     return resp['users'], stop |     return resp['users'], stop | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def serialized(it, outfile, outformat='csv', fields=[], header=None, ignore_missing=False, delimiter='\t'): | ||||||
|  |     outformat = outformat.lower() | ||||||
|  |     def do(out): | ||||||
|  |  | ||||||
|  |         if outformat == 'csv': | ||||||
|  |             writer = csv.writer(out, quoting=csv.QUOTE_ALL, delimiter=delimiter) | ||||||
|  |             if header != '': | ||||||
|  |                 h = header | ||||||
|  |                 if h is None: | ||||||
|  |                     h = delimiter.join(fields) | ||||||
|  |                 print(h, file=out) | ||||||
|  |             attrs = list(token.strip().split('.') for token in fields) | ||||||
|  |             for obj in it: | ||||||
|  |                 values = [] | ||||||
|  |                 for attr in attrs: | ||||||
|  |                     try: | ||||||
|  |                         values.append(reduce(operator.getitem, attr, obj)) | ||||||
|  |                     except KeyError: | ||||||
|  |                         if not ignore_missing: | ||||||
|  |                             print('Key not present: {}'.format(attr), file=sys.stderr) | ||||||
|  |                         values.append(None) | ||||||
|  |                 writer.writerow(values) | ||||||
|  |         elif outformat == 'jsonlines': | ||||||
|  |             for obj in it: | ||||||
|  |                 print(json.dumps(obj, sort_keys=True), file=out) | ||||||
|  |         elif outformat == 'indented': | ||||||
|  |             for obj in it: | ||||||
|  |                 print(json.dumps(obj, indent=4, sort_keys=True), file=out) | ||||||
|  |         else: | ||||||
|  |             for obj in it: | ||||||
|  |                 print(obj, file=out) | ||||||
|  |  | ||||||
|  |     if outfile is sys.stdout: | ||||||
|  |         return do(sys.stdout) | ||||||
|  |  | ||||||
|  |     with open(outfile, 'w') as out: | ||||||
|  |         return do(out) | ||||||
|   | |||||||
| @@ -48,13 +48,13 @@ class TestUtils(TestCase): | |||||||
|             time.sleep(0.5) |             time.sleep(0.5) | ||||||
|             return i |             return i | ||||||
|         tic = time.time() |         tic = time.time() | ||||||
|         resp = utils.parallel(echo, [1,2,3]) |         resp = utils.parallel(echo, [1, 2, 3]) | ||||||
|         assert isinstance(resp, types.GeneratorType) |         assert isinstance(resp, types.GeneratorType) | ||||||
|         assert list(resp) == [1,2,3] |         assert sorted(list(resp)) == [1, 2, 3] | ||||||
|         toc = time.time() |         toc = time.time() | ||||||
|         assert (tic-toc) < 600 |         assert (tic-toc) < 600 | ||||||
|         resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2) |         resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2) | ||||||
|         assert list(resp2) == [1,2, 3,4] |         assert sorted(list(resp2)) == [1, 2, 3, 4] | ||||||
|  |  | ||||||
|  |  | ||||||
| class TestUtilsEnv(TestUtils): | class TestUtilsEnv(TestUtils): | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user