mirror of
				https://github.com/balkian/bitter.git
				synced 2025-10-26 05:08:22 +00:00 
			
		
		
		
	Compare commits
	
		
			20 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | ea848f1a78 | ||
|  | 030c41b826 | ||
|  | bba73091e4 | ||
|  | 80b58541e7 | ||
|  | 40a8b45231 | ||
|  | fadeced761 | ||
|  | bdb844d75f | ||
|  | 653487e2d7 | ||
|  | 02aec5eefa | ||
|  | e6b08c4ffb | ||
|  | 311b972b27 | ||
|  | 7724967285 | ||
|  | dd662acd22 | ||
|  | 5199d5b5aa | ||
|  | 6259013978 | ||
|  | 53bb7edabc | ||
|  | 57eb73b53b | ||
|  | 7c829ee102 | ||
|  | 27bc3557b2 | ||
|  | 9c82dea298 | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -2,6 +2,7 @@ __pycache__ | ||||
| *.egg-info | ||||
| dist | ||||
| env | ||||
| .env | ||||
| __* | ||||
| .* | ||||
| *.pyc | ||||
|   | ||||
| @@ -2,6 +2,6 @@ | ||||
| From python:2.7-onbuild | ||||
| Maintainer J. Fernando Sánchez @balkian | ||||
|  | ||||
| RUN pip install -e "/usr/src/app/[server]" | ||||
| RUN pip install ".[server]" | ||||
|  | ||||
| ENTRYPOINT ["bitter"] | ||||
|   | ||||
| @@ -2,6 +2,6 @@ | ||||
| From python:3.4-onbuild | ||||
| Maintainer J. Fernando Sánchez @balkian | ||||
|  | ||||
| RUN pip install -e "/usr/src/app/[server]" | ||||
| RUN pip install ".[server]" | ||||
|  | ||||
| ENTRYPOINT ["bitter"] | ||||
|   | ||||
| @@ -2,6 +2,6 @@ | ||||
| From python:{{PYVERSION}}-onbuild | ||||
| Maintainer J. Fernando Sánchez @balkian | ||||
|  | ||||
| RUN pip install -e "/usr/src/app/[server]" | ||||
| RUN pip install ".[server]" | ||||
|  | ||||
| ENTRYPOINT ["bitter"] | ||||
|   | ||||
							
								
								
									
										8
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										8
									
								
								Makefile
									
									
									
									
									
								
							| @@ -1,4 +1,4 @@ | ||||
| PYVERSIONS=3.4 2.7 | ||||
| PYVERSIONS=3.5 | ||||
| PYMAIN=$(firstword $(PYVERSIONS)) | ||||
| NAME=bitter | ||||
| REPO=balkian | ||||
| @@ -19,7 +19,7 @@ Dockerfile-%: Dockerfile.template | ||||
| dev-%: | ||||
| 	@docker start $(NAME)-dev$* || (\ | ||||
| 		$(MAKE) build-$*; \ | ||||
| 		docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \ | ||||
| 		docker run -d -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \ | ||||
| 	)\ | ||||
|  | ||||
| 	docker exec -ti $(NAME)-dev$* bash | ||||
| @@ -38,7 +38,7 @@ test: $(addprefix test-,$(PYMAIN)) | ||||
| testall: $(addprefix test-,$(PYVERSIONS)) | ||||
|  | ||||
| test-%: build-% | ||||
| 	docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ; | ||||
| 	docker run --rm -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ; | ||||
|  | ||||
| pip_test-%: | ||||
| 	docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ; | ||||
| @@ -71,6 +71,6 @@ pip_upload: | ||||
| pip_test: $(addprefix pip_test-,$(PYVERSIONS)) | ||||
|  | ||||
| run: build | ||||
| 	docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' | ||||
| 	docker run --rm --env-file $$PWD/.env -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' | ||||
|  | ||||
| .PHONY: test test-% build-% build test test_pip run | ||||
|   | ||||
							
								
								
									
										141
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										141
									
								
								README.md
									
									
									
									
									
								
							| @@ -1,4 +1,5 @@ | ||||
| #Description | ||||
| # Description | ||||
|  | ||||
| There are two parts to bitter. | ||||
| First of all, it is a wrapper over Python twitter that adds support for several Twitter API credentials (e.g. authorizing the same app with different user accounts). | ||||
| Secondly, it is a command line tool to automate several actions (e.g. downloading user networks) using the wrapper. | ||||
| @@ -29,16 +30,146 @@ e.g. to get the latest 500 tweets by the python software foundation: | ||||
| ``` | ||||
| bitter api statuses/user_timeline --id thepsf --count 500 | ||||
| ``` | ||||
| # Credentials format | ||||
|  | ||||
|  | ||||
| ## Adding credentials | ||||
|  | ||||
| ``` | ||||
| {"user": "balkian", "consumer_secret": "xxx", "consumer_key": "xxx", "token_key": "xxx", "token_secret": "xxx"} | ||||
| bitter --config <YOUR CONFIGURATION FILE> credentials add | ||||
| ``` | ||||
|  | ||||
| By default, bitter uses '~/.bitter-credentials.json', but you may choose a different file: | ||||
| You can specify the parameters in the command or let the command line guide you through the process. | ||||
|  | ||||
| # Examples | ||||
|  | ||||
| ## Downloading a list of tweets | ||||
|  | ||||
| Bitter can download tweets from a list of tweets in a CSV file. | ||||
| The result is stored as individual json files in your folder of choice. | ||||
| You can even specify the column number for tweet ids. | ||||
| Bitter will not try to download  | ||||
|  | ||||
| ``` | ||||
| python -m bitter -c <credentials_file> ... | ||||
| Usage: bitter tweet get_all [OPTIONS] TWEETSFILE | ||||
|  | ||||
|   Download tweets from a list of tweets in a CSV file. The result is stored | ||||
|   as individual json files in your folder of choice. | ||||
|  | ||||
| Options: | ||||
|   -f, --folder TEXT | ||||
|   -d, --delimiter TEXT | ||||
|   -h, --header          Discard the first line (use it as a header) | ||||
|   -q, --quotechar TEXT | ||||
|   -c, --column INTEGER | ||||
|   --help                Show this message and exit. | ||||
|  | ||||
| ``` | ||||
|  | ||||
| For instance, this will download `tweet_ids.csv` in the `tweet_info` folder: | ||||
|  | ||||
| ``` | ||||
| bitter tweet get_all -f tweet_info tweet_ids.csv | ||||
| ``` | ||||
|  | ||||
| ## Downloading a list of users | ||||
|  | ||||
| Bitter downloads users and tweets in a similar way: | ||||
|  | ||||
| ``` | ||||
| Usage: bitter users get_all [OPTIONS] USERSFILE | ||||
|  | ||||
|   Download users from a list of user ids/screen names in a CSV file. The | ||||
|   result is stored as individual json files in your folder of choice. | ||||
|  | ||||
| Options: | ||||
|   -f, --folder TEXT | ||||
|   -d, --delimiter TEXT | ||||
|   -h, --header          Discard the first line (use it as a header) | ||||
|   -q, --quotechar TEXT | ||||
|   -c, --column INTEGER | ||||
|   --help                Show this message and exit. | ||||
| ``` | ||||
|  | ||||
| The only difference is that users can be downloaded via `screen_name` or `user_id`. | ||||
| This method does not try to resolve screen names to user ids, so users may be downloaded more than once if they appear in both ways. | ||||
|  | ||||
| ## Downloading a stream | ||||
|  | ||||
| ``` | ||||
| Usage: bitter stream get [OPTIONS] | ||||
|  | ||||
| Options: | ||||
|   -l, --locations TEXT | ||||
|   -t, --track TEXT | ||||
|   -f, --file TEXT       File to store the stream of tweets. Default: standard output | ||||
|   -p, --politelyretry   Politely retry after a hangup/connection error | ||||
|   --help                Show this message and exit. | ||||
| ``` | ||||
|  | ||||
| ``` | ||||
| bitter --config .bitter.yaml stream get  | ||||
| ``` | ||||
| python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines | ||||
|  | ||||
|  | ||||
| ## REST queries | ||||
|  | ||||
| In newer versions of bitter, individual methods to download tweets/users using the REST API are being replaced with a generic method to call the API. | ||||
|  | ||||
| ``` | ||||
| bitter api <URL endpoint> --parameter VALUE ... | [--tweets | --users] [--max_count MAX_COUNT] [--count COUNT_PER_CALL] | ||||
| ``` | ||||
|  | ||||
| For instance: | ||||
|  | ||||
| ``` | ||||
| # Get 100 tweets that mentioned Obama after tweet 942689870501302300 | ||||
| bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama | ||||
| ``` | ||||
|  | ||||
| That is equivalent to this call to the api: `api/1.1/searc/tweets?since_id=942689870501302300&count=100&q=Obama`. | ||||
|  | ||||
|  | ||||
| The flags `--tweets` and `--users` are optional. | ||||
| If you use them, bitter will try to intelligently fetch all the tweets/users by using pagination with the API. | ||||
|  | ||||
| For example: | ||||
|  | ||||
| ``` | ||||
| # Download 1000 tweets, 100 tweets per call. | ||||
| bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama --max_count=1000 --tweets | ||||
| ``` | ||||
|  | ||||
| ``` | ||||
| # Download all the followers of @balkian | ||||
| bitter api 'followers/list' --_id balkian --users --max_count -1 | ||||
| ``` | ||||
|  | ||||
| Note that some reserved words (such as `id`) have to be preceeded by an underscore. | ||||
| This limitation is imposed by the python-twitter library. | ||||
|  | ||||
| # Configuration format | ||||
|  | ||||
| ``` | ||||
| credentials: | ||||
| - user: "balkian" | ||||
|   consumer_secret: "xxx" | ||||
|   consumer_key: "xxx" | ||||
|   token_key: "xxx" | ||||
|   token_secret: "xxx" | ||||
| - user: .... | ||||
| ``` | ||||
|  | ||||
| By default, bitter uses '~/.bitter.yaml', but you may choose a different file: | ||||
|  | ||||
| ``` | ||||
| python -m bitter --config <config_file> ... | ||||
| ``` | ||||
|  | ||||
| Or use an environment variable: | ||||
|  | ||||
| ``` | ||||
| export BITTER_CONFIG=$(cat myconfig.yaml) | ||||
| ``` | ||||
|  | ||||
| # Server | ||||
|   | ||||
							
								
								
									
										10
									
								
								bin/README.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								bin/README.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,10 @@ | ||||
| Scripts to process jsonlines | ||||
|  | ||||
| To get the jsonlines file, you can use the streaming API or the search api, like so: | ||||
|  | ||||
| ``` | ||||
| python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines | ||||
| ``` | ||||
|  | ||||
| To keep track of the query that generated the file, you can save the command in a text file. | ||||
| For instance, the example above is also in `example_query.sh`. | ||||
							
								
								
									
										1
									
								
								bin/example_query.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										1
									
								
								bin/example_query.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1 @@ | ||||
| python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines | ||||
							
								
								
									
										13
									
								
								bin/extract-hashtags.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										13
									
								
								bin/extract-hashtags.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,13 @@ | ||||
