mirror of
https://github.com/balkian/bitter.git
synced 2025-10-25 20:58:24 +00:00
Compare commits
29 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
653487e2d7 | ||
|
|
02aec5eefa | ||
|
|
e6b08c4ffb | ||
|
|
311b972b27 | ||
|
|
7724967285 | ||
|
|
dd662acd22 | ||
|
|
5199d5b5aa | ||
|
|
6259013978 | ||
|
|
53bb7edabc | ||
|
|
57eb73b53b | ||
|
|
7c829ee102 | ||
|
|
27bc3557b2 | ||
|
|
9c82dea298 | ||
|
|
cf766a6bf3 | ||
|
|
e65f6836b3 | ||
|
|
1cb86abbdd | ||
|
|
b212a46ab7 | ||
|
|
0a0d8fd5f1 | ||
|
|
e3a78968da | ||
|
|
67ef307cce | ||
|
|
cb054ac365 | ||
|
|
bdc4690240 | ||
|
|
c0309a1e52 | ||
|
|
4afdd6807d | ||
|
|
38605ba2c8 | ||
|
|
738823c8a2 | ||
|
|
3f42879751 | ||
|
|
35f0c6376d | ||
|
|
2036d51d96 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -1,6 +1,8 @@
|
||||
__pycache__
|
||||
*.egg-info
|
||||
dist
|
||||
env
|
||||
.env
|
||||
__*
|
||||
.*
|
||||
*.pyc
|
||||
|
||||
7
Dockerfile-2.7
Normal file
7
Dockerfile-2.7
Normal file
@@ -0,0 +1,7 @@
|
||||
# onbuild copies . to /usr/src/app/
|
||||
From python:2.7-onbuild
|
||||
Maintainer J. Fernando Sánchez @balkian
|
||||
|
||||
RUN pip install -e "/usr/src/app/[server]"
|
||||
|
||||
ENTRYPOINT ["bitter"]
|
||||
7
Dockerfile-3.4
Normal file
7
Dockerfile-3.4
Normal file
@@ -0,0 +1,7 @@
|
||||
# onbuild copies . to /usr/src/app/
|
||||
From python:3.4-onbuild
|
||||
Maintainer J. Fernando Sánchez @balkian
|
||||
|
||||
RUN pip install ".[server]"
|
||||
|
||||
ENTRYPOINT ["bitter"]
|
||||
7
Dockerfile.template
Normal file
7
Dockerfile.template
Normal file
@@ -0,0 +1,7 @@
|
||||
# onbuild copies . to /usr/src/app/
|
||||
From python:{{PYVERSION}}-onbuild
|
||||
Maintainer J. Fernando Sánchez @balkian
|
||||
|
||||
RUN pip install ".[server]"
|
||||
|
||||
ENTRYPOINT ["bitter"]
|
||||
@@ -1,7 +1,10 @@
|
||||
include requirements.txt
|
||||
include requirements-py2.txt
|
||||
include test-requirements.txt
|
||||
include README.md
|
||||
include bitter/VERSION
|
||||
graft bitter/templates
|
||||
graft bitter/static
|
||||
graft test
|
||||
include tests/test*
|
||||
global-exclude *.pyc
|
||||
global-exclude __pycache__
|
||||
76
Makefile
Normal file
76
Makefile
Normal file
@@ -0,0 +1,76 @@
|
||||
PYVERSIONS=3.4 2.7
|
||||
PYMAIN=$(firstword $(PYVERSIONS))
|
||||
NAME=bitter
|
||||
REPO=balkian
|
||||
VERSION=$(shell cat $(NAME)/VERSION)
|
||||
TARNAME=$(NAME)-$(VERSION).tar.gz
|
||||
IMAGENAME=$(REPO)/$(NAME)
|
||||
IMAGEWTAG=$(IMAGENAME):$(VERSION)
|
||||
|
||||
|
||||
all: build run
|
||||
|
||||
dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS))
|
||||
|
||||
Dockerfile-%: Dockerfile.template
|
||||
sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$*
|
||||
|
||||
|
||||
dev-%:
|
||||
@docker start $(NAME)-dev$* || (\
|
||||
$(MAKE) build-$*; \
|
||||
docker run -d -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
|
||||
)\
|
||||
|
||||
docker exec -ti $(NAME)-dev$* bash
|
||||
|
||||
dev: dev-$(PYMAIN)
|
||||
|
||||
build: $(addprefix build-, $(PYMAIN))
|
||||
|
||||
buildall: $(addprefix build-, $(PYVERSIONS))
|
||||
|
||||
build-%: Dockerfile-%
|
||||
docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .;
|
||||
|
||||
test: $(addprefix test-,$(PYMAIN))
|
||||
|
||||
testall: $(addprefix test-,$(PYVERSIONS))
|
||||
|
||||
test-%: build-%
|
||||
docker run --rm -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
|
||||
|
||||
pip_test-%:
|
||||
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
|
||||
|
||||
dist/$(NAME)-$(VERSION).tar.gz:
|
||||
docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist;
|
||||
|
||||
sdist: dist/$(NAME)-$(VERSION).tar.gz
|
||||
|
||||
|
||||
upload-%: test-%
|
||||
docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
|
||||
|
||||
upload: testall $(addprefix upload-,$(PYVERSIONS))
|
||||
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)'
|
||||
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)'
|
||||
|
||||
clean:
|
||||
@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true
|
||||
@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true
|
||||
|
||||
upload_git:
|
||||
git commit -a
|
||||
git tag ${VERSION}
|
||||
git push --tags origin master
|
||||
|
||||
pip_upload:
|
||||
python setup.py sdist upload ;
|
||||
|
||||
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
|
||||
|
||||
run: build
|
||||
docker run --rm --env-file $$PWD/.env -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
|
||||
|
||||
.PHONY: test test-% build-% build test test_pip run
|
||||
152
README.md
152
README.md
@@ -1,4 +1,5 @@
|
||||
#Description
|
||||
# Description
|
||||
|
||||
There are two parts to bitter.
|
||||
First of all, it is a wrapper over Python twitter that adds support for several Twitter API credentials (e.g. authorizing the same app with different user accounts).
|
||||
Secondly, it is a command line tool to automate several actions (e.g. downloading user networks) using the wrapper.
|
||||
@@ -17,21 +18,158 @@ or
|
||||
Programmatically:
|
||||
|
||||
```python
|
||||
from bitter.crawlers import TwitterQueue
|
||||
wq = TwitterQueue.from_credentials()
|
||||
from bitter import easy
|
||||
wq = easy()
|
||||
print(wq.users.show(user_name='balkian'))
|
||||
```
|
||||
|
||||
# Credentials format
|
||||
|
||||
You can also make custom calls to the API through the command line.
|
||||
e.g. to get the latest 500 tweets by the python software foundation:
|
||||
|
||||
```
|
||||
{"user": "balkian", "consumer_secret": "xxx", "consumer_key": "xxx", "token_key": "xxx", "token_secret": "xxx"}
|
||||
bitter api statuses/user_timeline --id thepsf --count 500
|
||||
```
|
||||
|
||||
By default, bitter uses '~/.bitter-credentials.json', but you may choose a different file:
|
||||
|
||||
## Adding credentials
|
||||
|
||||
```
|
||||
python -m bitter -c <credentials_file> ...
|
||||
bitter --config <YOUR CONFIGURATION FILE> credentials add
|
||||
```
|
||||
|
||||
You can specify the parameters in the command or let the command line guide you through the process.
|
||||
|
||||
# Examples
|
||||
|
||||
## Downloading a list of tweets
|
||||
|
||||
Bitter can download tweets from a list of tweets in a CSV file.
|
||||
The result is stored as individual json files in your folder of choice.
|
||||
You can even specify the column number for tweet ids.
|
||||
Bitter will not try to download
|
||||
|
||||
```
|
||||
Usage: bitter tweet get_all [OPTIONS] TWEETSFILE
|
||||
|
||||
Download tweets from a list of tweets in a CSV file. The result is stored
|
||||
as individual json files in your folder of choice.
|
||||
|
||||
Options:
|
||||
-f, --folder TEXT
|
||||
-d, --delimiter TEXT
|
||||
-h, --header Discard the first line (use it as a header)
|
||||
-q, --quotechar TEXT
|
||||
-c, --column INTEGER
|
||||
--help Show this message and exit.
|
||||
|
||||
```
|
||||
|
||||
For instance, this will download `tweet_ids.csv` in the `tweet_info` folder:
|
||||
|
||||
```
|
||||
bitter tweet get_all -f tweet_info tweet_ids.csv
|
||||
```
|
||||
|
||||
## Downloading a list of users
|
||||
|
||||
Bitter downloads users and tweets in a similar way:
|
||||
|
||||
```
|
||||
Usage: bitter users get_all [OPTIONS] USERSFILE
|
||||
|
||||
Download users from a list of user ids/screen names in a CSV file. The
|
||||
result is stored as individual json files in your folder of choice.
|
||||
|
||||
Options:
|
||||
-f, --folder TEXT
|
||||
-d, --delimiter TEXT
|
||||
-h, --header Discard the first line (use it as a header)
|
||||
-q, --quotechar TEXT
|
||||
-c, --column INTEGER
|
||||
--help Show this message and exit.
|
||||
```
|
||||
|
||||
The only difference is that users can be downloaded via `screen_name` or `user_id`.
|
||||
This method does not try to resolve screen names to user ids, so users may be downloaded more than once if they appear in both ways.
|
||||
|
||||
## Downloading a stream
|
||||
|
||||
```
|
||||
Usage: bitter stream get [OPTIONS]
|
||||
|
||||
Options:
|
||||
-l, --locations TEXT
|
||||
-t, --track TEXT
|
||||
-f, --file TEXT File to store the stream of tweets. Default: standard output
|
||||
-p, --politelyretry Politely retry after a hangup/connection error
|
||||
--help Show this message and exit.
|
||||
```
|
||||
|
||||
```
|
||||
bitter --config .bitter.yaml stream get
|
||||
```
|
||||
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
|
||||
|
||||
|
||||
## REST queries
|
||||
|
||||
In newer versions of bitter, individual methods to download tweets/users using the REST API are being replaced with a generic method to call the API.
|
||||
|
||||
```
|
||||
bitter api <URL endpoint> --parameter VALUE ... | [--tweets | --users] [--max_count MAX_COUNT] [--count COUNT_PER_CALL]
|
||||
```
|
||||
|
||||
For instance:
|
||||
|
||||
```
|
||||
# Get 100 tweets that mentioned Obama after tweet 942689870501302300
|
||||
bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama
|
||||
```
|
||||
|
||||
That is equivalent to this call to the api: `api/1.1/searc/tweets?since_id=942689870501302300&count=100&q=Obama`.
|
||||
|
||||
|
||||
The flags `--tweets` and `--users` are optional.
|
||||
If you use them, bitter will try to intelligently fetch all the tweets/users by using pagination with the API.
|
||||
|
||||
For example:
|
||||
|
||||
```
|
||||
# Download 1000 tweets, 100 tweets per call.
|
||||
bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama --max_count=1000 --tweets
|
||||
```
|
||||
|
||||
```
|
||||
# Download all the followers of @balkian
|
||||
bitter api 'followers/list' --_id balkian --users --max_count -1
|
||||
```
|
||||
|
||||
Note that some reserved words (such as `id`) have to be preceeded by an underscore.
|
||||
This limitation is imposed by the python-twitter library.
|
||||
|
||||
# Configuration format
|
||||
|
||||
```
|
||||
credentials:
|
||||
- user: "balkian"
|
||||
consumer_secret: "xxx"
|
||||
consumer_key: "xxx"
|
||||
token_key: "xxx"
|
||||
token_secret: "xxx"
|
||||
- user: ....
|
||||
```
|
||||
|
||||
By default, bitter uses '~/.bitter.yaml', but you may choose a different file:
|
||||
|
||||
```
|
||||
python -m bitter --config <config_file> ...
|
||||
```
|
||||
|
||||
Or use an environment variable:
|
||||
|
||||
```
|
||||
export BITTER_CONFIG=$(cat myconfig.yaml)
|
||||
```
|
||||
|
||||
# Server
|
||||
|
||||
10
bin/README.md
Normal file
10
bin/README.md
Normal file
@@ -0,0 +1,10 @@
|
||||
Scripts to process jsonlines
|
||||
|
||||
To get the jsonlines file, you can use the streaming API or the search api, like so:
|
||||
|
||||
```
|
||||
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
|
||||
```
|
||||
|
||||
To keep track of the query that generated the file, you can save the command in a text file.
|
||||
For instance, the example above is also in `example_query.sh`.
|
||||
1
bin/example_query.sh
Executable file
1
bin/example_query.sh
Executable file
@@ -0,0 +1 @@
|
||||
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
|
||||
13
bin/extract-hashtags.sh
Executable file
13
bin/extract-hashtags.sh
Executable file
@@ -0,0 +1,13 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
export FIELDS="created_at,id,text"
|
||||
for i in "$@"
|
||||
do
|
||||
OUTPUT=$i.hashtags.csv
|
||||
echo "$FIELDS" > $OUTPUT
|
||||
pv -l $i -N "hashtags $i" | jq -r '. | .created_at as $created_at | .id_str as $id | .entities.hashtags | select(. != null) | .[] | [$created_at, $id, .text] | @csv' >> $OUTPUT
|
||||
done
|
||||
15
bin/extract-interactions.sh
Executable file
15
bin/extract-interactions.sh
Executable file
@@ -0,0 +1,15 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
REPLYOUTPUT=$i.replies.csv
|
||||
RTOUTPUT=$i.rts.csv
|
||||
echo 'created_at,id,user_id,reply_user_id' > $REPLYOUTPUT
|
||||
echo 'created_at,id,user_id,rt_user_id' > $RTOUTPUT
|
||||
pv -l -N "$i" $i | jq -r '. | select(.in_reply_to_user_id_str != null) | [.created_at, .id_str, .user.id_str, .in_reply_to_user_id_str] | @csv' >> $REPLYOUTPUT
|
||||
pv -l -N "$i" $i | jq -r '. | select(.retweeted_status != null) | [.created_at, .retweeted_status.id_str, .user.id_str, .retweeted_status.user.id_str] | @csv' >> $RTOUTPUT
|
||||
done
|
||||
16
bin/extract-limits.sh
Executable file
16
bin/extract-limits.sh
Executable file
@@ -0,0 +1,16 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
export QUERY='.limit | select(. != null) | [.timestamp_ms, .track] | @csv'
|
||||
|
||||
export FIELDS="timestamp,track"
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
OUTPUT=$i.limits.csv
|
||||
echo $FIELDS > $OUTPUT
|
||||
pv -N "$i limits" -l $i | jq -r "$QUERY" >> $OUTPUT
|
||||
done
|
||||
16
bin/extract-media.sh
Executable file
16
bin/extract-media.sh
Executable file
@@ -0,0 +1,16 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
export QUERY='select(.id != null) | .id_str as $id | .entities.urls[] | select(.expanded_url | select(. != null) | contains("open.spotify") or contains("youtube.com") or contains("youtu.be")) | [$id, .expanded_url] | @csv'
|
||||
|
||||
export FIELDS="id,url"
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
OUTPUT=$i.media.csv
|
||||
echo $FIELDS > $OUTPUT
|
||||
pv -N "$i media" -l $i | jq -r "$QUERY" >> $OUTPUT
|
||||
done
|
||||
28
bin/extract-users.sh
Executable file
28
bin/extract-users.sh
Executable file
@@ -0,0 +1,28 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
export USER_FIELDS="\$created_at,\
|
||||
.id_str,\
|
||||
.screen_name,\
|
||||
.followers_count,\
|
||||
.lang,\
|
||||
.description,\
|
||||
.statuses_count,\
|
||||
.favourites_count,\
|
||||
.friends_count,\
|
||||
.created_at,\
|
||||
.name,\
|
||||
.location,\
|
||||
.listed_count,\
|
||||
.time_zone\
|
||||
"
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
OUTPUT=$i.users.csv
|
||||
echo \#$USER_FIELDS > $OUTPUT
|
||||
jq -r ".created_at as \$created_at | .user,.retweeted_status.user | select(. != null) | [$USER_FIELDS] | @csv " $i | pv -N "$i" -l >> $OUTPUT
|
||||
done
|
||||
32
bin/extract.sh
Executable file
32
bin/extract.sh
Executable file
@@ -0,0 +1,32 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
FIELDS=".id_str,\
|
||||
.user.screen_name,\
|
||||
.user.id,\
|
||||
.favorite_count,\
|
||||
.retweet_count,\
|
||||
.quote_count,\
|
||||
.reply_count,\
|
||||
.created_at,\
|
||||
.lang,\
|
||||
.in_reply_to_user_id_str,\
|
||||
.in_reply_to_status_id_str,\
|
||||
.retweeted_status.id_str,\
|
||||
.retweeted_status.user.id,\
|
||||
.retweeted_status.favorite_count,\
|
||||
.retweeted_status.retweet_count,\
|
||||
.retweeted_status.quote_count,\
|
||||
.retweeted_status.reply_count,\
|
||||
.retweeted_status.created_at\
|
||||
"
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
OUTPUT=$i.tweets.csv
|
||||
echo "$FIELDS" | sed -e 's/,[ \t\n]*\./,/g' | sed -e 's/^[#]\?\.//' > $OUTPUT
|
||||
jq -r "[$FIELDS]|@csv" $i | pv -N "$i" -l >> $OUTPUT
|
||||
done
|
||||
17
bin/extract_extended.sh
Executable file
17
bin/extract_extended.sh
Executable file
@@ -0,0 +1,17 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
QUERY='.| select(.retweeted_status != null) | .retweeted_status | .id_str as $rt_id | .extended_tweet | select(. != null) | [$rt_id,.full_text]|@csv'
|
||||
HEADER='rt_id,full_text'
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
OUTPUT=$i.full_text.csv
|
||||
echo $HEADER > $OUTPUT
|
||||
jq "$QUERY" $i | pv -N "$i" -l >> $OUTPUT
|
||||
sort -u $OUTPUT -o $OUTPUT
|
||||
sed -ri 's/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g' $OUTPUT
|
||||
done
|
||||
16
bin/extract_text.sh
Executable file
16
bin/extract_text.sh
Executable file
@@ -0,0 +1,16 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
QUERY='(.full_text // .retweeted_status.full_text) as $text | [ .id_str,$text ] | @csv'
|
||||
HEADER='id,text'
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
OUTPUT=$i.text.csv
|
||||
echo $HEADER > $OUTPUT
|
||||
pv -l -N "$i" $i | jq -r "$QUERY" >> $OUTPUT
|
||||
# sed -ri s/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g $OUTPUT
|
||||
done
|
||||
10
bin/filter-edges.sh
Normal file
10
bin/filter-edges.sh
Normal file
@@ -0,0 +1,10 @@
|
||||
|
||||
if [ "$#" -lt 2 ]
|
||||
then
|
||||
echo "Find edge lines in a file that contain one of the users in a user list."
|
||||
echo ""
|
||||
echo "Usage: $0 <file with edges> <file with the list of users>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
pv -c -N 'read' "$1" | grep -F -w -f "$2" | pv -lc -N 'found'
|
||||
23
bin/functions.py
Normal file
23
bin/functions.py
Normal file
@@ -0,0 +1,23 @@
|
||||
import pandas as pd
|
||||
|
||||
def read_rts(rtsfile, tweetsfile):
|
||||
tweets = pd.read_csv(tweetsfile, index_col=0)
|
||||
rts = pd.read_csv(rtsfile, index_col=1)
|
||||
merged = rts.groupby(by=['id', 'rt_user_id']).size().rename('count').reset_index(level=1).merge(tweets, left_index=True, right_index=True)
|
||||
return merged.sort_values(by='count', ascending=False)
|
||||
|
||||
|
||||
def read_tweets(tweetsfile):
|
||||
'''When the dataset is small enough, we can load tweets as-in'''
|
||||
with open(tweetsfile) as f:
|
||||
header = f.readline().strip().split(',')
|
||||
dtypes = {}
|
||||
for key in header:
|
||||
if key.endswith('_str') or key.endswith('.id'):
|
||||
dtypes[key] = object
|
||||
tweets = pd.read_csv(tweetsfile, dtype=dtypes, index_col=0)
|
||||
return tweets
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
||||
1
bin/print-hashtags.sh
Executable file
1
bin/print-hashtags.sh
Executable file
@@ -0,0 +1 @@
|
||||
cat "$@" | awk -F"," '{print tolower($3)}' | sort | uniq -c | sort -h
|
||||
14
bin/print-replies.sh
Executable file
14
bin/print-replies.sh
Executable file
@@ -0,0 +1,14 @@
|
||||
MAX_TAGS=100
|
||||
|
||||
function get_text {
|
||||
while read line
|
||||
do
|
||||
echo $line
|
||||
rtid=$(echo $line | awk -F"," '{print $2}')
|
||||
text=$(grep -m 1 $rtid *.text.csv)
|
||||
echo "$line - $text"
|
||||
done < "/dev/stdin"
|
||||
}
|
||||
|
||||
cat "$@" | get_text
|
||||
|
||||
15
bin/print-rts.sh
Executable file
15
bin/print-rts.sh
Executable file
@@ -0,0 +1,15 @@
|
||||
MAX_TAGS=100
|
||||
|
||||
function get_text {
|
||||
while read line
|
||||
do
|
||||
echo $line
|
||||
rtid=$(echo $line | awk '{print $2}')
|
||||
count=$(echo $line | awk '{print $1}')
|
||||
text=$(grep -m 1 $rtid *.text.csv)
|
||||
echo "$line - $text"
|
||||
done < "/dev/stdin"
|
||||
}
|
||||
|
||||
cat "$@" | awk -F"," '{print tolower($2)}' | sort | uniq -c | sort -h | tail -n $MAX_TAGS | get_text
|
||||
|
||||
@@ -1 +1 @@
|
||||
0.5.4
|
||||
0.9.5
|
||||
|
||||
@@ -3,15 +3,14 @@ Bitter module. A library and cli for Twitter using python-twitter.
|
||||
http://github.com/balkian/bitter
|
||||
"""
|
||||
|
||||
try:
|
||||
from future.standard_library import install_aliases
|
||||
install_aliases()
|
||||
except ImportError:
|
||||
# Avoid problems at setup.py and py3.x
|
||||
pass
|
||||
|
||||
import os
|
||||
|
||||
from .version import __version__
|
||||
|
||||
def easy(*args, **kwargs):
|
||||
from .crawlers import TwitterQueue
|
||||
return TwitterQueue.from_credentials(*args, **kwargs)
|
||||
|
||||
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]
|
||||
|
||||
|
||||
|
||||
352
bitter/cli.py
352
bitter/cli.py
@@ -1,3 +1,5 @@
|
||||
from __future__ import print_function
|
||||
|
||||
import click
|
||||
import json
|
||||
import os
|
||||
@@ -6,10 +8,12 @@ import time
|
||||
import sqlalchemy.types
|
||||
import threading
|
||||
import sqlite3
|
||||
from tqdm import tqdm
|
||||
|
||||
from sqlalchemy import exists
|
||||
|
||||
from bitter import utils, models, crawlers
|
||||
from bitter import config as bconf
|
||||
from bitter.models import make_session, User, ExtractorEntry, Following
|
||||
|
||||
import sys
|
||||
@@ -17,7 +21,7 @@ if sys.version_info <= (3, 0):
|
||||
from contextlib2 import ExitStack
|
||||
else:
|
||||
from contextlib import ExitStack
|
||||
|
||||
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -25,47 +29,149 @@ logger = logging.getLogger(__name__)
|
||||
@click.group()
|
||||
@click.option("--verbose", is_flag=True)
|
||||
@click.option("--logging_level", required=False, default='WARN')
|
||||
@click.option("--config", required=False)
|
||||
@click.option('-c', '--credentials', show_default=True, default='~/.bitter-credentials.json')
|
||||
@click.option('--config', show_default=True, default=bconf.CONFIG_FILE)
|
||||
@click.option('--credentials', show_default=True, help="DEPRECATED: If specified, these credentials will be copied to the configuratation file.", default=bconf.CREDENTIALS)
|
||||
@click.pass_context
|
||||
def main(ctx, verbose, logging_level, config, credentials):
|
||||
logging.basicConfig(level=getattr(logging, logging_level))
|
||||
ctx.obj = {}
|
||||
ctx.obj['VERBOSE'] = verbose
|
||||
ctx.obj['CONFIG'] = config
|
||||
ctx.obj['CREDENTIALS'] = credentials
|
||||
utils.create_credentials(credentials)
|
||||
bconf.CONFIG_FILE = config
|
||||
bconf.CREDENTIALS = credentials
|
||||
if os.path.exists(utils.get_config_path(credentials)):
|
||||
utils.copy_credentials_to_config(credentials, config)
|
||||
|
||||
|
||||
@main.group(invoke_without_command=True)
|
||||
@click.pass_context
|
||||
def credentials(ctx):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
for worker in wq.queue:
|
||||
print('#'*20)
|
||||
try:
|
||||
resp = worker.client.application.rate_limit_status()
|
||||
print(worker.name)
|
||||
except Exception as ex:
|
||||
print('{}: AUTHENTICATION ERROR: {}'.format(worker.name, ex) )
|
||||
|
||||
|
||||
@credentials.command('limits')
|
||||
@click.option('--all', type=bool, default=False, required=False,
|
||||
help=('Print all limits. By default, it only limits that '
|
||||
'have been consumed will be shown.'))
|
||||
@click.argument('url', required=False)
|
||||
@click.pass_context
|
||||
def get_limits(ctx, all, url):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
total = {}
|
||||
for worker in wq.queue:
|
||||
resp = worker.client.application.rate_limit_status()
|
||||
print('#'*20)
|
||||
print(worker.name)
|
||||
if url:
|
||||
limit = 'NOT FOUND'
|
||||
print('URL is: {}'.format(url))
|
||||
cat = url.split('/')[1]
|
||||
if cat in resp['resources']:
|
||||
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
|
||||
else:
|
||||
print('Cat {} not found'.format(cat))
|
||||
continue
|
||||
for k in limit:
|
||||
total[k] = total.get(k, 0) + limit[k]
|
||||
print('{}: {}'.format(url, limit))
|
||||
continue
|
||||
nres = {}
|
||||
if not all:
|
||||
for res, urls in resp['resources'].items():
|
||||
nurls = {}
|
||||
for u, limits in urls.items():
|
||||
if limits['limit'] != limits['remaining']:
|
||||
nurls[u] = limits
|
||||
if nurls:
|
||||
nres[res] = nurls
|
||||
resp = nres
|
||||
print(json.dumps(resp, indent=2))
|
||||
if url:
|
||||
print('Total for {}: {}'.format(url, total))
|
||||
|
||||
@credentials.command('add')
|
||||
@click.option('--consumer_key', default=None)
|
||||
@click.option('--consumer_secret', default=None)
|
||||
@click.option('--token_key', default=None)
|
||||
@click.option('--token_secret', default=None)
|
||||
@click.argument('user_name')
|
||||
def add(user_name, consumer_key, consumer_secret, token_key, token_secret):
|
||||
if not consumer_key:
|
||||
consumer_key = click.prompt('Please, enter your YOUR CONSUMER KEY')
|
||||
if not consumer_secret:
|
||||
consumer_secret = click.prompt('Please, enter your CONSUMER SECRET')
|
||||
if not token_key:
|
||||
token_key = click.prompt('Please, enter your ACCESS TOKEN')
|
||||
if not token_secret:
|
||||
token_secret = click.prompt('Please, enter your ACCESS TOKEN SECRET')
|
||||
utils.add_credentials(conffile=bconf.CONFIG_FILE, user=user_name, consumer_key=consumer_key, consumer_secret=consumer_secret,
|
||||
token_key=token_key, token_secret=token_secret)
|
||||
click.echo('Credentials added for {}'.format(user_name))
|
||||
|
||||
|
||||
@main.group()
|
||||
@click.pass_context
|
||||
@click.pass_context
|
||||
def tweet(ctx):
|
||||
pass
|
||||
|
||||
@tweet.command('get')
|
||||
@click.option('-w', '--write', is_flag=True, default=False)
|
||||
@click.option('-f', '--folder', default="tweets")
|
||||
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
|
||||
@click.argument('tweetid')
|
||||
@click.pass_context
|
||||
def get_tweet(ctx, tweetid):
|
||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
||||
t = utils.get_tweet(wq, tweetid)
|
||||
print(json.dumps(t, indent=2))
|
||||
|
||||
def get_tweet(tweetid, write, folder, update):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
utils.download_tweet(wq, tweetid, write, folder, update)
|
||||
|
||||
@tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file.
|
||||
The result is stored as individual json files in your folder of choice.''')
|
||||
@click.argument('tweetsfile', 'File with a list of tweets to look up')
|
||||
@click.option('-f', '--folder', default="tweets")
|
||||
@click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!')
|
||||
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
|
||||
@click.option('-d', '--delimiter', default=",")
|
||||
@click.option('-h', '--header', help='Discard the first line (use it as a header)',
|
||||
is_flag=True, default=False)
|
||||
@click.option('-q', '--quotechar', default='"')
|
||||
@click.option('-c', '--column', type=int, default=0)
|
||||
@click.pass_context
|
||||
def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotechar, column):
|
||||
if update and not click.confirm('This may overwrite existing tweets. Continue?'):
|
||||
click.echo('Cancelling')
|
||||
return
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
|
||||
status = tqdm('Queried')
|
||||
failed = 0
|
||||
for tid, obj in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter,
|
||||
batch_method=utils.tweet_download_batch,
|
||||
header=header, quotechar=quotechar,
|
||||
column=column, update=update, retry_failed=retry):
|
||||
status.update(1)
|
||||
if not obj:
|
||||
failed += 1
|
||||
status.set_description('Failed: %s. Queried' % failed, refresh=True)
|
||||
|
||||
@tweet.command('search')
|
||||
@click.argument('query')
|
||||
@click.pass_context
|
||||
def get_tweet(ctx, query):
|
||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
||||
c = wq.next()
|
||||
t = utils.search_tweet(c.client, query)
|
||||
@click.pass_context
|
||||
def search(ctx, query):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
t = utils.search_tweet(wq, query)
|
||||
print(json.dumps(t, indent=2))
|
||||
|
||||
@tweet.command('timeline')
|
||||
@click.argument('user')
|
||||
@click.pass_context
|
||||
def get_tweet(ctx, user):
|
||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
||||
c = wq.next()
|
||||
t = utils.user_timeline(c.client, user)
|
||||
@click.pass_context
|
||||
def timeline(ctx, user):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
t = utils.user_timeline(wq, user)
|
||||
print(json.dumps(t, indent=2))
|
||||
|
||||
@main.group()
|
||||
@@ -84,23 +190,61 @@ def list_users(ctx, db):
|
||||
for j in i.__dict__:
|
||||
print('\t{}: {}'.format(j, getattr(i,j)))
|
||||
|
||||
@users.command('get_one')
|
||||
@click.argument('user')
|
||||
@click.pass_context
|
||||
def get_user(ctx, user):
|
||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
||||
c = wq.next()
|
||||
u = utils.get_user(c.client, user)
|
||||
print(json.dumps(u, indent=2))
|
||||
|
||||
@users.command('get')
|
||||
@click.argument('user')
|
||||
@click.option('-w', '--write', is_flag=True, default=False)
|
||||
@click.option('-f', '--folder', default="users")
|
||||
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
|
||||
def get_user(user, write, folder, update):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
if not write:
|
||||
u = utils.get_user(wq, user)
|
||||
js = json.dumps(u, indent=2)
|
||||
print(js)
|
||||
return
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
file = os.path.join(folder, '%s.json' % user)
|
||||
if not update and os.path.exists(file) and os.path.isfile(file):
|
||||
print('User exists: %s' % user)
|
||||
return
|
||||
with open(file, 'w') as f:
|
||||
u = utils.get_user(wq, user)
|
||||
js = json.dumps(u, indent=2)
|
||||
print(js, file=f)
|
||||
|
||||
@users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file.
|
||||
The result is stored as individual json files in your folder of choice.''')
|
||||
@click.argument('usersfile', 'File with a list of users to look up')
|
||||
@click.option('-f', '--folder', default="users")
|
||||
@click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!')
|
||||
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
|
||||
@click.option('-d', '--delimiter', default=",")
|
||||
@click.option('-h', '--header', help='Discard the first line (use it as a header)',
|
||||
is_flag=True, default=False)
|
||||
@click.option('-q', '--quotechar', default='"')
|
||||
@click.option('-c', '--column', type=int, default=0)
|
||||
@click.pass_context
|
||||
def get_users(ctx, usersfile, folder, update, retry, delimiter, header, quotechar, column):
|
||||
if update and not click.confirm('This may overwrite existing users. Continue?'):
|
||||
click.echo('Cancelling')
|
||||
return
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
for i in utils.download_file(wq, usersfile, folder, delimiter=delimiter,
|
||||
batch_method=utils.user_download_batch,
|
||||
update=update, retry_failed=retry,
|
||||
header=header, quotechar=quotechar,
|
||||
column=column):
|
||||
pass
|
||||
|
||||
@users.command('crawl')
|
||||
@click.option('--db', required=True, help='Database to save all users.')
|
||||
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
|
||||
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
|
||||
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
|
||||
@click.argument('usersfile', 'File with a list of users to look up')
|
||||
@click.pass_context
|
||||
def get_users(ctx, usersfile, skip, until, threads, db):
|
||||
def crawl_users(ctx, usersfile, skip, until, threads, db):
|
||||
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
|
||||
|
||||
if '://' not in db:
|
||||
@@ -112,7 +256,7 @@ def get_users(ctx, usersfile, skip, until, threads, db):
|
||||
return ExitStack()
|
||||
|
||||
|
||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
|
||||
len(wq.queue)))
|
||||
|
||||
@@ -203,14 +347,9 @@ def get_users(ctx, usersfile, skip, until, threads, db):
|
||||
speed = (collected-lastcollected)/10
|
||||
with statslock:
|
||||
lastcollected = collected
|
||||
|
||||
|
||||
logger.info('Done!')
|
||||
|
||||
@main.group('api')
|
||||
def api():
|
||||
pass
|
||||
|
||||
|
||||
@main.group('extractor')
|
||||
@click.pass_context
|
||||
@click.option('--db', required=True, help='Database of users.')
|
||||
@@ -259,7 +398,7 @@ def network_extractor(ctx, as_json):
|
||||
if as_json:
|
||||
import json
|
||||
print(json.dumps(follower_map, indent=4))
|
||||
|
||||
|
||||
|
||||
@extractor.command('users')
|
||||
@click.pass_context
|
||||
@@ -281,7 +420,7 @@ def users_extractor(ctx):
|
||||
@click.pass_context
|
||||
def extract(ctx, recursive, user, name, initfile):
|
||||
print(locals())
|
||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
dburi = ctx.obj['DBURI']
|
||||
utils.extract(wq,
|
||||
recursive=recursive,
|
||||
@@ -293,43 +432,122 @@ def extract(ctx, recursive, user, name, initfile):
|
||||
@extractor.command('reset')
|
||||
@click.pass_context
|
||||
def reset_extractor(ctx):
|
||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
db = ctx.obj['DBURI']
|
||||
session = make_session(db)
|
||||
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
|
||||
|
||||
@api.command('limits')
|
||||
@click.argument('url', required=False)
|
||||
|
||||
@main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False),
|
||||
help='''Issue a call to an endpoint of the Twitter API.''')
|
||||
@click.argument('cmd', nargs=1)
|
||||
@click.option('--tweets', is_flag=True, help='Fetch more tweets using smart pagination. Use --count to control how many tweets to fetch per call, and --max_count to set the number of desired tweets (or -1 to get as many as possible).', type=bool, default=False)
|
||||
@click.option('--users', is_flag=True, help='Fetch more users using smart pagination. Use --count to control how many users to fetch per call, and --max_count to set the number of desired users (or -1 to get as many as possible).', type=bool, default=False)
|
||||
@click.argument('api_args', nargs=-1, type=click.UNPROCESSED)
|
||||
@click.pass_context
|
||||
def get_limits(ctx, url):
|
||||
wq = crawlers.TwitterQueue.from_credentials(ctx.obj['CREDENTIALS'])
|
||||
for worker in wq.queue:
|
||||
resp = worker.client.application.rate_limit_status()
|
||||
print('#'*20)
|
||||
print(worker.name)
|
||||
if url:
|
||||
limit = 'NOT FOUND'
|
||||
print('URL is: {}'.format(url))
|
||||
cat = url.split('/')[1]
|
||||
if cat in resp['resources']:
|
||||
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
|
||||
else:
|
||||
print('Cat {} not found'.format(cat))
|
||||
print('{}: {}'.format(url, limit))
|
||||
else:
|
||||
print(json.dumps(resp, indent=2))
|
||||
def api(ctx, cmd, tweets, users, api_args):
|
||||
opts = {}
|
||||
mappings = {
|
||||
'id': '_id'
|
||||
}
|
||||
i = iter(api_args)
|
||||
for k, v in zip(i, i):
|
||||
k = k.replace('--', '')
|
||||
if k in mappings:
|
||||
k = mappings[k]
|
||||
opts[k] = v
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
if tweets:
|
||||
resp = utils.consume_tweets(wq[cmd], **opts)
|
||||
elif users:
|
||||
resp = utils.consume_users(wq[cmd], **opts)
|
||||
else:
|
||||
resp = wq[cmd](**opts)
|
||||
print(json.dumps(resp))
|
||||
return
|
||||
for i in resp:
|
||||
print(json.dumps(i))
|
||||
|
||||
|
||||
@main.command('server')
|
||||
@click.argument('CONSUMER_KEY', required=True)
|
||||
@click.argument('CONSUMER_SECRET', required=True)
|
||||
@click.pass_context
|
||||
def run_server(ctx, consumer_key, consumer_secret):
|
||||
from . import config
|
||||
config.CONSUMER_KEY = consumer_key
|
||||
config.CONSUMER_SECRET = consumer_secret
|
||||
bconf.CONSUMER_KEY = consumer_key
|
||||
bconf.CONSUMER_SECRET = consumer_secret
|
||||
from .webserver import app
|
||||
app.run()
|
||||
|
||||
app.run(host='0.0.0.0')
|
||||
|
||||
@main.group()
|
||||
@click.pass_context
|
||||
def stream(ctx):
|
||||
pass
|
||||
|
||||
@stream.command('get')
|
||||
@click.option('-l', '--locations', default=None)
|
||||
@click.option('-t', '--track', default=None)
|
||||
@click.option('-f', '--file', default=None, help='File to store the stream of tweets')
|
||||
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
|
||||
@click.pass_context
|
||||
def get_stream(ctx, locations, track, file, politelyretry):
|
||||
wq = crawlers.StreamQueue.from_config(conffile=bconf.CONFIG_FILE, max_workers=1)
|
||||
|
||||
query_args = {}
|
||||
if locations:
|
||||
query_args['locations'] = locations
|
||||
if track:
|
||||
query_args['track'] = track
|
||||
if not file:
|
||||
file = sys.stdout
|
||||
else:
|
||||
file = open(file, 'a')
|
||||
|
||||
def insist():
|
||||
lasthangup = time.time()
|
||||
while True:
|
||||
if not query_args:
|
||||
iterator = wq.statuses.sample()
|
||||
else:
|
||||
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
|
||||
try:
|
||||
for i in iterator:
|
||||
yield i
|
||||
if not politelyretry:
|
||||
return
|
||||
except Exception:
|
||||
if not politelyretry:
|
||||
raise ex
|
||||
thishangup = time.time()
|
||||
if thishangup - lasthangup < 60:
|
||||
raise Exception('Too many hangups in a row.')
|
||||
time.sleep(3)
|
||||
|
||||
for tweet in tqdm(insist()):
|
||||
print(json.dumps(tweet), file=file)
|
||||
if file != sys.stdout:
|
||||
file.close()
|
||||
|
||||
@stream.command('read')
|
||||
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
|
||||
@click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False)
|
||||
@click.pass_context
|
||||
def read_stream(ctx, file, tail):
|
||||
for tweet in utils.read_file(file, tail=tail):
|
||||
try:
|
||||
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['created_at'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
|
||||
except (KeyError, TypeError):
|
||||
print('Raw tweet: {}'.format(tweet))
|
||||
|
||||
@stream.command('tags')
|
||||
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
|
||||
@click.argument('limit', required=False, default=None, type=int)
|
||||
@click.pass_context
|
||||
def tags_stream(ctx, file, limit):
|
||||
c = utils.get_hashtags(utils.read_file(file))
|
||||
for count, tag in c.most_common(limit):
|
||||
print(u'{} - {}'.format(count, tag))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
@@ -3,7 +3,7 @@ Common configuration for other modules.
|
||||
It is not elegant, but it works with flask and the oauth decorators.
|
||||
|
||||
Using this module allows you to change the config before loading any other module.
|
||||
E.g.:
|
||||
E.g.:
|
||||
|
||||
import bitter.config as c
|
||||
c.CREDENTIALS="/tmp/credentials"
|
||||
@@ -11,3 +11,4 @@ E.g.:
|
||||
app.run()
|
||||
'''
|
||||
CREDENTIALS = '~/.bitter-credentials.json'
|
||||
CONFIG_FILE = '~/.bitter.yaml'
|
||||
|
||||
@@ -9,6 +9,13 @@ logger = logging.getLogger(__name__)
|
||||
from twitter import *
|
||||
from collections import OrderedDict
|
||||
from threading import Lock
|
||||
from itertools import islice
|
||||
from functools import partial
|
||||
try:
|
||||
import itertools.ifilter as filter
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
from . import utils
|
||||
from . import config
|
||||
|
||||
@@ -32,35 +39,123 @@ class AttrToFunc(object):
|
||||
else:
|
||||
return extend_call(k)
|
||||
|
||||
def __getitem__(self, k):
|
||||
return partial(self.handler, self.__uriparts+k.split('/'))
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
# for i, a in enumerate(args)e
|
||||
# kwargs[i] = a
|
||||
return self.handler(self.__uriparts, *args, **kwargs)
|
||||
|
||||
|
||||
class FromCredentialsMixin(object):
|
||||
|
||||
@classmethod
|
||||
def from_credentials(cls, cred_file=None, max_workers=None):
|
||||
wq = cls()
|
||||
|
||||
for cred in islice(utils.get_credentials(cred_file), max_workers):
|
||||
wq.ready(cls.worker_class(cred["user"], cred))
|
||||
return wq
|
||||
|
||||
class FromConfigMixin(object):
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, config=None, conffile=None, max_workers=None):
|
||||
wq = cls()
|
||||
|
||||
if not config:
|
||||
with utils.config(conffile) as c:
|
||||
config = c
|
||||
for cred in islice(config['credentials'], max_workers):
|
||||
wq.ready(cls.worker_class(cred["user"], cred))
|
||||
return wq
|
||||
|
||||
class TwitterWorker(object):
|
||||
def __init__(self, name, client):
|
||||
api_class = None
|
||||
|
||||
def __init__(self, name, creds):
|
||||
self.name = name
|
||||
self.client = client
|
||||
self.throttled_time = False
|
||||
self._client = None
|
||||
self.cred = creds
|
||||
self._lock = Lock()
|
||||
self.busy = False
|
||||
|
||||
@property
|
||||
def throttled(self):
|
||||
if not self.throttled_time:
|
||||
return False
|
||||
t = time.time()
|
||||
delta = self.throttled_time - t
|
||||
if delta > 0:
|
||||
return True
|
||||
return False
|
||||
def client(self):
|
||||
if not self._client:
|
||||
auth=OAuth(self.cred['token_key'],
|
||||
self.cred['token_secret'],
|
||||
self.cred['consumer_key'],
|
||||
self.cred['consumer_secret'])
|
||||
self._client = self.api_class(auth=auth)
|
||||
return self._client
|
||||
|
||||
def throttle_until(self, epoch=None):
|
||||
self.throttled_time = int(epoch)
|
||||
logger.info("Worker %s throttled for %s seconds" % (self.name, str(epoch-time.time())))
|
||||
def __repr__(self):
|
||||
msg = '<{} for {}>'.format(self.__class__.__name__, self.name)
|
||||
if self.busy:
|
||||
msg += ' [busy]'
|
||||
return msg
|
||||
|
||||
class RestWorker(TwitterWorker):
|
||||
api_class = Twitter
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(RestWorker, self).__init__(*args, **kwargs)
|
||||
self._limits = None
|
||||
|
||||
@property
|
||||
def limits(self):
|
||||
if not self._limits:
|
||||
self._limits = self.client.application.rate_limit_status()
|
||||
return self._limits
|
||||
|
||||
def is_limited(self, uriparts):
|
||||
return self.get_wait(uriparts)>0
|
||||
|
||||
def get_wait(self, uriparts):
|
||||
limits = self.get_limit(uriparts)
|
||||
if limits.get('remaining', 1) > 0:
|
||||
return 0
|
||||
reset = limits.get('reset', 0)
|
||||
now = time.time()
|
||||
return max(0, (reset-now))
|
||||
|
||||
def get_limit(self, uriparts):
|
||||
uriparts = list(u for u in uriparts if u)
|
||||
uri = '/'+'/'.join(uriparts)
|
||||
for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items():
|
||||
if ix.startswith(uri):
|
||||
return i
|
||||
return {}
|
||||
|
||||
def set_limit(self, uriparts, value):
|
||||
uri = '/'+'/'.join(uriparts)
|
||||
if 'resources' not in self.limits:
|
||||
self.limits['resources'] = {}
|
||||
resources = self.limits['resources']
|
||||
if uriparts[0] not in resources:
|
||||
resources[uriparts[0]] = {}
|
||||
resource = resources[uriparts[0]]
|
||||
resource[uri] = value
|
||||
|
||||
def update_limits(self, uriparts, remaining, reset, limit):
|
||||
self.set_limit(uriparts, {'remaining': remaining,
|
||||
'reset': reset,
|
||||
'limit': limit})
|
||||
|
||||
def update_limits_from_headers(self, uriparts, headers):
|
||||
reset = float(headers.get('X-Rate-Limit-Reset', time.time() + 30))
|
||||
remaining = int(headers.get('X-Rate-Limit-Remaining', 0))
|
||||
limit = int(headers.get('X-Rate-Limit-Limit', -1))
|
||||
self.update_limits(uriparts=uriparts, remaining=remaining, reset=reset, limit=limit)
|
||||
|
||||
|
||||
class TwitterQueue(AttrToFunc):
|
||||
|
||||
class QueueException(BaseException):
|
||||
pass
|
||||
|
||||
class QueueMixin(AttrToFunc, FromCredentialsMixin, FromConfigMixin):
|
||||
def __init__(self, wait=True):
|
||||
logger.debug('Creating worker queue')
|
||||
self.queue = set()
|
||||
@@ -71,77 +166,119 @@ class TwitterQueue(AttrToFunc):
|
||||
def ready(self, worker):
|
||||
self.queue.add(worker)
|
||||
|
||||
class TwitterQueue(QueueMixin):
|
||||
|
||||
worker_class = RestWorker
|
||||
|
||||
def handle_call(self, uriparts, *args, **kwargs):
|
||||
logger.debug('Called: {}'.format(uriparts))
|
||||
logger.debug('With: {} {}'.format(args, kwargs))
|
||||
while True:
|
||||
patience = 1
|
||||
while patience:
|
||||
c = None
|
||||
try:
|
||||
c = self.next()
|
||||
c = self.next(uriparts)
|
||||
c._lock.acquire()
|
||||
c.busy = True
|
||||
logger.debug('Next: {}'.format(c.name))
|
||||
ping = time.time()
|
||||
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
|
||||
pong = time.time()
|
||||
c.update_limits_from_headers(uriparts, resp.headers)
|
||||
logger.debug('Took: {}'.format(pong-ping))
|
||||
return resp
|
||||
except TwitterHTTPError as ex:
|
||||
if ex.e.code in (429, 502, 503, 504):
|
||||
limit = ex.e.headers.get('X-Rate-Limit-Reset', time.time() + 30)
|
||||
logger.info('{} limited'.format(c.name))
|
||||
c.throttle_until(limit)
|
||||
c.update_limits_from_headers(uriparts, ex.e.headers)
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
except urllib.error.URLError as ex:
|
||||
time.sleep(5)
|
||||
logger.info('Something fishy happened: {}'.format(ex))
|
||||
raise
|
||||
finally:
|
||||
if c:
|
||||
c.busy = False
|
||||
c._lock.release()
|
||||
|
||||
if not self.wait:
|
||||
patience -= 1
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
return self.next().client
|
||||
def get_wait(self, uriparts):
|
||||
# Stop as soon as one is available to avoid initiating the rest
|
||||
for i in self.queue:
|
||||
if not i.busy and i.get_wait(uriparts) == 0:
|
||||
return 0
|
||||
# If None is available, let's see how much we have to wait
|
||||
available = filter(lambda x: not x.busy, self.queue)
|
||||
diff = min(worker.get_wait(uriparts) for worker in self.queue if not worker.busy)
|
||||
return diff
|
||||
|
||||
|
||||
@classmethod
|
||||
def from_credentials(self, cred_file=None):
|
||||
wq = TwitterQueue()
|
||||
|
||||
for cred in utils.get_credentials(cred_file):
|
||||
c = Twitter(auth=OAuth(cred['token_key'],
|
||||
cred['token_secret'],
|
||||
cred['consumer_key'],
|
||||
cred['consumer_secret']))
|
||||
wq.ready(TwitterWorker(cred["user"], c))
|
||||
return wq
|
||||
|
||||
def _next(self):
|
||||
def _next(self, uriparts):
|
||||
logger.debug('Getting next available')
|
||||
s = list(self.queue)
|
||||
random.shuffle(s)
|
||||
for worker in s:
|
||||
if not worker.throttled and not worker.busy:
|
||||
if not worker.is_limited(uriparts) and not worker.busy:
|
||||
return worker
|
||||
raise Exception('No worker is available')
|
||||
raise QueueException('No worker is available')
|
||||
|
||||
def next(self):
|
||||
def next(self, uriparts):
|
||||
if not self.wait:
|
||||
return self._next()
|
||||
return self._next(uriparts)
|
||||
while True:
|
||||
try:
|
||||
return self._next()
|
||||
except Exception:
|
||||
return self._next(uriparts)
|
||||
except QueueException:
|
||||
available = filter(lambda x: not x.busy, self.queue)
|
||||
if available:
|
||||
first_worker = min(available, key=lambda x: x.throttled_time)
|
||||
diff = first_worker.throttled_time - time.time()
|
||||
diff = self.get_wait(uriparts)
|
||||
logger.info("All workers are throttled. Waiting %s seconds" % diff)
|
||||
else:
|
||||
diff = 5
|
||||
logger.info("All workers are busy. Waiting %s seconds" % diff)
|
||||
time.sleep(diff)
|
||||
|
||||
class StreamWorker(TwitterWorker):
|
||||
api_class = TwitterStream
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(StreamWorker, self).__init__(*args, **kwargs)
|
||||
|
||||
class StreamQueue(QueueMixin):
|
||||
worker_class = StreamWorker
|
||||
|
||||
def __init__(self, wait=True):
|
||||
logger.debug('Creating worker queue')
|
||||
self.queue = set()
|
||||
self.index = 0
|
||||
self.wait = wait
|
||||
AttrToFunc.__init__(self, handler=self.handle_call)
|
||||
|
||||
def handle_call(self, uriparts, *args, **kwargs):
|
||||
logger.debug('Called: {}'.format(uriparts))
|
||||
logger.debug('With: {} {}'.format(args, kwargs))
|
||||
c = None
|
||||
c = self.next(uriparts)
|
||||
c._lock.acquire()
|
||||
c.busy = True
|
||||
logger.debug('Next: {}'.format(c.name))
|
||||
ping = time.time()
|
||||
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
|
||||
for i in resp:
|
||||
yield i
|
||||
pong = time.time()
|
||||
logger.debug('Listening for: {}'.format(pong-ping))
|
||||
c.busy = False
|
||||
c._lock.release()
|
||||
|
||||
def next(self, uriparts):
|
||||
logger.debug('Getting next available')
|
||||
s = list(self.queue)
|
||||
random.shuffle(s)
|
||||
for worker in s:
|
||||
if not worker.busy:
|
||||
return worker
|
||||
raise QueueException('No worker is available')
|
||||
|
||||
@@ -3,10 +3,13 @@ import json
|
||||
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.types import BigInteger, Integer, Text, Boolean
|
||||
from sqlalchemy.schema import ForeignKey
|
||||
from sqlalchemy.pool import SingletonThreadPool
|
||||
from sqlalchemy import Column, Index
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from functools import wraps
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
@@ -85,34 +88,48 @@ class ExtractorEntry(Base):
|
||||
user = Column(BigInteger, index=True)
|
||||
cursor = Column(BigInteger, default=-1)
|
||||
pending = Column(Boolean, default=False)
|
||||
errors = Column(Text, default="")
|
||||
busy = Column(Boolean, default=False)
|
||||
|
||||
|
||||
class Search(Base):
|
||||
__tablename__ = 'search_queries'
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True, unique=True)
|
||||
endpoint = Column(Text, comment="Endpoint URL")
|
||||
attrs = Column(Text, comment="Text version of the dictionary of parameters")
|
||||
count = Column(Integer)
|
||||
current_count = Column(Integer)
|
||||
current_id = Column(BigInteger, comment='Oldest ID retrieved (should match max_id when done)')
|
||||
since_id = Column(BigInteger)
|
||||
|
||||
class SearchResults(Base):
|
||||
__tablename__ = 'search_results'
|
||||
id = Column(Integer, primary_key=True, index=True, unique=True)
|
||||
search_id = Column(ForeignKey('search_queries.id'))
|
||||
resource_id = Column(Text)
|
||||
|
||||
def memoize(f):
|
||||
memo = {}
|
||||
@wraps(f)
|
||||
def helper(self, **kwargs):
|
||||
st = dict_to_str(kwargs)
|
||||
key = (self.__uriparts, st)
|
||||
if key not in memo:
|
||||
memo[key] = f(self, **kwargs)
|
||||
return memo[key]
|
||||
return helper
|
||||
|
||||
def make_session(url):
|
||||
engine = create_engine(url)#, echo=True)
|
||||
if not isinstance(url, str):
|
||||
print(url)
|
||||
raise Exception("FUCK")
|
||||
engine = create_engine(url, poolclass=SingletonThreadPool)#, echo=True)
|
||||
Base.metadata.create_all(engine)
|
||||
Session = sessionmaker(bind=engine)
|
||||
session = Session()
|
||||
return session
|
||||
|
||||
|
||||
|
||||
def test(db='sqlite:///users.db'):
|
||||
|
||||
from sqlalchemy import exists
|
||||
session = make_session(db)
|
||||
|
||||
our_user = session.query(User).first()
|
||||
|
||||
print(our_user.name)
|
||||
print(session.query(User).count())
|
||||
fake_user = User(name="Fake user")
|
||||
session.add(fake_user)
|
||||
session.commit()
|
||||
print(session.query(User).count())
|
||||
print(session.query(exists().where(User.name == "Fake user")).scalar())
|
||||
fake_committed = session.query(User).filter_by(name="Fake user").first()
|
||||
print(fake_committed.id)
|
||||
print(fake_committed.name)
|
||||
session.delete(fake_committed)
|
||||
session.commit()
|
||||
print(session.query(User).count())
|
||||
print(list(session.execute('SELECT 1 from users where id=\'%s\'' % 1548)))
|
||||
def dict_to_str(args):
|
||||
return json.dumps(args, sort_keys=True)
|
||||
|
||||
740
bitter/utils.py
740
bitter/utils.py
@@ -1,6 +1,11 @@
|
||||
from __future__ import print_function
|
||||
|
||||
import logging
|
||||
import time
|
||||
import json
|
||||
import yaml
|
||||
import csv
|
||||
import io
|
||||
|
||||
import signal
|
||||
import sys
|
||||
@@ -8,10 +13,23 @@ import sqlalchemy
|
||||
import os
|
||||
import multiprocessing
|
||||
from multiprocessing.pool import ThreadPool
|
||||
from multiprocessing import Queue
|
||||
|
||||
from itertools import islice
|
||||
import queue
|
||||
import threading
|
||||
from select import select
|
||||
|
||||
from functools import partial
|
||||
|
||||
from tqdm import tqdm
|
||||
|
||||
from itertools import islice, chain
|
||||
from contextlib import contextmanager
|
||||
from itertools import zip_longest
|
||||
|
||||
from collections import Counter
|
||||
from random import choice
|
||||
|
||||
from builtins import map, filter
|
||||
|
||||
from twitter import TwitterHTTPError
|
||||
|
||||
@@ -19,6 +37,12 @@ from bitter.models import Following, User, ExtractorEntry, make_session
|
||||
|
||||
from bitter import config
|
||||
|
||||
# Fix Python 2.x.
|
||||
try:
|
||||
UNICODE_EXISTS = bool(type(unicode))
|
||||
except NameError:
|
||||
unicode = lambda s: str(s)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -26,39 +50,93 @@ def signal_handler(signal, frame):
|
||||
logger.info('You pressed Ctrl+C!')
|
||||
sys.exit(0)
|
||||
|
||||
def chunk(iterable, n, fillvalue=None):
|
||||
args = [iter(iterable)] * n
|
||||
return zip_longest(*args, fillvalue=fillvalue)
|
||||
|
||||
def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()):
|
||||
if chunksize:
|
||||
source = chunk(source, chunksize)
|
||||
p = ThreadPool(numcpus)
|
||||
for i in p.imap(func, source):
|
||||
def chunk(iterable, n):
|
||||
it = iter(iterable)
|
||||
return iter(lambda: tuple(islice(it, n)), ())
|
||||
|
||||
|
||||
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
|
||||
source = chunk(source, chunksize)
|
||||
p = ThreadPool(numcpus*2)
|
||||
results = p.imap_unordered(func, source)
|
||||
for i in chain.from_iterable(results):
|
||||
yield i
|
||||
|
||||
def get_credentials_path(credfile=None):
|
||||
if not credfile:
|
||||
if config.CREDENTIALS:
|
||||
credfile = config.CREDENTIALS
|
||||
|
||||
def get_config_path(conf=None):
|
||||
if not conf:
|
||||
if config.CONFIG_FILE:
|
||||
conf = config.CONFIG_FILE
|
||||
else:
|
||||
raise Exception('No valid credentials file')
|
||||
return os.path.expanduser(credfile)
|
||||
raise Exception('No valid config file')
|
||||
return os.path.expanduser(conf)
|
||||
|
||||
|
||||
def copy_credentials_to_config(credfile, conffile=None):
|
||||
p = get_config_path(credfile)
|
||||
with open(p) as old:
|
||||
for line in old:
|
||||
cred = json.loads(line.strip())
|
||||
add_credentials(conffile, **cred)
|
||||
|
||||
|
||||
def save_config(conf, conffile=None):
|
||||
with config(conffile) as c:
|
||||
c.clear()
|
||||
c.update(conf)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def credentials_file(credfile, *args, **kwargs):
|
||||
p = get_credentials_path(credfile)
|
||||
with open(p, *args, **kwargs) as f:
|
||||
yield f
|
||||
def config(conffile=None):
|
||||
d = read_config(conffile)
|
||||
try:
|
||||
yield d
|
||||
finally:
|
||||
write_config(d, conffile)
|
||||
|
||||
def iter_credentials(credfile=None):
|
||||
with credentials_file(credfile) as f:
|
||||
for l in f:
|
||||
yield json.loads(l.strip())
|
||||
|
||||
def get_credentials(credfile=None, inverse=False, **kwargs):
|
||||
def read_config(conffile):
|
||||
p = conffile and get_config_path(conffile)
|
||||
if p:
|
||||
if not os.path.exists(p):
|
||||
raise IOError('{} file does not exist.'.format(p))
|
||||
f = open(p, 'r')
|
||||
elif 'BITTER_CONFIG' not in os.environ:
|
||||
raise Exception('No config file or BITTER_CONFIG env variable.')
|
||||
else:
|
||||
f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n'))
|
||||
return yaml.load(f) or {'credentials': []}
|
||||
|
||||
|
||||
def write_config(conf, conffile=None):
|
||||
if not conf:
|
||||
conf = {'credentials': []}
|
||||
if conffile:
|
||||
p = get_config_path(conffile)
|
||||
with open(p, 'w') as f:
|
||||
yaml.dump(conf, f)
|
||||
else:
|
||||
os.environ['BITTER_CONFIG'] = yaml.dump(conf)
|
||||
|
||||
def iter_credentials(conffile=None):
|
||||
with config(conffile) as c:
|
||||
for i in c['credentials']:
|
||||
yield i
|
||||
|
||||
|
||||
def create_config_file(conffile=None):
|
||||
if not conffile:
|
||||
return
|
||||
conffile = get_config_path(conffile)
|
||||
with open(conffile, 'a'):
|
||||
pass
|
||||
write_config(None, conffile)
|
||||
|
||||
|
||||
def get_credentials(conffile=None, inverse=False, **kwargs):
|
||||
creds = []
|
||||
for i in iter_credentials(credfile):
|
||||
for i in iter_credentials(conffile):
|
||||
matches = all(map(lambda x: i[x[0]] == x[1], kwargs.items()))
|
||||
if matches and not inverse:
|
||||
creds.append(i)
|
||||
@@ -66,24 +144,51 @@ def get_credentials(credfile=None, inverse=False, **kwargs):
|
||||
creds.append(i)
|
||||
return creds
|
||||
|
||||
def create_credentials(credfile=None):
|
||||
credfile = get_credentials_path(credfile)
|
||||
with credentials_file(credfile, 'a'):
|
||||
pass
|
||||
|
||||
def delete_credentials(credfile=None, **creds):
|
||||
tokeep = get_credentials(credfile, inverse=True, **creds)
|
||||
with credentials_file(credfile, 'w') as f:
|
||||
for i in tokeep:
|
||||
f.write(json.dumps(i))
|
||||
f.write('\n')
|
||||
def delete_credentials(conffile=None, **creds):
|
||||
tokeep = get_credentials(conffile, inverse=True, **creds)
|
||||
with config(conffile) as c:
|
||||
c['credentials'] = list(tokeep)
|
||||
|
||||
def add_credentials(credfile=None, **creds):
|
||||
exist = get_credentials(credfile, **creds)
|
||||
if not exist:
|
||||
with credentials_file(credfile, 'a') as f:
|
||||
f.write(json.dumps(creds))
|
||||
f.write('\n')
|
||||
|
||||
def add_credentials(conffile=None, **creds):
|
||||
try:
|
||||
exist = get_credentials(conffile, **creds)
|
||||
except IOError:
|
||||
exist = False
|
||||
create_config_file(conffile)
|
||||
if exist:
|
||||
return
|
||||
with config(conffile) as c:
|
||||
c['credentials'].append(creds)
|
||||
|
||||
|
||||
def get_hashtags(iter_tweets, best=None):
|
||||
c = Counter()
|
||||
for tweet in iter_tweets:
|
||||
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
|
||||
return c
|
||||
|
||||
|
||||
def read_file(filename, tail=False):
|
||||
if filename == '-':
|
||||
f = sys.stdin
|
||||
else:
|
||||
f = open(filename)
|
||||
try:
|
||||
while True:
|
||||
line = f.readline()
|
||||
if line not in (None, '', '\n'):
|
||||
tweet = json.loads(line.strip())
|
||||
yield tweet
|
||||
else:
|
||||
if tail:
|
||||
time.sleep(1)
|
||||
else:
|
||||
return
|
||||
finally:
|
||||
if f != sys.stdin:
|
||||
close(f)
|
||||
|
||||
|
||||
def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
|
||||
@@ -113,6 +218,7 @@ def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
|
||||
else:
|
||||
yield user
|
||||
|
||||
|
||||
def trim_user(user):
|
||||
if 'status' in user:
|
||||
del user['status']
|
||||
@@ -126,142 +232,218 @@ def trim_user(user):
|
||||
return user
|
||||
|
||||
|
||||
def add_user(session, user, enqueue=False):
|
||||
def add_user(user, dburi=None, session=None, update=False):
|
||||
if not session:
|
||||
session = make_session(dburi)
|
||||
|
||||
user = trim_user(user)
|
||||
olduser = session.query(User).filter(User.id==user['id'])
|
||||
olduser = session.query(User).filter(User.id == user['id'])
|
||||
if olduser:
|
||||
if not update:
|
||||
return
|
||||
olduser.delete()
|
||||
user = User(**user)
|
||||
session.add(user)
|
||||
if extract:
|
||||
logging.debug('Adding entry')
|
||||
nuser = User()
|
||||
for key, value in user.items():
|
||||
setattr(nuser, key, value)
|
||||
user = nuser
|
||||
if update:
|
||||
session.add(user)
|
||||
logger.debug('Adding entry')
|
||||
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
|
||||
if not entry:
|
||||
entry = ExtractorEntry(user=user.id)
|
||||
session.add(entry)
|
||||
logging.debug(entry.pending)
|
||||
logger.debug(entry.pending)
|
||||
entry.pending = True
|
||||
entry.cursor = -1
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
|
||||
def download_entry(wq, entry_id, dburi=None, recursive=False):
|
||||
session = make_session(dburi)
|
||||
if not session:
|
||||
raise Exception("Provide dburi or session")
|
||||
logger.info("Downloading entry: %s (%s)" % (entry_id, type(entry_id)))
|
||||
entry = session.query(ExtractorEntry).filter(ExtractorEntry.id==entry_id).first()
|
||||
user = session.query(User).filter(User.id == entry.user).first()
|
||||
download_user(wq, session, user, entry, recursive)
|
||||
session.close()
|
||||
|
||||
|
||||
def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
|
||||
|
||||
total_followers = user.followers_count
|
||||
|
||||
if total_followers > max_followers:
|
||||
entry.pending = False
|
||||
logger.info("Too many followers for user: %s" % user.screen_name)
|
||||
session.add(entry)
|
||||
session.commit()
|
||||
return
|
||||
|
||||
if not entry:
|
||||
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() or ExtractorEntry(user=user.id)
|
||||
session.add(entry)
|
||||
session.commit()
|
||||
|
||||
pending = True
|
||||
cursor = entry.cursor
|
||||
uid = user.id
|
||||
name = user.name
|
||||
|
||||
logger.info("#"*20)
|
||||
logger.info("Getting %s - %s" % (uid, name))
|
||||
logger.info("Cursor %s" % cursor)
|
||||
logger.info("Using account: %s" % wq.name)
|
||||
|
||||
_fetched_followers = 0
|
||||
|
||||
def fetched_followers():
|
||||
return session.query(Following).filter(Following.isfollowed==uid).count()
|
||||
|
||||
attempts = 0
|
||||
while cursor > 0 or fetched_followers() < total_followers:
|
||||
try:
|
||||
resp = wq.followers.ids(user_id=uid, cursor=cursor)
|
||||
except TwitterHTTPError as ex:
|
||||
attempts += 1
|
||||
if ex.e.code in (401, ) or attempts > 3:
|
||||
logger.info('Not authorized for user: {}'.format(uid))
|
||||
entry.errors = ex.message
|
||||
break
|
||||
if 'ids' not in resp:
|
||||
logger.info("Error with id %s %s" % (uid, resp))
|
||||
entry.pending = False
|
||||
entry.errors = "No ids in response: %s" % resp
|
||||
break
|
||||
|
||||
logger.info("New followers: %s" % len(resp['ids']))
|
||||
if recursive:
|
||||
newusers = get_users(wq, resp)
|
||||
for newuser in newusers:
|
||||
add_user(session=session, user=newuser)
|
||||
|
||||
if 'ids' not in resp or not resp['ids']:
|
||||
logger.info('NO IDS in response')
|
||||
break
|
||||
for i in resp['ids']:
|
||||
existing_user = session.query(Following).\
|
||||
filter(Following.isfollowed == uid).\
|
||||
filter(Following.follower == i).first()
|
||||
now = int(time.time())
|
||||
if existing_user:
|
||||
existing_user.created_at_stamp = now
|
||||
else:
|
||||
f = Following(isfollowed=uid,
|
||||
follower=i,
|
||||
created_at_stamp=now)
|
||||
session.add(f)
|
||||
|
||||
logger.info("Fetched: %s/%s followers" % (fetched_followers(),
|
||||
total_followers))
|
||||
entry.cursor = resp["next_cursor"]
|
||||
|
||||
session.add(entry)
|
||||
session.commit()
|
||||
|
||||
logger.info("Done getting followers for %s" % uid)
|
||||
|
||||
entry.pending = False
|
||||
entry.busy = False
|
||||
session.add(entry)
|
||||
session.commit()
|
||||
|
||||
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def classify_user(id_or_name, screen_names, user_ids):
|
||||
try:
|
||||
int(id_or_name)
|
||||
user_ids.append(id_or_name)
|
||||
logger.debug("Added user id")
|
||||
except ValueError:
|
||||
logger.debug("Added screen_name")
|
||||
screen_names.append(id_or_name.split('@')[-1])
|
||||
|
||||
|
||||
# TODO: adapt to the crawler
|
||||
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
w = wq.next()
|
||||
if not dburi:
|
||||
dburi = 'sqlite:///%s.db' % extractor_name
|
||||
|
||||
session = make_session(dburi)
|
||||
session.query(ExtractorEntry).update({ExtractorEntry.busy: False})
|
||||
session.commit()
|
||||
|
||||
screen_names = []
|
||||
user_ids = []
|
||||
|
||||
def classify_user(id_or_name):
|
||||
try:
|
||||
int(user)
|
||||
user_ids.append(user)
|
||||
logger.info("Added user id")
|
||||
except ValueError:
|
||||
logger.info("Added screen_name")
|
||||
screen_names.append(user.split('@')[-1])
|
||||
|
||||
if user:
|
||||
classify_user(user)
|
||||
|
||||
elif initfile:
|
||||
logger.info("No user. I will open %s" % initfile)
|
||||
with open(initfile, 'r') as f:
|
||||
for line in f:
|
||||
user = line.strip().split(',')[0]
|
||||
classify_user(user)
|
||||
else:
|
||||
if not (user or initfile):
|
||||
logger.info('Using pending users from last session')
|
||||
else:
|
||||
screen_names = []
|
||||
user_ids = []
|
||||
if user:
|
||||
classify_user(user, screen_names, user_ids)
|
||||
elif initfile:
|
||||
logger.info("No user. I will open %s" % initfile)
|
||||
with open(initfile, 'r') as f:
|
||||
for line in f:
|
||||
user = line.strip().split(',')[0]
|
||||
classify_user(user, screen_names, user_ids)
|
||||
|
||||
def missing_user(ix, column=User.screen_name):
|
||||
res = session.query(User).filter(column == ix).count() == 0
|
||||
if res:
|
||||
logger.info("Missing user %s. Count: %s" % (ix, res))
|
||||
return res
|
||||
|
||||
nusers = list(get_users(wq, screen_names, by_name=True))
|
||||
if user_ids:
|
||||
nusers += list(get_users(wq, user_ids, by_name=False))
|
||||
screen_names = list(filter(missing_user, screen_names))
|
||||
user_ids = list(filter(partial(missing_user, column=User.id_str), user_ids))
|
||||
nusers = []
|
||||
logger.info("Missing user ids: %s" % user_ids)
|
||||
logger.info("Missing screen names: %s" % screen_names)
|
||||
if screen_names:
|
||||
nusers = list(get_users(wq, screen_names, by_name=True))
|
||||
if user_ids:
|
||||
nusers += list(get_users(wq, user_ids, by_name=False))
|
||||
|
||||
for i in nusers:
|
||||
add_user(session, i, enqueue=True)
|
||||
for i in nusers:
|
||||
add_user(dburi=dburi, user=i)
|
||||
|
||||
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
|
||||
logging.info('Total users: {}'.format(total_users))
|
||||
def pending_entries():
|
||||
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
|
||||
logging.info('Pending: {}'.format(pending))
|
||||
return pending
|
||||
logger.info('Total users: {}'.format(total_users))
|
||||
|
||||
while pending_entries() > 0:
|
||||
logger.info("Using account: %s" % w.name)
|
||||
de = partial(download_entry, wq, dburi=dburi)
|
||||
pending = pending_entries(dburi)
|
||||
session.close()
|
||||
|
||||
for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users):
|
||||
logger.info("Got %s" % i)
|
||||
|
||||
|
||||
def pending_entries(dburi):
|
||||
session = make_session(dburi)
|
||||
while True:
|
||||
candidate, entry = session.query(User, ExtractorEntry).\
|
||||
filter(ExtractorEntry.user == User.id).\
|
||||
filter(ExtractorEntry.pending == True).\
|
||||
order_by(User.followers_count).first()
|
||||
if not candidate:
|
||||
break
|
||||
pending = True
|
||||
cursor = entry.cursor
|
||||
uid = candidate.id
|
||||
uobject = session.query(User).filter(User.id==uid).first()
|
||||
name = uobject.screen_name if uobject else None
|
||||
|
||||
logger.info("#"*20)
|
||||
logger.info("Getting %s - %s" % (uid, name))
|
||||
logger.info("Cursor %s" % cursor)
|
||||
logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users))
|
||||
try:
|
||||
resp = wq.followers.ids(user_id=uid, cursor=cursor)
|
||||
except TwitterHTTPError as ex:
|
||||
if ex.e.code in (401, ):
|
||||
logger.info('Not authorized for user: {}'.format(uid))
|
||||
resp = {}
|
||||
if 'ids' in resp:
|
||||
logger.info("New followers: %s" % len(resp['ids']))
|
||||
if recursive:
|
||||
newusers = get_users(wq, resp)
|
||||
for user in newusers:
|
||||
add_user(session, newuser, enqueue=True)
|
||||
for i in resp['ids']:
|
||||
existing_user = session.query(Following).\
|
||||
filter(Following.isfollowed==uid).\
|
||||
filter(Following.follower==i).first()
|
||||
now = int(time.time())
|
||||
if existing_user:
|
||||
existing_user.created_at_stamp = now
|
||||
else:
|
||||
f = Following(isfollowed=uid,
|
||||
follower=i,
|
||||
created_at_stamp=now)
|
||||
session.add(f)
|
||||
|
||||
total_followers = candidate.followers_count
|
||||
fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count()
|
||||
logger.info("Fetched: %s/%s followers" % (fetched_followers,
|
||||
total_followers))
|
||||
cursor = resp["next_cursor"]
|
||||
if cursor > 0:
|
||||
pending = True
|
||||
logger.info("Getting more followers for %s" % uid)
|
||||
else:
|
||||
logger.info("Done getting followers for %s" % uid)
|
||||
cursor = -1
|
||||
pending = False
|
||||
else:
|
||||
logger.info("Error with id %s %s" % (uid, resp))
|
||||
pending = False
|
||||
|
||||
entry.pending = pending
|
||||
entry.cursor = cursor
|
||||
logging.debug('Entry: {} - {}'.format(entry.user, entry.pending))
|
||||
|
||||
session.add(candidate)
|
||||
session.commit()
|
||||
|
||||
sys.stdout.flush()
|
||||
|
||||
filter(ExtractorEntry.user == User.id).\
|
||||
filter(ExtractorEntry.pending == True).\
|
||||
filter(ExtractorEntry.busy == False).\
|
||||
order_by(User.followers_count).first()
|
||||
if candidate:
|
||||
entry.busy = True
|
||||
session.add(entry)
|
||||
session.commit()
|
||||
yield int(entry.id)
|
||||
continue
|
||||
if session.query(ExtractorEntry).\
|
||||
filter(ExtractorEntry.busy == True).count() > 0:
|
||||
time.sleep(1)
|
||||
continue
|
||||
logger.info("No more pending entries")
|
||||
break
|
||||
session.close()
|
||||
|
||||
def get_tweet(c, tid):
|
||||
return c.statuses.show(id=tid)
|
||||
@@ -281,3 +463,263 @@ def get_user(c, user):
|
||||
return c.users.lookup(user_id=user)[0]
|
||||
except ValueError:
|
||||
return c.users.lookup(screen_name=user)[0]
|
||||
|
||||
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
|
||||
cached = cached_id(tweetid, folder)
|
||||
tweet = None
|
||||
if update or not cached:
|
||||
tweet = get_tweet(wq, tweetid)
|
||||
js = json.dumps(tweet)
|
||||
if write:
|
||||
if tweet:
|
||||
write_json(js, folder)
|
||||
else:
|
||||
print(js)
|
||||
|
||||
|
||||
def cached_id(oid, folder):
|
||||
tweet = None
|
||||
file = os.path.join(folder, '%s.json' % oid)
|
||||
if os.path.exists(file) and os.path.isfile(file):
|
||||
try:
|
||||
# print('%s: Object exists' % oid)
|
||||
with open(file) as f:
|
||||
tweet = json.load(f)
|
||||
except Exception as ex:
|
||||
logger.error('Error getting cached version of {}: {}'.format(oid, ex))
|
||||
return tweet
|
||||
|
||||
def write_json(js, folder, oid=None):
|
||||
if not oid:
|
||||
oid = js['id']
|
||||
file = id_file(oid, folder)
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
with open(file, 'w') as f:
|
||||
json.dump(js, f)
|
||||
logger.info('Written {} to file {}'.format(oid, file))
|
||||
|
||||
def id_file(oid, folder):
|
||||
return os.path.join(folder, '%s.json' % oid)
|
||||
|
||||
def fail_file(oid, folder):
|
||||
failsfolder = os.path.join(folder, 'failed')
|
||||
if not os.path.exists(failsfolder):
|
||||
os.makedirs(failsfolder)
|
||||
return os.path.join(failsfolder, '%s.failed' % oid)
|
||||
|
||||
def id_failed(oid, folder):
|
||||
return os.path.isfile(fail_file(oid, folder))
|
||||
|
||||
def tweet_download_batch(wq, batch):
|
||||
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
|
||||
for tid, tweet in tweets.items():
|
||||
yield tid, tweet
|
||||
|
||||
def user_download_batch(wq, batch):
|
||||
screen_names = []
|
||||
user_ids = []
|
||||
for elem in batch:
|
||||
try:
|
||||
int(elem)
|
||||
user_ids.append(str(elem))
|
||||
except ValueError:
|
||||
screen_names.append(elem.lower())
|
||||
args = {}
|
||||
if user_ids:
|
||||
args['user_id'] = ','.join(user_ids)
|
||||
if screen_names:
|
||||
args['screen_name'] = ','.join(screen_names)
|
||||
try:
|
||||
users = wq.users.lookup(**args)
|
||||
except TwitterHTTPError as ex:
|
||||
if ex.e.code in (404,):
|
||||
users = []
|
||||
else:
|
||||
raise
|
||||
found_ids = []
|
||||
found_names = []
|
||||
for user in users:
|
||||
uid = user['id_str']
|
||||
if uid in user_ids:
|
||||
found_ids.append(uid)
|
||||
yield (uid, user)
|
||||
uname = user['screen_name'].lower()
|
||||
if uname in screen_names:
|
||||
found_names.append(uname)
|
||||
yield (uname, user)
|
||||
for uid in set(user_ids) - set(found_ids):
|
||||
yield (uid, None)
|
||||
for name in set(screen_names) - set(found_names):
|
||||
yield (name, None)
|
||||
|
||||
|
||||
def dump_result(oid, obj, folder, ignore_fails=True):
|
||||
if obj:
|
||||
try:
|
||||
write_json(obj, folder=folder, oid=oid)
|
||||
failed = fail_file(oid, folder)
|
||||
if os.path.exists(failed):
|
||||
os.remove(failed)
|
||||
except Exception as ex:
|
||||
logger.error('%s: %s' % (oid, ex))
|
||||
if not ignore_fails:
|
||||
raise
|
||||
else:
|
||||
logger.info('Object not recovered: {}'.format(oid))
|
||||
with open(fail_file(oid, folder), 'w') as f:
|
||||
print('Object not found', file=f)
|
||||
|
||||
def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False,
|
||||
batch_method=tweet_download_batch):
|
||||
|
||||
done = Queue()
|
||||
|
||||
down = Queue()
|
||||
|
||||
|
||||
def filter_list(lst, done, down):
|
||||
print('filtering')
|
||||
for oid in lst:
|
||||
# print('Checking {}'.format(line))
|
||||
cached = cached_id(oid, folder)
|
||||
if (cached and not update):
|
||||
done.put((oid, cached))
|
||||
elif (id_failed(oid, folder) and not retry_failed):
|
||||
done.put((oid, None))
|
||||
else:
|
||||
down.put(oid)
|
||||
down.put(None)
|
||||
|
||||
def download_results(batch_method, down, done):
|
||||
def gen():
|
||||
while True:
|
||||
r = down.get()
|
||||
if not r:
|
||||
return
|
||||
yield r
|
||||
|
||||
for t in parallel(batch_method, gen(), 100):
|
||||
done.put(t)
|
||||
|
||||
def batch(*args, **kwargs):
|
||||
return batch_method(wq, *args, **kwargs)
|
||||
|
||||
tc = threading.Thread(target=filter_list, args=(lst, done, down), daemon=True)
|
||||
tc.start()
|
||||
td = threading.Thread(target=download_results, args=(batch, down, done), daemon=True)
|
||||
td.start()
|
||||
|
||||
def check_threads(ts, done):
|
||||
for t in ts:
|
||||
t.join()
|
||||
done.put(None)
|
||||
|
||||
wait = threading.Thread(target=check_threads, args=([tc, td], done), daemon=True)
|
||||
wait.start()
|
||||
|
||||
while True:
|
||||
rec = done.get()
|
||||
|
||||
if rec is None:
|
||||
break
|
||||
|
||||
oid, obj = rec
|
||||
dump_result(oid, obj, folder, ignore_fails)
|
||||
yield rec
|
||||
|
||||
wait.join()
|
||||
|
||||
|
||||
def download_file(wq, csvfile, folder, column=0, delimiter=',',
|
||||
header=False, quotechar='"', batch_method=tweet_download_batch,
|
||||
**kwargs):
|
||||
with open(csvfile) as f:
|
||||
csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar))
|
||||
if header:
|
||||
next(csvreader)
|
||||
|
||||
def reader(r):
|
||||
for row in csvreader:
|
||||
if len(row) > column:
|
||||
yield row[column].strip()
|
||||
|
||||
|
||||
for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method,
|
||||
**kwargs):
|
||||
yield res
|
||||
|
||||
|
||||
def download_timeline(wq, user):
|
||||
return wq.statuses.user_timeline(id=user)
|
||||
|
||||
|
||||
def _consume_feed(func, feed_control=None, **kwargs):
|
||||
'''
|
||||
Get all the tweets using pagination and a given method.
|
||||
It can be controlled with the `count` parameter.
|
||||
|
||||
If max_count < 0 => Loop until the whole feed is consumed.
|
||||
If max_count == 0 => Only call the API once, with the default values.
|
||||
If max_count > 0 => Get max_count tweets from the feed.
|
||||
'''
|
||||
remaining = int(kwargs.pop('max_count', 0))
|
||||
count = int(kwargs.get('count', -1))
|
||||
limit = False
|
||||
|
||||
# We need to at least perform a query, so we simulate a do-while
|
||||
# by running once with no limit and updating the condition at the end
|
||||
with tqdm(total=remaining) as pbar:
|
||||
while not limit:
|
||||
if remaining > 0 and ((count < 0) or (count > remaining)):
|
||||
kwargs['count'] = remaining
|
||||
resp, stop = feed_control(func, kwargs, remaining=remaining, batch_size=count)
|
||||
if not resp:
|
||||
return
|
||||
for entry in resp:
|
||||
yield entry
|
||||
pbar.update(len(resp))
|
||||
limit = stop
|
||||
if remaining < 0:
|
||||
# If the loop was run with a negative remaining, it will only stop
|
||||
# when the control function tells it to.
|
||||
continue
|
||||
# Otherwise, check if we have already downloaded all the required items
|
||||
remaining -= len(resp)
|
||||
limit = limit or remaining <= 0
|
||||
|
||||
|
||||
def consume_tweets(*args, **kwargs):
|
||||
return _consume_feed(*args, feed_control=_tweets_control, **kwargs)
|
||||
|
||||
|
||||
def consume_users(*args, **kwargs):
|
||||
return _consume_feed(*args, feed_control=_users_control, **kwargs)
|
||||
|
||||
|
||||
def _tweets_control(func, apiargs, remaining=0, **kwargs):
|
||||
''' Return a list of entries, the remaining '''
|
||||
|
||||
resp = func(**apiargs)
|
||||
if not resp:
|
||||
return None, True
|
||||
# Update the arguments for the next call
|
||||
# Two options: either resp is a list, or a dict like:
|
||||
# {'statuses': ... 'search_metadata': ...}
|
||||
if isinstance(resp, dict) and 'search_metadata' in resp:
|
||||
resp = resp['statuses']
|
||||
max_id = min(s['id'] for s in resp) - 1
|
||||
apiargs['max_id'] = max_id
|
||||
return resp, False
|
||||
|
||||
|
||||
def _users_control(func, apiargs, remaining=0, **kwargs):
|
||||
resp = func(**apiargs)
|
||||
stop = True
|
||||
# Update the arguments for the next call
|
||||
if 'next_cursor' in resp:
|
||||
cursor = resp['next_cursor']
|
||||
apiargs['cursor'] = cursor
|
||||
if int(cursor) != -1:
|
||||
stop = False
|
||||
return resp['users'], stop
|
||||
|
||||
12
docker-compose.yml
Normal file
12
docker-compose.yml
Normal file
@@ -0,0 +1,12 @@
|
||||
version: '2'
|
||||
services:
|
||||
dev:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile-3.4
|
||||
volumes:
|
||||
- '.:/usr/src/app'
|
||||
tty: yes
|
||||
working_dir: '/usr/src/app'
|
||||
entrypoint: '/bin/bash'
|
||||
command: ''
|
||||
@@ -1,3 +1,5 @@
|
||||
sqlalchemy
|
||||
twitter
|
||||
click
|
||||
tqdm
|
||||
pyyaml
|
||||
|
||||
4
setup.cfg
Normal file
4
setup.cfg
Normal file
@@ -0,0 +1,4 @@
|
||||
[metadata]
|
||||
description-file = README.md
|
||||
[aliases]
|
||||
test=pytest
|
||||
45
setup.py
45
setup.py
@@ -1,29 +1,23 @@
|
||||
import pip
|
||||
from setuptools import setup
|
||||
from pip.req import parse_requirements
|
||||
|
||||
# parse_requirements() returns generator of pip.req.InstallRequirement objects
|
||||
# pip 6 introduces the *required* session argument
|
||||
try:
|
||||
install_reqs = parse_requirements("requirements.txt", session=pip.download.PipSession())
|
||||
py2_reqs = parse_requirements("requirements-py2.txt", session=pip.download.PipSession())
|
||||
test_reqs = parse_requirements("test-requirements.txt", session=pip.download.PipSession())
|
||||
except AttributeError:
|
||||
install_reqs = parse_requirements("requirements.txt")
|
||||
py2_reqs = parse_requirements("requirements-py2.txt")
|
||||
test_reqs = parse_requirements("test-requirements.txt")
|
||||
def parse_requirements(filename):
|
||||
""" load requirements from a pip requirements file """
|
||||
with open(filename, 'r') as f:
|
||||
lineiter = list(line.strip() for line in f)
|
||||
return [line for line in lineiter if line and not line.startswith("#")]
|
||||
|
||||
install_reqs = parse_requirements("requirements.txt")
|
||||
py2_reqs = parse_requirements("requirements-py2.txt")
|
||||
test_reqs = parse_requirements("test-requirements.txt")
|
||||
|
||||
import sys
|
||||
import os
|
||||
import itertools
|
||||
if sys.version_info <= (3, 0):
|
||||
install_reqs = itertools.chain(install_reqs, py2_reqs)
|
||||
install_reqs = install_reqs + py2_reqs
|
||||
|
||||
# reqs is a list of requirement
|
||||
# e.g. ['django==1.5.1', 'mezzanine==1.4.6']
|
||||
install_reqs = [str(ir.req) for ir in install_reqs]
|
||||
test_reqs = [str(ir.req) for ir in test_reqs]
|
||||
|
||||
from bitter import __version__
|
||||
with open(os.path.join('bitter', 'VERSION'), 'r') as f:
|
||||
__version__ = f.read().strip()
|
||||
|
||||
setup(
|
||||
name="bitter",
|
||||
@@ -38,10 +32,19 @@ setup(
|
||||
extras_require = {
|
||||
'server': ['flask', 'flask-oauthlib']
|
||||
},
|
||||
test_suite="tests",
|
||||
setup_requires=['pytest-runner',],
|
||||
include_package_data=True,
|
||||
entry_points="""
|
||||
[console_scripts]
|
||||
bitter=bitter.cli:main
|
||||
"""
|
||||
""",
|
||||
classifiers=[
|
||||
'Development Status :: 4 - Beta',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: Science/Research',
|
||||
'License :: OSI Approved :: Apache Software License',
|
||||
'Programming Language :: Python :: 2',
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3',
|
||||
]
|
||||
)
|
||||
|
||||
76
tests/test_crawlers.py
Normal file
76
tests/test_crawlers.py
Normal file
@@ -0,0 +1,76 @@
|
||||
from unittest import TestCase
|
||||
|
||||
import os
|
||||
import types
|
||||
import datetime
|
||||
import time
|
||||
|
||||
from bitter import utils, easy
|
||||
from bitter.crawlers import QueueException
|
||||
from bitter import config as c
|
||||
|
||||
class TestCrawlers(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
CONF_PATH = os.path.join(os.path.dirname(__file__), '.bitter.yaml')
|
||||
if os.path.exists(CONF_PATH):
|
||||
self.wq = easy(CONF_PATH)
|
||||
else:
|
||||
self.wq = easy()
|
||||
|
||||
def test_create_worker(self):
|
||||
assert len(self.wq.queue)==1
|
||||
|
||||
def test_get_limits(self):
|
||||
w1 = list(self.wq.queue)[0]
|
||||
print(w1.limits)
|
||||
limitslook = w1.get_limit(['statuses', 'lookup'])
|
||||
assert limitslook['remaining'] == limitslook['limit']
|
||||
|
||||
def test_set_limits(self):
|
||||
w1 = list(self.wq.queue)[0]
|
||||
w1.set_limit(['test', 'test2'], {'remaining': 0})
|
||||
assert w1.get_limit(['test', 'test2']) == {'remaining': 0}
|
||||
|
||||
def test_await(self):
|
||||
w1 = list(self.wq.queue)[0]
|
||||
w1.set_limit(['test', 'wait'], {'remaining': 0, 'reset': time.time()+2})
|
||||
assert w1.get_wait(['test', 'wait']) > 1
|
||||
time.sleep(2)
|
||||
assert w1.get_wait(['test', 'wait']) == 0
|
||||
assert w1.get_wait(['statuses', 'lookup']) == 0
|
||||
|
||||
def test_is_limited(self):
|
||||
w1 = list(self.wq.queue)[0]
|
||||
assert not w1.is_limited(['statuses', 'lookup'])
|
||||
w1.set_limit(['test', 'limited'], {'remaining': 0, 'reset': time.time()+100})
|
||||
assert w1.is_limited(['test', 'limited'])
|
||||
|
||||
def test_call(self):
|
||||
w1 = list(self.wq.queue)[0]
|
||||
l1 = w1.get_limit(['users', 'lookup'])
|
||||
resp = self.wq.users.lookup(screen_name='balkian')
|
||||
l2 = w1.get_limit(['users', 'lookup'])
|
||||
assert l1['remaining']-l2['remaining'] == 1
|
||||
|
||||
def test_consume(self):
|
||||
w1 = list(self.wq.queue)[0]
|
||||
l1 = w1.get_limit(['friends', 'list'])
|
||||
self.wq.wait = False
|
||||
for i in range(l1['remaining']):
|
||||
print(i)
|
||||
resp = self.wq.friends.list(screen_name='balkian')
|
||||
# l2 = w1.get_limit(['users', 'lookup'])
|
||||
# assert l2['remaining'] == 0
|
||||
# self.wq.users.lookup(screen_name='balkian')
|
||||
|
||||
failed = False
|
||||
try:
|
||||
# resp = self.wq.friends.list(screen_name='balkian')
|
||||
self.wq.next(['friends', 'list'])
|
||||
except QueueException:
|
||||
failed = True
|
||||
assert failed
|
||||
l2 = w1.get_limit(['friends', 'list'])
|
||||
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
|
||||
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)
|
||||
23
tests/test_models.py
Normal file
23
tests/test_models.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from unittest import TestCase
|
||||
|
||||
import os
|
||||
import types
|
||||
|
||||
from bitter import utils
|
||||
from bitter.models import *
|
||||
from sqlalchemy import exists
|
||||
|
||||
class TestModels(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.session = make_session('sqlite://')
|
||||
|
||||
def test_user(self):
|
||||
fake_user = User(name="Fake user", id=1548)
|
||||
self.session.add(fake_user)
|
||||
self.session.commit()
|
||||
fake_committed = self.session.query(User).filter_by(name="Fake user").first()
|
||||
assert fake_committed
|
||||
self.session.delete(fake_committed)
|
||||
self.session.commit()
|
||||
assert not list(self.session.execute('SELECT 1 from users where id=\'%s\'' % 1548))
|
||||
@@ -8,54 +8,63 @@ from bitter import config as c
|
||||
|
||||
class TestUtils(TestCase):
|
||||
|
||||
configfile = '/tmp/bitter.yaml'
|
||||
|
||||
def setUp(self):
|
||||
self.credfile = '/tmp/credentials.txt'
|
||||
c.CREDENTIALS = self.credfile
|
||||
if os.path.exists(self.credfile):
|
||||
os.remove(self.credfile)
|
||||
utils.create_credentials(self.credfile)
|
||||
c.CONFIG_FILE = self.configfile
|
||||
if os.path.exists(self.configfile):
|
||||
os.remove(self.configfile)
|
||||
assert not os.path.exists(self.configfile)
|
||||
utils.create_config_file(self.configfile)
|
||||
assert os.path.exists(self.configfile)
|
||||
|
||||
|
||||
def test_create_credentials(self):
|
||||
assert os.path.exists(self.credfile)
|
||||
os.remove(self.credfile)
|
||||
utils.create_credentials() # From config
|
||||
assert os.path.exists(self.credfile)
|
||||
|
||||
def test_add_credentials(self):
|
||||
utils.add_credentials(self.credfile, user="test")
|
||||
assert utils.get_credentials(self.credfile)
|
||||
assert utils.get_credentials(user="test")
|
||||
assert list(utils.get_credentials(user="test"))[0]["user"] == "test"
|
||||
utils.add_credentials(self.configfile, user="test")
|
||||
assert utils.get_credentials(self.configfile)
|
||||
assert utils.get_credentials(self.configfile, user="test")
|
||||
assert list(utils.get_credentials(self.configfile, user="test"))[0]["user"] == "test"
|
||||
|
||||
def test_get_credentials(self):
|
||||
utils.add_credentials(self.credfile, user="test")
|
||||
assert utils.get_credentials(user="test")
|
||||
assert not utils.get_credentials(user="test", inverse=True)
|
||||
utils.add_credentials(self.configfile, user="test")
|
||||
assert utils.get_credentials(self.configfile, user="test")
|
||||
assert not utils.get_credentials(self.configfile, user="test", inverse=True)
|
||||
|
||||
def test_add_two_credentials(self):
|
||||
utils.add_credentials(self.credfile, user="test")
|
||||
utils.add_credentials(self.credfile, user="test2")
|
||||
assert utils.get_credentials(user="test")
|
||||
assert utils.get_credentials(user="test2")
|
||||
utils.add_credentials(self.configfile, user="test")
|
||||
utils.add_credentials(self.configfile, user="test2")
|
||||
assert utils.get_credentials(self.configfile, user="test")
|
||||
assert utils.get_credentials(self.configfile, user="test2")
|
||||
|
||||
|
||||
def test_delete_credentials(self):
|
||||
utils.add_credentials(self.credfile, user="test")
|
||||
assert utils.get_credentials(user="test")
|
||||
utils.delete_credentials(user="test")
|
||||
print(utils.get_credentials())
|
||||
assert not utils.get_credentials(user="test")
|
||||
utils.add_credentials(self.configfile, user="test")
|
||||
assert utils.get_credentials(self.configfile, user="test")
|
||||
utils.delete_credentials(self.configfile, user="test")
|
||||
assert not utils.get_credentials(self.configfile, user="test")
|
||||
|
||||
def test_parallel(self):
|
||||
import time
|
||||
def echo(i):
|
||||
time.sleep(2)
|
||||
time.sleep(0.5)
|
||||
return i
|
||||
tic = time.time()
|
||||
resp = utils.parallel(echo, [1,2,3])
|
||||
assert isinstance(resp, types.GeneratorType)
|
||||
assert list(resp) == [1,2,3]
|
||||
toc = time.time()
|
||||
assert (tic-toc) < 6000
|
||||
|
||||
assert (tic-toc) < 600
|
||||
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
|
||||
assert list(resp2) == [1,2, 3,4]
|
||||
|
||||
|
||||
class TestUtilsEnv(TestUtils):
|
||||
configfile = None
|
||||
|
||||
def setUp(self):
|
||||
if 'BITTER_CONFIG' in os.environ:
|
||||
self.oldenv = os.environ['BITTER_CONFIG']
|
||||
os.environ['BITTER_CONFIG'] = ''
|
||||
|
||||
def tearDown(self):
|
||||
if hasattr(self, 'oldenv'):
|
||||
os.environ['BITTER_CONFIG'] = self.oldenv
|
||||
|
||||
Reference in New Issue
Block a user