mirror of
				https://github.com/balkian/bitter.git
				synced 2025-10-26 05:08:22 +00:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | c940709df8 | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -2,7 +2,6 @@ __pycache__ | |||||||
| *.egg-info | *.egg-info | ||||||
| dist | dist | ||||||
| env | env | ||||||
| .env |  | ||||||
| __* | __* | ||||||
| .* | .* | ||||||
| *.pyc | *.pyc | ||||||
|   | |||||||
| @@ -2,6 +2,6 @@ | |||||||
| From python:3.4-onbuild | From python:3.4-onbuild | ||||||
| Maintainer J. Fernando Sánchez @balkian | Maintainer J. Fernando Sánchez @balkian | ||||||
|  |  | ||||||
| RUN pip install ".[server]" | RUN pip install -e "/usr/src/app/[server]" | ||||||
|  |  | ||||||
| ENTRYPOINT ["bitter"] | ENTRYPOINT ["bitter"] | ||||||
|   | |||||||
| @@ -2,6 +2,6 @@ | |||||||
| From python:{{PYVERSION}}-onbuild | From python:{{PYVERSION}}-onbuild | ||||||
| Maintainer J. Fernando Sánchez @balkian | Maintainer J. Fernando Sánchez @balkian | ||||||
|  |  | ||||||
| RUN pip install ".[server]" | RUN pip install -e "/usr/src/app/[server]" | ||||||
|  |  | ||||||
| ENTRYPOINT ["bitter"] | ENTRYPOINT ["bitter"] | ||||||
|   | |||||||
							
								
								
									
										6
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								Makefile
									
									
									
									
									
								
							| @@ -19,7 +19,7 @@ Dockerfile-%: Dockerfile.template | |||||||
| dev-%: | dev-%: | ||||||
| 	@docker start $(NAME)-dev$* || (\ | 	@docker start $(NAME)-dev$* || (\ | ||||||
| 		$(MAKE) build-$*; \ | 		$(MAKE) build-$*; \ | ||||||
| 		docker run -d -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \ | 		docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \ | ||||||
| 	)\ | 	)\ | ||||||
|  |  | ||||||
| 	docker exec -ti $(NAME)-dev$* bash | 	docker exec -ti $(NAME)-dev$* bash | ||||||
| @@ -38,7 +38,7 @@ test: $(addprefix test-,$(PYMAIN)) | |||||||
| testall: $(addprefix test-,$(PYVERSIONS)) | testall: $(addprefix test-,$(PYVERSIONS)) | ||||||
|  |  | ||||||
| test-%: build-% | test-%: build-% | ||||||
| 	docker run --rm -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ; | 	docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ; | ||||||
|  |  | ||||||
| pip_test-%: | pip_test-%: | ||||||
| 	docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ; | 	docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ; | ||||||
| @@ -71,6 +71,6 @@ pip_upload: | |||||||
| pip_test: $(addprefix pip_test-,$(PYVERSIONS)) | pip_test: $(addprefix pip_test-,$(PYVERSIONS)) | ||||||
|  |  | ||||||
| run: build | run: build | ||||||
| 	docker run --rm --env-file $$PWD/.env -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' | 	docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' | ||||||
|  |  | ||||||
| .PHONY: test test-% build-% build test test_pip run | .PHONY: test test-% build-% build test test_pip run | ||||||
|   | |||||||
							
								
								
									
										148
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										148
									
								
								README.md
									
									
									
									
									
								
							| @@ -1,5 +1,4 @@ | |||||||