| if [ "$#" -lt 1 ] | ||||
| then | ||||
| 	echo "Usage: $0 <files to convert>" | ||||
| 	exit 1 | ||||
| fi | ||||
|  | ||||
| export FIELDS="created_at,id,text"  | ||||
| for i in "$@" | ||||
| do | ||||
|   OUTPUT=$i.hashtags.csv | ||||
|   echo "$FIELDS" > $OUTPUT | ||||
|   pv -l $i -N "hashtags $i" | jq -r '. | .created_at as $created_at | .id_str as $id | .entities.hashtags | select(. != null) | .[] | [$created_at, $id, .text] | @csv' >> $OUTPUT | ||||
| done | ||||
							
								
								
									
										15
									
								
								bin/extract-interactions.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										15
									
								
								bin/extract-interactions.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,15 @@ | ||||
| if [ "$#" -lt 1 ] | ||||
| then | ||||
| 	echo "Usage: $0 <files to convert>" | ||||
| 	exit 1 | ||||
| fi | ||||
|  | ||||
| for i in "$@" | ||||
| do | ||||
|   REPLYOUTPUT=$i.replies.csv | ||||
|   RTOUTPUT=$i.rts.csv | ||||
|   echo 'created_at,id,user_id,reply_user_id' > $REPLYOUTPUT | ||||
|   echo 'created_at,id,user_id,rt_user_id' > $RTOUTPUT | ||||
|   pv -l -N "$i" $i | jq -r '. | select(.in_reply_to_user_id_str != null) | [.created_at, .id_str, .user.id_str, .in_reply_to_user_id_str] | @csv' >> $REPLYOUTPUT | ||||
|   pv -l -N "$i" $i | jq -r '. | select(.retweeted_status != null) | [.created_at, .retweeted_status.id_str, .user.id_str, .retweeted_status.user.id_str] | @csv' >> $RTOUTPUT | ||||
| done | ||||
							
								
								
									
										16
									
								
								bin/extract-limits.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										16
									
								
								bin/extract-limits.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,16 @@ | ||||
| if [ "$#" -lt 1 ] | ||||
| then | ||||
| 	echo "Usage: $0 <files to convert>" | ||||
| 	exit 1 | ||||
| fi | ||||
|  | ||||
| export QUERY='.limit | select(. != null) | [.timestamp_ms, .track] | @csv' | ||||
|  | ||||
| export FIELDS="timestamp,track" | ||||
|  | ||||
| for i in "$@" | ||||
| do | ||||
|   OUTPUT=$i.limits.csv | ||||
|   echo $FIELDS > $OUTPUT | ||||
|   pv -N "$i limits" -l $i | jq -r "$QUERY" >> $OUTPUT | ||||
| done | ||||
							
								
								
									
										16
									
								
								bin/extract-media.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										16
									
								
								bin/extract-media.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,16 @@ | ||||
| if [ "$#" -lt 1 ] | ||||
| then | ||||
| 	echo "Usage: $0 <files to convert>" | ||||
| 	exit 1 | ||||
| fi | ||||
|  | ||||
| export QUERY='select(.id != null) | .id_str as $id | .entities.urls[] | select(.expanded_url | select(. != null) |  contains("open.spotify") or contains("youtube.com") or contains("youtu.be")) | [$id, .expanded_url] | @csv' | ||||
|  | ||||
| export FIELDS="id,url" | ||||
|  | ||||
| for i in "$@" | ||||
| do | ||||
|   OUTPUT=$i.media.csv | ||||
|   echo $FIELDS > $OUTPUT | ||||
|   pv -N "$i media" -l $i | jq -r "$QUERY" >> $OUTPUT | ||||
| done | ||||
							
								
								
									
										28
									
								
								bin/extract-users.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										28
									
								
								bin/extract-users.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,28 @@ | ||||
| if [ "$#" -lt 1 ] | ||||
| then | ||||
| 	echo "Usage: $0 <files to convert>" | ||||
| 	exit 1 | ||||
| fi | ||||
|  | ||||
| export USER_FIELDS="\$created_at,\ | ||||
| .id_str,\ | ||||
| .screen_name,\ | ||||
| .followers_count,\ | ||||
| .lang,\ | ||||
| .description,\ | ||||
| .statuses_count,\ | ||||
| .favourites_count,\ | ||||
| .friends_count,\ | ||||
| .created_at,\ | ||||
| .name,\ | ||||
| .location,\ | ||||
| .listed_count,\ | ||||
| .time_zone\ | ||||
| " | ||||
|  | ||||
| for i in "$@" | ||||
| do | ||||
|   OUTPUT=$i.users.csv | ||||
|   echo \#$USER_FIELDS > $OUTPUT | ||||
|   jq -r ".created_at as \$created_at | .user,.retweeted_status.user | select(. != null) | [$USER_FIELDS] | @csv " $i | pv -N "$i" -l >> $OUTPUT | ||||
| done | ||||
							
								
								
									
										32
									
								
								bin/extract.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										32
									
								
								bin/extract.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,32 @@ | ||||
| if [ "$#" -lt 1 ] | ||||
| then | ||||
| 	echo "Usage: $0 <files to convert>" | ||||
| 	exit 1 | ||||
| fi | ||||
|  | ||||
| FIELDS=".id_str,\ | ||||
|         .user.screen_name,\ | ||||
|         .user.id,\ | ||||
|         .favorite_count,\ | ||||
|         .retweet_count,\ | ||||
|         .quote_count,\ | ||||
|         .reply_count,\ | ||||
|         .created_at,\ | ||||
|         .lang,\ | ||||
|         .in_reply_to_user_id_str,\ | ||||
|         .in_reply_to_status_id_str,\ | ||||
|         .retweeted_status.id_str,\ | ||||
|         .retweeted_status.user.id,\ | ||||
|         .retweeted_status.favorite_count,\ | ||||
|         .retweeted_status.retweet_count,\ | ||||
|         .retweeted_status.quote_count,\ | ||||
|         .retweeted_status.reply_count,\ | ||||
|         .retweeted_status.created_at\ | ||||
| " | ||||
|  | ||||
| for i in "$@" | ||||
| do | ||||
|   OUTPUT=$i.tweets.csv | ||||
|   echo "$FIELDS" | sed -e 's/,[ \t\n]*\./,/g' | sed -e 's/^[#]\?\.//' > $OUTPUT | ||||
|   jq -r "[$FIELDS]|@csv" $i | pv -N "$i" -l >> $OUTPUT | ||||
| done | ||||
							
								
								
									
										17
									
								
								bin/extract_extended.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										17
									
								
								bin/extract_extended.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,17 @@ | ||||
| if [ "$#" -lt 1 ] | ||||
| then | ||||
| 	echo "Usage: $0 <files to convert>" | ||||
| 	exit 1 | ||||
| fi | ||||
|  | ||||
| QUERY='.| select(.retweeted_status != null) | .retweeted_status | .id_str as $rt_id | .extended_tweet | select(. != null) | [$rt_id,.full_text]|@csv' | ||||
| HEADER='rt_id,full_text' | ||||
|  | ||||
| for i in "$@" | ||||
| do | ||||
|   OUTPUT=$i.full_text.csv | ||||
|   echo $HEADER > $OUTPUT | ||||
|   jq "$QUERY" $i | pv -N "$i" -l >> $OUTPUT | ||||
|   sort -u $OUTPUT -o $OUTPUT | ||||
|   sed -ri 's/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g' $OUTPUT | ||||
| done | ||||
							
								
								
									
										16
									
								
								bin/extract_text.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										16
									
								
								bin/extract_text.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,16 @@ | ||||
| if [ "$#" -lt 1 ] | ||||
| then | ||||
| 	echo "Usage: $0 <files to convert>" | ||||
| 	exit 1 | ||||
| fi | ||||
|  | ||||
| QUERY='(.full_text // .retweeted_status.full_text) as $text | [ .id_str,$text ] | @csv' | ||||
| HEADER='id,text' | ||||
|  | ||||
| for i in "$@" | ||||
| do | ||||
|   OUTPUT=$i.text.csv | ||||
|   echo $HEADER > $OUTPUT | ||||
|   pv -l -N "$i" $i | jq -r "$QUERY" >> $OUTPUT | ||||
|  # sed -ri s/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g $OUTPUT | ||||
| done | ||||
							
								
								
									
										10
									
								
								bin/filter-edges.sh
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								bin/filter-edges.sh
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,10 @@ | ||||
|  | ||||
| if [ "$#" -lt 2 ] | ||||
| then | ||||
|     echo "Find edge lines in a file that contain one of the users in a user list." | ||||
|     echo "" | ||||
| 	  echo "Usage: $0 <file with edges> <file with the list of users>" | ||||
| 	  exit 1 | ||||
| fi | ||||
|  | ||||
| pv -c -N 'read' "$1" |  grep -F -w -f "$2" |  pv -lc -N 'found' | ||||
							
								
								
									
										23
									
								
								bin/functions.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										23
									
								
								bin/functions.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,23 @@ | ||||
| import pandas as pd | ||||
|  | ||||
| def read_rts(rtsfile, tweetsfile): | ||||
|     tweets = pd.read_csv(tweetsfile, index_col=0) | ||||
|     rts = pd.read_csv(rtsfile, index_col=1) | ||||
|     merged = rts.groupby(by=['id', 'rt_user_id']).size().rename('count').reset_index(level=1).merge(tweets, left_index=True, right_index=True) | ||||
|     return merged.sort_values(by='count', ascending=False) | ||||
|  | ||||
|  | ||||
| def read_tweets(tweetsfile): | ||||
|     '''When the dataset is small enough, we can load tweets as-in''' | ||||
|     with open(tweetsfile) as f: | ||||
|         header = f.readline().strip().split(',') | ||||
|         dtypes = {} | ||||
|     for key in header: | ||||
|         if key.endswith('_str') or key.endswith('.id'): | ||||
|             dtypes[key] = object  | ||||
|             tweets = pd.read_csv(tweetsfile, dtype=dtypes, index_col=0) | ||||
|     return tweets | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     import argparse | ||||
							
								
								
									
										1
									
								
								bin/print-hashtags.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										1
									
								
								bin/print-hashtags.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1 @@ | ||||
| cat "$@" | awk -F"," '{print tolower($3)}' | sort | uniq -c | sort -h  | ||||
							
								
								
									
										14
									
								
								bin/print-replies.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										14
									
								
								bin/print-replies.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,14 @@ | ||||
| MAX_TAGS=100 | ||||
|  | ||||
| function get_text { | ||||
|     while read line | ||||
|     do | ||||
|         echo $line | ||||
|         rtid=$(echo $line | awk -F"," '{print $2}') | ||||
|         text=$(grep -m 1 $rtid *.text.csv) | ||||
|         echo "$line - $text" | ||||
|     done < "/dev/stdin" | ||||
| } | ||||
|  | ||||
| cat "$@" | get_text | ||||
|  | ||||
							
								
								
									
										15
									
								
								bin/print-rts.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										15
									
								
								bin/print-rts.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,15 @@ | ||||