| # Description | #Description | ||||||
|  |  | ||||||
| There are two parts to bitter. | There are two parts to bitter. | ||||||
| First of all, it is a wrapper over Python twitter that adds support for several Twitter API credentials (e.g. authorizing the same app with different user accounts). | First of all, it is a wrapper over Python twitter that adds support for several Twitter API credentials (e.g. authorizing the same app with different user accounts). | ||||||
| Secondly, it is a command line tool to automate several actions (e.g. downloading user networks) using the wrapper. | Secondly, it is a command line tool to automate several actions (e.g. downloading user networks) using the wrapper. | ||||||
| @@ -23,153 +22,16 @@ wq = easy() | |||||||
| print(wq.users.show(user_name='balkian')) | print(wq.users.show(user_name='balkian')) | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
|  | # Credentials format | ||||||
| You can also make custom calls to the API through the command line. |  | ||||||
| e.g. to get the latest 500 tweets by the python software foundation: |  | ||||||
|  |  | ||||||
| ``` | ``` | ||||||
| bitter api statuses/user_timeline --id thepsf --count 500 | {"user": "balkian", "consumer_secret": "xxx", "consumer_key": "xxx", "token_key": "xxx", "token_secret": "xxx"} | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
|  | By default, bitter uses '~/.bitter-credentials.json', but you may choose a different file: | ||||||
| ## Adding credentials |  | ||||||
|  |  | ||||||
| ``` | ``` | ||||||
| bitter --config <YOUR CONFIGURATION FILE> credentials add | python -m bitter -c <credentials_file> ... | ||||||
| ``` |  | ||||||
|  |  | ||||||
| You can specify the parameters in the command or let the command line guide you through the process. |  | ||||||
|  |  | ||||||
| # Examples |  | ||||||
|  |  | ||||||
| ## Downloading a list of tweets |  | ||||||
|  |  | ||||||
| Bitter can download tweets from a list of tweets in a CSV file. |  | ||||||
| The result is stored as individual json files in your folder of choice. |  | ||||||
| You can even specify the column number for tweet ids. |  | ||||||
| Bitter will not try to download  |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| Usage: bitter tweet get_all [OPTIONS] TWEETSFILE |  | ||||||
|  |  | ||||||
|   Download tweets from a list of tweets in a CSV file. The result is stored |  | ||||||
|   as individual json files in your folder of choice. |  | ||||||
|  |  | ||||||
| Options: |  | ||||||
|   -f, --folder TEXT |  | ||||||
|   -d, --delimiter TEXT |  | ||||||
|   -h, --header          Discard the first line (use it as a header) |  | ||||||
|   -q, --quotechar TEXT |  | ||||||
|   -c, --column INTEGER |  | ||||||
|   --help                Show this message and exit. |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| For instance, this will download `tweet_ids.csv` in the `tweet_info` folder: |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| bitter tweet get_all -f tweet_info tweet_ids.csv |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| ## Downloading a list of users |  | ||||||
|  |  | ||||||
| Bitter downloads users and tweets in a similar way: |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| Usage: bitter users get_all [OPTIONS] USERSFILE |  | ||||||
|  |  | ||||||
|   Download users from a list of user ids/screen names in a CSV file. The |  | ||||||
|   result is stored as individual json files in your folder of choice. |  | ||||||
|  |  | ||||||
| Options: |  | ||||||
|   -f, --folder TEXT |  | ||||||
|   -d, --delimiter TEXT |  | ||||||
|   -h, --header          Discard the first line (use it as a header) |  | ||||||
|   -q, --quotechar TEXT |  | ||||||
|   -c, --column INTEGER |  | ||||||
|   --help                Show this message and exit. |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| The only difference is that users can be downloaded via `screen_name` or `user_id`. |  | ||||||
| This method does not try to resolve screen names to user ids, so users may be downloaded more than once if they appear in both ways. |  | ||||||
|  |  | ||||||
| ## Downloading a stream |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| Usage: bitter stream get [OPTIONS] |  | ||||||
|  |  | ||||||
| Options: |  | ||||||
|   -l, --locations TEXT |  | ||||||
|   -t, --track TEXT |  | ||||||
|   -f, --file TEXT       File to store the stream of tweets. Default: standard output |  | ||||||
|   -p, --politelyretry   Politely retry after a hangup/connection error |  | ||||||
|   --help                Show this message and exit. |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| bitter --config .bitter.yaml stream get  |  | ||||||
| ``` |  | ||||||
| python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines |  | ||||||
|  |  | ||||||
|  |  | ||||||
| ## REST queries |  | ||||||
|  |  | ||||||
| In newer versions of bitter, individual methods to download tweets/users using the REST API are being replaced with a generic method to call the API. |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| bitter api <URL endpoint> --parameter VALUE ... | [--tweets | --users] [--max_count MAX_COUNT] [--count COUNT_PER_CALL] |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| For instance: |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| # Get 100 tweets that mentioned Obama after tweet 942689870501302300 |  | ||||||
| bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| That is equivalent to this call to the api: `api/1.1/searc/tweets?since_id=942689870501302300&count=100&q=Obama`. |  | ||||||
|  |  | ||||||
|  |  | ||||||
| The flags `--tweets` and `--users` are optional. |  | ||||||
| If you use them, bitter will try to intelligently fetch all the tweets/users by using pagination with the API. |  | ||||||
|  |  | ||||||
| For example: |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| # Download 1000 tweets, 100 tweets per call. |  | ||||||
| bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama --max_count=1000 --tweets |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| # Download all the followers of @balkian |  | ||||||
| bitter api 'followers/list' --_id balkian --users --max_count -1 |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| Note that some reserved words (such as `id`) have to be preceeded by an underscore. |  | ||||||
| This limitation is imposed by the python-twitter library. |  | ||||||
|  |  | ||||||
| # Configuration format |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| credentials: |  | ||||||
| - user: "balkian" |  | ||||||
|   consumer_secret: "xxx" |  | ||||||
|   consumer_key: "xxx" |  | ||||||
|   token_key: "xxx" |  | ||||||
|   token_secret: "xxx" |  | ||||||
| - user: .... |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| By default, bitter uses '~/.bitter.yaml', but you may choose a different file: |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| python -m bitter --config <config_file> ... |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| Or use an environment variable: |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| export BITTER_CONFIG=$(cat myconfig.yaml) |  | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
| # Server | # Server | ||||||
|   | |||||||
| @@ -1,10 +0,0 @@ | |||||||
| Scripts to process jsonlines |  | ||||||
|  |  | ||||||
| To get the jsonlines file, you can use the streaming API or the search api, like so: |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| To keep track of the query that generated the file, you can save the command in a text file. |  | ||||||
| For instance, the example above is also in `example_query.sh`. |  | ||||||
| @@ -1 +0,0 @@ | |||||||
| python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines |  | ||||||
| @@ -1,13 +0,0 @@ | |||||||
| if [ "$#" -lt 1 ] |  | ||||||
| then |  | ||||||
| 	echo "Usage: $0 <files to convert>" |  | ||||||
| 	exit 1 |  | ||||||
| fi |  | ||||||
|  |  | ||||||
| export FIELDS="created_at,id,text"  |  | ||||||
| for i in "$@" |  | ||||||
| do |  | ||||||
|   OUTPUT=$i.hashtags.csv |  | ||||||
|   echo "$FIELDS" > $OUTPUT |  | ||||||
|   pv -l $i -N "hashtags $i" | jq -r '. | .created_at as $created_at | .id_str as $id | .entities.hashtags | select(. != null) | .[] | [$created_at, $id, .text] | @csv' >> $OUTPUT |  | ||||||
| done |  | ||||||
| @@ -1,15 +0,0 @@ | |||||||
| if [ "$#" -lt 1 ] |  | ||||||
| then |  | ||||||
| 	echo "Usage: $0 <files to convert>" |  | ||||||
| 	exit 1 |  | ||||||
| fi |  | ||||||
|  |  | ||||||
| for i in "$@" |  | ||||||
| do |  | ||||||
|   REPLYOUTPUT=$i.replies.csv |  | ||||||
|   RTOUTPUT=$i.rts.csv |  | ||||||
|   echo 'created_at,id,user_id,reply_user_id' > $REPLYOUTPUT |  | ||||||
|   echo 'created_at,id,user_id,rt_user_id' > $RTOUTPUT |  | ||||||
|   pv -l -N "$i" $i | jq -r '. | select(.in_reply_to_user_id_str != null) | [.created_at, .id_str, .user.id_str, .in_reply_to_user_id_str] | @csv' >> $REPLYOUTPUT |  | ||||||
|   pv -l -N "$i" $i | jq -r '. | select(.retweeted_status != null) | [.created_at, .retweeted_status.id_str, .user.id_str, .retweeted_status.user.id_str] | @csv' >> $RTOUTPUT |  | ||||||
| done |  | ||||||
| @@ -1,16 +0,0 @@ | |||||||
| if [ "$#" -lt 1 ] |  | ||||||
| then |  | ||||||
| 	echo "Usage: $0 <files to convert>" |  | ||||||
| 	exit 1 |  | ||||||
| fi |  | ||||||
|  |  | ||||||
| export QUERY='.limit | select(. != null) | [.timestamp_ms, .track] | @csv' |  | ||||||
|  |  | ||||||
| export FIELDS="timestamp,track" |  | ||||||
|  |  | ||||||
| for i in "$@" |  | ||||||
| do |  | ||||||
|   OUTPUT=$i.limits.csv |  | ||||||
|   echo $FIELDS > $OUTPUT |  | ||||||
|   pv -N "$i limits" -l $i | jq -r "$QUERY" >> $OUTPUT |  | ||||||
| done |  | ||||||
| @@ -1,16 +0,0 @@ | |||||||
| if [ "$#" -lt 1 ] |  | ||||||
| then |  | ||||||
| 	echo "Usage: $0 <files to convert>" |  | ||||||
| 	exit 1 |  | ||||||
| fi |  | ||||||
|  |  | ||||||
| export QUERY='select(.id != null) | .id_str as $id | .entities.urls[] | select(.expanded_url | select(. != null) |  contains("open.spotify") or contains("youtube.com") or contains("youtu.be")) | [$id, .expanded_url] | @csv' |  | ||||||
|  |  | ||||||
| export FIELDS="id,url" |  | ||||||
|  |  | ||||||
| for i in "$@" |  | ||||||
| do |  | ||||||
|   OUTPUT=$i.media.csv |  | ||||||
|   echo $FIELDS > $OUTPUT |  | ||||||
|   pv -N "$i media" -l $i | jq -r "$QUERY" >> $OUTPUT |  | ||||||
| done |  | ||||||
| @@ -1,28 +0,0 @@ | |||||||
| if [ "$#" -lt 1 ] |  | ||||||
| then |  | ||||||
| 	echo "Usage: $0 <files to convert>" |  | ||||||
| 	exit 1 |  | ||||||
| fi |  | ||||||
|  |  | ||||||
| export USER_FIELDS="\$created_at,\ |  | ||||||
| .id_str,\ |  | ||||||
| .screen_name,\ |  | ||||||
| .followers_count,\ |  | ||||||
| .lang,\ |  | ||||||
| .description,\ |  | ||||||
| .statuses_count,\ |  | ||||||
| .favourites_count,\ |  | ||||||
| .friends_count,\ |  | ||||||
| .created_at,\ |  | ||||||
| .name,\ |  | ||||||
| .location,\ |  | ||||||
| .listed_count,\ |  | ||||||
| .time_zone\ |  | ||||||
| " |  | ||||||
|  |  | ||||||
| for i in "$@" |  | ||||||
| do |  | ||||||
|   OUTPUT=$i.users.csv |  | ||||||
|   echo \#$USER_FIELDS > $OUTPUT |  | ||||||
|   jq -r ".created_at as \$created_at | .user,.retweeted_status.user | select(. != null) | [$USER_FIELDS] | @csv " $i | pv -N "$i" -l >> $OUTPUT |  | ||||||
| done |  | ||||||
| @@ -1,32 +0,0 @@ | |||||||
| if [ "$#" -lt 1 ] |  | ||||||
| then |  | ||||||
| 	echo "Usage: $0 <files to convert>" |  | ||||||
| 	exit 1 |  | ||||||
| fi |  | ||||||
|  |  | ||||||
| FIELDS=".id_str,\ |  | ||||||
|         .user.screen_name,\ |  | ||||||
|         .user.id,\ |  | ||||||
|         .favorite_count,\ |  | ||||||
|         .retweet_count,\ |  | ||||||
|         .quote_count,\ |  | ||||||
|         .reply_count,\ |  | ||||||
|         .created_at,\ |  | ||||||
|         .lang,\ |  | ||||||
|         .in_reply_to_user_id_str,\ |  | ||||||
|         .in_reply_to_status_id_str,\ |  | ||||||
|         .retweeted_status.id_str,\ |  | ||||||
|         .retweeted_status.user.id,\ |  | ||||||
|         .retweeted_status.favorite_count,\ |  | ||||||
|         .retweeted_status.retweet_count,\ |  | ||||||
|         .retweeted_status.quote_count,\ |  | ||||||
|         .retweeted_status.reply_count,\ |  | ||||||
|         .retweeted_status.created_at\ |  | ||||||
| " |  | ||||||
|  |  | ||||||
| for i in "$@" |  | ||||||
| do |  | ||||||
|   OUTPUT=$i.tweets.csv |  | ||||||
|   echo "$FIELDS" | sed -e 's/,[ \t\n]*\./,/g' | sed -e 's/^[#]\?\.//' > $OUTPUT |  | ||||||
|   jq -r "[$FIELDS]|@csv" $i | pv -N "$i" -l >> $OUTPUT |  | ||||||
| done |  | ||||||
| @@ -1,17 +0,0 @@ | |||||||
| if [ "$#" -lt 1 ] |  | ||||||
| then |  | ||||||
| 	echo "Usage: $0 <files to convert>" |  | ||||||
| 	exit 1 |  | ||||||
| fi |  | ||||||
|  |  | ||||||
| QUERY='.| select(.retweeted_status != null) | .retweeted_status | .id_str as $rt_id | .extended_tweet | select(. != null) | [$rt_id,.full_text]|@csv' |  | ||||||
| HEADER='rt_id,full_text' |  | ||||||
|  |  | ||||||
| for i in "$@" |  | ||||||
| do |  | ||||||
|   OUTPUT=$i.full_text.csv |  | ||||||
|   echo $HEADER > $OUTPUT |  | ||||||
|   jq "$QUERY" $i | pv -N "$i" -l >> $OUTPUT |  | ||||||
|   sort -u $OUTPUT -o $OUTPUT |  | ||||||
|   sed -ri 's/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g' $OUTPUT |  | ||||||
| done |  | ||||||
| @@ -1,16 +0,0 @@ | |||||||
| if [ "$#" -lt 1 ] |  | ||||||
| then |  | ||||||
| 	echo "Usage: $0 <files to convert>" |  | ||||||
| 	exit 1 |  | ||||||
| fi |  | ||||||
|  |  | ||||||
| QUERY='(.full_text // .retweeted_status.full_text) as $text | [ .id_str,$text ] | @csv' |  | ||||||
| HEADER='id,text' |  | ||||||
|  |  | ||||||
| for i in "$@" |  | ||||||
| do |  | ||||||
|   OUTPUT=$i.text.csv |  | ||||||
|   echo $HEADER > $OUTPUT |  | ||||||
|   pv -l -N "$i" $i | jq -r "$QUERY" >> $OUTPUT |  | ||||||
|  # sed -ri s/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g $OUTPUT |  | ||||||
| done |  | ||||||
| @@ -1,10 +0,0 @@ | |||||||
|  |  | ||||||
| if [ "$#" -lt 2 ] |  | ||||||
| then |  | ||||||
|     echo "Find edge lines in a file that contain one of the users in a user list." |  | ||||||
|     echo "" |  | ||||||
| 	  echo "Usage: $0 <file with edges> <file with the list of users>" |  | ||||||
| 	  exit 1 |  | ||||||
| fi |  | ||||||
|  |  | ||||||
| pv -c -N 'read' "$1" |  grep -F -w -f "$2" |  pv -lc -N 'found' |  | ||||||
| @@ -1,23 +0,0 @@ | |||||||
| import pandas as pd |  | ||||||
|  |  | ||||||
| def read_rts(rtsfile, tweetsfile): |  | ||||||
|     tweets = pd.read_csv(tweetsfile, index_col=0) |  | ||||||
|     rts = pd.read_csv(rtsfile, index_col=1) |  | ||||||
|     merged = rts.groupby(by=['id', 'rt_user_id']).size().rename('count').reset_index(level=1).merge(tweets, left_index=True, right_index=True) |  | ||||||
|     return merged.sort_values(by='count', ascending=False) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def read_tweets(tweetsfile): |  | ||||||
|     '''When the dataset is small enough, we can load tweets as-in''' |  | ||||||
|     with open(tweetsfile) as f: |  | ||||||
|         header = f.readline().strip().split(',') |  | ||||||
|         dtypes = {} |  | ||||||
|     for key in header: |  | ||||||
|         if key.endswith('_str') or key.endswith('.id'): |  | ||||||
|             dtypes[key] = object  |  | ||||||
|             tweets = pd.read_csv(tweetsfile, dtype=dtypes, index_col=0) |  | ||||||
|     return tweets |  | ||||||
|  |  | ||||||
|  |  | ||||||
| if __name__ == '__main__': |  | ||||||
|     import argparse |  | ||||||
| @@ -1 +0,0 @@ | |||||||
| cat "$@" | awk -F"," '{print tolower($3)}' | sort | uniq -c | sort -h  |  | ||||||
| @@ -1,14 +0,0 @@ | |||||||
| MAX_TAGS=100 |  | ||||||
|  |  | ||||||
| function get_text { |  | ||||||
|     while read line |  | ||||||
|     do |  | ||||||
|         echo $line |  | ||||||
|         rtid=$(echo $line | awk -F"," '{print $2}') |  | ||||||
|         text=$(grep -m 1 $rtid *.text.csv) |  | ||||||
|         echo "$line - $text" |  | ||||||
|     done < "/dev/stdin" |  | ||||||
| } |  | ||||||
|  |  | ||||||
| cat "$@" | get_text |  | ||||||
|  |  | ||||||
| @@ -1,15 +0,0 @@ | |||||||
| MAX_TAGS=100 |  | ||||||
|  |  | ||||||
| function get_text { |  | ||||||
|     while read line |  | ||||||
|     do |  | ||||||
|         echo $line |  | ||||||
|         rtid=$(echo $line | awk '{print $2}') |  | ||||||
|         count=$(echo $line | awk '{print $1}') |  | ||||||
|         text=$(grep -m 1 $rtid *.text.csv) |  | ||||||
|         echo "$line - $text" |  | ||||||
|     done < "/dev/stdin" |  | ||||||
| } |  | ||||||
|  |  | ||||||
| cat "$@" | awk -F"," '{print tolower($2)}' | sort | uniq -c | sort -h | tail -n $MAX_TAGS | get_text |  | ||||||
|  |  | ||||||
| @@ -1 +1 @@ | |||||||
| 0.9.5 | 0.7.1 | ||||||
|   | |||||||
							
								
								
									
										214
									
								
								bitter/cli.py
									
									
									
									
									
								
							
							
						
						
									
										214
									
								
								bitter/cli.py
									
									
									
									
									
								
							| @@ -29,91 +29,16 @@ logger = logging.getLogger(__name__) | |||||||