| MAX_TAGS=100 | ||||
|  | ||||
| function get_text { | ||||
|     while read line | ||||
|     do | ||||
|         echo $line | ||||
|         rtid=$(echo $line | awk '{print $2}') | ||||
|         count=$(echo $line | awk '{print $1}') | ||||
|         text=$(grep -m 1 $rtid *.text.csv) | ||||
|         echo "$line - $text" | ||||
|     done < "/dev/stdin" | ||||
| } | ||||
|  | ||||
| cat "$@" | awk -F"," '{print tolower($2)}' | sort | uniq -c | sort -h | tail -n $MAX_TAGS | get_text | ||||
|  | ||||
| @@ -1 +1 @@ | ||||
| 0.7.4 | ||||
| 0.10.3 | ||||
|   | ||||
| @@ -6,10 +6,12 @@ http://github.com/balkian/bitter | ||||
| import os | ||||
|  | ||||
| from .version import __version__ | ||||
| from . import config as bconf | ||||
|  | ||||
| def easy(*args, **kwargs): | ||||
| def easy(conffile=bconf.CONFIG_FILE): | ||||
|     from .crawlers import TwitterQueue | ||||
|     return TwitterQueue.from_credentials(*args, **kwargs) | ||||
|  | ||||
|     return TwitterQueue.from_config(conffile=conffile) | ||||
|  | ||||
| __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] | ||||
|  | ||||
|   | ||||
							
								
								
									
										327
									
								
								bitter/cli.py
									
									
									
									
									
								
							
							
						
						
									
										327
									
								
								bitter/cli.py
									
									
									
									
									
								
							| @@ -21,60 +21,192 @@ if sys.version_info <= (3, 0): | ||||