| @click.group() | @click.group() | ||||||
| @click.option("--verbose", is_flag=True) | @click.option("--verbose", is_flag=True) | ||||||
| @click.option("--logging_level", required=False, default='WARN') | @click.option("--logging_level", required=False, default='WARN') | ||||||
| @click.option('--config', show_default=True, default=bconf.CONFIG_FILE) | @click.option("--config", required=False) | ||||||
| @click.option('--credentials', show_default=True, help="DEPRECATED: If specified, these credentials will be copied to the configuratation file.", default=bconf.CREDENTIALS) | @click.option('-c', '--credentials', show_default=True, default='~/.bitter-credentials.json') | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def main(ctx, verbose, logging_level, config, credentials): | def main(ctx, verbose, logging_level, config, credentials): | ||||||
|     logging.basicConfig(level=getattr(logging, logging_level)) |     logging.basicConfig(level=getattr(logging, logging_level)) | ||||||
|     ctx.obj = {} |     ctx.obj = {} | ||||||
|     ctx.obj['VERBOSE'] = verbose |     ctx.obj['VERBOSE'] = verbose | ||||||
|     bconf.CONFIG_FILE = config |     ctx.obj['CONFIG'] = config | ||||||
|     bconf.CREDENTIALS = credentials |     bconf.CREDENTIALS = credentials | ||||||
|     if os.path.exists(utils.get_config_path(credentials)): |     utils.create_credentials(credentials) | ||||||
|       utils.copy_credentials_to_config(credentials, config) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| @main.group(invoke_without_command=True) |  | ||||||
| @click.pass_context |  | ||||||
| def credentials(ctx): |  | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |  | ||||||
|     for worker in wq.queue: |  | ||||||
|         print('#'*20) |  | ||||||
|         try: |  | ||||||
|             resp = worker.client.application.rate_limit_status() |  | ||||||
|             print(worker.name) |  | ||||||
|         except Exception as ex: |  | ||||||
|             print('{}: AUTHENTICATION ERROR: {}'.format(worker.name, ex) ) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| @credentials.command('limits') |  | ||||||
| @click.option('--all', type=bool, default=False, required=False, |  | ||||||
|               help=('Print all limits. By default, it only limits that ' |  | ||||||
|                     'have been consumed will be shown.')) |  | ||||||
| @click.argument('url', required=False) |  | ||||||
| @click.pass_context |  | ||||||
| def get_limits(ctx, all, url): |  | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |  | ||||||
|     total = {} |  | ||||||
|     for worker in wq.queue: |  | ||||||
|         resp = worker.client.application.rate_limit_status() |  | ||||||
|         print('#'*20) |  | ||||||
|         print(worker.name) |  | ||||||
|         if url: |  | ||||||
|             limit = 'NOT FOUND' |  | ||||||
|             print('URL is: {}'.format(url)) |  | ||||||
|             cat = url.split('/')[1] |  | ||||||
|             if cat in resp['resources']: |  | ||||||
|                 limit = resp['resources'][cat].get(url, None) or resp['resources'][cat] |  | ||||||
|             else: |  | ||||||
|                 print('Cat {} not found'.format(cat)) |  | ||||||
|                 continue |  | ||||||
|             for k in limit: |  | ||||||
|                 total[k] = total.get(k, 0) + limit[k] |  | ||||||
|             print('{}: {}'.format(url, limit)) |  | ||||||
|             continue |  | ||||||
|         nres = {} |  | ||||||
|         if not all: |  | ||||||
|             for res, urls in resp['resources'].items(): |  | ||||||
|                 nurls = {} |  | ||||||
|                 for u, limits in urls.items(): |  | ||||||
|                     if limits['limit'] != limits['remaining']: |  | ||||||
|                         nurls[u] = limits |  | ||||||
|                 if nurls: |  | ||||||
|                     nres[res] = nurls |  | ||||||
|             resp = nres |  | ||||||
|         print(json.dumps(resp, indent=2)) |  | ||||||
|     if url: |  | ||||||
|         print('Total for {}: {}'.format(url, total)) |  | ||||||
|  |  | ||||||
| @credentials.command('add') |  | ||||||
| @click.option('--consumer_key', default=None) |  | ||||||
| @click.option('--consumer_secret', default=None) |  | ||||||
| @click.option('--token_key', default=None) |  | ||||||
| @click.option('--token_secret', default=None) |  | ||||||
| @click.argument('user_name') |  | ||||||
| def add(user_name, consumer_key, consumer_secret, token_key, token_secret): |  | ||||||
|     if not consumer_key: |  | ||||||
|         consumer_key = click.prompt('Please, enter your YOUR CONSUMER KEY') |  | ||||||
|     if not consumer_secret: |  | ||||||
|         consumer_secret = click.prompt('Please, enter your CONSUMER SECRET') |  | ||||||
|     if not token_key: |  | ||||||
|         token_key = click.prompt('Please, enter your ACCESS TOKEN') |  | ||||||
|     if not token_secret: |  | ||||||
|         token_secret = click.prompt('Please, enter your ACCESS TOKEN SECRET') |  | ||||||
|     utils.add_credentials(conffile=bconf.CONFIG_FILE, user=user_name, consumer_key=consumer_key, consumer_secret=consumer_secret, |  | ||||||
|                           token_key=token_key, token_secret=token_secret) |  | ||||||
|     click.echo('Credentials added for {}'.format(user_name)) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| @main.group() | @main.group() | ||||||
| @click.pass_context  | @click.pass_context  | ||||||
| @@ -126,43 +51,22 @@ def tweet(ctx): | |||||||
| @click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False) | @click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False) | ||||||
| @click.argument('tweetid') | @click.argument('tweetid') | ||||||
| def get_tweet(tweetid, write, folder, update): | def get_tweet(tweetid, write, folder, update): | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     utils.download_tweet(wq, tweetid, write, folder, update) |     utils.download_tweet(wq, tweetid, write, folder, update) | ||||||
|          |          | ||||||
| @tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file. | @tweet.command('get_all') | ||||||
| The result is stored as individual json files in your folder of choice.''') |  | ||||||
| @click.argument('tweetsfile', 'File with a list of tweets to look up') | @click.argument('tweetsfile', 'File with a list of tweets to look up') | ||||||
| @click.option('-f', '--folder', default="tweets") | @click.option('-f', '--folder', default="tweets") | ||||||
| @click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!') |  | ||||||
| @click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads') |  | ||||||
| @click.option('-d', '--delimiter', default=",") |  | ||||||
| @click.option('-h', '--header', help='Discard the first line (use it as a header)', |  | ||||||
|               is_flag=True, default=False) |  | ||||||
| @click.option('-q', '--quotechar', default='"') |  | ||||||
| @click.option('-c', '--column', type=int, default=0) |  | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotechar, column): | def get_tweets(ctx, tweetsfile, folder): | ||||||
|     if update and not click.confirm('This may overwrite existing tweets. Continue?'): |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|         click.echo('Cancelling') |     utils.download_tweets(wq, tweetsfile, folder) | ||||||
|         return |  | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |  | ||||||
|  |  | ||||||
|     status = tqdm('Queried') |  | ||||||
|     failed = 0 |  | ||||||
|     for tid, obj in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter, |  | ||||||
|                                         batch_method=utils.tweet_download_batch, |  | ||||||
|                                         header=header, quotechar=quotechar, |  | ||||||
|                                         column=column, update=update, retry_failed=retry): |  | ||||||
|         status.update(1) |  | ||||||
|         if not obj: |  | ||||||
|             failed += 1 |  | ||||||
|             status.set_description('Failed: %s. Queried' % failed, refresh=True) |  | ||||||
|  |  | ||||||
| @tweet.command('search') | @tweet.command('search') | ||||||
| @click.argument('query') | @click.argument('query') | ||||||
| @click.pass_context  | @click.pass_context  | ||||||
| def search(ctx, query): | def search(ctx, query): | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     t = utils.search_tweet(wq, query) |     t = utils.search_tweet(wq, query) | ||||||
|     print(json.dumps(t, indent=2)) |     print(json.dumps(t, indent=2)) | ||||||
|  |  | ||||||
| @@ -170,7 +74,7 @@ def search(ctx, query): | |||||||
| @click.argument('user') | @click.argument('user') | ||||||
| @click.pass_context  | @click.pass_context  | ||||||
| def timeline(ctx, user): | def timeline(ctx, user): | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     t = utils.user_timeline(wq, user) |     t = utils.user_timeline(wq, user) | ||||||
|     print(json.dumps(t, indent=2)) |     print(json.dumps(t, indent=2)) | ||||||
|  |  | ||||||
| @@ -196,7 +100,7 @@ def list_users(ctx, db): | |||||||
| @click.option('-f', '--folder', default="users") | @click.option('-f', '--folder', default="users") | ||||||
| @click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False) | @click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False) | ||||||
| def get_user(user, write, folder, update): | def get_user(user, write, folder, update): | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     if not write: |     if not write: | ||||||
|         u = utils.get_user(wq, user) |         u = utils.get_user(wq, user) | ||||||
|         js = json.dumps(u, indent=2) |         js = json.dumps(u, indent=2) | ||||||
| @@ -213,29 +117,15 @@ def get_user(user, write, folder, update): | |||||||
|         js = json.dumps(u, indent=2) |         js = json.dumps(u, indent=2) | ||||||
|         print(js, file=f) |         print(js, file=f) | ||||||
|  |  | ||||||
| @users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file. | @users.command('get_all') | ||||||
|                The result is stored as individual json files in your folder of choice.''') |  | ||||||
| @click.argument('usersfile', 'File with a list of users to look up') | @click.argument('usersfile', 'File with a list of users to look up') | ||||||
| @click.option('-f', '--folder', default="users") | @click.option('-f', '--folder', default="users") | ||||||
| @click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!') |  | ||||||
| @click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads') |  | ||||||
| @click.option('-d', '--delimiter', default=",") |  | ||||||
| @click.option('-h', '--header', help='Discard the first line (use it as a header)', |  | ||||||
|               is_flag=True, default=False) |  | ||||||
| @click.option('-q', '--quotechar', default='"') |  | ||||||
| @click.option('-c', '--column', type=int, default=0) |  | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def get_users(ctx, usersfile, folder, update, retry, delimiter, header, quotechar, column): | def get_users(ctx, usersfile, folder): | ||||||
|     if update and not click.confirm('This may overwrite existing users. Continue?'): |     with open(usersfile) as f: | ||||||
|         click.echo('Cancelling') |         for line in f: | ||||||
|         return |             uid = line.strip() | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |             ctx.invoke(get_user, folder=folder, user=uid, write=True) | ||||||
|     for i in utils.download_file(wq, usersfile, folder, delimiter=delimiter, |  | ||||||
|                                  batch_method=utils.user_download_batch, |  | ||||||
|                                  update=update, retry_failed=retry, |  | ||||||
|                                  header=header, quotechar=quotechar, |  | ||||||
|                                  column=column): |  | ||||||
|         pass |  | ||||||
|  |  | ||||||
| @users.command('crawl') | @users.command('crawl') | ||||||
| @click.option('--db', required=True, help='Database to save all users.') | @click.option('--db', required=True, help='Database to save all users.') | ||||||
| @@ -256,7 +146,7 @@ def crawl_users(ctx, usersfile, skip, until, threads, db): | |||||||
|             return ExitStack() |             return ExitStack() | ||||||
|  |  | ||||||
|  |  | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, |     logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, | ||||||
|                                                                                       len(wq.queue))) |                                                                                       len(wq.queue))) | ||||||
|  |  | ||||||
| @@ -350,6 +240,11 @@ def crawl_users(ctx, usersfile, skip, until, threads, db): | |||||||
|              |              | ||||||
|     logger.info('Done!') |     logger.info('Done!') | ||||||
|  |  | ||||||
|  | @main.group('api') | ||||||
|  | def api(): | ||||||
|  |     pass | ||||||
|  |  | ||||||
|  |  | ||||||
| @main.group('extractor') | @main.group('extractor') | ||||||
| @click.pass_context | @click.pass_context | ||||||
| @click.option('--db', required=True, help='Database of users.') | @click.option('--db', required=True, help='Database of users.') | ||||||
| @@ -420,7 +315,7 @@ def users_extractor(ctx): | |||||||
| @click.pass_context | @click.pass_context | ||||||
| def extract(ctx, recursive, user, name, initfile): | def extract(ctx, recursive, user, name, initfile): | ||||||
|     print(locals()) |     print(locals()) | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     dburi = ctx.obj['DBURI'] |     dburi = ctx.obj['DBURI'] | ||||||
|     utils.extract(wq, |     utils.extract(wq, | ||||||
|                   recursive=recursive, |                   recursive=recursive, | ||||||
| @@ -432,42 +327,31 @@ def extract(ctx, recursive, user, name, initfile): | |||||||
| @extractor.command('reset') | @extractor.command('reset') | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def reset_extractor(ctx): | def reset_extractor(ctx): | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     db = ctx.obj['DBURI'] |     db = ctx.obj['DBURI'] | ||||||
|     session = make_session(db) |     session = make_session(db) | ||||||
|     session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) |     session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) | ||||||
|  |  | ||||||
|  | @api.command('limits') | ||||||
| @main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False), | @click.argument('url', required=False) | ||||||
|               help='''Issue a call to an endpoint of the Twitter API.''') |  | ||||||
| @click.argument('cmd', nargs=1) |  | ||||||
| @click.option('--tweets', is_flag=True, help='Fetch more tweets using smart pagination. Use --count to control how many tweets to fetch per call, and --max_count to set the number of desired tweets (or -1 to get as many as possible).', type=bool, default=False) |  | ||||||
| @click.option('--users', is_flag=True, help='Fetch more users using smart pagination. Use --count to control how many users to fetch per call, and --max_count to set the number of desired users (or -1 to get as many as possible).', type=bool, default=False) |  | ||||||
| @click.argument('api_args', nargs=-1, type=click.UNPROCESSED) |  | ||||||
| @click.pass_context | @click.pass_context | ||||||
| def api(ctx, cmd, tweets, users, api_args): | def get_limits(ctx, url): | ||||||
|     opts = {} |     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||||
|     mappings = { |     for worker in wq.queue: | ||||||
|         'id': '_id' |         resp = worker.client.application.rate_limit_status() | ||||||
|     } |         print('#'*20) | ||||||
|     i = iter(api_args) |         print(worker.name) | ||||||
|     for k, v in zip(i, i): |         if url: | ||||||
|         k = k.replace('--', '') |             limit = 'NOT FOUND' | ||||||
|         if k in mappings: |             print('URL is: {}'.format(url)) | ||||||
|             k = mappings[k] |             cat = url.split('/')[1] | ||||||
|         opts[k] = v |             if cat in resp['resources']: | ||||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) |                 limit = resp['resources'][cat].get(url, None) or resp['resources'][cat] | ||||||
|     if tweets: |  | ||||||
|         resp = utils.consume_tweets(wq[cmd], **opts) |  | ||||||
|     elif users: |  | ||||||
|         resp = utils.consume_users(wq[cmd], **opts) |  | ||||||
|             else: |             else: | ||||||
|         resp = wq[cmd](**opts) |                 print('Cat {} not found'.format(cat)) | ||||||
|         print(json.dumps(resp)) |             print('{}: {}'.format(url, limit))            | ||||||
|         return |         else: | ||||||
|     for i in resp: |             print(json.dumps(resp, indent=2)) | ||||||
|         print(json.dumps(i)) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| @main.command('server') | @main.command('server') | ||||||
| @click.argument('CONSUMER_KEY', required=True) | @click.argument('CONSUMER_KEY', required=True) | ||||||
| @@ -487,11 +371,11 @@ def stream(ctx): | |||||||
| @stream.command('get') | @stream.command('get') | ||||||
| @click.option('-l', '--locations', default=None) | @click.option('-l', '--locations', default=None) | ||||||
| @click.option('-t', '--track', default=None) | @click.option('-t', '--track', default=None) | ||||||
| @click.option('-f', '--file', default=None, help='File to store the stream of tweets') | @click.option('-f', '--file', help='File to store the stream of tweets') | ||||||
| @click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True) | @click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True) | ||||||
| @click.pass_context  | @click.pass_context  | ||||||
| def get_stream(ctx, locations, track, file, politelyretry): | def get_stream(ctx, locations, track, file, politelyretry): | ||||||
|     wq = crawlers.StreamQueue.from_config(conffile=bconf.CONFIG_FILE, max_workers=1) |     wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1) | ||||||
|  |  | ||||||
|     query_args = {} |     query_args = {} | ||||||
|     if locations: |     if locations: | ||||||
| @@ -510,14 +394,10 @@ def get_stream(ctx, locations, track, file, politelyretry): | |||||||
|                 iterator = wq.statuses.sample() |                 iterator = wq.statuses.sample() | ||||||
|             else: |             else: | ||||||
|                 iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75") |                 iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75") | ||||||
|             try: |  | ||||||
|             for i in iterator: |             for i in iterator: | ||||||
|                 yield i |                 yield i | ||||||
|             if not politelyretry: |             if not politelyretry: | ||||||
|                 return |                 return | ||||||
|             except Exception: |  | ||||||
|                 if not politelyretry: |  | ||||||
|                     raise ex |  | ||||||
|             thishangup = time.time() |             thishangup = time.time() | ||||||
|             if thishangup - lasthangup < 60: |             if thishangup - lasthangup < 60: | ||||||
|                 raise Exception('Too many hangups in a row.') |                 raise Exception('Too many hangups in a row.') | ||||||
| @@ -535,7 +415,7 @@ def get_stream(ctx, locations, track, file, politelyretry): | |||||||
| def read_stream(ctx, file, tail): | def read_stream(ctx, file, tail): | ||||||
|     for tweet in utils.read_file(file, tail=tail): |     for tweet in utils.read_file(file, tail=tail): | ||||||
|         try: |         try: | ||||||
|             print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['created_at'], screen_name=tweet['user']['screen_name'], text=tweet['text'])) |             print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text'])) | ||||||
|         except (KeyError, TypeError): |         except (KeyError, TypeError): | ||||||
|             print('Raw tweet: {}'.format(tweet)) |             print('Raw tweet: {}'.format(tweet)) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -11,4 +11,3 @@ E.g.: | |||||||
|     app.run() |     app.run() | ||||||
| ''' | ''' | ||||||
| CREDENTIALS = '~/.bitter-credentials.json' | CREDENTIALS = '~/.bitter-credentials.json' | ||||||
| CONFIG_FILE = '~/.bitter.yaml' |  | ||||||
|   | |||||||
| @@ -10,7 +10,6 @@ from twitter import * | |||||||
| from collections import OrderedDict | from collections import OrderedDict | ||||||
| from threading import Lock | from threading import Lock | ||||||
| from itertools import islice | from itertools import islice | ||||||
| from functools import partial |  | ||||||
| try: | try: | ||||||
|     import itertools.ifilter as filter |     import itertools.ifilter as filter | ||||||
| except ImportError: | except ImportError: | ||||||
| @@ -39,9 +38,6 @@ class AttrToFunc(object): | |||||||
|         else: |         else: | ||||||
|             return extend_call(k) |             return extend_call(k) | ||||||
|  |  | ||||||
|     def __getitem__(self, k): |  | ||||||
|         return partial(self.handler, self.__uriparts+k.split('/')) |  | ||||||
|  |  | ||||||
|     def __call__(self, *args, **kwargs): |     def __call__(self, *args, **kwargs): | ||||||
|         # for i, a in enumerate(args)e |         # for i, a in enumerate(args)e | ||||||
|         #     kwargs[i] = a |         #     kwargs[i] = a | ||||||
| @@ -58,18 +54,6 @@ class FromCredentialsMixin(object): | |||||||
|             wq.ready(cls.worker_class(cred["user"], cred)) |             wq.ready(cls.worker_class(cred["user"], cred)) | ||||||
|         return wq |         return wq | ||||||
|      |      | ||||||
| class FromConfigMixin(object): |  | ||||||
|  |  | ||||||
|     @classmethod |  | ||||||
|     def from_config(cls, config=None, conffile=None, max_workers=None): |  | ||||||
|         wq = cls() |  | ||||||
|  |  | ||||||
|         if not config: |  | ||||||
|           with utils.config(conffile) as c: |  | ||||||
|               config = c |  | ||||||
|         for cred in islice(config['credentials'], max_workers): |  | ||||||
|             wq.ready(cls.worker_class(cred["user"], cred)) |  | ||||||
|         return wq |  | ||||||
|  |  | ||||||
| class TwitterWorker(object): | class TwitterWorker(object): | ||||||
|     api_class = None |     api_class = None | ||||||
| @@ -91,12 +75,6 @@ class TwitterWorker(object): | |||||||
|             self._client = self.api_class(auth=auth) |             self._client = self.api_class(auth=auth) | ||||||
|         return self._client |         return self._client | ||||||
|  |  | ||||||
|     def __repr__(self): |  | ||||||
|         msg = '<{} for {}>'.format(self.__class__.__name__, self.name) |  | ||||||
|         if self.busy: |  | ||||||
|             msg += ' [busy]' |  | ||||||
|         return msg |  | ||||||
|  |  | ||||||
| class RestWorker(TwitterWorker): | class RestWorker(TwitterWorker): | ||||||
|     api_class = Twitter |     api_class = Twitter | ||||||
|  |  | ||||||
| @@ -115,14 +93,13 @@ class RestWorker(TwitterWorker): | |||||||
|  |  | ||||||
|     def get_wait(self, uriparts): |     def get_wait(self, uriparts): | ||||||
|         limits = self.get_limit(uriparts) |         limits = self.get_limit(uriparts) | ||||||
|         if limits.get('remaining', 1) > 0: |         if limits['remaining'] > 0: | ||||||
|             return 0 |             return 0 | ||||||
|         reset = limits.get('reset', 0) |         reset = limits.get('reset', 0) | ||||||
|         now = time.time() |         now = time.time() | ||||||
|         return max(0, (reset-now)) |         return max(0, (reset-now)) | ||||||
|  |  | ||||||
|     def get_limit(self, uriparts): |     def get_limit(self, uriparts): | ||||||
|         uriparts = list(u for u in uriparts if u) |  | ||||||
|         uri = '/'+'/'.join(uriparts) |         uri = '/'+'/'.join(uriparts) | ||||||
|         for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items(): |         for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items(): | ||||||
|             if ix.startswith(uri): |             if ix.startswith(uri): | ||||||
| @@ -155,7 +132,7 @@ class RestWorker(TwitterWorker): | |||||||
| class QueueException(BaseException): | class QueueException(BaseException): | ||||||
|     pass |     pass | ||||||
|  |  | ||||||
| class QueueMixin(AttrToFunc, FromCredentialsMixin, FromConfigMixin): | class QueueMixin(AttrToFunc, FromCredentialsMixin): | ||||||
|     def __init__(self, wait=True): |     def __init__(self, wait=True): | ||||||
|         logger.debug('Creating worker queue') |         logger.debug('Creating worker queue') | ||||||
|         self.queue = set() |         self.queue = set() | ||||||
|   | |||||||
| @@ -3,13 +3,10 @@ import json | |||||||
|  |  | ||||||
| from sqlalchemy.ext.declarative import declarative_base | from sqlalchemy.ext.declarative import declarative_base | ||||||
| from sqlalchemy.types import BigInteger, Integer, Text, Boolean | from sqlalchemy.types import BigInteger, Integer, Text, Boolean | ||||||
| from sqlalchemy.schema import ForeignKey |  | ||||||
| from sqlalchemy.pool import SingletonThreadPool |  | ||||||
| from sqlalchemy import Column, Index | from sqlalchemy import Column, Index | ||||||
|  |  | ||||||
| from sqlalchemy import create_engine | from sqlalchemy import create_engine | ||||||
| from sqlalchemy.orm import sessionmaker | from sqlalchemy.orm import sessionmaker | ||||||
| from functools import wraps |  | ||||||
|  |  | ||||||
| Base = declarative_base() | Base = declarative_base() | ||||||
|  |  | ||||||
| @@ -88,48 +85,34 @@ class ExtractorEntry(Base): | |||||||
|     user = Column(BigInteger, index=True) |     user = Column(BigInteger, index=True) | ||||||
|     cursor = Column(BigInteger, default=-1) |     cursor = Column(BigInteger, default=-1) | ||||||
|     pending = Column(Boolean, default=False) |     pending = Column(Boolean, default=False) | ||||||
|     errors = Column(Text, default="") |  | ||||||
|     busy = Column(Boolean, default=False) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class Search(Base): |  | ||||||
|     __tablename__ = 'search_queries' |  | ||||||
|  |  | ||||||
|     id = Column(Integer, primary_key=True, index=True, unique=True) |  | ||||||
|     endpoint = Column(Text, comment="Endpoint URL") |  | ||||||
|     attrs = Column(Text, comment="Text version of the dictionary of parameters") |  | ||||||
|     count = Column(Integer) |  | ||||||
|     current_count = Column(Integer) |  | ||||||
|     current_id = Column(BigInteger, comment='Oldest ID retrieved (should match max_id when done)') |  | ||||||
|     since_id = Column(BigInteger) |  | ||||||
|  |  | ||||||
| class SearchResults(Base): |  | ||||||
|     __tablename__ = 'search_results' |  | ||||||
|     id = Column(Integer, primary_key=True, index=True, unique=True) |  | ||||||
|     search_id = Column(ForeignKey('search_queries.id')) |  | ||||||
|     resource_id = Column(Text) |  | ||||||
|  |  | ||||||
| def memoize(f): |  | ||||||
|     memo = {} |  | ||||||
|     @wraps(f) |  | ||||||
|     def helper(self, **kwargs): |  | ||||||
|         st = dict_to_str(kwargs) |  | ||||||
|         key = (self.__uriparts, st) |  | ||||||
|         if key not in memo: |  | ||||||
|             memo[key] = f(self, **kwargs) |  | ||||||
|         return memo[key] |  | ||||||
|     return helper |  | ||||||
|  |  | ||||||
| def make_session(url): | def make_session(url): | ||||||
|     if not isinstance(url, str): |     engine = create_engine(url)#, echo=True) | ||||||
|         print(url) |  | ||||||
|         raise Exception("FUCK") |  | ||||||
|     engine = create_engine(url, poolclass=SingletonThreadPool)#, echo=True) |  | ||||||
|     Base.metadata.create_all(engine) |     Base.metadata.create_all(engine) | ||||||
|     Session = sessionmaker(bind=engine) |     Session = sessionmaker(bind=engine) | ||||||
|     session = Session() |     session = Session() | ||||||
|     return session |     return session | ||||||
|      |      | ||||||
|  |  | ||||||
| def dict_to_str(args): |  | ||||||
|     return json.dumps(args, sort_keys=True) | def test(db='sqlite:///users.db'): | ||||||
|  |  | ||||||
|  |     from sqlalchemy import exists | ||||||
|  |     session = make_session(db) | ||||||
|  |  | ||||||
|  |     our_user = session.query(User).first()  | ||||||
|  |  | ||||||
|  |     print(our_user.name) | ||||||
|  |     print(session.query(User).count()) | ||||||
|  |     fake_user = User(name="Fake user") | ||||||
|  |     session.add(fake_user) | ||||||
|  |     session.commit() | ||||||
|  |     print(session.query(User).count()) | ||||||
|  |     print(session.query(exists().where(User.name == "Fake user")).scalar()) | ||||||
|  |     fake_committed = session.query(User).filter_by(name="Fake user").first() | ||||||
|  |     print(fake_committed.id) | ||||||
|  |     print(fake_committed.name) | ||||||
|  |     session.delete(fake_committed) | ||||||
|  |     session.commit() | ||||||
|  |     print(session.query(User).count()) | ||||||
|  |     print(list(session.execute('SELECT 1 from users where id=\'%s\'' % 1548))) | ||||||
|   | |||||||
							
								
								
									
										663
									
								
								bitter/utils.py
									
									
									
									
									
								
							
							
						
						
									
										663
									
								
								bitter/utils.py
									
									
									
									
									
								
							| @@ -3,9 +3,6 @@ from __future__ import print_function | |||||||
| import logging | import logging | ||||||
| import time | import time | ||||||
| import json | import json | ||||||
| import yaml |  | ||||||
| import csv |  | ||||||
| import io |  | ||||||
|  |  | ||||||
| import signal | import signal | ||||||
| import sys | import sys | ||||||
| @@ -13,21 +10,18 @@ import sqlalchemy | |||||||
| import os | import os | ||||||
| import multiprocessing | import multiprocessing | ||||||
| from multiprocessing.pool import ThreadPool | from multiprocessing.pool import ThreadPool | ||||||
| from multiprocessing import Queue |  | ||||||
|  |  | ||||||
| import queue |  | ||||||
| import threading |  | ||||||
| from select import select |  | ||||||
|  |  | ||||||
| from functools import partial |  | ||||||
|  |  | ||||||
| from tqdm import tqdm | from tqdm import tqdm | ||||||
|  |  | ||||||
| from itertools import islice, chain | from itertools import islice, chain | ||||||
| from contextlib import contextmanager | from contextlib import contextmanager | ||||||
|  |  | ||||||
|  | try: | ||||||
|  |     from itertools import izip_longest | ||||||
|  | except ImportError: | ||||||
|  |     from itertools import zip_longest | ||||||
|  |  | ||||||
| from collections import Counter | from collections import Counter | ||||||
| from random import choice |  | ||||||
|  |  | ||||||
| from builtins import map, filter | from builtins import map, filter | ||||||
|  |  | ||||||
| @@ -37,12 +31,6 @@ from bitter.models import Following, User, ExtractorEntry, make_session | |||||||
|  |  | ||||||
| from bitter import config | from bitter import config | ||||||
|  |  | ||||||
| # Fix Python 2.x. |  | ||||||
| try: |  | ||||||
|     UNICODE_EXISTS = bool(type(unicode)) |  | ||||||
| except NameError: |  | ||||||
|     unicode = lambda s: str(s) |  | ||||||
|  |  | ||||||
| logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -50,93 +38,38 @@ def signal_handler(signal, frame): | |||||||
|     logger.info('You pressed Ctrl+C!') |     logger.info('You pressed Ctrl+C!') | ||||||
|     sys.exit(0) |     sys.exit(0) | ||||||
|  |  | ||||||
|  |  | ||||||
| def chunk(iterable, n): | def chunk(iterable, n): | ||||||
|     it = iter(iterable) |     it = iter(iterable) | ||||||
|     return iter(lambda: tuple(islice(it, n)), ()) |     return iter(lambda: tuple(islice(it, n)), ()) | ||||||
|  |  | ||||||
|  |  | ||||||
| def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): | def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): | ||||||
|     source = chunk(source, chunksize) |     source = chunk(source, chunksize) | ||||||
|     p = ThreadPool(numcpus*2) |     p = ThreadPool(numcpus*2) | ||||||
|     results = p.imap_unordered(func, source) |     for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))): | ||||||
|     for i in chain.from_iterable(results): |  | ||||||
|         yield i |         yield i | ||||||
|  |  | ||||||
|  | def get_credentials_path(credfile=None): | ||||||
| def get_config_path(conf=None): |     if not credfile: | ||||||
|     if not conf: |         if config.CREDENTIALS: | ||||||
|         if config.CONFIG_FILE: |             credfile = config.CREDENTIALS | ||||||
|             conf = config.CONFIG_FILE |  | ||||||
|         else: |         else: | ||||||
|             raise Exception('No valid config file') |             raise Exception('No valid credentials file') | ||||||
|     return os.path.expanduser(conf) |     return os.path.expanduser(credfile) | ||||||
|  |  | ||||||
|  |  | ||||||
| def copy_credentials_to_config(credfile, conffile=None): |  | ||||||
|       p = get_config_path(credfile) |  | ||||||
|       with open(p) as old: |  | ||||||
|           for line in old: |  | ||||||
|               cred = json.loads(line.strip()) |  | ||||||
|               add_credentials(conffile, **cred) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def save_config(conf, conffile=None): |  | ||||||
|     with config(conffile) as c: |  | ||||||
|         c.clear() |  | ||||||
|         c.update(conf) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| @contextmanager | @contextmanager | ||||||
| def config(conffile=None): | def credentials_file(credfile, *args, **kwargs): | ||||||
|     d = read_config(conffile) |     p = get_credentials_path(credfile) | ||||||
|     try: |     with open(p, *args, **kwargs) as f: | ||||||
|         yield d |         yield f | ||||||
|     finally: |  | ||||||
|         write_config(d, conffile) |  | ||||||
|  |  | ||||||
|  | def iter_credentials(credfile=None): | ||||||
|  |     with credentials_file(credfile) as f: | ||||||
|  |         for l in f: | ||||||
|  |             yield json.loads(l.strip()) | ||||||
|  |  | ||||||
| def read_config(conffile): | def get_credentials(credfile=None, inverse=False, **kwargs): | ||||||
|     p = conffile and get_config_path(conffile) |  | ||||||
|     if p: |  | ||||||
|         if not os.path.exists(p): |  | ||||||
|             raise IOError('{} file does not exist.'.format(p)) |  | ||||||
|         f = open(p, 'r') |  | ||||||
|     elif 'BITTER_CONFIG' not in os.environ: |  | ||||||
|         raise Exception('No config file or BITTER_CONFIG env variable.') |  | ||||||
|     else: |  | ||||||
|         f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n')) |  | ||||||
|     return yaml.load(f) or {'credentials': []} |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def write_config(conf, conffile=None): |  | ||||||
|     if not conf: |  | ||||||
|         conf = {'credentials': []} |  | ||||||
|     if conffile: |  | ||||||
|         p = get_config_path(conffile) |  | ||||||
|         with open(p, 'w') as f: |  | ||||||
|             yaml.dump(conf, f) |  | ||||||
|     else: |  | ||||||
|         os.environ['BITTER_CONFIG'] = yaml.dump(conf) |  | ||||||
|  |  | ||||||
| def iter_credentials(conffile=None): |  | ||||||
|     with config(conffile) as c: |  | ||||||
|         for i in c['credentials']: |  | ||||||
|             yield i |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def create_config_file(conffile=None): |  | ||||||
|     if not conffile: |  | ||||||
|         return |  | ||||||
|     conffile = get_config_path(conffile) |  | ||||||
|     with open(conffile, 'a'): |  | ||||||
|         pass |  | ||||||
|     write_config(None, conffile) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_credentials(conffile=None, inverse=False, **kwargs): |  | ||||||
|     creds = [] |     creds = [] | ||||||
|     for i in iter_credentials(conffile): |     for i in iter_credentials(credfile): | ||||||
|         matches = all(map(lambda x: i[x[0]] == x[1], kwargs.items())) |         matches = all(map(lambda x: i[x[0]] == x[1], kwargs.items())) | ||||||
|         if matches and not inverse: |         if matches and not inverse: | ||||||
|             creds.append(i) |             creds.append(i) | ||||||
| @@ -144,23 +77,24 @@ def get_credentials(conffile=None, inverse=False, **kwargs): | |||||||
|             creds.append(i) |             creds.append(i) | ||||||
|     return creds |     return creds | ||||||
|  |  | ||||||
|  | def create_credentials(credfile=None): | ||||||
|  |     credfile = get_credentials_path(credfile) | ||||||
|  |     with credentials_file(credfile, 'a'): | ||||||
|  |         pass | ||||||
|  |  | ||||||
| def delete_credentials(conffile=None, **creds): | def delete_credentials(credfile=None, **creds): | ||||||
|     tokeep = get_credentials(conffile, inverse=True, **creds) |     tokeep = get_credentials(credfile, inverse=True, **creds) | ||||||
|     with config(conffile) as c: |     with credentials_file(credfile, 'w') as f: | ||||||
|         c['credentials'] = list(tokeep) |         for i in tokeep: | ||||||
|  |             f.write(json.dumps(i)) | ||||||
|  |             f.write('\n') | ||||||
|  |  | ||||||
|  | def add_credentials(credfile=None, **creds): | ||||||
| def add_credentials(conffile=None, **creds): |     exist = get_credentials(credfile, **creds) | ||||||
|     try: |     if not exist: | ||||||
|         exist = get_credentials(conffile, **creds) |         with credentials_file(credfile, 'a') as f: | ||||||
|     except IOError: |             f.write(json.dumps(creds)) | ||||||
|         exist = False |             f.write('\n') | ||||||
|         create_config_file(conffile) |  | ||||||
|     if exist: |  | ||||||
|         return |  | ||||||
|     with config(conffile) as c: |  | ||||||
|         c['credentials'].append(creds) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_hashtags(iter_tweets, best=None): | def get_hashtags(iter_tweets, best=None): | ||||||
| @@ -169,13 +103,8 @@ def get_hashtags(iter_tweets, best=None): | |||||||
|         c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {})) |         c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {})) | ||||||
|     return c |     return c | ||||||
|  |  | ||||||
|  |  | ||||||
| def read_file(filename, tail=False): | def read_file(filename, tail=False): | ||||||
|     if filename == '-': |     with open(filename) as f: | ||||||
|         f = sys.stdin |  | ||||||
|     else: |  | ||||||
|         f = open(filename) |  | ||||||
|     try: |  | ||||||
|         while True: |         while True: | ||||||
|             line = f.readline() |             line = f.readline() | ||||||
|             if line not in (None, '', '\n'): |             if line not in (None, '', '\n'): | ||||||
| @@ -186,9 +115,6 @@ def read_file(filename, tail=False): | |||||||
|                     time.sleep(1) |                     time.sleep(1) | ||||||
|                 else: |                 else: | ||||||
|                     return |                     return | ||||||
|     finally: |  | ||||||
|         if f != sys.stdin: |  | ||||||
|           close(f) |  | ||||||
|      |      | ||||||
|  |  | ||||||
| def get_users(wq, ulist, by_name=False, queue=None, max_users=100): | def get_users(wq, ulist, by_name=False, queue=None, max_users=100): | ||||||
| @@ -218,7 +144,6 @@ def get_users(wq, ulist, by_name=False, queue=None, max_users=100): | |||||||
|             else: |             else: | ||||||
|                 yield user |                 yield user | ||||||
|  |  | ||||||
|  |  | ||||||
| def trim_user(user): | def trim_user(user): | ||||||
|     if 'status' in user: |     if 'status' in user: | ||||||
|         del user['status'] |         del user['status'] | ||||||
| @@ -232,22 +157,14 @@ def trim_user(user): | |||||||
|     return user |     return user | ||||||
|  |  | ||||||
|  |  | ||||||
| def add_user(user, dburi=None, session=None, update=False): | def add_user(session, user, enqueue=False): | ||||||
|     if not session: |  | ||||||
|         session = make_session(dburi) |  | ||||||
|  |  | ||||||
|     user = trim_user(user) |     user = trim_user(user) | ||||||
|     olduser = session.query(User).filter(User.id == user['id']) |     olduser = session.query(User).filter(User.id==user['id']) | ||||||
|     if olduser: |     if olduser: | ||||||
|         if not update: |  | ||||||
|             return |  | ||||||
|         olduser.delete() |         olduser.delete() | ||||||
|     nuser = User() |     user = User(**user) | ||||||
|     for key, value in user.items(): |  | ||||||
|         setattr(nuser, key, value) |  | ||||||
|     user = nuser |  | ||||||
|     if update: |  | ||||||
|     session.add(user) |     session.add(user) | ||||||
|  |     if extract: | ||||||
|         logger.debug('Adding entry') |         logger.debug('Adding entry') | ||||||
|         entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() |         entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() | ||||||
|         if not entry: |         if not entry: | ||||||
| @@ -257,80 +174,91 @@ def add_user(user, dburi=None, session=None, update=False): | |||||||
|         entry.pending = True |         entry.pending = True | ||||||
|         entry.cursor = -1 |         entry.cursor = -1 | ||||||
|         session.commit() |         session.commit() | ||||||
|     session.close() |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def download_entry(wq, entry_id, dburi=None, recursive=False): | # TODO: adapt to the crawler | ||||||
|  | def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None): | ||||||
|  |     signal.signal(signal.SIGINT, signal_handler) | ||||||
|  |  | ||||||
|  |     w = wq.next() | ||||||
|  |     if not dburi: | ||||||
|  |         dburi = 'sqlite:///%s.db' % extractor_name | ||||||
|  |  | ||||||
|     session = make_session(dburi) |     session = make_session(dburi) | ||||||
|     if not session: |  | ||||||
|         raise Exception("Provide dburi or session") |     screen_names = [] | ||||||
|     logger.info("Downloading entry: %s (%s)" % (entry_id, type(entry_id))) |     user_ids = [] | ||||||
|     entry = session.query(ExtractorEntry).filter(ExtractorEntry.id==entry_id).first() |  | ||||||
|     user = session.query(User).filter(User.id == entry.user).first() |     def classify_user(id_or_name): | ||||||
|     download_user(wq, session, user, entry, recursive) |         try: | ||||||
|     session.close() |             int(user) | ||||||
|  |             user_ids.append(user) | ||||||
|  |             logger.info("Added user id") | ||||||
|  |         except ValueError: | ||||||
|  |             logger.info("Added screen_name") | ||||||
|  |             screen_names.append(user.split('@')[-1]) | ||||||
|  |  | ||||||
|  |     if user: | ||||||
|  |         classify_user(user) | ||||||
|  |  | ||||||
|  |     elif initfile: | ||||||
|  |         logger.info("No user. I will open %s" % initfile) | ||||||
|  |         with open(initfile, 'r') as f: | ||||||
|  |             for line in f: | ||||||
|  |                 user = line.strip().split(',')[0] | ||||||
|  |                 classify_user(user) | ||||||
|  |     else: | ||||||
|  |         logger.info('Using pending users from last session') | ||||||
|  |  | ||||||
|  |  | ||||||
| def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000): |     nusers = list(get_users(wq, screen_names, by_name=True)) | ||||||
|  |     if user_ids: | ||||||
|  |         nusers += list(get_users(wq, user_ids, by_name=False)) | ||||||
|  |  | ||||||
|     total_followers = user.followers_count |     for i in nusers: | ||||||
|  |         add_user(session, i, enqueue=True) | ||||||
|  |  | ||||||
|     if total_followers > max_followers: |     total_users = session.query(sqlalchemy.func.count(User.id)).scalar() | ||||||
|         entry.pending = False |     logger.info('Total users: {}'.format(total_users)) | ||||||
|         logger.info("Too many followers for user: %s" % user.screen_name) |     def pending_entries(): | ||||||
|         session.add(entry) |         pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count() | ||||||
|         session.commit() |         logger.info('Pending: {}'.format(pending)) | ||||||
|         return |         return pending | ||||||
|  |  | ||||||
|     if not entry: |  | ||||||
|         entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() or ExtractorEntry(user=user.id) |  | ||||||
|     session.add(entry) |  | ||||||
|     session.commit() |  | ||||||
|  |  | ||||||
|  |     while pending_entries() > 0: | ||||||
|  |         logger.info("Using account: %s" % w.name) | ||||||
|  |         candidate, entry = session.query(User, ExtractorEntry).\ | ||||||
|  |                            filter(ExtractorEntry.user == User.id).\ | ||||||
|  |                            filter(ExtractorEntry.pending == True).\ | ||||||
|  |                            order_by(User.followers_count).first() | ||||||
|  |         if not candidate: | ||||||
|  |             break | ||||||
|         pending = True |         pending = True | ||||||
|         cursor = entry.cursor |         cursor = entry.cursor | ||||||
|     uid = user.id |         uid = candidate.id | ||||||
|     name = user.name |         uobject = session.query(User).filter(User.id==uid).first() | ||||||
|  |         name = uobject.screen_name if uobject else None | ||||||
|  |  | ||||||
|         logger.info("#"*20) |         logger.info("#"*20) | ||||||
|         logger.info("Getting %s - %s" % (uid, name)) |         logger.info("Getting %s - %s" % (uid, name)) | ||||||
|         logger.info("Cursor %s" % cursor) |         logger.info("Cursor %s" % cursor) | ||||||
|     logger.info("Using account: %s" % wq.name) |         logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users)) | ||||||
|  |  | ||||||
|     _fetched_followers = 0 |  | ||||||
|  |  | ||||||
|     def fetched_followers(): |  | ||||||
|         return session.query(Following).filter(Following.isfollowed==uid).count() |  | ||||||
|  |  | ||||||
|     attempts = 0 |  | ||||||
|     while cursor > 0 or fetched_followers() < total_followers: |  | ||||||
|         try: |         try: | ||||||
|             resp = wq.followers.ids(user_id=uid, cursor=cursor) |             resp = wq.followers.ids(user_id=uid, cursor=cursor) | ||||||
|         except TwitterHTTPError as ex: |         except TwitterHTTPError as ex: | ||||||
|             attempts += 1 |             if ex.e.code in (401, ): | ||||||
|             if ex.e.code in (401, ) or attempts > 3: |  | ||||||
|                 logger.info('Not authorized for user: {}'.format(uid)) |                 logger.info('Not authorized for user: {}'.format(uid)) | ||||||
|                 entry.errors = ex.message |                 resp = {} | ||||||
|                 break |         if 'ids' in resp: | ||||||
|         if 'ids' not in resp: |  | ||||||
|             logger.info("Error with id %s %s" % (uid, resp)) |  | ||||||
|             entry.pending = False |  | ||||||
|             entry.errors = "No ids in response: %s" % resp |  | ||||||
|             break |  | ||||||
|  |  | ||||||
|             logger.info("New followers: %s" % len(resp['ids'])) |             logger.info("New followers: %s" % len(resp['ids'])) | ||||||
|             if recursive: |             if recursive: | ||||||
|                 newusers = get_users(wq, resp) |                 newusers = get_users(wq, resp) | ||||||
|             for newuser in newusers: |                 for user in newusers: | ||||||
|                 add_user(session=session, user=newuser) |                     add_user(session, newuser, enqueue=True) | ||||||
|  |  | ||||||
|         if 'ids' not in resp or not resp['ids']: |  | ||||||
|             logger.info('NO IDS in response') |  | ||||||
|             break |  | ||||||
|             for i in resp['ids']: |             for i in resp['ids']: | ||||||
|                 existing_user = session.query(Following).\ |                 existing_user = session.query(Following).\ | ||||||
|                             filter(Following.isfollowed == uid).\ |                                 filter(Following.isfollowed==uid).\ | ||||||
|                             filter(Following.follower == i).first() |                                 filter(Following.follower==i).first() | ||||||
|                 now = int(time.time()) |                 now = int(time.time()) | ||||||
|                 if existing_user: |                 if existing_user: | ||||||
|                     existing_user.created_at_stamp = now |                     existing_user.created_at_stamp = now | ||||||
| @@ -340,111 +268,32 @@ def download_user(wq, session, user, entry=None, recursive=False, max_followers= | |||||||
|                                   created_at_stamp=now) |                                   created_at_stamp=now) | ||||||
|                     session.add(f) |                     session.add(f) | ||||||
|  |  | ||||||
|         logger.info("Fetched: %s/%s followers" % (fetched_followers(), |             total_followers = candidate.followers_count | ||||||
|  |             fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count() | ||||||
|  |             logger.info("Fetched: %s/%s followers" % (fetched_followers, | ||||||
|                                                       total_followers)) |                                                       total_followers)) | ||||||
|         entry.cursor = resp["next_cursor"] |             cursor = resp["next_cursor"] | ||||||
|  |             if cursor > 0: | ||||||
|         session.add(entry) |                 pending = True | ||||||
|         session.commit() |                 logger.info("Getting more followers for %s" % uid) | ||||||
|  |             else: | ||||||
|                 logger.info("Done getting followers for %s" % uid) |                 logger.info("Done getting followers for %s" % uid) | ||||||
|  |                 cursor = -1 | ||||||
|  |                 pending = False | ||||||
|  |         else: | ||||||
|  |             logger.info("Error with id %s %s" % (uid, resp)) | ||||||
|  |             pending = False | ||||||
|  |  | ||||||
|     entry.pending = False |         entry.pending = pending | ||||||
|     entry.busy = False |         entry.cursor = cursor | ||||||
|     session.add(entry) |         logger.debug('Entry: {} - {}'.format(entry.user, entry.pending)) | ||||||
|  |  | ||||||
|  |         session.add(candidate) | ||||||
|         session.commit() |         session.commit() | ||||||
|  |  | ||||||
|     logger.debug('Entry: {} - {}'.format(entry.user, entry.pending)) |  | ||||||
|         sys.stdout.flush() |         sys.stdout.flush() | ||||||
|  |  | ||||||
|  |  | ||||||
| def classify_user(id_or_name, screen_names, user_ids): |  | ||||||
|     try: |  | ||||||
|         int(id_or_name) |  | ||||||
|         user_ids.append(id_or_name) |  | ||||||
|         logger.debug("Added user id") |  | ||||||
|     except ValueError: |  | ||||||
|         logger.debug("Added screen_name") |  | ||||||
|         screen_names.append(id_or_name.split('@')[-1]) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None): |  | ||||||
|     signal.signal(signal.SIGINT, signal_handler) |  | ||||||
|  |  | ||||||
|     if not dburi: |  | ||||||
|         dburi = 'sqlite:///%s.db' % extractor_name |  | ||||||
|  |  | ||||||
|     session = make_session(dburi) |  | ||||||
|     session.query(ExtractorEntry).update({ExtractorEntry.busy: False}) |  | ||||||
|     session.commit() |  | ||||||
|  |  | ||||||
|  |  | ||||||
|     if not (user or initfile): |  | ||||||
|         logger.info('Using pending users from last session') |  | ||||||
|     else: |  | ||||||
|         screen_names = [] |  | ||||||
|         user_ids = [] |  | ||||||
|         if user: |  | ||||||
|             classify_user(user, screen_names, user_ids) |  | ||||||
|         elif initfile: |  | ||||||
|             logger.info("No user. I will open %s" % initfile) |  | ||||||
|             with open(initfile, 'r') as f: |  | ||||||
|                 for line in f: |  | ||||||
|                     user = line.strip().split(',')[0] |  | ||||||
|                     classify_user(user, screen_names, user_ids) |  | ||||||
|  |  | ||||||
|         def missing_user(ix, column=User.screen_name): |  | ||||||
|             res = session.query(User).filter(column == ix).count() == 0 |  | ||||||
|             if res: |  | ||||||
|                 logger.info("Missing user %s. Count: %s" % (ix, res)) |  | ||||||
|             return res |  | ||||||
|  |  | ||||||
|         screen_names = list(filter(missing_user, screen_names)) |  | ||||||
|         user_ids = list(filter(partial(missing_user, column=User.id_str), user_ids)) |  | ||||||
|         nusers = [] |  | ||||||
|         logger.info("Missing user ids: %s" % user_ids) |  | ||||||
|         logger.info("Missing screen names: %s" % screen_names) |  | ||||||
|         if screen_names: |  | ||||||
|             nusers = list(get_users(wq, screen_names, by_name=True)) |  | ||||||
|         if user_ids: |  | ||||||
|             nusers += list(get_users(wq, user_ids, by_name=False)) |  | ||||||
|  |  | ||||||
|         for i in nusers: |  | ||||||
|             add_user(dburi=dburi, user=i) |  | ||||||
|  |  | ||||||
|     total_users = session.query(sqlalchemy.func.count(User.id)).scalar() |  | ||||||
|     logger.info('Total users: {}'.format(total_users)) |  | ||||||
|  |  | ||||||
|     de = partial(download_entry, wq, dburi=dburi) |  | ||||||
|     pending = pending_entries(dburi) |  | ||||||
|     session.close() |  | ||||||
|  |  | ||||||
|     for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users): |  | ||||||
|         logger.info("Got %s" % i) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def pending_entries(dburi): |  | ||||||
|     session = make_session(dburi) |  | ||||||
|     while True: |  | ||||||
|         candidate, entry = session.query(User, ExtractorEntry).\ |  | ||||||
|                         filter(ExtractorEntry.user == User.id).\ |  | ||||||
|                         filter(ExtractorEntry.pending == True).\ |  | ||||||
|                         filter(ExtractorEntry.busy == False).\ |  | ||||||
|                         order_by(User.followers_count).first() |  | ||||||
|         if candidate: |  | ||||||
|             entry.busy = True |  | ||||||
|             session.add(entry) |  | ||||||
|             session.commit() |  | ||||||
|             yield int(entry.id) |  | ||||||
|             continue |  | ||||||
|         if session.query(ExtractorEntry).\ |  | ||||||
|             filter(ExtractorEntry.busy == True).count() > 0: |  | ||||||
|             time.sleep(1) |  | ||||||
|             continue |  | ||||||
|         logger.info("No more pending entries") |  | ||||||
|         break |  | ||||||
|     session.close() |  | ||||||
|  |  | ||||||
| def get_tweet(c, tid): | def get_tweet(c, tid): | ||||||
|     return c.statuses.show(id=tid) |     return c.statuses.show(id=tid) | ||||||
|  |  | ||||||
| @@ -465,261 +314,83 @@ def get_user(c, user): | |||||||
|         return c.users.lookup(screen_name=user)[0] |         return c.users.lookup(screen_name=user)[0] | ||||||
|  |  | ||||||
| def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False): | def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False): | ||||||
|     cached = cached_id(tweetid, folder) |     cached = cached_tweet(tweetid, folder) | ||||||
|     tweet = None |     tweet = None | ||||||
|     if update or not cached: |     if update or not cached: | ||||||
|         tweet = get_tweet(wq, tweetid) |         tweet = get_tweet(wq, tweetid) | ||||||
|         js = json.dumps(tweet) |         js = json.dumps(tweet, indent=2) | ||||||
|     if write: |     if write: | ||||||
|         if tweet: |         if tweet: | ||||||
|             write_json(js, folder) |             write_tweet_json(js, folder) | ||||||
|     else: |     else: | ||||||
|         print(js) |         print(js) | ||||||
|  |  | ||||||
|  |  | ||||||
| def cached_id(oid, folder): | def cached_tweet(tweetid, folder): | ||||||
|     tweet = None |     tweet = None | ||||||
|     file = os.path.join(folder, '%s.json' % oid) |     file = os.path.join(folder, '%s.json' % tweetid) | ||||||
|     if os.path.exists(file) and os.path.isfile(file): |     if os.path.exists(file) and os.path.isfile(file): | ||||||
|         try: |         try: | ||||||
|             # print('%s: Object exists' % oid) |             # print('%s: Tweet exists' % tweetid) | ||||||
|             with open(file) as f: |             with open(file) as f: | ||||||
|                 tweet = json.load(f) |                 tweet = json.load(f) | ||||||
|         except Exception as ex: |         except Exception as ex: | ||||||
|             logger.error('Error getting cached version of {}: {}'.format(oid, ex)) |             logger.error('Error getting cached version of {}: {}'.format(tweetid, ex)) | ||||||
|     return tweet |     return tweet | ||||||
|  |  | ||||||
| def write_json(js, folder, oid=None): | def write_tweet_json(js, folder): | ||||||
|     if not oid: |     tweetid = js['id'] | ||||||
|       oid = js['id'] |     file = tweet_file(tweetid, folder) | ||||||
|     file = id_file(oid, folder) |  | ||||||
|     if not os.path.exists(folder): |     if not os.path.exists(folder): | ||||||
|         os.makedirs(folder) |         os.makedirs(folder) | ||||||
|     with open(file, 'w') as f: |     with open(file, 'w') as f: | ||||||
|         json.dump(js, f) |         json.dump(js, f, indent=2) | ||||||
|         logger.info('Written {} to file {}'.format(oid, file)) |         logger.info('Written {} to file {}'.format(tweetid, file)) | ||||||
|  |  | ||||||
| def id_file(oid, folder): | def tweet_file(tweetid, folder): | ||||||
|     return os.path.join(folder, '%s.json' % oid) |     return os.path.join(folder, '%s.json' % tweetid) | ||||||
|  |  | ||||||
| def fail_file(oid, folder): | def tweet_fail_file(tweetid, folder): | ||||||
|     failsfolder = os.path.join(folder, 'failed') |     failsfolder = os.path.join(folder, 'failed') | ||||||
|     if not os.path.exists(failsfolder): |     if not os.path.exists(failsfolder): | ||||||
|         os.makedirs(failsfolder) |         os.makedirs(failsfolder) | ||||||
|     return os.path.join(failsfolder, '%s.failed' % oid) |     return os.path.join(failsfolder, '%s.failed' % tweetid) | ||||||
|  |  | ||||||
| def id_failed(oid, folder): | def tweet_failed(tweetid, folder): | ||||||
|     return os.path.isfile(fail_file(oid, folder)) |     return os.path.isfile(tweet_fail_file(tweetid, folder)) | ||||||
|  |  | ||||||
| def tweet_download_batch(wq, batch): | def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True): | ||||||
|     tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id'] |     def filter_line(line): | ||||||
|     for tid, tweet in tweets.items(): |         tweetid = int(line) | ||||||
|         yield tid, tweet |         # print('Checking {}'.format(tweetid)) | ||||||
|  |         if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed): | ||||||
| def user_download_batch(wq, batch): |             yield None | ||||||
|     screen_names = [] |  | ||||||
|     user_ids = [] |  | ||||||
|     for elem in batch: |  | ||||||
|         try: |  | ||||||
|             int(elem) |  | ||||||
|             user_ids.append(str(elem)) |  | ||||||
|         except ValueError: |  | ||||||
|             screen_names.append(elem.lower()) |  | ||||||
|     args = {} |  | ||||||
|     if user_ids: |  | ||||||
|         args['user_id'] = ','.join(user_ids) |  | ||||||
|     if screen_names: |  | ||||||
|         args['screen_name'] = ','.join(screen_names) |  | ||||||
|     try: |  | ||||||
|         users = wq.users.lookup(**args) |  | ||||||
|     except TwitterHTTPError as ex: |  | ||||||
|         if ex.e.code in (404,): |  | ||||||
|             users = [] |  | ||||||
|         else: |         else: | ||||||
|             raise |             yield line | ||||||
|     found_ids = [] |  | ||||||
|     found_names = [] |  | ||||||
|     for user in users: |  | ||||||
|         uid = user['id_str'] |  | ||||||
|         if uid in user_ids: |  | ||||||
|             found_ids.append(uid) |  | ||||||
|             yield (uid, user) |  | ||||||
|         uname = user['screen_name'].lower() |  | ||||||
|         if uname in screen_names: |  | ||||||
|             found_names.append(uname) |  | ||||||
|             yield (uname, user) |  | ||||||
|     for uid in set(user_ids) - set(found_ids): |  | ||||||
|         yield (uid, None) |  | ||||||
|     for name in set(screen_names) - set(found_names): |  | ||||||
|         yield (name, None) |  | ||||||
|  |  | ||||||
|  |     def print_result(res): | ||||||
| def dump_result(oid, obj, folder, ignore_fails=True): |         tid, tweet = res | ||||||
|     if obj: |         if tweet: | ||||||
|             try: |             try: | ||||||
|             write_json(obj, folder=folder, oid=oid) |                 write_tweet_json(tweet, folder=folder) | ||||||
|             failed = fail_file(oid, folder) |                 yield 1 | ||||||
|             if os.path.exists(failed): |  | ||||||
|                 os.remove(failed) |  | ||||||
|             except Exception as ex: |             except Exception as ex: | ||||||
|             logger.error('%s: %s' % (oid, ex)) |                 logger.error('%s: %s' % (tid, ex)) | ||||||
|                 if not ignore_fails: |                 if not ignore_fails: | ||||||
|                     raise |                     raise | ||||||
|         else: |         else: | ||||||
|         logger.info('Object not recovered: {}'.format(oid)) |             logger.info('Tweet not recovered: {}'.format(tid)) | ||||||
|         with open(fail_file(oid, folder), 'w') as f: |             with open(tweet_fail_file(tid, folder), 'w') as f: | ||||||
|             print('Object not found', file=f) |                 print('Tweet not found', file=f) | ||||||
|  |             yield -1 | ||||||
|  |  | ||||||
| def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False, |     def download_batch(batch): | ||||||
|                   batch_method=tweet_download_batch): |         tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id'] | ||||||
|  |         return tweets.items() | ||||||
|  |  | ||||||
|     done = Queue() |     with open(tweetsfile) as f: | ||||||
|  |         lines = map(lambda x: x.strip(), f) | ||||||
|     down = Queue() |         lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines')) | ||||||
|  |         tweets = parallel(download_batch, lines_to_crawl, 100) | ||||||
|  |         for res in tqdm(parallel(print_result, tweets), desc='Queried'): | ||||||
|     def filter_list(lst, done, down): |             pass | ||||||
|         print('filtering') |  | ||||||
|         for oid in lst: |  | ||||||
|             # print('Checking {}'.format(line)) |  | ||||||
|             cached = cached_id(oid, folder) |  | ||||||
|             if (cached and not update): |  | ||||||
|                 done.put((oid, cached)) |  | ||||||
|             elif (id_failed(oid, folder) and not retry_failed): |  | ||||||
|                 done.put((oid, None)) |  | ||||||
|             else: |  | ||||||
|                 down.put(oid) |  | ||||||
|         down.put(None) |  | ||||||
|  |  | ||||||
|     def download_results(batch_method, down, done): |  | ||||||
|         def gen(): |  | ||||||
|             while True: |  | ||||||
|                 r = down.get() |  | ||||||
|                 if not r: |  | ||||||
|                     return |  | ||||||
|                 yield r |  | ||||||
|  |  | ||||||
|         for t in parallel(batch_method, gen(), 100): |  | ||||||
|             done.put(t) |  | ||||||
|  |  | ||||||
|     def batch(*args, **kwargs): |  | ||||||
|         return batch_method(wq, *args, **kwargs) |  | ||||||
|  |  | ||||||
|     tc = threading.Thread(target=filter_list, args=(lst, done, down), daemon=True) |  | ||||||
|     tc.start() |  | ||||||
|     td = threading.Thread(target=download_results, args=(batch, down, done), daemon=True) |  | ||||||
|     td.start() |  | ||||||
|  |  | ||||||
|     def check_threads(ts, done): |  | ||||||
|         for t in ts: |  | ||||||
|             t.join() |  | ||||||
|         done.put(None) |  | ||||||
|  |  | ||||||
|     wait = threading.Thread(target=check_threads, args=([tc, td], done), daemon=True) |  | ||||||
|     wait.start() |  | ||||||
|  |  | ||||||
|     while True: |  | ||||||
|         rec = done.get() |  | ||||||
|  |  | ||||||
|         if rec is None: |  | ||||||
|             break |  | ||||||
|  |  | ||||||
|         oid, obj = rec |  | ||||||
|         dump_result(oid, obj, folder, ignore_fails) |  | ||||||
|         yield rec |  | ||||||
|  |  | ||||||
|     wait.join() |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def download_file(wq, csvfile, folder, column=0, delimiter=',', |  | ||||||
|                   header=False, quotechar='"', batch_method=tweet_download_batch, |  | ||||||
|                   **kwargs): |  | ||||||
|     with open(csvfile) as f: |  | ||||||
|         csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar)) |  | ||||||
|         if header: |  | ||||||
|             next(csvreader) |  | ||||||
|  |  | ||||||
|         def reader(r): |  | ||||||
|             for row in csvreader: |  | ||||||
|                 if len(row) > column: |  | ||||||
|                     yield row[column].strip() |  | ||||||
|  |  | ||||||
|  |  | ||||||
|         for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method, |  | ||||||
|                                  **kwargs): |  | ||||||
|             yield res |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def download_timeline(wq, user): |  | ||||||
|     return wq.statuses.user_timeline(id=user) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def _consume_feed(func, feed_control=None, **kwargs): |  | ||||||
|     ''' |  | ||||||
|     Get all the tweets using pagination and a given method. |  | ||||||
|     It can be controlled with the `count` parameter. |  | ||||||
|  |  | ||||||
|     If max_count < 0 => Loop until the whole feed is consumed. |  | ||||||
|     If max_count == 0 => Only call the API once, with the default values. |  | ||||||
|     If max_count > 0 => Get max_count tweets from the feed. |  | ||||||
|     ''' |  | ||||||
|     remaining = int(kwargs.pop('max_count', 0)) |  | ||||||
|     count = int(kwargs.get('count', -1)) |  | ||||||
|     limit = False |  | ||||||
|  |  | ||||||
|     # We need to at least perform a query, so we simulate a do-while |  | ||||||
|     # by running once with no limit and updating the condition at the end |  | ||||||
|     with tqdm(total=remaining) as pbar: |  | ||||||
|       while not limit: |  | ||||||
|           if remaining > 0 and  ((count < 0) or (count > remaining)): |  | ||||||
|               kwargs['count'] = remaining |  | ||||||
|           resp, stop = feed_control(func, kwargs, remaining=remaining, batch_size=count) |  | ||||||
|           if not resp: |  | ||||||
|               return |  | ||||||
|           for entry in resp: |  | ||||||
|               yield entry |  | ||||||
|           pbar.update(len(resp)) |  | ||||||
|           limit = stop |  | ||||||
|           if remaining < 0: |  | ||||||
|               # If the loop was run with a negative remaining, it will only stop |  | ||||||
|               # when the control function tells it to. |  | ||||||
|               continue |  | ||||||
|           # Otherwise, check if we have already downloaded all the required items |  | ||||||
|           remaining -= len(resp) |  | ||||||
|           limit = limit or remaining <= 0 |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def consume_tweets(*args, **kwargs): |  | ||||||
|     return _consume_feed(*args, feed_control=_tweets_control, **kwargs) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def consume_users(*args, **kwargs): |  | ||||||
|     return _consume_feed(*args, feed_control=_users_control, **kwargs) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def _tweets_control(func, apiargs, remaining=0, **kwargs): |  | ||||||
|     ''' Return a list of entries, the remaining ''' |  | ||||||
|      |  | ||||||
|     resp = func(**apiargs) |  | ||||||
|     if not resp: |  | ||||||
|         return None, True |  | ||||||
|     # Update the arguments for the next call |  | ||||||
|     # Two options: either resp is a list, or a dict like: |  | ||||||
|     #    {'statuses': ... 'search_metadata': ...} |  | ||||||
|     if isinstance(resp, dict) and 'search_metadata' in resp: |  | ||||||
|         resp = resp['statuses'] |  | ||||||
|     max_id = min(s['id'] for s in resp) - 1 |  | ||||||
|     apiargs['max_id'] = max_id |  | ||||||
|     return resp, False |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def _users_control(func, apiargs, remaining=0, **kwargs): |  | ||||||
|     resp = func(**apiargs) |  | ||||||
|     stop = True |  | ||||||
|     # Update the arguments for the next call |  | ||||||
|     if 'next_cursor' in resp: |  | ||||||
|         cursor = resp['next_cursor'] |  | ||||||
|         apiargs['cursor'] = cursor |  | ||||||
|         if int(cursor) != -1: |  | ||||||
|             stop = False |  | ||||||
|     return resp['users'], stop |  | ||||||
|   | |||||||
| @@ -1,12 +0,0 @@ | |||||||
| version: '2' |  | ||||||
| services: |  | ||||||
|   dev: |  | ||||||
|     build: |  | ||||||
|       context: . |  | ||||||
|       dockerfile: Dockerfile-3.4 |  | ||||||
|     volumes: |  | ||||||
|       - '.:/usr/src/app' |  | ||||||
|     tty: yes |  | ||||||
|     working_dir: '/usr/src/app' |  | ||||||
|     entrypoint: '/bin/bash' |  | ||||||
|     command: '' |  | ||||||
| @@ -2,4 +2,3 @@ sqlalchemy | |||||||
| twitter | twitter | ||||||
| click | click | ||||||
| tqdm | tqdm | ||||||
| pyyaml |  | ||||||
|   | |||||||
							
								
								
									
										32
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										32
									
								
								setup.py
									
									
									
									
									
								
							| @@ -1,23 +1,29 @@ | |||||||