|     from contextlib2 import ExitStack | ||||
| else: | ||||
|     from contextlib import ExitStack | ||||
|      | ||||
|  | ||||
|  | ||||
|  | ||||
| logger = logging.getLogger(__name__) | ||||
|  | ||||
| def serialize(function): | ||||
|     '''Common options to serialize output to CSV or other formats''' | ||||
|  | ||||
|     @click.option('--fields', help='Provide a list of comma-separated fields to print.', default='', type=str) | ||||
|     @click.option('--ignore_missing', help='Do not show warnings for missing fields.', is_flag=True) | ||||
|     @click.option('--header', help='Header that will be printed at the beginning of the file', default=None) | ||||
|     @click.option('--csv', help='Print each object as a csv row.', is_flag=True) | ||||
|     @click.option('--jsonlines', '--json', help='Print each object as JSON in a new line.', is_flag=True) | ||||
|     @click.option('--indented', help='Print each object as an indented JSON object', is_flag=True) | ||||
|     @click.option('--outdelimiter', help='Delimiter for some output formats, such as CSV. It defaults to \t', default='\t') | ||||
|     @click.option('--outfile', help='Output file. It defaults to STDOUT', default=sys.stdout) | ||||
|     def decorated(fields, ignore_missing, header, csv, jsonlines, indented, outfile, outdelimiter, **kwargs): | ||||
|         it = function(**kwargs) | ||||
|         outformat = 'json' | ||||
|         if csv: | ||||
|             outformat = 'csv' | ||||
|         elif jsonlines: | ||||
|             outformat = 'jsonlines' | ||||
|         elif indented: | ||||
|             outformat = 'indented' | ||||
|  | ||||
|         return utils.serialized(it, outfile, outformat=outformat, fields=fields.split(','), ignore_missing=ignore_missing, header=header, delimiter=outdelimiter) | ||||
|  | ||||
|     return decorated | ||||
|  | ||||
|  | ||||
| @click.group() | ||||
| @click.option("--verbose", is_flag=True) | ||||
| @click.option("--logging_level", required=False, default='WARN') | ||||
| @click.option("--config", required=False) | ||||
| @click.option('-c', '--credentials', show_default=True, default='~/.bitter-credentials.json') | ||||
| @click.option('--config', show_default=True, default=bconf.CONFIG_FILE) | ||||
| @click.option('--credentials', show_default=True, help="DEPRECATED: If specified, these credentials will be copied to the configuratation file.", default=bconf.CREDENTIALS) | ||||
| @click.pass_context | ||||
| def main(ctx, verbose, logging_level, config, credentials): | ||||
|     logging.basicConfig(level=getattr(logging, logging_level)) | ||||
|     ctx.obj = {} | ||||
|     ctx.obj['VERBOSE'] = verbose | ||||
|     ctx.obj['CONFIG'] = config | ||||
|     bconf.CONFIG_FILE = config | ||||
|     bconf.CREDENTIALS = credentials | ||||
|     utils.create_credentials(credentials) | ||||
|     if os.path.exists(utils.get_config_path(credentials)): | ||||
|       utils.copy_credentials_to_config(credentials, config) | ||||
|  | ||||
|  | ||||
| @main.group(invoke_without_command=True) | ||||
| @click.pass_context | ||||
| def credentials(ctx): | ||||
|     if ctx.invoked_subcommand is not None: | ||||
|         return | ||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||
|     for worker in wq.queue: | ||||
|         print('#'*20) | ||||
|         try: | ||||
|             resp = worker.client.application.rate_limit_status() | ||||
|             print(worker.name) | ||||
|         except Exception as ex: | ||||
|             print('{}: AUTHENTICATION ERROR: {}'.format(worker.name, ex) ) | ||||
|  | ||||
|  | ||||
| @credentials.command('limits') | ||||
| @click.option('--no_aggregate', is_flag=True, default=False, | ||||
|               help=('Print limits from all workers. By default, limits are ' | ||||
|                     'aggregated (summed).')) | ||||
| @click.option('--no_diff', is_flag=True, default=False, | ||||
|               help=('Print all limits. By default, only limits that ' | ||||
|                     'have been consumed will be shown.')) | ||||
| @click.argument('url', required=False) | ||||
| @click.pass_context | ||||
| def get_limits(ctx, no_aggregate, no_diff, url): | ||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||
|     limits = {} | ||||
|     if url: | ||||
|         print('URL is: {}'.format(url)) | ||||
|     for worker in wq.queue: | ||||
|         resp = worker.client.application.rate_limit_status() | ||||
|         for urlimits in resp['resources'].values(): | ||||
|             for url, value in urlimits.items(): | ||||
|                 if url not in limits: | ||||
|                     limits[url] = {} | ||||
|                 glob = limits[url].get('global', {}) | ||||
|                 limits[url][worker.name] = value | ||||
|                 for k in ['limit', 'remaining']: | ||||
|                     if k not in glob: | ||||
|                         glob[k] = 0 | ||||
|                     glob[k] += value[k] | ||||
|                 limits[url]['global'] = glob | ||||
|     for url, lims in limits.items(): | ||||
|         worker_list = lims.keys() if no_aggregate else ['global', ]  | ||||
|  | ||||
|         url_printed = False | ||||
|  | ||||
|         for worker in worker_list: | ||||
|             vals = lims[worker] | ||||
|             consumed = vals['limit'] - vals['remaining']  | ||||
|             if no_diff or consumed: | ||||
|                 if not url_printed: | ||||
|                     print(url) | ||||
|                     url_printed = True | ||||
|                 print('\t', worker, ':') | ||||
|                 print('\t\t', vals) | ||||
|  | ||||
|  | ||||
| @credentials.command('add') | ||||
| @click.option('--consumer_key', default=None) | ||||
| @click.option('--consumer_secret', default=None) | ||||
| @click.option('--token_key', default=None) | ||||
| @click.option('--token_secret', default=None) | ||||
| @click.argument('user_name') | ||||
| def add(user_name, consumer_key, consumer_secret, token_key, token_secret): | ||||
|     if not consumer_key: | ||||
|         consumer_key = click.prompt('Please, enter your YOUR CONSUMER KEY') | ||||
|     if not consumer_secret: | ||||
|         consumer_secret = click.prompt('Please, enter your CONSUMER SECRET') | ||||
|     if not token_key: | ||||
|         token_key = click.prompt('Please, enter your ACCESS TOKEN') | ||||
|     if not token_secret: | ||||
|         token_secret = click.prompt('Please, enter your ACCESS TOKEN SECRET') | ||||
|     utils.add_credentials(conffile=bconf.CONFIG_FILE, user=user_name, consumer_key=consumer_key, consumer_secret=consumer_secret, | ||||
|                           token_key=token_key, token_secret=token_secret) | ||||
|     click.echo('Credentials added for {}'.format(user_name)) | ||||
|  | ||||
|  | ||||
| @main.group() | ||||
| @click.pass_context  | ||||
| @click.pass_context | ||||
| def tweet(ctx): | ||||
|     pass | ||||
|  | ||||
| @tweet.command('get') | ||||
| @click.option('-w', '--write', is_flag=True, default=False) | ||||
| @click.option('-d', '--dry_run', is_flag=True, default=False) | ||||
| @click.option('-f', '--folder', default="tweets") | ||||
| @click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False) | ||||
| @click.argument('tweetid') | ||||
| def get_tweet(tweetid, write, folder, update): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     utils.download_tweet(wq, tweetid, write, folder, update) | ||||
|          | ||||
| @tweet.command('get_all') | ||||
| @click.argument('tweetsfile', 'File with a list of tweets to look up') | ||||
| @serialize | ||||
| def get_tweet(tweetid, dry_run, folder, update): | ||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||
|     yield from utils.download_tweet(wq, tweetid, not dry_run, folder, update) | ||||
|  | ||||
| @tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file. | ||||
| The result is stored as individual json files in your folder of choice.''') | ||||
| @click.argument('tweetsfile') | ||||
| @click.option('-f', '--folder', default="tweets") | ||||
| @click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!') | ||||
| @click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads') | ||||
| @click.option('-d', '--delimiter', default=",") | ||||
| @click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results') | ||||
| @click.option('--skip', help='Discard the first DISCARD lines (use them as a header)', default=0) | ||||
| @click.option('--commentchar', help='Lines starting with this character will be ignored', default=None) | ||||
| @click.option('-q', '--quotechar', default='"') | ||||
| @click.option('-c', '--column', type=int, default=0) | ||||
| @serialize | ||||
| @click.pass_context | ||||
| def get_tweets(ctx, tweetsfile, folder): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     utils.download_tweets(wq, tweetsfile, folder) | ||||
| def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, nocache, skip, quotechar, commentchar, column): | ||||
|     if update and not click.confirm('This may overwrite existing tweets. Continue?'): | ||||
|         click.echo('Cancelling') | ||||
|         return | ||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||
|  | ||||
|     status = tqdm('Queried') | ||||
|     failed = 0 | ||||
|     for tid, obj in utils.download_tweets_file(wq, tweetsfile, folder, delimiter=delimiter, cache=not nocache, | ||||
|                                                skip=skip, quotechar=quotechar, commentchar=commentchar, | ||||
|                                                column=column, update=update, retry_failed=retry): | ||||
|         status.update(1) | ||||
|         if not obj: | ||||
|             failed += 1 | ||||
|             status.set_description('Failed: %s. Queried' % failed, refresh=True) | ||||
|             continue | ||||
|         yield obj | ||||
|  | ||||
|  | ||||
| @tweet.command('search') | ||||
| @click.argument('query') | ||||
| @click.pass_context  | ||||
| @serialize | ||||
| @click.pass_context | ||||
| def search(ctx, query): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     t = utils.search_tweet(wq, query) | ||||
|     print(json.dumps(t, indent=2)) | ||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||
|     yield from utils.search_tweet(wq, query) | ||||
|  | ||||
| @tweet.command('timeline') | ||||
| @click.argument('user') | ||||
| @click.pass_context  | ||||
| @click.pass_context | ||||
| def timeline(ctx, user): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||
|     t = utils.user_timeline(wq, user) | ||||
|     print(json.dumps(t, indent=2)) | ||||
|  | ||||
| @@ -96,43 +228,48 @@ def list_users(ctx, db): | ||||
|  | ||||
| @users.command('get') | ||||
| @click.argument('user') | ||||
| @click.option('-w', '--write', is_flag=True, default=False) | ||||
| @click.option('-d', '--dry_run', is_flag=True, default=False) | ||||
| @click.option('-f', '--folder', default="users") | ||||
| @click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False) | ||||
| def get_user(user, write, folder, update): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     if not write: | ||||
|         u = utils.get_user(wq, user) | ||||
|         js = json.dumps(u, indent=2) | ||||
|         print(js) | ||||
|         return | ||||
|     if not os.path.exists(folder): | ||||
|         os.makedirs(folder) | ||||
|     file = os.path.join(folder, '%s.json' % user) | ||||
|     if not update and os.path.exists(file) and os.path.isfile(file): | ||||
|         print('User exists: %s' % user) | ||||
|         return | ||||
|     with open(file, 'w') as f: | ||||
|         u = utils.get_user(wq, user) | ||||
|         js = json.dumps(u, indent=2) | ||||
|         print(js, file=f) | ||||
| @serialize | ||||
| def get_user(user, dry_run, folder, update): | ||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||
|     yield from utils.download_user(wq, user, not dry_run, folder, update) | ||||
|  | ||||
| @users.command('get_all') | ||||
| @click.argument('usersfile', 'File with a list of users to look up') | ||||
| @users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file. | ||||
|                The result is stored as individual json files in your folder of choice.''') | ||||
| @click.argument('usersfile') | ||||
| @click.option('-f', '--folder', default="users") | ||||
| @click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!') | ||||
| @click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads') | ||||
| @click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results') | ||||
| @click.option('-d', '--delimiter', default=",") | ||||
| @click.option('--skip', help='Discard the first SKIP lines (e.g., use them as a header)', | ||||
|               is_flag=True, default=False) | ||||
| @click.option('-q', '--quotechar', default='"') | ||||
| @click.option('--commentchar', help='Lines starting with this character will be ignored', default=None) | ||||
| @click.option('-c', '--column', type=int, default=0) | ||||
| @serialize | ||||
| @click.pass_context | ||||
| def get_users(ctx, usersfile, folder): | ||||
|     with open(usersfile) as f: | ||||
|         for line in f: | ||||
|             uid = line.strip() | ||||
|             ctx.invoke(get_user, folder=folder, user=uid, write=True) | ||||
| def get_users(ctx, usersfile, folder, update, retry, nocache, delimiter, skip, quotechar, commentchar, column): | ||||
|     if update and not click.confirm('This may overwrite existing users. Continue?'): | ||||
|         click.echo('Cancelling') | ||||
|         return | ||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||
|     for i in utils.download_users_file(wq, usersfile, folder, delimiter=delimiter, | ||||
|                                        update=update, retry_failed=retry, | ||||
|                                        skip=skip, quotechar=quotechar, | ||||
|                                        cache=not nocache, | ||||
|                                        commentchar=commentchar, | ||||
|                                        column=column): | ||||
|         yield i | ||||
|  | ||||
| @users.command('crawl') | ||||
| @click.option('--db', required=True, help='Database to save all users.') | ||||
| @click.option('--skip', required=False, default=0, help='Skip N lines from the file.') | ||||
| @click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.') | ||||
| @click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.') | ||||
| @click.argument('usersfile', 'File with a list of users to look up') | ||||
| @click.argument('usersfile') | ||||
| @click.pass_context | ||||
| def crawl_users(ctx, usersfile, skip, until, threads, db): | ||||
|     global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock | ||||
| @@ -146,7 +283,7 @@ def crawl_users(ctx, usersfile, skip, until, threads, db): | ||||
|             return ExitStack() | ||||
|  | ||||
|  | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||
|     logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, | ||||
|                                                                                       len(wq.queue))) | ||||
|  | ||||
| @@ -237,7 +374,7 @@ def crawl_users(ctx, usersfile, skip, until, threads, db): | ||||
|             speed = (collected-lastcollected)/10 | ||||
|             with statslock: | ||||
|                 lastcollected = collected | ||||
|              | ||||
|  | ||||
|     logger.info('Done!') | ||||
|  | ||||
| @main.group('extractor') | ||||
| @@ -288,7 +425,7 @@ def network_extractor(ctx, as_json): | ||||
|     if as_json: | ||||
|         import json | ||||
|         print(json.dumps(follower_map, indent=4)) | ||||
|      | ||||
|  | ||||
|  | ||||
| @extractor.command('users') | ||||
| @click.pass_context | ||||
| @@ -310,7 +447,7 @@ def users_extractor(ctx): | ||||
| @click.pass_context | ||||
| def extract(ctx, recursive, user, name, initfile): | ||||
|     print(locals()) | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||
|     dburi = ctx.obj['DBURI'] | ||||
|     utils.extract(wq, | ||||
|                   recursive=recursive, | ||||
| @@ -322,56 +459,40 @@ def extract(ctx, recursive, user, name, initfile): | ||||
| @extractor.command('reset') | ||||
| @click.pass_context | ||||
| def reset_extractor(ctx): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     db = ctx.obj['DBURI'] | ||||
|     session = make_session(db) | ||||
|     session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) | ||||
|  | ||||
| @main.command('limits') | ||||
| @click.argument('url', required=False) | ||||
| @click.pass_context | ||||
| def get_limits(ctx, url): | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     for worker in wq.queue: | ||||
|         resp = worker.client.application.rate_limit_status() | ||||
|         print('#'*20) | ||||
|         print(worker.name) | ||||
|         if url: | ||||
|             limit = 'NOT FOUND' | ||||
|             print('URL is: {}'.format(url)) | ||||
|             cat = url.split('/')[1] | ||||
|             if cat in resp['resources']: | ||||
|                 limit = resp['resources'][cat].get(url, None) or resp['resources'][cat] | ||||
|             else: | ||||
|                 print('Cat {} not found'.format(cat)) | ||||
|             print('{}: {}'.format(url, limit))            | ||||
|         else: | ||||
|             print(json.dumps(resp, indent=2)) | ||||
|  | ||||
|  | ||||
| @main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False)) | ||||
| @main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False), | ||||
|               help='''Issue a call to an endpoint of the Twitter API.''') | ||||
| @click.argument('cmd', nargs=1) | ||||
| @click.option('--tweets', is_flag=True, help='Fetch more tweets using smart pagination. Use --count to control how many tweets to fetch per call, and --max_count to set the number of desired tweets (or -1 to get as many as possible).', type=bool, default=False) | ||||
| @click.option('--users', is_flag=True, help='Fetch more users using smart pagination. Use --count to control how many users to fetch per call, and --max_count to set the number of desired users (or -1 to get as many as possible).', type=bool, default=False) | ||||
| @click.argument('api_args', nargs=-1, type=click.UNPROCESSED) | ||||
| @click.pass_context | ||||
| def api(ctx, cmd, api_args): | ||||
| def api(ctx, cmd, tweets, users, api_args): | ||||
|     opts = {} | ||||
|     mappings = { | ||||
|         'id': '_id' | ||||
|     } | ||||
|     i = iter(api_args) | ||||
|     for k, v in zip(i, i): | ||||
|         k = k.replace('--', '') | ||||
|         if k in mappings: | ||||
|             k = mappings[k] | ||||
|         opts[k] = v | ||||
|     wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) | ||||
|     resp = utils.consume_feed(wq[cmd], **opts) | ||||
|     # A hack to stream jsons | ||||
|     print('[') | ||||
|     first = True | ||||
|     wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) | ||||
|     if tweets: | ||||
|         resp = utils.consume_tweets(wq[cmd], **opts) | ||||
|     elif users: | ||||
|         resp = utils.consume_users(wq[cmd], **opts) | ||||
|     else: | ||||
|         resp = wq[cmd](**opts) | ||||
|         print(json.dumps(resp)) | ||||
|         return | ||||
|     for i in resp: | ||||
|         if not first: | ||||
|             print(',') | ||||
|         else: | ||||
|             first = False | ||||
|          | ||||
|         print(json.dumps(i, indent=2)) | ||||
|     print(']') | ||||
|         print(json.dumps(i)) | ||||
|  | ||||
|  | ||||
| @main.command('server') | ||||
| @@ -383,20 +504,20 @@ def run_server(ctx, consumer_key, consumer_secret): | ||||
|     bconf.CONSUMER_SECRET = consumer_secret | ||||
|     from .webserver import app | ||||
|     app.run(host='0.0.0.0') | ||||
|      | ||||
|  | ||||
| @main.group() | ||||
| @click.pass_context  | ||||
| @click.pass_context | ||||
| def stream(ctx): | ||||
|     pass | ||||
|  | ||||
| @stream.command('get') | ||||
| @click.option('-l', '--locations', default=None) | ||||
| @click.option('-t', '--track', default=None) | ||||
| @click.option('-f', '--file', help='File to store the stream of tweets') | ||||
| @click.option('-f', '--file', default=None, help='File to store the stream of tweets') | ||||
| @click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True) | ||||
| @click.pass_context  | ||||
| @click.pass_context | ||||
| def get_stream(ctx, locations, track, file, politelyretry): | ||||
|     wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1) | ||||
|     wq = crawlers.StreamQueue.from_config(conffile=bconf.CONFIG_FILE, max_workers=1) | ||||
|  | ||||
|     query_args = {} | ||||
|     if locations: | ||||
| @@ -415,10 +536,14 @@ def get_stream(ctx, locations, track, file, politelyretry): | ||||
|                 iterator = wq.statuses.sample() | ||||
|             else: | ||||
|                 iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75") | ||||
|             for i in iterator: | ||||
|                 yield i | ||||
|             if not politelyretry: | ||||
|                 return | ||||
|             try: | ||||
|               for i in iterator: | ||||
|                   yield i | ||||
|               if not politelyretry: | ||||
|                   return | ||||
|             except Exception: | ||||
|                 if not politelyretry: | ||||
|                     raise ex | ||||
|             thishangup = time.time() | ||||
|             if thishangup - lasthangup < 60: | ||||
|                 raise Exception('Too many hangups in a row.') | ||||
| @@ -432,23 +557,23 @@ def get_stream(ctx, locations, track, file, politelyretry): | ||||
| @stream.command('read') | ||||
| @click.option('-f', '--file', help='File to read the stream of tweets from', required=True) | ||||
| @click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False) | ||||
| @click.pass_context  | ||||
| @click.pass_context | ||||
| def read_stream(ctx, file, tail): | ||||
|     for tweet in utils.read_file(file, tail=tail): | ||||
|         try: | ||||
|             print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text'])) | ||||
|             print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['created_at'], screen_name=tweet['user']['screen_name'], text=tweet['text'])) | ||||
|         except (KeyError, TypeError): | ||||
|             print('Raw tweet: {}'.format(tweet)) | ||||
|  | ||||
| @stream.command('tags') | ||||
| @click.option('-f', '--file', help='File to read the stream of tweets from', required=True) | ||||
| @click.argument('limit', required=False, default=None, type=int) | ||||
| @click.pass_context  | ||||
| @click.pass_context | ||||
| def tags_stream(ctx, file, limit): | ||||
|     c = utils.get_hashtags(utils.read_file(file)) | ||||
|     for count, tag in c.most_common(limit): | ||||
|         print(u'{} - {}'.format(count, tag)) | ||||
|      | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     main() | ||||
|   | ||||
| @@ -3,7 +3,7 @@ Common configuration for other modules. | ||||
| It is not elegant, but it works with flask and the oauth decorators. | ||||
|  | ||||
| Using this module allows you to change the config before loading any other module. | ||||
| E.g.:  | ||||
| E.g.: | ||||
|  | ||||
|     import bitter.config as c | ||||
|     c.CREDENTIALS="/tmp/credentials" | ||||
| @@ -11,3 +11,4 @@ E.g.: | ||||
|     app.run() | ||||
| ''' | ||||
| CREDENTIALS = '~/.bitter-credentials.json' | ||||
| CONFIG_FILE = '~/.bitter.yaml' | ||||
|   | ||||
| @@ -58,6 +58,18 @@ class FromCredentialsMixin(object): | ||||
|             wq.ready(cls.worker_class(cred["user"], cred)) | ||||
|         return wq | ||||
|      | ||||
| class FromConfigMixin(object): | ||||
|  | ||||
|     @classmethod | ||||
|     def from_config(cls, config=None, conffile=None, max_workers=None): | ||||
|         wq = cls() | ||||
|  | ||||
|         if not config: | ||||
|           with utils.config(conffile) as c: | ||||
|               config = c | ||||
|         for cred in islice(config['credentials'], max_workers): | ||||
|             wq.ready(cls.worker_class(cred["user"], cred)) | ||||
|         return wq | ||||
|  | ||||
| class TwitterWorker(object): | ||||
|     api_class = None | ||||
| @@ -103,13 +115,14 @@ class RestWorker(TwitterWorker): | ||||
|  | ||||
|     def get_wait(self, uriparts): | ||||
|         limits = self.get_limit(uriparts) | ||||
|         if limits['remaining'] > 0: | ||||
|         if limits.get('remaining', 1) > 0: | ||||
|             return 0 | ||||
|         reset = limits.get('reset', 0) | ||||
|         now = time.time() | ||||
|         return max(0, (reset-now)) | ||||
|  | ||||
|     def get_limit(self, uriparts): | ||||
|         uriparts = list(u for u in uriparts if u) | ||||
|         uri = '/'+'/'.join(uriparts) | ||||
|         for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items(): | ||||
|             if ix.startswith(uri): | ||||
| @@ -142,7 +155,7 @@ class RestWorker(TwitterWorker): | ||||
| class QueueException(BaseException): | ||||
|     pass | ||||
|  | ||||
| class QueueMixin(AttrToFunc, FromCredentialsMixin): | ||||
| class QueueMixin(AttrToFunc, FromCredentialsMixin, FromConfigMixin): | ||||
|     def __init__(self, wait=True): | ||||
|         logger.debug('Creating worker queue') | ||||
|         self.queue = set() | ||||
|   | ||||
| @@ -3,11 +3,13 @@ import json | ||||
|  | ||||
| from sqlalchemy.ext.declarative import declarative_base | ||||
| from sqlalchemy.types import BigInteger, Integer, Text, Boolean | ||||
| from sqlalchemy.schema import ForeignKey | ||||
| from sqlalchemy.pool import SingletonThreadPool | ||||
| from sqlalchemy import Column, Index | ||||
|  | ||||
| from sqlalchemy import create_engine | ||||
| from sqlalchemy.orm import sessionmaker | ||||
| from functools import wraps | ||||
|  | ||||
| Base = declarative_base() | ||||
|  | ||||
| @@ -90,6 +92,34 @@ class ExtractorEntry(Base): | ||||
|     busy = Column(Boolean, default=False) | ||||
|  | ||||
|  | ||||
| class Search(Base): | ||||
|     __tablename__ = 'search_queries' | ||||
|  | ||||
|     id = Column(Integer, primary_key=True, index=True, unique=True) | ||||
|     endpoint = Column(Text, comment="Endpoint URL") | ||||
|     attrs = Column(Text, comment="Text version of the dictionary of parameters") | ||||
|     count = Column(Integer) | ||||
|     current_count = Column(Integer) | ||||
|     current_id = Column(BigInteger, comment='Oldest ID retrieved (should match max_id when done)') | ||||
|     since_id = Column(BigInteger) | ||||
|  | ||||
| class SearchResults(Base): | ||||
|     __tablename__ = 'search_results' | ||||
|     id = Column(Integer, primary_key=True, index=True, unique=True) | ||||
|     search_id = Column(ForeignKey('search_queries.id')) | ||||
|     resource_id = Column(Text) | ||||
|  | ||||
| def memoize(f): | ||||
|     memo = {} | ||||
|     @wraps(f) | ||||
|     def helper(self, **kwargs): | ||||
|         st = dict_to_str(kwargs) | ||||
|         key = (self.__uriparts, st) | ||||
|         if key not in memo: | ||||
|             memo[key] = f(self, **kwargs) | ||||
|         return memo[key] | ||||
|     return helper | ||||
|  | ||||
| def make_session(url): | ||||
|     if not isinstance(url, str): | ||||
|         print(url) | ||||
| @@ -100,24 +130,6 @@ def make_session(url): | ||||
|     session = Session() | ||||
|     return session | ||||
|  | ||||
| def test(db='sqlite:///users.db'): | ||||
|  | ||||
|     from sqlalchemy import exists | ||||
|     session = make_session(db) | ||||
|  | ||||
|     our_user = session.query(User).first()  | ||||
|  | ||||
|     print(our_user.name) | ||||
|     print(session.query(User).count()) | ||||
|     fake_user = User(name="Fake user") | ||||
|     session.add(fake_user) | ||||
|     session.commit() | ||||
|     print(session.query(User).count()) | ||||
|     print(session.query(exists().where(User.name == "Fake user")).scalar()) | ||||
|     fake_committed = session.query(User).filter_by(name="Fake user").first() | ||||
|     print(fake_committed.id) | ||||
|     print(fake_committed.name) | ||||
|     session.delete(fake_committed) | ||||
|     session.commit() | ||||
|     print(session.query(User).count()) | ||||
|     print(list(session.execute('SELECT 1 from users where id=\'%s\'' % 1548))) | ||||
| def dict_to_str(args): | ||||
|     return json.dumps(args, sort_keys=True) | ||||
|   | ||||
							
								
								
									
										545
									
								
								bitter/utils.py
									
									
									
									
									
								
							
							
						
						
									
										545
									
								
								bitter/utils.py
									
									
									
									
									
								
							| @@ -3,6 +3,9 @@ from __future__ import print_function | ||||
| import logging | ||||
| import time | ||||
| import json | ||||
| import yaml | ||||
| import csv | ||||
| import io | ||||
|  | ||||
| import signal | ||||
| import sys | ||||
| @@ -10,8 +13,15 @@ import sqlalchemy | ||||
| import os | ||||
| import multiprocessing | ||||
| from multiprocessing.pool import ThreadPool | ||||
| from multiprocessing import Queue | ||||
|  | ||||
| from functools import partial | ||||
| import queue | ||||
| import threading | ||||
| from select import select | ||||
|  | ||||
| import operator | ||||
|  | ||||
| from functools import partial, reduce | ||||
|  | ||||
| from tqdm import tqdm | ||||
|  | ||||
| @@ -19,6 +29,7 @@ from itertools import islice, chain | ||||
| from contextlib import contextmanager | ||||
|  | ||||
| from collections import Counter | ||||
| from random import choice | ||||
|  | ||||
| from builtins import map, filter | ||||
|  | ||||
| @@ -28,6 +39,12 @@ from bitter.models import Following, User, ExtractorEntry, make_session | ||||
|  | ||||
| from bitter import config | ||||
|  | ||||
| # Fix Python 2.x. | ||||
| try: | ||||
|     UNICODE_EXISTS = bool(type(unicode)) | ||||
| except NameError: | ||||
|     unicode = lambda s: str(s) | ||||
|  | ||||
| logger = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| @@ -44,36 +61,91 @@ def chunk(iterable, n): | ||||
| def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): | ||||
|     source = chunk(source, chunksize) | ||||
|     p = ThreadPool(numcpus*2) | ||||
|     results = p.imap_unordered(func, source, chunksize=int(1000/numcpus)) | ||||
|  | ||||
|     def wrapped_func(*args, **kwargs): | ||||
|         try: | ||||
|             return func(*args, **kwargs) | ||||
|         except Exception as ex: | ||||
|             print('Exception on parallel thread: {}'.format(ex), file=sys.stderr) | ||||
|  | ||||
|     results = p.imap_unordered(wrapped_func, source) | ||||
|     for i in chain.from_iterable(results): | ||||
|         yield i | ||||
|  | ||||
|  | ||||
| def get_credentials_path(credfile=None): | ||||
|     if not credfile: | ||||
|         if config.CREDENTIALS: | ||||
|             credfile = config.CREDENTIALS | ||||
| def get_config_path(conf=None): | ||||
|     if not conf: | ||||
|         if config.CONFIG_FILE: | ||||
|             conf = config.CONFIG_FILE | ||||
|         else: | ||||
|             raise Exception('No valid credentials file') | ||||
|     return os.path.expanduser(credfile) | ||||
|             raise Exception('No valid config file') | ||||
|     return os.path.expanduser(conf) | ||||
|  | ||||
|  | ||||
| def copy_credentials_to_config(credfile, conffile=None): | ||||
|       p = get_config_path(credfile) | ||||
|       with open(p) as old: | ||||
|           for line in old: | ||||
|               cred = json.loads(line.strip()) | ||||
|               add_credentials(conffile, **cred) | ||||
|  | ||||
|  | ||||
| def save_config(conf, conffile=None): | ||||
|     with config(conffile) as c: | ||||
|         c.clear() | ||||
|         c.update(conf) | ||||
|  | ||||
|  | ||||
| @contextmanager | ||||
| def credentials_file(credfile, *args, **kwargs): | ||||
|     p = get_credentials_path(credfile) | ||||
|     with open(p, *args, **kwargs) as f: | ||||
|         yield f | ||||
| def config(conffile=None): | ||||
|     d = read_config(conffile) | ||||
|     try: | ||||
|         yield d | ||||
|     finally: | ||||
|         write_config(d, conffile) | ||||
|  | ||||
|  | ||||
| def iter_credentials(credfile=None): | ||||
|     with credentials_file(credfile) as f: | ||||
|         for l in f: | ||||
|             yield json.loads(l.strip()) | ||||
| def read_config(conffile): | ||||
|     p = conffile and get_config_path(conffile) | ||||
|     if p: | ||||
|         if not os.path.exists(p): | ||||
|             raise IOError('{} file does not exist.'.format(p)) | ||||
|         f = open(p, 'r') | ||||
|     elif 'BITTER_CONFIG' not in os.environ: | ||||
|         raise Exception('No config file or BITTER_CONFIG env variable.') | ||||
|     else: | ||||
|         f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n')) | ||||
|     return yaml.load(f, Loader=yaml.SafeLoader) or {'credentials': []} | ||||
|  | ||||
|  | ||||
| def get_credentials(credfile=None, inverse=False, **kwargs): | ||||
| def write_config(conf, conffile=None): | ||||
|     if not conf: | ||||
|         conf = {'credentials': []} | ||||
|     if conffile: | ||||
|         p = get_config_path(conffile) | ||||
|         with open(p, 'w') as f: | ||||
|             yaml.dump(conf, f) | ||||
|     else: | ||||
|         os.environ['BITTER_CONFIG'] = yaml.dump(conf) | ||||
|  | ||||
| def iter_credentials(conffile=None): | ||||
|     with config(conffile) as c: | ||||
|         for i in c['credentials']: | ||||
|             yield i | ||||
|  | ||||
|  | ||||
| def create_config_file(conffile=None): | ||||
|     if not conffile: | ||||
|         return | ||||
|     conffile = get_config_path(conffile) | ||||
|     with open(conffile, 'a'): | ||||
|         pass | ||||
|     write_config(None, conffile) | ||||
|  | ||||
|  | ||||
| def get_credentials(conffile=None, inverse=False, **kwargs): | ||||
|     creds = [] | ||||
|     for i in iter_credentials(credfile): | ||||
|     for i in iter_credentials(conffile): | ||||
|         matches = all(map(lambda x: i[x[0]] == x[1], kwargs.items())) | ||||
|         if matches and not inverse: | ||||
|             creds.append(i) | ||||
| @@ -82,26 +154,22 @@ def get_credentials(credfile=None, inverse=False, **kwargs): | ||||
|     return creds | ||||
|  | ||||
|  | ||||
| def create_credentials(credfile=None): | ||||
|     credfile = get_credentials_path(credfile) | ||||
|     with credentials_file(credfile, 'a'): | ||||
|         pass | ||||
|  | ||||
|      | ||||
| def delete_credentials(credfile=None, **creds): | ||||
|     tokeep = get_credentials(credfile, inverse=True, **creds) | ||||
|     with credentials_file(credfile, 'w') as f: | ||||
|         for i in tokeep: | ||||
|             f.write(json.dumps(i)) | ||||
|             f.write('\n') | ||||
| def delete_credentials(conffile=None, **creds): | ||||
|     tokeep = get_credentials(conffile, inverse=True, **creds) | ||||
|     with config(conffile) as c: | ||||
|         c['credentials'] = list(tokeep) | ||||
|  | ||||
|  | ||||
| def add_credentials(credfile=None, **creds): | ||||
|     exist = get_credentials(credfile, **creds) | ||||
|     if not exist: | ||||
|         with credentials_file(credfile, 'a') as f: | ||||
|             f.write(json.dumps(creds)) | ||||
|             f.write('\n') | ||||
| def add_credentials(conffile=None, **creds): | ||||
|     try: | ||||
|         exist = get_credentials(conffile, **creds) | ||||
|     except IOError: | ||||
|         exist = False | ||||
|         create_config_file(conffile) | ||||
|     if exist: | ||||
|         return | ||||
|     with config(conffile) as c: | ||||
|         c['credentials'].append(creds) | ||||
|  | ||||
|  | ||||
| def get_hashtags(iter_tweets, best=None): | ||||
| @@ -112,17 +180,24 @@ def get_hashtags(iter_tweets, best=None): | ||||
|  | ||||
|  | ||||
| def read_file(filename, tail=False): | ||||
|     with open(filename) as f: | ||||
|         while True: | ||||
|             line = f.readline() | ||||
|             if line not in (None, '', '\n'): | ||||
|                 tweet = json.loads(line.strip()) | ||||
|                 yield tweet | ||||
|             else: | ||||
|                 if tail: | ||||
|                     time.sleep(1) | ||||
|                 else: | ||||
|                     return | ||||
|     if filename == '-': | ||||
|         f = sys.stdin | ||||
|     else: | ||||
|         f = open(filename) | ||||
|     try: | ||||
|       while True: | ||||
|           line = f.readline() | ||||
|           if line not in (None, '', '\n'): | ||||
|               tweet = json.loads(line.strip()) | ||||
|               yield tweet | ||||
|           else: | ||||
|               if tail: | ||||
|                   time.sleep(1) | ||||
|               else: | ||||
|                   return | ||||
|     finally: | ||||
|         if f != sys.stdin: | ||||
|           close(f) | ||||
|  | ||||
|  | ||||
| def get_users(wq, ulist, by_name=False, queue=None, max_users=100): | ||||
| @@ -204,8 +279,7 @@ def download_entry(wq, entry_id, dburi=None, recursive=False): | ||||
|     download_user(wq, session, user, entry, recursive) | ||||
|     session.close() | ||||
|  | ||||
|  | ||||
| def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000): | ||||
| def crawl_user(wq, session, user, entry=None, recursive=False, max_followers=50000): | ||||
|  | ||||
|     total_followers = user.followers_count | ||||
|  | ||||
| @@ -353,8 +427,11 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor | ||||
|     pending = pending_entries(dburi) | ||||
|     session.close() | ||||
|  | ||||
|     for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users): | ||||
|         logger.info("Got %s" % i) | ||||
|     with tqdm(parallel(de, pending), desc='Downloading users', total=total_users) as tq: | ||||
|         for i in tq:  | ||||
|             tq.write('Got {}'.format(i)) | ||||
|             logger.info("Got %s" % i) | ||||
|  | ||||
|  | ||||
|  | ||||
| def pending_entries(dburi): | ||||
| @@ -383,7 +460,7 @@ def get_tweet(c, tid): | ||||
|     return c.statuses.show(id=tid) | ||||
|  | ||||
| def search_tweet(c, query): | ||||
|     return c.search.tweets(q=query) | ||||
|      yield from c.search.tweets(q=query)['statuses'] | ||||
|  | ||||
| def user_timeline(c, query): | ||||
|     try: | ||||
| @@ -398,117 +475,329 @@ def get_user(c, user): | ||||
|     except ValueError: | ||||
|         return c.users.lookup(screen_name=user)[0] | ||||
|  | ||||
| def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False): | ||||
|     cached = cached_tweet(tweetid, folder) | ||||
|     tweet = None | ||||
|     if update or not cached: | ||||
| def download_tweet(wq, tweetid, cache=True, folder="downloaded_tweets", update=False): | ||||
|     tweet = cached_id(tweetid, folder) | ||||
|     if update or not tweet: | ||||
|         tweet = get_tweet(wq, tweetid) | ||||
|         js = json.dumps(tweet, indent=2) | ||||
|     if write: | ||||
|     if cache and update: | ||||
|         if tweet: | ||||
|             write_tweet_json(js, folder) | ||||
|     else: | ||||
|         print(js) | ||||
|             js = json.dumps(tweet) | ||||
|             write_json(js, folder) | ||||
|     yield tweet | ||||
|  | ||||
|  | ||||
| def cached_tweet(tweetid, folder): | ||||
| def download_user(wq, userid, cache=True, folder="downloaded_users", update=False): | ||||
|     user = cached_id(userid, folder) | ||||
|     if update or not user: | ||||
|         user = get_user(wq, userid) | ||||
|     if cache and update: | ||||
|         if user: | ||||
|             write_json(user, folder, aliases=[user['screen_name'], ]) | ||||
|     yield user | ||||
|  | ||||
|  | ||||
| def cached_id(oid, folder): | ||||
|     tweet = None | ||||
|     file = os.path.join(folder, '%s.json' % tweetid) | ||||
|     file = os.path.join(folder, '%s.json' % oid) | ||||
|     if os.path.exists(file) and os.path.isfile(file): | ||||
|         try: | ||||
|             # print('%s: Tweet exists' % tweetid) | ||||
|             # print('%s: Object exists' % oid) | ||||
|             with open(file) as f: | ||||
|                 tweet = json.load(f) | ||||
|         except Exception as ex: | ||||
|             logger.error('Error getting cached version of {}: {}'.format(tweetid, ex)) | ||||
|             logger.error('Error getting cached version of {}: {}'.format(oid, ex)) | ||||
|     return tweet | ||||
|  | ||||
| def write_tweet_json(js, folder): | ||||
|     tweetid = js['id'] | ||||
|     file = tweet_file(tweetid, folder) | ||||
| def write_json(js, folder, oid=None, aliases=[]): | ||||
|     if not oid: | ||||
|       oid = js['id'] | ||||
|     file = id_file(oid, folder) | ||||
|     if not os.path.exists(folder): | ||||
|         os.makedirs(folder) | ||||
|     with open(file, 'w') as f: | ||||
|         json.dump(js, f, indent=2) | ||||
|         logger.info('Written {} to file {}'.format(tweetid, file)) | ||||
|         json.dump(js, f) | ||||
|         logger.info('Written {} to file {}'.format(oid, file)) | ||||
|     for alias in aliases: | ||||
|         os.symlink('%s.json' % oid, id_file(alias, folder)) | ||||
|  | ||||
| def tweet_file(tweetid, folder): | ||||
|     return os.path.join(folder, '%s.json' % tweetid) | ||||
| def id_file(oid, folder): | ||||
|     return os.path.join(folder, '%s.json' % oid) | ||||
|  | ||||
| def tweet_fail_file(tweetid, folder): | ||||
| def fail_file(oid, folder): | ||||
|     failsfolder = os.path.join(folder, 'failed') | ||||
|     if not os.path.exists(failsfolder): | ||||
|         os.makedirs(failsfolder) | ||||
|     return os.path.join(failsfolder, '%s.failed' % tweetid) | ||||
|     return os.path.join(failsfolder, '%s.failed' % oid) | ||||
|  | ||||
| def tweet_failed(tweetid, folder): | ||||
|     return os.path.isfile(tweet_fail_file(tweetid, folder)) | ||||
| def id_failed(oid, folder): | ||||
|     return os.path.isfile(fail_file(oid, folder)) | ||||
|  | ||||
| def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True): | ||||
|     def filter_line(line): | ||||
|         tweetid = int(line) | ||||
|         # print('Checking {}'.format(tweetid)) | ||||
|         if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed): | ||||
|             yield None | ||||
| def tweet_download_batch(wq, batch): | ||||
|     tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id'] | ||||
|     for tid, tweet in tweets.items(): | ||||
|         yield tid, tweet | ||||
|  | ||||
| def user_download_batch(wq, batch): | ||||
|     screen_names = [] | ||||
|     user_ids = [] | ||||
|     for elem in batch: | ||||
|         try: | ||||
|             int(elem) | ||||
|             user_ids.append(str(elem)) | ||||
|         except ValueError: | ||||
|             screen_names.append(elem.lower()) | ||||
|     args = {} | ||||
|     if user_ids: | ||||
|         args['user_id'] = ','.join(user_ids) | ||||
|     if screen_names: | ||||
|         args['screen_name'] = ','.join(screen_names) | ||||
|     try: | ||||
|         users = wq.users.lookup(**args) | ||||
|     except TwitterHTTPError as ex: | ||||
|         if ex.e.code in (404,): | ||||
|             users = [] | ||||
|         else: | ||||
|             yield line | ||||
|             raise | ||||
|     found_ids = [] | ||||
|     found_names = [] | ||||
|     for user in users: | ||||
|         uid = user['id_str'] | ||||
|         if uid in user_ids: | ||||
|             found_ids.append(uid) | ||||
|             yield (uid, user) | ||||
|         uname = user['screen_name'].lower() | ||||
|         if uname in screen_names: | ||||
|             found_names.append(uname) | ||||
|             yield (uname, user) | ||||
|     for uid in set(user_ids) - set(found_ids): | ||||
|         yield (uid, None) | ||||
|     for name in set(screen_names) - set(found_names): | ||||
|         yield (name, None) | ||||
|  | ||||
|     def print_result(res): | ||||
|         tid, tweet = res | ||||
|         if tweet: | ||||
|             try: | ||||
|                 write_tweet_json(tweet, folder=folder) | ||||
|                 yield 1 | ||||
|             except Exception as ex: | ||||
|                 logger.error('%s: %s' % (tid, ex)) | ||||
|                 if not ignore_fails: | ||||
|                     raise | ||||
|         else: | ||||
|             logger.info('Tweet not recovered: {}'.format(tid)) | ||||
|             with open(tweet_fail_file(tid, folder), 'w') as f: | ||||
|                 print('Tweet not found', file=f) | ||||
|             yield -1 | ||||
|  | ||||
|     def download_batch(batch): | ||||
|         tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id'] | ||||
|         return tweets.items() | ||||
| def dump_result(oid, obj, folder, ignore_fails=True): | ||||
|     if obj: | ||||
|         try: | ||||
|             write_json(obj, folder=folder, oid=oid) | ||||
|             failed = fail_file(oid, folder) | ||||
|             if os.path.exists(failed): | ||||
|                 os.remove(failed) | ||||
|         except Exception as ex: | ||||
|             logger.error('%s: %s' % (oid, ex)) | ||||
|             if not ignore_fails: | ||||
|                 raise | ||||
|     else: | ||||
|         logger.info('Object not recovered: {}'.format(oid)) | ||||
|         with open(fail_file(oid, folder), 'w') as f: | ||||
|             print('Object not found', file=f) | ||||
|  | ||||
|  | ||||
| def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False, cache=True, | ||||
|                   batch_method=tweet_download_batch): | ||||
|  | ||||
|     done = Queue() | ||||
|  | ||||
|     down = Queue() | ||||
|  | ||||
|  | ||||
|     def filter_list(lst, done, down): | ||||
|         print('filtering') | ||||
|         for oid in lst: | ||||
|             cached = cached_id(oid, folder) | ||||
|             if (cached and not update): | ||||
|                 done.put((oid, cached)) | ||||
|             elif (id_failed(oid, folder) and not retry_failed): | ||||
|                 done.put((oid, None)) | ||||
|             else: | ||||
|                 down.put(oid) | ||||
|         down.put(None) | ||||
|  | ||||
|     def download_results(batch_method, down, done): | ||||
|         def gen(): | ||||
|             while True: | ||||
|                 r = down.get() | ||||
|                 if r is None: | ||||
|                     down.close() | ||||
|                     down.join_thread() | ||||
|                     return | ||||
|                 yield r | ||||
|  | ||||
|         for t in parallel(batch_method, gen(), 100): | ||||
|             done.put(t) | ||||
|  | ||||
|     def batch(*args, **kwargs): | ||||
|         return batch_method(wq, *args, **kwargs) | ||||
|  | ||||
|     tc = threading.Thread(target=filter_list, args=(lst, done, down), daemon=True) | ||||
|     tc.start() | ||||
|     td = threading.Thread(target=download_results, args=(batch, down, done), daemon=True) | ||||
|     td.start() | ||||
|  | ||||
|     def check_threads(ts, done): | ||||
|         for t in ts: | ||||
|             t.join() | ||||
|         done.put(None) | ||||
|  | ||||
|     wait = threading.Thread(target=check_threads, args=([tc, td], done), daemon=True) | ||||
|     wait.start() | ||||
|  | ||||
|     while True: | ||||
|         rec = done.get() | ||||
|  | ||||
|         if rec is None: | ||||
|             done.close() | ||||
|             done.join_thread() | ||||
|             break | ||||
|  | ||||
|         oid, obj = rec | ||||
|         if cache or (not obj): | ||||
|             dump_result(oid, obj, folder, ignore_fails) | ||||
|         yield rec | ||||
|  | ||||
|     wait.join() | ||||
|  | ||||
|  | ||||
| def download_tweets_file(*args, **kwargs): | ||||
|     kwargs['batch_method'] = tweet_download_batch | ||||
|     yield from download_file(*args, **kwargs) | ||||
|  | ||||
|  | ||||
| def download_users_file(*args, **kwargs): | ||||
|     kwargs['batch_method'] = user_download_batch | ||||
|     yield from download_file(*args, **kwargs) | ||||
|  | ||||
|  | ||||
| def download_file(wq, csvfile, folder, column=0, delimiter=',', skip=0, cache=True, | ||||
|                   quotechar='"', commentchar=None, batch_method=tweet_download_batch, | ||||
|                   **kwargs): | ||||
|     with open(csvfile) as f: | ||||
|         if commentchar: | ||||
|             f = (line for line in f if not line.startswith('#')) | ||||
|  | ||||
|         csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar)) | ||||
|         for n in range(skip): | ||||
|             next(csvreader) | ||||
|  | ||||
|         def reader(r): | ||||
|             for row in csvreader: | ||||
|                 if len(row) > column: | ||||
|                     yield row[column].strip() | ||||
|  | ||||
|  | ||||
|         for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method, cache=cache, | ||||
|                                  **kwargs): | ||||
|             yield res | ||||
|  | ||||
|     with open(tweetsfile) as f: | ||||
|         lines = map(lambda x: x.strip(), f) | ||||
|         lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines')) | ||||
|         tweets = parallel(download_batch, lines_to_crawl, 100) | ||||
|         for res in tqdm(parallel(print_result, tweets), desc='Queried'): | ||||
|             pass | ||||
|  | ||||
| def download_timeline(wq, user): | ||||
|     return wq.statuses.user_timeline(id=user) | ||||
|  | ||||
|  | ||||
| def consume_feed(func, *args, **kwargs): | ||||
| def _consume_feed(func, feed_control=None, **kwargs): | ||||
|     ''' | ||||
|     Get all the tweets using pagination and a given method. | ||||
|     It can be controlled with the `count` parameter. | ||||
|  | ||||
|     If count < 0 => Loop until the whole feed is consumed. | ||||
|     If count == 0 => Only call the API once, with the default values. | ||||
|     If count > 0 => Get count tweets from the feed. | ||||
|     If max_count < 0 => Loop until the whole feed is consumed. | ||||
|     If max_count == 0 => Only call the API once, with the default values. | ||||
|     If max_count > 0 => Get max_count tweets from the feed. | ||||
|     ''' | ||||
|     remaining = int(kwargs.pop('count', 0)) | ||||
|     consume = remaining < 0 | ||||
|     remaining = int(kwargs.pop('max_count', 0)) | ||||
|     count = int(kwargs.get('count', -1)) | ||||
|     limit = False | ||||
|  | ||||
|     # Simulate a do-while by updating the condition at the end | ||||
|     while not limit: | ||||
|         if remaining > 0: | ||||
|             kwargs['count'] = remaining | ||||
|         resp = func(*args, **kwargs) | ||||
|         if not resp: | ||||
|             return | ||||
|         for t in resp: | ||||
|             yield t | ||||
|         if consume: | ||||
|             continue | ||||
|         remaining -= len(resp) | ||||
|         max_id = min(s['id'] for s in func(*args, **kwargs)) - 1 | ||||
|         kwargs['max_id'] = max_id | ||||
|         limit = remaining <= 0 | ||||
|     # We need to at least perform a query, so we simulate a do-while | ||||
|     # by running once with no limit and updating the condition at the end | ||||
|     with tqdm(total=remaining) as pbar: | ||||
|       while not limit: | ||||
|           if remaining > 0 and  ((count < 0) or (count > remaining)): | ||||
|               kwargs['count'] = remaining | ||||
|           resp, stop = feed_control(func, kwargs, remaining=remaining, batch_size=count) | ||||
|           if not resp: | ||||
|               return | ||||
|           for entry in resp: | ||||
|               yield entry | ||||
|           pbar.update(len(resp)) | ||||
|           limit = stop | ||||
|           if remaining < 0: | ||||
|               # If the loop was run with a negative remaining, it will only stop | ||||
|               # when the control function tells it to. | ||||
|               continue | ||||
|           # Otherwise, check if we have already downloaded all the required items | ||||
|           remaining -= len(resp) | ||||
|           limit = limit or remaining <= 0 | ||||
|  | ||||
|  | ||||
| def consume_tweets(*args, **kwargs): | ||||
|     return _consume_feed(*args, feed_control=_tweets_control, **kwargs) | ||||
|  | ||||
|  | ||||
| def consume_users(*args, **kwargs): | ||||
|     return _consume_feed(*args, feed_control=_users_control, **kwargs) | ||||
|  | ||||
|  | ||||
| def _tweets_control(func, apiargs, remaining=0, **kwargs): | ||||
|     ''' Return a list of entries, the remaining ''' | ||||
|      | ||||
|     resp = func(**apiargs) | ||||
|     if not resp: | ||||
|         return None, True | ||||
|     # Update the arguments for the next call | ||||
|     # Two options: either resp is a list, or a dict like: | ||||
|     #    {'statuses': ... 'search_metadata': ...} | ||||
|     if isinstance(resp, dict) and 'search_metadata' in resp: | ||||
|         resp = resp['statuses'] | ||||
|     max_id = min(s['id'] for s in resp) - 1 | ||||
|     apiargs['max_id'] = max_id | ||||
|     return resp, False | ||||
|  | ||||
|  | ||||
| def _users_control(func, apiargs, remaining=0, **kwargs): | ||||
|     resp = func(**apiargs) | ||||
|     stop = True | ||||
|     # Update the arguments for the next call | ||||
|     if 'next_cursor' in resp: | ||||
|         cursor = resp['next_cursor'] | ||||
|         apiargs['cursor'] = cursor | ||||
|         if int(cursor) != -1: | ||||
|             stop = False | ||||
|     return resp['users'], stop | ||||
|  | ||||
|  | ||||
| def serialized(it, outfile, outformat='csv', fields=[], header=None, ignore_missing=False, delimiter='\t'): | ||||
|     outformat = outformat.lower() | ||||
|     def do(out): | ||||
|  | ||||
|         if outformat == 'csv': | ||||
|             writer = csv.writer(out, quoting=csv.QUOTE_ALL, delimiter=delimiter) | ||||
|             if header != '': | ||||
|                 h = header | ||||
|                 if h is None: | ||||
|                     h = delimiter.join(fields) | ||||
|                 print(h, file=out) | ||||
|             attrs = list(token.strip().split('.') for token in fields) | ||||
|             for obj in it: | ||||
|                 values = [] | ||||
|                 for attr in attrs: | ||||
|                     try: | ||||
|                         values.append(reduce(operator.getitem, attr, obj)) | ||||
|                     except KeyError: | ||||
|                         if not ignore_missing: | ||||
|                             print('Key not present: {}'.format(attr), file=sys.stderr) | ||||
|                         values.append(None) | ||||
|                 writer.writerow(values) | ||||
|         elif outformat == 'jsonlines': | ||||
|             for obj in it: | ||||
|                 print(json.dumps(obj, sort_keys=True), file=out) | ||||
|         elif outformat == 'indented': | ||||
|             for obj in it: | ||||
|                 print(json.dumps(obj, indent=4, sort_keys=True), file=out) | ||||
|         else: | ||||
|             for obj in it: | ||||
|                 print(obj, file=out) | ||||
|  | ||||
|     if outfile is sys.stdout: | ||||
|         return do(sys.stdout) | ||||
|  | ||||
|     with open(outfile, 'w') as out: | ||||
|         return do(out) | ||||
|   | ||||
							
								
								
									
										12
									
								
								docker-compose.yml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										12
									
								
								docker-compose.yml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,12 @@ | ||||
| version: '2' | ||||
| services: | ||||
|   dev: | ||||
|     build: | ||||
|       context: . | ||||
|       dockerfile: Dockerfile-3.4 | ||||
|     volumes: | ||||
|       - '.:/usr/src/app' | ||||
|     tty: yes | ||||
|     working_dir: '/usr/src/app' | ||||
|     entrypoint: '/bin/bash' | ||||
|     command: '' | ||||
| @@ -2,3 +2,4 @@ sqlalchemy | ||||
| twitter | ||||
| click | ||||
| tqdm | ||||
| pyyaml | ||||
|   | ||||
							
								
								
									
										32
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										32
									
								
								setup.py
									
									
									
									
									
								
							| @@ -1,29 +1,23 @@ | ||||
| import pip | ||||
| from setuptools import setup | ||||
| from pip.req import parse_requirements | ||||
|  | ||||
| # parse_requirements() returns generator of pip.req.InstallRequirement objects | ||||
| # pip 6 introduces the *required* session argument | ||||
| try: | ||||
|     install_reqs = parse_requirements("requirements.txt", session=pip.download.PipSession()) | ||||
|     py2_reqs = parse_requirements("requirements-py2.txt", session=pip.download.PipSession()) | ||||
|     test_reqs = parse_requirements("test-requirements.txt", session=pip.download.PipSession()) | ||||
| except AttributeError: | ||||
|     install_reqs = parse_requirements("requirements.txt") | ||||
|     py2_reqs = parse_requirements("requirements-py2.txt") | ||||
|     test_reqs = parse_requirements("test-requirements.txt") | ||||
| def parse_requirements(filename): | ||||
|     """ load requirements from a pip requirements file """ | ||||
|     with open(filename, 'r') as f: | ||||
|         lineiter = list(line.strip() for line in f) | ||||
|     return [line for line in lineiter if line and not line.startswith("#")] | ||||
|  | ||||
| install_reqs = parse_requirements("requirements.txt") | ||||
| py2_reqs = parse_requirements("requirements-py2.txt") | ||||
| test_reqs = parse_requirements("test-requirements.txt") | ||||
|  | ||||
| import sys | ||||
| import os | ||||
| import itertools | ||||
| if sys.version_info <= (3, 0): | ||||
|     install_reqs = itertools.chain(install_reqs, py2_reqs) | ||||
|     install_reqs = install_reqs + py2_reqs | ||||
|  | ||||
| # reqs is a list of requirement | ||||
| # e.g. ['django==1.5.1', 'mezzanine==1.4.6'] | ||||
| install_reqs = [str(ir.req) for ir in install_reqs] | ||||
| test_reqs = [str(ir.req) for ir in test_reqs] | ||||
|  | ||||
| from bitter import __version__ | ||||
| with open(os.path.join('bitter', 'VERSION'), 'r') as f: | ||||
|     __version__ = f.read().strip() | ||||
|  | ||||
| setup( | ||||
|     name="bitter", | ||||
|   | ||||
| @@ -12,7 +12,11 @@ from bitter import config as c | ||||
| class TestCrawlers(TestCase): | ||||
|  | ||||
|     def setUp(self): | ||||
|         self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json')) | ||||
|         CONF_PATH = os.path.join(os.path.dirname(__file__), '.bitter.yaml') | ||||
|         if os.path.exists(CONF_PATH): | ||||
|             self.wq = easy(CONF_PATH) | ||||
|         else: | ||||
|             self.wq = easy() | ||||
|  | ||||
|     def test_create_worker(self): | ||||
|         assert len(self.wq.queue)==1 | ||||
|   | ||||
							
								
								
									
										23
									
								
								tests/test_models.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										23
									
								
								tests/test_models.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,23 @@ | ||||
| from unittest import TestCase | ||||
|  | ||||
| import os | ||||
| import types | ||||
|  | ||||
| from bitter import utils | ||||
| from bitter.models import * | ||||
| from sqlalchemy import exists | ||||
|  | ||||
| class TestModels(TestCase): | ||||
|  | ||||
|     def setUp(self): | ||||
|         self.session = make_session('sqlite://') | ||||
|  | ||||
|     def test_user(self): | ||||
|         fake_user = User(name="Fake user", id=1548) | ||||
|         self.session.add(fake_user) | ||||
|         self.session.commit() | ||||
|         fake_committed = self.session.query(User).filter_by(name="Fake user").first() | ||||
|         assert fake_committed | ||||
|         self.session.delete(fake_committed) | ||||
|         self.session.commit() | ||||
|         assert not list(self.session.execute('SELECT 1 from users where id=\'%s\'' % 1548)) | ||||
| @@ -8,56 +8,63 @@ from bitter import config as c | ||||
|  | ||||
| class TestUtils(TestCase): | ||||
|  | ||||
|     configfile = '/tmp/bitter.yaml' | ||||
|  | ||||
|     def setUp(self): | ||||
|         self.credfile = '/tmp/credentials.txt' | ||||
|         c.CREDENTIALS = self.credfile | ||||
|         if os.path.exists(self.credfile): | ||||
|             os.remove(self.credfile) | ||||
|         utils.create_credentials(self.credfile) | ||||
|         c.CONFIG_FILE = self.configfile | ||||
|         if os.path.exists(self.configfile): | ||||
|             os.remove(self.configfile) | ||||
|         assert not os.path.exists(self.configfile) | ||||
|         utils.create_config_file(self.configfile) | ||||
|         assert os.path.exists(self.configfile) | ||||
|          | ||||
|  | ||||
|     def test_create_credentials(self): | ||||
|         assert os.path.exists(self.credfile) | ||||
|         os.remove(self.credfile) | ||||
|         utils.create_credentials() # From config | ||||
|         assert os.path.exists(self.credfile) | ||||
|  | ||||
|     def test_add_credentials(self): | ||||
|         utils.add_credentials(self.credfile, user="test") | ||||
|         assert utils.get_credentials(self.credfile) | ||||
|         assert utils.get_credentials(user="test") | ||||
|         assert list(utils.get_credentials(user="test"))[0]["user"] == "test" | ||||
|         utils.add_credentials(self.configfile, user="test") | ||||
|         assert utils.get_credentials(self.configfile) | ||||
|         assert utils.get_credentials(self.configfile, user="test") | ||||
|         assert list(utils.get_credentials(self.configfile, user="test"))[0]["user"] == "test" | ||||
|  | ||||
|     def test_get_credentials(self): | ||||
|         utils.add_credentials(self.credfile, user="test") | ||||
|         assert utils.get_credentials(user="test") | ||||
|         assert not utils.get_credentials(user="test", inverse=True) | ||||
|         utils.add_credentials(self.configfile, user="test") | ||||
|         assert utils.get_credentials(self.configfile, user="test") | ||||
|         assert not utils.get_credentials(self.configfile, user="test", inverse=True) | ||||
|  | ||||
|     def test_add_two_credentials(self): | ||||
|         utils.add_credentials(self.credfile, user="test") | ||||
|         utils.add_credentials(self.credfile, user="test2") | ||||
|         assert utils.get_credentials(user="test") | ||||
|         assert utils.get_credentials(user="test2") | ||||
|         utils.add_credentials(self.configfile, user="test") | ||||
|         utils.add_credentials(self.configfile, user="test2") | ||||
|         assert utils.get_credentials(self.configfile, user="test") | ||||
|         assert utils.get_credentials(self.configfile, user="test2") | ||||
|  | ||||
|  | ||||
|     def test_delete_credentials(self): | ||||
|         utils.add_credentials(self.credfile, user="test") | ||||
|         assert utils.get_credentials(user="test") | ||||
|         utils.delete_credentials(user="test") | ||||
|         print(utils.get_credentials()) | ||||
|         assert not utils.get_credentials(user="test") | ||||
|         utils.add_credentials(self.configfile, user="test") | ||||
|         assert utils.get_credentials(self.configfile, user="test") | ||||
|         utils.delete_credentials(self.configfile, user="test") | ||||
|         assert not utils.get_credentials(self.configfile, user="test") | ||||
|  | ||||
|     def test_parallel(self): | ||||
|         import time | ||||
|         def echo(i): | ||||
|             time.sleep(2) | ||||
|             time.sleep(0.5) | ||||
|             return i | ||||
|         tic = time.time() | ||||
|         resp = utils.parallel(echo, [1,2,3]) | ||||
|         resp = utils.parallel(echo, [1, 2, 3]) | ||||
|         assert isinstance(resp, types.GeneratorType) | ||||
|         assert list(resp) == [1,2,3] | ||||
|         assert sorted(list(resp)) == [1, 2, 3] | ||||
|         toc = time.time() | ||||
|         assert (tic-toc) < 6000 | ||||
|         assert (tic-toc) < 600 | ||||
|         resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2) | ||||
|         assert list(resp2) == [1,2,3,4] | ||||
|          | ||||
|         assert sorted(list(resp2)) == [1, 2, 3, 4] | ||||
|  | ||||
|  | ||||
| class TestUtilsEnv(TestUtils): | ||||
|     configfile = None | ||||
|  | ||||
|     def setUp(self): | ||||
|         if 'BITTER_CONFIG' in os.environ: | ||||
|           self.oldenv = os.environ['BITTER_CONFIG'] | ||||
|         os.environ['BITTER_CONFIG'] = '' | ||||
|  | ||||
|     def tearDown(self): | ||||
|         if hasattr(self, 'oldenv'): | ||||
|             os.environ['BITTER_CONFIG'] = self.oldenv | ||||
|   | ||||
		Reference in New Issue
	
	Block a user