|  | import pip | ||||||
| from setuptools import setup | from setuptools import setup | ||||||
|  | from pip.req import parse_requirements | ||||||
|  |  | ||||||
| def parse_requirements(filename): | # parse_requirements() returns generator of pip.req.InstallRequirement objects | ||||||
|     """ load requirements from a pip requirements file """ | # pip 6 introduces the *required* session argument | ||||||
|     with open(filename, 'r') as f: | try: | ||||||
|         lineiter = list(line.strip() for line in f) |     install_reqs = parse_requirements("requirements.txt", session=pip.download.PipSession()) | ||||||
|     return [line for line in lineiter if line and not line.startswith("#")] |     py2_reqs = parse_requirements("requirements-py2.txt", session=pip.download.PipSession()) | ||||||
|  |     test_reqs = parse_requirements("test-requirements.txt", session=pip.download.PipSession()) | ||||||
| install_reqs = parse_requirements("requirements.txt") | except AttributeError: | ||||||
| py2_reqs = parse_requirements("requirements-py2.txt") |     install_reqs = parse_requirements("requirements.txt") | ||||||
| test_reqs = parse_requirements("test-requirements.txt") |     py2_reqs = parse_requirements("requirements-py2.txt") | ||||||
|  |     test_reqs = parse_requirements("test-requirements.txt") | ||||||
|  |  | ||||||
| import sys | import sys | ||||||
| import os |  | ||||||
| import itertools | import itertools | ||||||
| if sys.version_info <= (3, 0): | if sys.version_info <= (3, 0): | ||||||
|     install_reqs = install_reqs + py2_reqs |     install_reqs = itertools.chain(install_reqs, py2_reqs) | ||||||
|  |  | ||||||
| with open(os.path.join('bitter', 'VERSION'), 'r') as f: | # reqs is a list of requirement | ||||||
|     __version__ = f.read().strip() | # e.g. ['django==1.5.1', 'mezzanine==1.4.6'] | ||||||
|  | install_reqs = [str(ir.req) for ir in install_reqs] | ||||||
|  | test_reqs = [str(ir.req) for ir in test_reqs] | ||||||
|  |  | ||||||
|  | from bitter import __version__ | ||||||
|  |  | ||||||
| setup( | setup( | ||||||
|     name="bitter", |     name="bitter", | ||||||
|   | |||||||
| @@ -12,11 +12,7 @@ from bitter import config as c | |||||||
| class TestCrawlers(TestCase): | class TestCrawlers(TestCase): | ||||||
|  |  | ||||||
|     def setUp(self): |     def setUp(self): | ||||||
|         CONF_PATH = os.path.join(os.path.dirname(__file__), '.bitter.yaml') |         self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json')) | ||||||
|         if os.path.exists(CONF_PATH): |  | ||||||
|             self.wq = easy(CONF_PATH) |  | ||||||
|         else: |  | ||||||
|             self.wq = easy() |  | ||||||
|  |  | ||||||
|     def test_create_worker(self): |     def test_create_worker(self): | ||||||
|         assert len(self.wq.queue)==1 |         assert len(self.wq.queue)==1 | ||||||
|   | |||||||
| @@ -1,23 +0,0 @@ | |||||||
| from unittest import TestCase |  | ||||||
|  |  | ||||||
| import os |  | ||||||
| import types |  | ||||||
|  |  | ||||||
| from bitter import utils |  | ||||||
| from bitter.models import * |  | ||||||
| from sqlalchemy import exists |  | ||||||
|  |  | ||||||
| class TestModels(TestCase): |  | ||||||
|  |  | ||||||
|     def setUp(self): |  | ||||||
|         self.session = make_session('sqlite://') |  | ||||||
|  |  | ||||||
|     def test_user(self): |  | ||||||
|         fake_user = User(name="Fake user", id=1548) |  | ||||||
|         self.session.add(fake_user) |  | ||||||
|         self.session.commit() |  | ||||||
|         fake_committed = self.session.query(User).filter_by(name="Fake user").first() |  | ||||||
|         assert fake_committed |  | ||||||
|         self.session.delete(fake_committed) |  | ||||||
|         self.session.commit() |  | ||||||
|         assert not list(self.session.execute('SELECT 1 from users where id=\'%s\'' % 1548)) |  | ||||||
| @@ -8,63 +8,56 @@ from bitter import config as c | |||||||
|  |  | ||||||
| class TestUtils(TestCase): | class TestUtils(TestCase): | ||||||
|  |  | ||||||
|     configfile = '/tmp/bitter.yaml' |  | ||||||
|  |  | ||||||
|     def setUp(self): |     def setUp(self): | ||||||
|         c.CONFIG_FILE = self.configfile |         self.credfile = '/tmp/credentials.txt' | ||||||
|         if os.path.exists(self.configfile): |         c.CREDENTIALS = self.credfile | ||||||
|             os.remove(self.configfile) |         if os.path.exists(self.credfile): | ||||||
|         assert not os.path.exists(self.configfile) |             os.remove(self.credfile) | ||||||
|         utils.create_config_file(self.configfile) |         utils.create_credentials(self.credfile) | ||||||
|         assert os.path.exists(self.configfile) |          | ||||||
|  |  | ||||||
|  |     def test_create_credentials(self): | ||||||
|  |         assert os.path.exists(self.credfile) | ||||||
|  |         os.remove(self.credfile) | ||||||
|  |         utils.create_credentials() # From config | ||||||
|  |         assert os.path.exists(self.credfile) | ||||||
|  |  | ||||||
|     def test_add_credentials(self): |     def test_add_credentials(self): | ||||||
|         utils.add_credentials(self.configfile, user="test") |         utils.add_credentials(self.credfile, user="test") | ||||||
|         assert utils.get_credentials(self.configfile) |         assert utils.get_credentials(self.credfile) | ||||||
|         assert utils.get_credentials(self.configfile, user="test") |         assert utils.get_credentials(user="test") | ||||||
|         assert list(utils.get_credentials(self.configfile, user="test"))[0]["user"] == "test" |         assert list(utils.get_credentials(user="test"))[0]["user"] == "test" | ||||||
|  |  | ||||||
|     def test_get_credentials(self): |     def test_get_credentials(self): | ||||||
|         utils.add_credentials(self.configfile, user="test") |         utils.add_credentials(self.credfile, user="test") | ||||||
|         assert utils.get_credentials(self.configfile, user="test") |         assert utils.get_credentials(user="test") | ||||||
|         assert not utils.get_credentials(self.configfile, user="test", inverse=True) |         assert not utils.get_credentials(user="test", inverse=True) | ||||||
|  |  | ||||||
|     def test_add_two_credentials(self): |     def test_add_two_credentials(self): | ||||||
|         utils.add_credentials(self.configfile, user="test") |         utils.add_credentials(self.credfile, user="test") | ||||||
|         utils.add_credentials(self.configfile, user="test2") |         utils.add_credentials(self.credfile, user="test2") | ||||||
|         assert utils.get_credentials(self.configfile, user="test") |         assert utils.get_credentials(user="test") | ||||||
|         assert utils.get_credentials(self.configfile, user="test2") |         assert utils.get_credentials(user="test2") | ||||||
|  |  | ||||||
|  |  | ||||||
|     def test_delete_credentials(self): |     def test_delete_credentials(self): | ||||||
|         utils.add_credentials(self.configfile, user="test") |         utils.add_credentials(self.credfile, user="test") | ||||||
|         assert utils.get_credentials(self.configfile, user="test") |         assert utils.get_credentials(user="test") | ||||||
|         utils.delete_credentials(self.configfile, user="test") |         utils.delete_credentials(user="test") | ||||||
|         assert not utils.get_credentials(self.configfile, user="test") |         print(utils.get_credentials()) | ||||||
|  |         assert not utils.get_credentials(user="test") | ||||||
|  |  | ||||||
|     def test_parallel(self): |     def test_parallel(self): | ||||||
|         import time |         import time | ||||||
|         def echo(i): |         def echo(i): | ||||||
|             time.sleep(0.5) |             time.sleep(2) | ||||||
|             return i |             return i | ||||||
|         tic = time.time() |         tic = time.time() | ||||||
|         resp = utils.parallel(echo, [1,2,3]) |         resp = utils.parallel(echo, [1,2,3]) | ||||||
|         assert isinstance(resp, types.GeneratorType) |         assert isinstance(resp, types.GeneratorType) | ||||||
|         assert list(resp) == [1,2,3] |         assert list(resp) == [1,2,3] | ||||||
|         toc = time.time() |         toc = time.time() | ||||||
|         assert (tic-toc) < 600 |         assert (tic-toc) < 6000 | ||||||
|         resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2) |         resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2) | ||||||
|         assert list(resp2) == [1,2, 3,4] |         assert list(resp2) == [1,2,3,4] | ||||||
|          |          | ||||||
|  |  | ||||||
| class TestUtilsEnv(TestUtils): |  | ||||||
|     configfile = None |  | ||||||
|  |  | ||||||
|     def setUp(self): |  | ||||||
|         if 'BITTER_CONFIG' in os.environ: |  | ||||||
|           self.oldenv = os.environ['BITTER_CONFIG'] |  | ||||||
|         os.environ['BITTER_CONFIG'] = '' |  | ||||||
|  |  | ||||||
|     def tearDown(self): |  | ||||||
|         if hasattr(self, 'oldenv'): |  | ||||||
|             os.environ['BITTER_CONFIG'] = self.oldenv |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user