mirror of
https://github.com/balkian/bitter.git
synced 2025-07-11 07:52:22 +00:00
Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
c940709df8 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -2,7 +2,6 @@ __pycache__
|
||||
*.egg-info
|
||||
dist
|
||||
env
|
||||
.env
|
||||
__*
|
||||
.*
|
||||
*.pyc
|
||||
|
@ -2,6 +2,6 @@
|
||||
From python:2.7-onbuild
|
||||
Maintainer J. Fernando Sánchez @balkian
|
||||
|
||||
RUN pip install ".[server]"
|
||||
RUN pip install -e "/usr/src/app/[server]"
|
||||
|
||||
ENTRYPOINT ["bitter"]
|
||||
|
@ -2,6 +2,6 @@
|
||||
From python:3.4-onbuild
|
||||
Maintainer J. Fernando Sánchez @balkian
|
||||
|
||||
RUN pip install ".[server]"
|
||||
RUN pip install -e "/usr/src/app/[server]"
|
||||
|
||||
ENTRYPOINT ["bitter"]
|
||||
|
@ -2,6 +2,6 @@
|
||||
From python:{{PYVERSION}}-onbuild
|
||||
Maintainer J. Fernando Sánchez @balkian
|
||||
|
||||
RUN pip install ".[server]"
|
||||
RUN pip install -e "/usr/src/app/[server]"
|
||||
|
||||
ENTRYPOINT ["bitter"]
|
||||
|
8
Makefile
8
Makefile
@ -1,4 +1,4 @@
|
||||
PYVERSIONS=3.5
|
||||
PYVERSIONS=3.4 2.7
|
||||
PYMAIN=$(firstword $(PYVERSIONS))
|
||||
NAME=bitter
|
||||
REPO=balkian
|
||||
@ -19,7 +19,7 @@ Dockerfile-%: Dockerfile.template
|
||||
dev-%:
|
||||
@docker start $(NAME)-dev$* || (\
|
||||
$(MAKE) build-$*; \
|
||||
docker run -d -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
|
||||
docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
|
||||
)\
|
||||
|
||||
docker exec -ti $(NAME)-dev$* bash
|
||||
@ -38,7 +38,7 @@ test: $(addprefix test-,$(PYMAIN))
|
||||
testall: $(addprefix test-,$(PYVERSIONS))
|
||||
|
||||
test-%: build-%
|
||||
docker run --rm -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
|
||||
docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
|
||||
|
||||
pip_test-%:
|
||||
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
|
||||
@ -71,6 +71,6 @@ pip_upload:
|
||||
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
|
||||
|
||||
run: build
|
||||
docker run --rm --env-file $$PWD/.env -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
|
||||
docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
|
||||
|
||||
.PHONY: test test-% build-% build test test_pip run
|
||||
|
148
README.md
148
README.md
@ -1,5 +1,4 @@
|
||||
# Description
|
||||
|
||||
#Description
|
||||
There are two parts to bitter.
|
||||
First of all, it is a wrapper over Python twitter that adds support for several Twitter API credentials (e.g. authorizing the same app with different user accounts).
|
||||
Secondly, it is a command line tool to automate several actions (e.g. downloading user networks) using the wrapper.
|
||||
@ -23,153 +22,16 @@ wq = easy()
|
||||
print(wq.users.show(user_name='balkian'))
|
||||
```
|
||||
|
||||
|
||||
You can also make custom calls to the API through the command line.
|
||||
e.g. to get the latest 500 tweets by the python software foundation:
|
||||
# Credentials format
|
||||
|
||||
```
|
||||
bitter api statuses/user_timeline --id thepsf --count 500
|
||||
{"user": "balkian", "consumer_secret": "xxx", "consumer_key": "xxx", "token_key": "xxx", "token_secret": "xxx"}
|
||||
```
|
||||
|
||||
|
||||
## Adding credentials
|
||||
By default, bitter uses '~/.bitter-credentials.json', but you may choose a different file:
|
||||
|
||||
```
|
||||
bitter --config <YOUR CONFIGURATION FILE> credentials add
|
||||
```
|
||||
|
||||
You can specify the parameters in the command or let the command line guide you through the process.
|
||||
|
||||
# Examples
|
||||
|
||||
## Downloading a list of tweets
|
||||
|
||||
Bitter can download tweets from a list of tweets in a CSV file.
|
||||
The result is stored as individual json files in your folder of choice.
|
||||
You can even specify the column number for tweet ids.
|
||||
Bitter will not try to download
|
||||
|
||||
```
|
||||
Usage: bitter tweet get_all [OPTIONS] TWEETSFILE
|
||||
|
||||
Download tweets from a list of tweets in a CSV file. The result is stored
|
||||
as individual json files in your folder of choice.
|
||||
|
||||
Options:
|
||||
-f, --folder TEXT
|
||||
-d, --delimiter TEXT
|
||||
-h, --header Discard the first line (use it as a header)
|
||||
-q, --quotechar TEXT
|
||||
-c, --column INTEGER
|
||||
--help Show this message and exit.
|
||||
|
||||
```
|
||||
|
||||
For instance, this will download `tweet_ids.csv` in the `tweet_info` folder:
|
||||
|
||||
```
|
||||
bitter tweet get_all -f tweet_info tweet_ids.csv
|
||||
```
|
||||
|
||||
## Downloading a list of users
|
||||
|
||||
Bitter downloads users and tweets in a similar way:
|
||||
|
||||
```
|
||||
Usage: bitter users get_all [OPTIONS] USERSFILE
|
||||
|
||||
Download users from a list of user ids/screen names in a CSV file. The
|
||||
result is stored as individual json files in your folder of choice.
|
||||
|
||||
Options:
|
||||
-f, --folder TEXT
|
||||
-d, --delimiter TEXT
|
||||
-h, --header Discard the first line (use it as a header)
|
||||
-q, --quotechar TEXT
|
||||
-c, --column INTEGER
|
||||
--help Show this message and exit.
|
||||
```
|
||||
|
||||
The only difference is that users can be downloaded via `screen_name` or `user_id`.
|
||||
This method does not try to resolve screen names to user ids, so users may be downloaded more than once if they appear in both ways.
|
||||
|
||||
## Downloading a stream
|
||||
|
||||
```
|
||||
Usage: bitter stream get [OPTIONS]
|
||||
|
||||
Options:
|
||||
-l, --locations TEXT
|
||||
-t, --track TEXT
|
||||
-f, --file TEXT File to store the stream of tweets. Default: standard output
|
||||
-p, --politelyretry Politely retry after a hangup/connection error
|
||||
--help Show this message and exit.
|
||||
```
|
||||
|
||||
```
|
||||
bitter --config .bitter.yaml stream get
|
||||
```
|
||||
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
|
||||
|
||||
|
||||
## REST queries
|
||||
|
||||
In newer versions of bitter, individual methods to download tweets/users using the REST API are being replaced with a generic method to call the API.
|
||||
|
||||
```
|
||||
bitter api <URL endpoint> --parameter VALUE ... | [--tweets | --users] [--max_count MAX_COUNT] [--count COUNT_PER_CALL]
|
||||
```
|
||||
|
||||
For instance:
|
||||
|
||||
```
|
||||
# Get 100 tweets that mentioned Obama after tweet 942689870501302300
|
||||
bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama
|
||||
```
|
||||
|
||||
That is equivalent to this call to the api: `api/1.1/searc/tweets?since_id=942689870501302300&count=100&q=Obama`.
|
||||
|
||||
|
||||
The flags `--tweets` and `--users` are optional.
|
||||
If you use them, bitter will try to intelligently fetch all the tweets/users by using pagination with the API.
|
||||
|
||||
For example:
|
||||
|
||||
```
|
||||
# Download 1000 tweets, 100 tweets per call.
|
||||
bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama --max_count=1000 --tweets
|
||||
```
|
||||
|
||||
```
|
||||
# Download all the followers of @balkian
|
||||
bitter api 'followers/list' --_id balkian --users --max_count -1
|
||||
```
|
||||
|
||||
Note that some reserved words (such as `id`) have to be preceeded by an underscore.
|
||||
This limitation is imposed by the python-twitter library.
|
||||
|
||||
# Configuration format
|
||||
|
||||
```
|
||||
credentials:
|
||||
- user: "balkian"
|
||||
consumer_secret: "xxx"
|
||||
consumer_key: "xxx"
|
||||
token_key: "xxx"
|
||||
token_secret: "xxx"
|
||||
- user: ....
|
||||
```
|
||||
|
||||
By default, bitter uses '~/.bitter.yaml', but you may choose a different file:
|
||||
|
||||
```
|
||||
python -m bitter --config <config_file> ...
|
||||
```
|
||||
|
||||
Or use an environment variable:
|
||||
|
||||
```
|
||||
export BITTER_CONFIG=$(cat myconfig.yaml)
|
||||
python -m bitter -c <credentials_file> ...
|
||||
```
|
||||
|
||||
# Server
|
||||
|
@ -1,10 +0,0 @@
|
||||
Scripts to process jsonlines
|
||||
|
||||
To get the jsonlines file, you can use the streaming API or the search api, like so:
|
||||
|
||||
```
|
||||
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
|
||||
```
|
||||
|
||||
To keep track of the query that generated the file, you can save the command in a text file.
|
||||
For instance, the example above is also in `example_query.sh`.
|
@ -1 +0,0 @@
|
||||
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
|
@ -1,13 +0,0 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
export FIELDS="created_at,id,text"
|
||||
for i in "$@"
|
||||
do
|
||||
OUTPUT=$i.hashtags.csv
|
||||
echo "$FIELDS" > $OUTPUT
|
||||
pv -l $i -N "hashtags $i" | jq -r '. | .created_at as $created_at | .id_str as $id | .entities.hashtags | select(. != null) | .[] | [$created_at, $id, .text] | @csv' >> $OUTPUT
|
||||
done
|
@ -1,15 +0,0 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
REPLYOUTPUT=$i.replies.csv
|
||||
RTOUTPUT=$i.rts.csv
|
||||
echo 'created_at,id,user_id,reply_user_id' > $REPLYOUTPUT
|
||||
echo 'created_at,id,user_id,rt_user_id' > $RTOUTPUT
|
||||
pv -l -N "$i" $i | jq -r '. | select(.in_reply_to_user_id_str != null) | [.created_at, .id_str, .user.id_str, .in_reply_to_user_id_str] | @csv' >> $REPLYOUTPUT
|
||||
pv -l -N "$i" $i | jq -r '. | select(.retweeted_status != null) | [.created_at, .retweeted_status.id_str, .user.id_str, .retweeted_status.user.id_str] | @csv' >> $RTOUTPUT
|
||||
done
|
@ -1,16 +0,0 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
export QUERY='.limit | select(. != null) | [.timestamp_ms, .track] | @csv'
|
||||
|
||||
export FIELDS="timestamp,track"
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
OUTPUT=$i.limits.csv
|
||||
echo $FIELDS > $OUTPUT
|
||||
pv -N "$i limits" -l $i | jq -r "$QUERY" >> $OUTPUT
|
||||
done
|
@ -1,16 +0,0 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
export QUERY='select(.id != null) | .id_str as $id | .entities.urls[] | select(.expanded_url | select(. != null) | contains("open.spotify") or contains("youtube.com") or contains("youtu.be")) | [$id, .expanded_url] | @csv'
|
||||
|
||||
export FIELDS="id,url"
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
OUTPUT=$i.media.csv
|
||||
echo $FIELDS > $OUTPUT
|
||||
pv -N "$i media" -l $i | jq -r "$QUERY" >> $OUTPUT
|
||||
done
|
@ -1,28 +0,0 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
export USER_FIELDS="\$created_at,\
|
||||
.id_str,\
|
||||
.screen_name,\
|
||||
.followers_count,\
|
||||
.lang,\
|
||||
.description,\
|
||||
.statuses_count,\
|
||||
.favourites_count,\
|
||||
.friends_count,\
|
||||
.created_at,\
|
||||
.name,\
|
||||
.location,\
|
||||
.listed_count,\
|
||||
.time_zone\
|
||||
"
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
OUTPUT=$i.users.csv
|
||||
echo \#$USER_FIELDS > $OUTPUT
|
||||
jq -r ".created_at as \$created_at | .user,.retweeted_status.user | select(. != null) | [$USER_FIELDS] | @csv " $i | pv -N "$i" -l >> $OUTPUT
|
||||
done
|
@ -1,32 +0,0 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
FIELDS=".id_str,\
|
||||
.user.screen_name,\
|
||||
.user.id,\
|
||||
.favorite_count,\
|
||||
.retweet_count,\
|
||||
.quote_count,\
|
||||
.reply_count,\
|
||||
.created_at,\
|
||||
.lang,\
|
||||
.in_reply_to_user_id_str,\
|
||||
.in_reply_to_status_id_str,\
|
||||
.retweeted_status.id_str,\
|
||||
.retweeted_status.user.id,\
|
||||
.retweeted_status.favorite_count,\
|
||||
.retweeted_status.retweet_count,\
|
||||
.retweeted_status.quote_count,\
|
||||
.retweeted_status.reply_count,\
|
||||
.retweeted_status.created_at\
|
||||
"
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
OUTPUT=$i.tweets.csv
|
||||
echo "$FIELDS" | sed -e 's/,[ \t\n]*\./,/g' | sed -e 's/^[#]\?\.//' > $OUTPUT
|
||||
jq -r "[$FIELDS]|@csv" $i | pv -N "$i" -l >> $OUTPUT
|
||||
done
|
@ -1,17 +0,0 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
QUERY='.| select(.retweeted_status != null) | .retweeted_status | .id_str as $rt_id | .extended_tweet | select(. != null) | [$rt_id,.full_text]|@csv'
|
||||
HEADER='rt_id,full_text'
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
OUTPUT=$i.full_text.csv
|
||||
echo $HEADER > $OUTPUT
|
||||
jq "$QUERY" $i | pv -N "$i" -l >> $OUTPUT
|
||||
sort -u $OUTPUT -o $OUTPUT
|
||||
sed -ri 's/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g' $OUTPUT
|
||||
done
|
@ -1,16 +0,0 @@
|
||||
if [ "$#" -lt 1 ]
|
||||
then
|
||||
echo "Usage: $0 <files to convert>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
QUERY='(.full_text // .retweeted_status.full_text) as $text | [ .id_str,$text ] | @csv'
|
||||
HEADER='id,text'
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
OUTPUT=$i.text.csv
|
||||
echo $HEADER > $OUTPUT
|
||||
pv -l -N "$i" $i | jq -r "$QUERY" >> $OUTPUT
|
||||
# sed -ri s/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g $OUTPUT
|
||||
done
|
@ -1,10 +0,0 @@
|
||||
|
||||
if [ "$#" -lt 2 ]
|
||||
then
|
||||
echo "Find edge lines in a file that contain one of the users in a user list."
|
||||
echo ""
|
||||
echo "Usage: $0 <file with edges> <file with the list of users>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
pv -c -N 'read' "$1" | grep -F -w -f "$2" | pv -lc -N 'found'
|
@ -1,23 +0,0 @@
|
||||
import pandas as pd
|
||||
|
||||
def read_rts(rtsfile, tweetsfile):
|
||||
tweets = pd.read_csv(tweetsfile, index_col=0)
|
||||
rts = pd.read_csv(rtsfile, index_col=1)
|
||||
merged = rts.groupby(by=['id', 'rt_user_id']).size().rename('count').reset_index(level=1).merge(tweets, left_index=True, right_index=True)
|
||||
return merged.sort_values(by='count', ascending=False)
|
||||
|
||||
|
||||
def read_tweets(tweetsfile):
|
||||
'''When the dataset is small enough, we can load tweets as-in'''
|
||||
with open(tweetsfile) as f:
|
||||
header = f.readline().strip().split(',')
|
||||
dtypes = {}
|
||||
for key in header:
|
||||
if key.endswith('_str') or key.endswith('.id'):
|
||||
dtypes[key] = object
|
||||
tweets = pd.read_csv(tweetsfile, dtype=dtypes, index_col=0)
|
||||
return tweets
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
@ -1 +0,0 @@
|
||||
cat "$@" | awk -F"," '{print tolower($3)}' | sort | uniq -c | sort -h
|
@ -1,14 +0,0 @@
|
||||
MAX_TAGS=100
|
||||
|
||||
function get_text {
|
||||
while read line
|
||||
do
|
||||
echo $line
|
||||
rtid=$(echo $line | awk -F"," '{print $2}')
|
||||
text=$(grep -m 1 $rtid *.text.csv)
|
||||
echo "$line - $text"
|
||||
done < "/dev/stdin"
|
||||
}
|
||||
|
||||
cat "$@" | get_text
|
||||
|
@ -1,15 +0,0 @@
|
||||
MAX_TAGS=100
|
||||
|
||||
function get_text {
|
||||
while read line
|
||||
do
|
||||
echo $line
|
||||
rtid=$(echo $line | awk '{print $2}')
|
||||
count=$(echo $line | awk '{print $1}')
|
||||
text=$(grep -m 1 $rtid *.text.csv)
|
||||
echo "$line - $text"
|
||||
done < "/dev/stdin"
|
||||
}
|
||||
|
||||
cat "$@" | awk -F"," '{print tolower($2)}' | sort | uniq -c | sort -h | tail -n $MAX_TAGS | get_text
|
||||
|
@ -1 +1 @@
|
||||
0.10.3
|
||||
0.7.1
|
||||
|
@ -6,12 +6,10 @@ http://github.com/balkian/bitter
|
||||
import os
|
||||
|
||||
from .version import __version__
|
||||
from . import config as bconf
|
||||
|
||||
def easy(conffile=bconf.CONFIG_FILE):
|
||||
def easy(*args, **kwargs):
|
||||
from .crawlers import TwitterQueue
|
||||
|
||||
return TwitterQueue.from_config(conffile=conffile)
|
||||
return TwitterQueue.from_credentials(*args, **kwargs)
|
||||
|
||||
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]
|
||||
|
||||
|
330
bitter/cli.py
330
bitter/cli.py
@ -21,192 +21,60 @@ if sys.version_info <= (3, 0):
|
||||
from contextlib2 import ExitStack
|
||||
else:
|
||||
from contextlib import ExitStack
|
||||
|
||||
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def serialize(function):
|
||||
'''Common options to serialize output to CSV or other formats'''
|
||||
|
||||
@click.option('--fields', help='Provide a list of comma-separated fields to print.', default='', type=str)
|
||||
@click.option('--ignore_missing', help='Do not show warnings for missing fields.', is_flag=True)
|
||||
@click.option('--header', help='Header that will be printed at the beginning of the file', default=None)
|
||||
@click.option('--csv', help='Print each object as a csv row.', is_flag=True)
|
||||
@click.option('--jsonlines', '--json', help='Print each object as JSON in a new line.', is_flag=True)
|
||||
@click.option('--indented', help='Print each object as an indented JSON object', is_flag=True)
|
||||
@click.option('--outdelimiter', help='Delimiter for some output formats, such as CSV. It defaults to \t', default='\t')
|
||||
@click.option('--outfile', help='Output file. It defaults to STDOUT', default=sys.stdout)
|
||||
def decorated(fields, ignore_missing, header, csv, jsonlines, indented, outfile, outdelimiter, **kwargs):
|
||||
it = function(**kwargs)
|
||||
outformat = 'json'
|
||||
if csv:
|
||||
outformat = 'csv'
|
||||
elif jsonlines:
|
||||
outformat = 'jsonlines'
|
||||
elif indented:
|
||||
outformat = 'indented'
|
||||
|
||||
return utils.serialized(it, outfile, outformat=outformat, fields=fields.split(','), ignore_missing=ignore_missing, header=header, delimiter=outdelimiter)
|
||||
|
||||
return decorated
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.option("--verbose", is_flag=True)
|
||||
@click.option("--logging_level", required=False, default='WARN')
|
||||
@click.option('--config', show_default=True, default=bconf.CONFIG_FILE)
|
||||
@click.option('--credentials', show_default=True, help="DEPRECATED: If specified, these credentials will be copied to the configuratation file.", default=bconf.CREDENTIALS)
|
||||
@click.option("--config", required=False)
|
||||
@click.option('-c', '--credentials', show_default=True, default='~/.bitter-credentials.json')
|
||||
@click.pass_context
|
||||
def main(ctx, verbose, logging_level, config, credentials):
|
||||
logging.basicConfig(level=getattr(logging, logging_level))
|
||||
ctx.obj = {}
|
||||
ctx.obj['VERBOSE'] = verbose
|
||||
bconf.CONFIG_FILE = config
|
||||
ctx.obj['CONFIG'] = config
|
||||
bconf.CREDENTIALS = credentials
|
||||
if os.path.exists(utils.get_config_path(credentials)):
|
||||
utils.copy_credentials_to_config(credentials, config)
|
||||
|
||||
|
||||
@main.group(invoke_without_command=True)
|
||||
@click.pass_context
|
||||
def credentials(ctx):
|
||||
if ctx.invoked_subcommand is not None:
|
||||
return
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
for worker in wq.queue:
|
||||
print('#'*20)
|
||||
try:
|
||||
resp = worker.client.application.rate_limit_status()
|
||||
print(worker.name)
|
||||
except Exception as ex:
|
||||
print('{}: AUTHENTICATION ERROR: {}'.format(worker.name, ex) )
|
||||
|
||||
|
||||
@credentials.command('limits')
|
||||
@click.option('--no_aggregate', is_flag=True, default=False,
|
||||
help=('Print limits from all workers. By default, limits are '
|
||||
'aggregated (summed).'))
|
||||
@click.option('--no_diff', is_flag=True, default=False,
|
||||
help=('Print all limits. By default, only limits that '
|
||||
'have been consumed will be shown.'))
|
||||
@click.argument('url', required=False)
|
||||
@click.pass_context
|
||||
def get_limits(ctx, no_aggregate, no_diff, url):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
limits = {}
|
||||
if url:
|
||||
print('URL is: {}'.format(url))
|
||||
for worker in wq.queue:
|
||||
resp = worker.client.application.rate_limit_status()
|
||||
for urlimits in resp['resources'].values():
|
||||
for url, value in urlimits.items():
|
||||
if url not in limits:
|
||||
limits[url] = {}
|
||||
glob = limits[url].get('global', {})
|
||||
limits[url][worker.name] = value
|
||||
for k in ['limit', 'remaining']:
|
||||
if k not in glob:
|
||||
glob[k] = 0
|
||||
glob[k] += value[k]
|
||||
limits[url]['global'] = glob
|
||||
for url, lims in limits.items():
|
||||
worker_list = lims.keys() if no_aggregate else ['global', ]
|
||||
|
||||
url_printed = False
|
||||
|
||||
for worker in worker_list:
|
||||
vals = lims[worker]
|
||||
consumed = vals['limit'] - vals['remaining']
|
||||
if no_diff or consumed:
|
||||
if not url_printed:
|
||||
print(url)
|
||||
url_printed = True
|
||||
print('\t', worker, ':')
|
||||
print('\t\t', vals)
|
||||
|
||||
|
||||
@credentials.command('add')
|
||||
@click.option('--consumer_key', default=None)
|
||||
@click.option('--consumer_secret', default=None)
|
||||
@click.option('--token_key', default=None)
|
||||
@click.option('--token_secret', default=None)
|
||||
@click.argument('user_name')
|
||||
def add(user_name, consumer_key, consumer_secret, token_key, token_secret):
|
||||
if not consumer_key:
|
||||
consumer_key = click.prompt('Please, enter your YOUR CONSUMER KEY')
|
||||
if not consumer_secret:
|
||||
consumer_secret = click.prompt('Please, enter your CONSUMER SECRET')
|
||||
if not token_key:
|
||||
token_key = click.prompt('Please, enter your ACCESS TOKEN')
|
||||
if not token_secret:
|
||||
token_secret = click.prompt('Please, enter your ACCESS TOKEN SECRET')
|
||||
utils.add_credentials(conffile=bconf.CONFIG_FILE, user=user_name, consumer_key=consumer_key, consumer_secret=consumer_secret,
|
||||
token_key=token_key, token_secret=token_secret)
|
||||
click.echo('Credentials added for {}'.format(user_name))
|
||||
|
||||
utils.create_credentials(credentials)
|
||||
|
||||
@main.group()
|
||||
@click.pass_context
|
||||
@click.pass_context
|
||||
def tweet(ctx):
|
||||
pass
|
||||
|
||||
@tweet.command('get')
|
||||
@click.option('-d', '--dry_run', is_flag=True, default=False)
|
||||
@click.option('-w', '--write', is_flag=True, default=False)
|
||||
@click.option('-f', '--folder', default="tweets")
|
||||
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
|
||||
@click.argument('tweetid')
|
||||
@serialize
|
||||
def get_tweet(tweetid, dry_run, folder, update):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
yield from utils.download_tweet(wq, tweetid, not dry_run, folder, update)
|
||||
|
||||
@tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file.
|
||||
The result is stored as individual json files in your folder of choice.''')
|
||||
@click.argument('tweetsfile')
|
||||
def get_tweet(tweetid, write, folder, update):
|
||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||
utils.download_tweet(wq, tweetid, write, folder, update)
|
||||
|
||||
@tweet.command('get_all')
|
||||
@click.argument('tweetsfile', 'File with a list of tweets to look up')
|
||||
@click.option('-f', '--folder', default="tweets")
|
||||
@click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!')
|
||||
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
|
||||
@click.option('-d', '--delimiter', default=",")
|
||||
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
|
||||
@click.option('--skip', help='Discard the first DISCARD lines (use them as a header)', default=0)
|
||||
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
|
||||
@click.option('-q', '--quotechar', default='"')
|
||||
@click.option('-c', '--column', type=int, default=0)
|
||||
@serialize
|
||||
@click.pass_context
|
||||
def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, nocache, skip, quotechar, commentchar, column):
|
||||
if update and not click.confirm('This may overwrite existing tweets. Continue?'):
|
||||
click.echo('Cancelling')
|
||||
return
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
|
||||
status = tqdm('Queried')
|
||||
failed = 0
|
||||
for tid, obj in utils.download_tweets_file(wq, tweetsfile, folder, delimiter=delimiter, cache=not nocache,
|
||||
skip=skip, quotechar=quotechar, commentchar=commentchar,
|
||||
column=column, update=update, retry_failed=retry):
|
||||
status.update(1)
|
||||
if not obj:
|
||||
failed += 1
|
||||
status.set_description('Failed: %s. Queried' % failed, refresh=True)
|
||||
continue
|
||||
yield obj
|
||||
|
||||
def get_tweets(ctx, tweetsfile, folder):
|
||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||
utils.download_tweets(wq, tweetsfile, folder)
|
||||
|
||||
@tweet.command('search')
|
||||
@click.argument('query')
|
||||
@serialize
|
||||
@click.pass_context
|
||||
@click.pass_context
|
||||
def search(ctx, query):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
yield from utils.search_tweet(wq, query)
|
||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||
t = utils.search_tweet(wq, query)
|
||||
print(json.dumps(t, indent=2))
|
||||
|
||||
@tweet.command('timeline')
|
||||
@click.argument('user')
|
||||
@click.pass_context
|
||||
@click.pass_context
|
||||
def timeline(ctx, user):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||
t = utils.user_timeline(wq, user)
|
||||
print(json.dumps(t, indent=2))
|
||||
|
||||
@ -228,48 +96,43 @@ def list_users(ctx, db):
|
||||
|
||||
@users.command('get')
|
||||
@click.argument('user')
|
||||
@click.option('-d', '--dry_run', is_flag=True, default=False)
|
||||
@click.option('-w', '--write', is_flag=True, default=False)
|
||||
@click.option('-f', '--folder', default="users")
|
||||
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
|
||||
@serialize
|
||||
def get_user(user, dry_run, folder, update):
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
yield from utils.download_user(wq, user, not dry_run, folder, update)
|
||||
|
||||
@users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file.
|
||||
The result is stored as individual json files in your folder of choice.''')
|
||||
@click.argument('usersfile')
|
||||
@click.option('-f', '--folder', default="users")
|
||||
@click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!')
|
||||
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
|
||||
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
|
||||
@click.option('-d', '--delimiter', default=",")
|
||||
@click.option('--skip', help='Discard the first SKIP lines (e.g., use them as a header)',
|
||||
is_flag=True, default=False)
|
||||
@click.option('-q', '--quotechar', default='"')
|
||||
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
|
||||
@click.option('-c', '--column', type=int, default=0)
|
||||
@serialize
|
||||
@click.pass_context
|
||||
def get_users(ctx, usersfile, folder, update, retry, nocache, delimiter, skip, quotechar, commentchar, column):
|
||||
if update and not click.confirm('This may overwrite existing users. Continue?'):
|
||||
click.echo('Cancelling')
|
||||
def get_user(user, write, folder, update):
|
||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||
if not write:
|
||||
u = utils.get_user(wq, user)
|
||||
js = json.dumps(u, indent=2)
|
||||
print(js)
|
||||
return
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
for i in utils.download_users_file(wq, usersfile, folder, delimiter=delimiter,
|
||||
update=update, retry_failed=retry,
|
||||
skip=skip, quotechar=quotechar,
|
||||
cache=not nocache,
|
||||
commentchar=commentchar,
|
||||
column=column):
|
||||
yield i
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
file = os.path.join(folder, '%s.json' % user)
|
||||
if not update and os.path.exists(file) and os.path.isfile(file):
|
||||
print('User exists: %s' % user)
|
||||
return
|
||||
with open(file, 'w') as f:
|
||||
u = utils.get_user(wq, user)
|
||||
js = json.dumps(u, indent=2)
|
||||
print(js, file=f)
|
||||
|
||||
@users.command('get_all')
|
||||
@click.argument('usersfile', 'File with a list of users to look up')
|
||||
@click.option('-f', '--folder', default="users")
|
||||
@click.pass_context
|
||||
def get_users(ctx, usersfile, folder):
|
||||
with open(usersfile) as f:
|
||||
for line in f:
|
||||
uid = line.strip()
|
||||
ctx.invoke(get_user, folder=folder, user=uid, write=True)
|
||||
|
||||
@users.command('crawl')
|
||||
@click.option('--db', required=True, help='Database to save all users.')
|
||||
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
|
||||
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
|
||||
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
|
||||
@click.argument('usersfile')
|
||||
@click.argument('usersfile', 'File with a list of users to look up')
|
||||
@click.pass_context
|
||||
def crawl_users(ctx, usersfile, skip, until, threads, db):
|
||||
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
|
||||
@ -283,7 +146,7 @@ def crawl_users(ctx, usersfile, skip, until, threads, db):
|
||||
return ExitStack()
|
||||
|
||||
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
|
||||
len(wq.queue)))
|
||||
|
||||
@ -374,9 +237,14 @@ def crawl_users(ctx, usersfile, skip, until, threads, db):
|
||||
speed = (collected-lastcollected)/10
|
||||
with statslock:
|
||||
lastcollected = collected
|
||||
|
||||
|
||||
logger.info('Done!')
|
||||
|
||||
@main.group('api')
|
||||
def api():
|
||||
pass
|
||||
|
||||
|
||||
@main.group('extractor')
|
||||
@click.pass_context
|
||||
@click.option('--db', required=True, help='Database of users.')
|
||||
@ -425,7 +293,7 @@ def network_extractor(ctx, as_json):
|
||||
if as_json:
|
||||
import json
|
||||
print(json.dumps(follower_map, indent=4))
|
||||
|
||||
|
||||
|
||||
@extractor.command('users')
|
||||
@click.pass_context
|
||||
@ -447,7 +315,7 @@ def users_extractor(ctx):
|
||||
@click.pass_context
|
||||
def extract(ctx, recursive, user, name, initfile):
|
||||
print(locals())
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||
dburi = ctx.obj['DBURI']
|
||||
utils.extract(wq,
|
||||
recursive=recursive,
|
||||
@ -459,41 +327,31 @@ def extract(ctx, recursive, user, name, initfile):
|
||||
@extractor.command('reset')
|
||||
@click.pass_context
|
||||
def reset_extractor(ctx):
|
||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||
db = ctx.obj['DBURI']
|
||||
session = make_session(db)
|
||||
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
|
||||
|
||||
|
||||
@main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False),
|
||||
help='''Issue a call to an endpoint of the Twitter API.''')
|
||||
@click.argument('cmd', nargs=1)
|
||||
@click.option('--tweets', is_flag=True, help='Fetch more tweets using smart pagination. Use --count to control how many tweets to fetch per call, and --max_count to set the number of desired tweets (or -1 to get as many as possible).', type=bool, default=False)
|
||||
@click.option('--users', is_flag=True, help='Fetch more users using smart pagination. Use --count to control how many users to fetch per call, and --max_count to set the number of desired users (or -1 to get as many as possible).', type=bool, default=False)
|
||||
@click.argument('api_args', nargs=-1, type=click.UNPROCESSED)
|
||||
@api.command('limits')
|
||||
@click.argument('url', required=False)
|
||||
@click.pass_context
|
||||
def api(ctx, cmd, tweets, users, api_args):
|
||||
opts = {}
|
||||
mappings = {
|
||||
'id': '_id'
|
||||
}
|
||||
i = iter(api_args)
|
||||
for k, v in zip(i, i):
|
||||
k = k.replace('--', '')
|
||||
if k in mappings:
|
||||
k = mappings[k]
|
||||
opts[k] = v
|
||||
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
|
||||
if tweets:
|
||||
resp = utils.consume_tweets(wq[cmd], **opts)
|
||||
elif users:
|
||||
resp = utils.consume_users(wq[cmd], **opts)
|
||||
else:
|
||||
resp = wq[cmd](**opts)
|
||||
print(json.dumps(resp))
|
||||
return
|
||||
for i in resp:
|
||||
print(json.dumps(i))
|
||||
|
||||
def get_limits(ctx, url):
|
||||
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
|
||||
for worker in wq.queue:
|
||||
resp = worker.client.application.rate_limit_status()
|
||||
print('#'*20)
|
||||
print(worker.name)
|
||||
if url:
|
||||
limit = 'NOT FOUND'
|
||||
print('URL is: {}'.format(url))
|
||||
cat = url.split('/')[1]
|
||||
if cat in resp['resources']:
|
||||
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
|
||||
else:
|
||||
print('Cat {} not found'.format(cat))
|
||||
print('{}: {}'.format(url, limit))
|
||||
else:
|
||||
print(json.dumps(resp, indent=2))
|
||||
|
||||
@main.command('server')
|
||||
@click.argument('CONSUMER_KEY', required=True)
|
||||
@ -504,20 +362,20 @@ def run_server(ctx, consumer_key, consumer_secret):
|
||||
bconf.CONSUMER_SECRET = consumer_secret
|
||||
from .webserver import app
|
||||
app.run(host='0.0.0.0')
|
||||
|
||||
|
||||
@main.group()
|
||||
@click.pass_context
|
||||
@click.pass_context
|
||||
def stream(ctx):
|
||||
pass
|
||||
|
||||
@stream.command('get')
|
||||
@click.option('-l', '--locations', default=None)
|
||||
@click.option('-t', '--track', default=None)
|
||||
@click.option('-f', '--file', default=None, help='File to store the stream of tweets')
|
||||
@click.option('-f', '--file', help='File to store the stream of tweets')
|
||||
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
|
||||
@click.pass_context
|
||||
@click.pass_context
|
||||
def get_stream(ctx, locations, track, file, politelyretry):
|
||||
wq = crawlers.StreamQueue.from_config(conffile=bconf.CONFIG_FILE, max_workers=1)
|
||||
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
|
||||
|
||||
query_args = {}
|
||||
if locations:
|
||||
@ -536,14 +394,10 @@ def get_stream(ctx, locations, track, file, politelyretry):
|
||||
iterator = wq.statuses.sample()
|
||||
else:
|
||||
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
|
||||
try:
|
||||
for i in iterator:
|
||||
yield i
|
||||
if not politelyretry:
|
||||
return
|
||||
except Exception:
|
||||
if not politelyretry:
|
||||
raise ex
|
||||
for i in iterator:
|
||||
yield i
|
||||
if not politelyretry:
|
||||
return
|
||||
thishangup = time.time()
|
||||
if thishangup - lasthangup < 60:
|
||||
raise Exception('Too many hangups in a row.')
|
||||
@ -557,23 +411,23 @@ def get_stream(ctx, locations, track, file, politelyretry):
|
||||
@stream.command('read')
|
||||
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
|
||||
@click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False)
|
||||
@click.pass_context
|
||||
@click.pass_context
|
||||
def read_stream(ctx, file, tail):
|
||||
for tweet in utils.read_file(file, tail=tail):
|
||||
try:
|
||||
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['created_at'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
|
||||
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
|
||||
except (KeyError, TypeError):
|
||||
print('Raw tweet: {}'.format(tweet))
|
||||
|
||||
@stream.command('tags')
|
||||
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
|
||||
@click.argument('limit', required=False, default=None, type=int)
|
||||
@click.pass_context
|
||||
@click.pass_context
|
||||
def tags_stream(ctx, file, limit):
|
||||
c = utils.get_hashtags(utils.read_file(file))
|
||||
for count, tag in c.most_common(limit):
|
||||
print(u'{} - {}'.format(count, tag))
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
@ -3,7 +3,7 @@ Common configuration for other modules.
|
||||
It is not elegant, but it works with flask and the oauth decorators.
|
||||
|
||||
Using this module allows you to change the config before loading any other module.
|
||||
E.g.:
|
||||
E.g.:
|
||||
|
||||
import bitter.config as c
|
||||
c.CREDENTIALS="/tmp/credentials"
|
||||
@ -11,4 +11,3 @@ E.g.:
|
||||
app.run()
|
||||
'''
|
||||
CREDENTIALS = '~/.bitter-credentials.json'
|
||||
CONFIG_FILE = '~/.bitter.yaml'
|
||||
|
@ -10,7 +10,6 @@ from twitter import *
|
||||
from collections import OrderedDict
|
||||
from threading import Lock
|
||||
from itertools import islice
|
||||
from functools import partial
|
||||
try:
|
||||
import itertools.ifilter as filter
|
||||
except ImportError:
|
||||
@ -39,9 +38,6 @@ class AttrToFunc(object):
|
||||
else:
|
||||
return extend_call(k)
|
||||
|
||||
def __getitem__(self, k):
|
||||
return partial(self.handler, self.__uriparts+k.split('/'))
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
# for i, a in enumerate(args)e
|
||||
# kwargs[i] = a
|
||||
@ -58,18 +54,6 @@ class FromCredentialsMixin(object):
|
||||
wq.ready(cls.worker_class(cred["user"], cred))
|
||||
return wq
|
||||
|
||||
class FromConfigMixin(object):
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, config=None, conffile=None, max_workers=None):
|
||||
wq = cls()
|
||||
|
||||
if not config:
|
||||
with utils.config(conffile) as c:
|
||||
config = c
|
||||
for cred in islice(config['credentials'], max_workers):
|
||||
wq.ready(cls.worker_class(cred["user"], cred))
|
||||
return wq
|
||||
|
||||
class TwitterWorker(object):
|
||||
api_class = None
|
||||
@ -91,12 +75,6 @@ class TwitterWorker(object):
|
||||
self._client = self.api_class(auth=auth)
|
||||
return self._client
|
||||
|
||||
def __repr__(self):
|
||||
msg = '<{} for {}>'.format(self.__class__.__name__, self.name)
|
||||
if self.busy:
|
||||
msg += ' [busy]'
|
||||
return msg
|
||||
|
||||
class RestWorker(TwitterWorker):
|
||||
api_class = Twitter
|
||||
|
||||
@ -115,14 +93,13 @@ class RestWorker(TwitterWorker):
|
||||
|
||||
def get_wait(self, uriparts):
|
||||
limits = self.get_limit(uriparts)
|
||||
if limits.get('remaining', 1) > 0:
|
||||
if limits['remaining'] > 0:
|
||||
return 0
|
||||
reset = limits.get('reset', 0)
|
||||
now = time.time()
|
||||
return max(0, (reset-now))
|
||||
|
||||
def get_limit(self, uriparts):
|
||||
uriparts = list(u for u in uriparts if u)
|
||||
uri = '/'+'/'.join(uriparts)
|
||||
for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items():
|
||||
if ix.startswith(uri):
|
||||
@ -155,7 +132,7 @@ class RestWorker(TwitterWorker):
|
||||
class QueueException(BaseException):
|
||||
pass
|
||||
|
||||
class QueueMixin(AttrToFunc, FromCredentialsMixin, FromConfigMixin):
|
||||
class QueueMixin(AttrToFunc, FromCredentialsMixin):
|
||||
def __init__(self, wait=True):
|
||||
logger.debug('Creating worker queue')
|
||||
self.queue = set()
|
||||
|
@ -3,13 +3,10 @@ import json
|
||||
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.types import BigInteger, Integer, Text, Boolean
|
||||
from sqlalchemy.schema import ForeignKey
|
||||
from sqlalchemy.pool import SingletonThreadPool
|
||||
from sqlalchemy import Column, Index
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from functools import wraps
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
@ -88,48 +85,34 @@ class ExtractorEntry(Base):
|
||||
user = Column(BigInteger, index=True)
|
||||
cursor = Column(BigInteger, default=-1)
|
||||
pending = Column(Boolean, default=False)
|
||||
errors = Column(Text, default="")
|
||||
busy = Column(Boolean, default=False)
|
||||
|
||||
|
||||
class Search(Base):
|
||||
__tablename__ = 'search_queries'
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True, unique=True)
|
||||
endpoint = Column(Text, comment="Endpoint URL")
|
||||
attrs = Column(Text, comment="Text version of the dictionary of parameters")
|
||||
count = Column(Integer)
|
||||
current_count = Column(Integer)
|
||||
current_id = Column(BigInteger, comment='Oldest ID retrieved (should match max_id when done)')
|
||||
since_id = Column(BigInteger)
|
||||
|
||||
class SearchResults(Base):
|
||||
__tablename__ = 'search_results'
|
||||
id = Column(Integer, primary_key=True, index=True, unique=True)
|
||||
search_id = Column(ForeignKey('search_queries.id'))
|
||||
resource_id = Column(Text)
|
||||
|
||||
def memoize(f):
|
||||
memo = {}
|
||||
@wraps(f)
|
||||
def helper(self, **kwargs):
|
||||
st = dict_to_str(kwargs)
|
||||
key = (self.__uriparts, st)
|
||||
if key not in memo:
|
||||
memo[key] = f(self, **kwargs)
|
||||
return memo[key]
|
||||
return helper
|
||||
|
||||
def make_session(url):
|
||||
if not isinstance(url, str):
|
||||
print(url)
|
||||
raise Exception("FUCK")
|
||||
engine = create_engine(url, poolclass=SingletonThreadPool)#, echo=True)
|
||||
engine = create_engine(url)#, echo=True)
|
||||
Base.metadata.create_all(engine)
|
||||
Session = sessionmaker(bind=engine)
|
||||
session = Session()
|
||||
return session
|
||||
|
||||
|
||||
|
||||
def dict_to_str(args):
|
||||
return json.dumps(args, sort_keys=True)
|
||||
def test(db='sqlite:///users.db'):
|
||||
|
||||
from sqlalchemy import exists
|
||||
session = make_session(db)
|
||||
|
||||
our_user = session.query(User).first()
|
||||
|
||||
print(our_user.name)
|
||||
print(session.query(User).count())
|
||||
fake_user = User(name="Fake user")
|
||||
session.add(fake_user)
|
||||
session.commit()
|
||||
print(session.query(User).count())
|
||||
print(session.query(exists().where(User.name == "Fake user")).scalar())
|
||||
fake_committed = session.query(User).filter_by(name="Fake user").first()
|
||||
print(fake_committed.id)
|
||||
print(fake_committed.name)
|
||||
session.delete(fake_committed)
|
||||
session.commit()
|
||||
print(session.query(User).count())
|
||||
print(list(session.execute('SELECT 1 from users where id=\'%s\'' % 1548)))
|
||||
|
833
bitter/utils.py
833
bitter/utils.py
@ -3,9 +3,6 @@ from __future__ import print_function
|
||||
import logging
|
||||
import time
|
||||
import json
|
||||
import yaml
|
||||
import csv
|
||||
import io
|
||||
|
||||
import signal
|
||||
import sys
|
||||
@ -13,23 +10,18 @@ import sqlalchemy
|
||||
import os
|
||||
import multiprocessing
|
||||
from multiprocessing.pool import ThreadPool
|
||||
from multiprocessing import Queue
|
||||
|
||||
import queue
|
||||
import threading
|
||||
from select import select
|
||||
|
||||
import operator
|
||||
|
||||
from functools import partial, reduce
|
||||
|
||||
from tqdm import tqdm
|
||||
|
||||
from itertools import islice, chain
|
||||
from contextlib import contextmanager
|
||||
|
||||
try:
|
||||
from itertools import izip_longest
|
||||
except ImportError:
|
||||
from itertools import zip_longest
|
||||
|
||||
from collections import Counter
|
||||
from random import choice
|
||||
|
||||
from builtins import map, filter
|
||||
|
||||
@ -39,12 +31,6 @@ from bitter.models import Following, User, ExtractorEntry, make_session
|
||||
|
||||
from bitter import config
|
||||
|
||||
# Fix Python 2.x.
|
||||
try:
|
||||
UNICODE_EXISTS = bool(type(unicode))
|
||||
except NameError:
|
||||
unicode = lambda s: str(s)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -52,100 +38,38 @@ def signal_handler(signal, frame):
|
||||
logger.info('You pressed Ctrl+C!')
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def chunk(iterable, n):
|
||||
it = iter(iterable)
|
||||
return iter(lambda: tuple(islice(it, n)), ())
|
||||
|
||||
|
||||
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
|
||||
source = chunk(source, chunksize)
|
||||
p = ThreadPool(numcpus*2)
|
||||
|
||||
def wrapped_func(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except Exception as ex:
|
||||
print('Exception on parallel thread: {}'.format(ex), file=sys.stderr)
|
||||
|
||||
results = p.imap_unordered(wrapped_func, source)
|
||||
for i in chain.from_iterable(results):
|
||||
for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))):
|
||||
yield i
|
||||
|
||||
|
||||
def get_config_path(conf=None):
|
||||
if not conf:
|
||||
if config.CONFIG_FILE:
|
||||
conf = config.CONFIG_FILE
|
||||
def get_credentials_path(credfile=None):
|
||||
if not credfile:
|
||||
if config.CREDENTIALS:
|
||||
credfile = config.CREDENTIALS
|
||||
else:
|
||||
raise Exception('No valid config file')
|
||||
return os.path.expanduser(conf)
|
||||
|
||||
|
||||
def copy_credentials_to_config(credfile, conffile=None):
|
||||
p = get_config_path(credfile)
|
||||
with open(p) as old:
|
||||
for line in old:
|
||||
cred = json.loads(line.strip())
|
||||
add_credentials(conffile, **cred)
|
||||
|
||||
|
||||
def save_config(conf, conffile=None):
|
||||
with config(conffile) as c:
|
||||
c.clear()
|
||||
c.update(conf)
|
||||
|
||||
raise Exception('No valid credentials file')
|
||||
return os.path.expanduser(credfile)
|
||||
|
||||
@contextmanager
|
||||
def config(conffile=None):
|
||||
d = read_config(conffile)
|
||||
try:
|
||||
yield d
|
||||
finally:
|
||||
write_config(d, conffile)
|
||||
def credentials_file(credfile, *args, **kwargs):
|
||||
p = get_credentials_path(credfile)
|
||||
with open(p, *args, **kwargs) as f:
|
||||
yield f
|
||||
|
||||
def iter_credentials(credfile=None):
|
||||
with credentials_file(credfile) as f:
|
||||
for l in f:
|
||||
yield json.loads(l.strip())
|
||||
|
||||
def read_config(conffile):
|
||||
p = conffile and get_config_path(conffile)
|
||||
if p:
|
||||
if not os.path.exists(p):
|
||||
raise IOError('{} file does not exist.'.format(p))
|
||||
f = open(p, 'r')
|
||||
elif 'BITTER_CONFIG' not in os.environ:
|
||||
raise Exception('No config file or BITTER_CONFIG env variable.')
|
||||
else:
|
||||
f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n'))
|
||||
return yaml.load(f, Loader=yaml.SafeLoader) or {'credentials': []}
|
||||
|
||||
|
||||
def write_config(conf, conffile=None):
|
||||
if not conf:
|
||||
conf = {'credentials': []}
|
||||
if conffile:
|
||||
p = get_config_path(conffile)
|
||||
with open(p, 'w') as f:
|
||||
yaml.dump(conf, f)
|
||||
else:
|
||||
os.environ['BITTER_CONFIG'] = yaml.dump(conf)
|
||||
|
||||
def iter_credentials(conffile=None):
|
||||
with config(conffile) as c:
|
||||
for i in c['credentials']:
|
||||
yield i
|
||||
|
||||
|
||||
def create_config_file(conffile=None):
|
||||
if not conffile:
|
||||
return
|
||||
conffile = get_config_path(conffile)
|
||||
with open(conffile, 'a'):
|
||||
pass
|
||||
write_config(None, conffile)
|
||||
|
||||
|
||||
def get_credentials(conffile=None, inverse=False, **kwargs):
|
||||
def get_credentials(credfile=None, inverse=False, **kwargs):
|
||||
creds = []
|
||||
for i in iter_credentials(conffile):
|
||||
for i in iter_credentials(credfile):
|
||||
matches = all(map(lambda x: i[x[0]] == x[1], kwargs.items()))
|
||||
if matches and not inverse:
|
||||
creds.append(i)
|
||||
@ -153,23 +77,24 @@ def get_credentials(conffile=None, inverse=False, **kwargs):
|
||||
creds.append(i)
|
||||
return creds
|
||||
|
||||
def create_credentials(credfile=None):
|
||||
credfile = get_credentials_path(credfile)
|
||||
with credentials_file(credfile, 'a'):
|
||||
pass
|
||||
|
||||
def delete_credentials(conffile=None, **creds):
|
||||
tokeep = get_credentials(conffile, inverse=True, **creds)
|
||||
with config(conffile) as c:
|
||||
c['credentials'] = list(tokeep)
|
||||
def delete_credentials(credfile=None, **creds):
|
||||
tokeep = get_credentials(credfile, inverse=True, **creds)
|
||||
with credentials_file(credfile, 'w') as f:
|
||||
for i in tokeep:
|
||||
f.write(json.dumps(i))
|
||||
f.write('\n')
|
||||
|
||||
|
||||
def add_credentials(conffile=None, **creds):
|
||||
try:
|
||||
exist = get_credentials(conffile, **creds)
|
||||
except IOError:
|
||||
exist = False
|
||||
create_config_file(conffile)
|
||||
if exist:
|
||||
return
|
||||
with config(conffile) as c:
|
||||
c['credentials'].append(creds)
|
||||
def add_credentials(credfile=None, **creds):
|
||||
exist = get_credentials(credfile, **creds)
|
||||
if not exist:
|
||||
with credentials_file(credfile, 'a') as f:
|
||||
f.write(json.dumps(creds))
|
||||
f.write('\n')
|
||||
|
||||
|
||||
def get_hashtags(iter_tweets, best=None):
|
||||
@ -178,27 +103,19 @@ def get_hashtags(iter_tweets, best=None):
|
||||
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
|
||||
return c
|
||||
|
||||
|
||||
def read_file(filename, tail=False):
|
||||
if filename == '-':
|
||||
f = sys.stdin
|
||||
else:
|
||||
f = open(filename)
|
||||
try:
|
||||
while True:
|
||||
line = f.readline()
|
||||
if line not in (None, '', '\n'):
|
||||
tweet = json.loads(line.strip())
|
||||
yield tweet
|
||||
else:
|
||||
if tail:
|
||||
time.sleep(1)
|
||||
else:
|
||||
return
|
||||
finally:
|
||||
if f != sys.stdin:
|
||||
close(f)
|
||||
|
||||
with open(filename) as f:
|
||||
while True:
|
||||
line = f.readline()
|
||||
if line not in (None, '', '\n'):
|
||||
tweet = json.loads(line.strip())
|
||||
yield tweet
|
||||
else:
|
||||
if tail:
|
||||
time.sleep(1)
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
|
||||
t = 'name' if by_name else 'uid'
|
||||
@ -227,7 +144,6 @@ def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
|
||||
else:
|
||||
yield user
|
||||
|
||||
|
||||
def trim_user(user):
|
||||
if 'status' in user:
|
||||
del user['status']
|
||||
@ -241,22 +157,14 @@ def trim_user(user):
|
||||
return user
|
||||
|
||||
|
||||
def add_user(user, dburi=None, session=None, update=False):
|
||||
if not session:
|
||||
session = make_session(dburi)
|
||||
|
||||
def add_user(session, user, enqueue=False):
|
||||
user = trim_user(user)
|
||||
olduser = session.query(User).filter(User.id == user['id'])
|
||||
olduser = session.query(User).filter(User.id==user['id'])
|
||||
if olduser:
|
||||
if not update:
|
||||
return
|
||||
olduser.delete()
|
||||
nuser = User()
|
||||
for key, value in user.items():
|
||||
setattr(nuser, key, value)
|
||||
user = nuser
|
||||
if update:
|
||||
session.add(user)
|
||||
user = User(**user)
|
||||
session.add(user)
|
||||
if extract:
|
||||
logger.debug('Adding entry')
|
||||
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
|
||||
if not entry:
|
||||
@ -266,201 +174,131 @@ def add_user(user, dburi=None, session=None, update=False):
|
||||
entry.pending = True
|
||||
entry.cursor = -1
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
|
||||
def download_entry(wq, entry_id, dburi=None, recursive=False):
|
||||
session = make_session(dburi)
|
||||
if not session:
|
||||
raise Exception("Provide dburi or session")
|
||||
logger.info("Downloading entry: %s (%s)" % (entry_id, type(entry_id)))
|
||||
entry = session.query(ExtractorEntry).filter(ExtractorEntry.id==entry_id).first()
|
||||
user = session.query(User).filter(User.id == entry.user).first()
|
||||
download_user(wq, session, user, entry, recursive)
|
||||
session.close()
|
||||
|
||||
def crawl_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
|
||||
|
||||
total_followers = user.followers_count
|
||||
|
||||
if total_followers > max_followers:
|
||||
entry.pending = False
|
||||
logger.info("Too many followers for user: %s" % user.screen_name)
|
||||
session.add(entry)
|
||||
session.commit()
|
||||
return
|
||||
|
||||
if not entry:
|
||||
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() or ExtractorEntry(user=user.id)
|
||||
session.add(entry)
|
||||
session.commit()
|
||||
|
||||
pending = True
|
||||
cursor = entry.cursor
|
||||
uid = user.id
|
||||
name = user.name
|
||||
|
||||
logger.info("#"*20)
|
||||
logger.info("Getting %s - %s" % (uid, name))
|
||||
logger.info("Cursor %s" % cursor)
|
||||
logger.info("Using account: %s" % wq.name)
|
||||
|
||||
_fetched_followers = 0
|
||||
|
||||
def fetched_followers():
|
||||
return session.query(Following).filter(Following.isfollowed==uid).count()
|
||||
|
||||
attempts = 0
|
||||
while cursor > 0 or fetched_followers() < total_followers:
|
||||
try:
|
||||
resp = wq.followers.ids(user_id=uid, cursor=cursor)
|
||||
except TwitterHTTPError as ex:
|
||||
attempts += 1
|
||||
if ex.e.code in (401, ) or attempts > 3:
|
||||
logger.info('Not authorized for user: {}'.format(uid))
|
||||
entry.errors = ex.message
|
||||
break
|
||||
if 'ids' not in resp:
|
||||
logger.info("Error with id %s %s" % (uid, resp))
|
||||
entry.pending = False
|
||||
entry.errors = "No ids in response: %s" % resp
|
||||
break
|
||||
|
||||
logger.info("New followers: %s" % len(resp['ids']))
|
||||
if recursive:
|
||||
newusers = get_users(wq, resp)
|
||||
for newuser in newusers:
|
||||
add_user(session=session, user=newuser)
|
||||
|
||||
if 'ids' not in resp or not resp['ids']:
|
||||
logger.info('NO IDS in response')
|
||||
break
|
||||
for i in resp['ids']:
|
||||
existing_user = session.query(Following).\
|
||||
filter(Following.isfollowed == uid).\
|
||||
filter(Following.follower == i).first()
|
||||
now = int(time.time())
|
||||
if existing_user:
|
||||
existing_user.created_at_stamp = now
|
||||
else:
|
||||
f = Following(isfollowed=uid,
|
||||
follower=i,
|
||||
created_at_stamp=now)
|
||||
session.add(f)
|
||||
|
||||
logger.info("Fetched: %s/%s followers" % (fetched_followers(),
|
||||
total_followers))
|
||||
entry.cursor = resp["next_cursor"]
|
||||
|
||||
session.add(entry)
|
||||
session.commit()
|
||||
|
||||
logger.info("Done getting followers for %s" % uid)
|
||||
|
||||
entry.pending = False
|
||||
entry.busy = False
|
||||
session.add(entry)
|
||||
session.commit()
|
||||
|
||||
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def classify_user(id_or_name, screen_names, user_ids):
|
||||
try:
|
||||
int(id_or_name)
|
||||
user_ids.append(id_or_name)
|
||||
logger.debug("Added user id")
|
||||
except ValueError:
|
||||
logger.debug("Added screen_name")
|
||||
screen_names.append(id_or_name.split('@')[-1])
|
||||
|
||||
|
||||
# TODO: adapt to the crawler
|
||||
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
w = wq.next()
|
||||
if not dburi:
|
||||
dburi = 'sqlite:///%s.db' % extractor_name
|
||||
|
||||
session = make_session(dburi)
|
||||
session.query(ExtractorEntry).update({ExtractorEntry.busy: False})
|
||||
session.commit()
|
||||
|
||||
screen_names = []
|
||||
user_ids = []
|
||||
|
||||
if not (user or initfile):
|
||||
logger.info('Using pending users from last session')
|
||||
def classify_user(id_or_name):
|
||||
try:
|
||||
int(user)
|
||||
user_ids.append(user)
|
||||
logger.info("Added user id")
|
||||
except ValueError:
|
||||
logger.info("Added screen_name")
|
||||
screen_names.append(user.split('@')[-1])
|
||||
|
||||
if user:
|
||||
classify_user(user)
|
||||
|
||||
elif initfile:
|
||||
logger.info("No user. I will open %s" % initfile)
|
||||
with open(initfile, 'r') as f:
|
||||
for line in f:
|
||||
user = line.strip().split(',')[0]
|
||||
classify_user(user)
|
||||
else:
|
||||
screen_names = []
|
||||
user_ids = []
|
||||
if user:
|
||||
classify_user(user, screen_names, user_ids)
|
||||
elif initfile:
|
||||
logger.info("No user. I will open %s" % initfile)
|
||||
with open(initfile, 'r') as f:
|
||||
for line in f:
|
||||
user = line.strip().split(',')[0]
|
||||
classify_user(user, screen_names, user_ids)
|
||||
logger.info('Using pending users from last session')
|
||||
|
||||
def missing_user(ix, column=User.screen_name):
|
||||
res = session.query(User).filter(column == ix).count() == 0
|
||||
if res:
|
||||
logger.info("Missing user %s. Count: %s" % (ix, res))
|
||||
return res
|
||||
|
||||
screen_names = list(filter(missing_user, screen_names))
|
||||
user_ids = list(filter(partial(missing_user, column=User.id_str), user_ids))
|
||||
nusers = []
|
||||
logger.info("Missing user ids: %s" % user_ids)
|
||||
logger.info("Missing screen names: %s" % screen_names)
|
||||
if screen_names:
|
||||
nusers = list(get_users(wq, screen_names, by_name=True))
|
||||
if user_ids:
|
||||
nusers += list(get_users(wq, user_ids, by_name=False))
|
||||
nusers = list(get_users(wq, screen_names, by_name=True))
|
||||
if user_ids:
|
||||
nusers += list(get_users(wq, user_ids, by_name=False))
|
||||
|
||||
for i in nusers:
|
||||
add_user(dburi=dburi, user=i)
|
||||
for i in nusers:
|
||||
add_user(session, i, enqueue=True)
|
||||
|
||||
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
|
||||
logger.info('Total users: {}'.format(total_users))
|
||||
def pending_entries():
|
||||
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
|
||||
logger.info('Pending: {}'.format(pending))
|
||||
return pending
|
||||
|
||||
de = partial(download_entry, wq, dburi=dburi)
|
||||
pending = pending_entries(dburi)
|
||||
session.close()
|
||||
|
||||
with tqdm(parallel(de, pending), desc='Downloading users', total=total_users) as tq:
|
||||
for i in tq:
|
||||
tq.write('Got {}'.format(i))
|
||||
logger.info("Got %s" % i)
|
||||
|
||||
|
||||
|
||||
def pending_entries(dburi):
|
||||
session = make_session(dburi)
|
||||
while True:
|
||||
while pending_entries() > 0:
|
||||
logger.info("Using account: %s" % w.name)
|
||||
candidate, entry = session.query(User, ExtractorEntry).\
|
||||
filter(ExtractorEntry.user == User.id).\
|
||||
filter(ExtractorEntry.pending == True).\
|
||||
filter(ExtractorEntry.busy == False).\
|
||||
order_by(User.followers_count).first()
|
||||
if candidate:
|
||||
entry.busy = True
|
||||
session.add(entry)
|
||||
session.commit()
|
||||
yield int(entry.id)
|
||||
continue
|
||||
if session.query(ExtractorEntry).\
|
||||
filter(ExtractorEntry.busy == True).count() > 0:
|
||||
time.sleep(1)
|
||||
continue
|
||||
logger.info("No more pending entries")
|
||||
break
|
||||
session.close()
|
||||
filter(ExtractorEntry.user == User.id).\
|
||||
filter(ExtractorEntry.pending == True).\
|
||||
order_by(User.followers_count).first()
|
||||
if not candidate:
|
||||
break
|
||||
pending = True
|
||||
cursor = entry.cursor
|
||||
uid = candidate.id
|
||||
uobject = session.query(User).filter(User.id==uid).first()
|
||||
name = uobject.screen_name if uobject else None
|
||||
|
||||
logger.info("#"*20)
|
||||
logger.info("Getting %s - %s" % (uid, name))
|
||||
logger.info("Cursor %s" % cursor)
|
||||
logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users))
|
||||
try:
|
||||
resp = wq.followers.ids(user_id=uid, cursor=cursor)
|
||||
except TwitterHTTPError as ex:
|
||||
if ex.e.code in (401, ):
|
||||
logger.info('Not authorized for user: {}'.format(uid))
|
||||
resp = {}
|
||||
if 'ids' in resp:
|
||||
logger.info("New followers: %s" % len(resp['ids']))
|
||||
if recursive:
|
||||
newusers = get_users(wq, resp)
|
||||
for user in newusers:
|
||||
add_user(session, newuser, enqueue=True)
|
||||
for i in resp['ids']:
|
||||
existing_user = session.query(Following).\
|
||||
filter(Following.isfollowed==uid).\
|
||||
filter(Following.follower==i).first()
|
||||
now = int(time.time())
|
||||
if existing_user:
|
||||
existing_user.created_at_stamp = now
|
||||
else:
|
||||
f = Following(isfollowed=uid,
|
||||
follower=i,
|
||||
created_at_stamp=now)
|
||||
session.add(f)
|
||||
|
||||
total_followers = candidate.followers_count
|
||||
fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count()
|
||||
logger.info("Fetched: %s/%s followers" % (fetched_followers,
|
||||
total_followers))
|
||||
cursor = resp["next_cursor"]
|
||||
if cursor > 0:
|
||||
pending = True
|
||||
logger.info("Getting more followers for %s" % uid)
|
||||
else:
|
||||
logger.info("Done getting followers for %s" % uid)
|
||||
cursor = -1
|
||||
pending = False
|
||||
else:
|
||||
logger.info("Error with id %s %s" % (uid, resp))
|
||||
pending = False
|
||||
|
||||
entry.pending = pending
|
||||
entry.cursor = cursor
|
||||
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
|
||||
|
||||
session.add(candidate)
|
||||
session.commit()
|
||||
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def get_tweet(c, tid):
|
||||
return c.statuses.show(id=tid)
|
||||
|
||||
def search_tweet(c, query):
|
||||
yield from c.search.tweets(q=query)['statuses']
|
||||
return c.search.tweets(q=query)
|
||||
|
||||
def user_timeline(c, query):
|
||||
try:
|
||||
@ -475,329 +313,84 @@ def get_user(c, user):
|
||||
except ValueError:
|
||||
return c.users.lookup(screen_name=user)[0]
|
||||
|
||||
def download_tweet(wq, tweetid, cache=True, folder="downloaded_tweets", update=False):
|
||||
tweet = cached_id(tweetid, folder)
|
||||
if update or not tweet:
|
||||
tweet = get_tweet(wq, tweetid)
|
||||
if cache and update:
|
||||
if tweet:
|
||||
js = json.dumps(tweet)
|
||||
write_json(js, folder)
|
||||
yield tweet
|
||||
|
||||
|
||||
def download_user(wq, userid, cache=True, folder="downloaded_users", update=False):
|
||||
user = cached_id(userid, folder)
|
||||
if update or not user:
|
||||
user = get_user(wq, userid)
|
||||
if cache and update:
|
||||
if user:
|
||||
write_json(user, folder, aliases=[user['screen_name'], ])
|
||||
yield user
|
||||
|
||||
|
||||
def cached_id(oid, folder):
|
||||
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
|
||||
cached = cached_tweet(tweetid, folder)
|
||||
tweet = None
|
||||
file = os.path.join(folder, '%s.json' % oid)
|
||||
if update or not cached:
|
||||
tweet = get_tweet(wq, tweetid)
|
||||
js = json.dumps(tweet, indent=2)
|
||||
if write:
|
||||
if tweet:
|
||||
write_tweet_json(js, folder)
|
||||
else:
|
||||
print(js)
|
||||
|
||||
|
||||
def cached_tweet(tweetid, folder):
|
||||
tweet = None
|
||||
file = os.path.join(folder, '%s.json' % tweetid)
|
||||
if os.path.exists(file) and os.path.isfile(file):
|
||||
try:
|
||||
# print('%s: Object exists' % oid)
|
||||
# print('%s: Tweet exists' % tweetid)
|
||||
with open(file) as f:
|
||||
tweet = json.load(f)
|
||||
except Exception as ex:
|
||||
logger.error('Error getting cached version of {}: {}'.format(oid, ex))
|
||||
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
|
||||
return tweet
|
||||
|
||||
def write_json(js, folder, oid=None, aliases=[]):
|
||||
if not oid:
|
||||
oid = js['id']
|
||||
file = id_file(oid, folder)
|
||||
def write_tweet_json(js, folder):
|
||||
tweetid = js['id']
|
||||
file = tweet_file(tweetid, folder)
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
with open(file, 'w') as f:
|
||||
json.dump(js, f)
|
||||
logger.info('Written {} to file {}'.format(oid, file))
|
||||
for alias in aliases:
|
||||
os.symlink('%s.json' % oid, id_file(alias, folder))
|
||||
json.dump(js, f, indent=2)
|
||||
logger.info('Written {} to file {}'.format(tweetid, file))
|
||||
|
||||
def id_file(oid, folder):
|
||||
return os.path.join(folder, '%s.json' % oid)
|
||||
def tweet_file(tweetid, folder):
|
||||
return os.path.join(folder, '%s.json' % tweetid)
|
||||
|
||||
def fail_file(oid, folder):
|
||||
def tweet_fail_file(tweetid, folder):
|
||||
failsfolder = os.path.join(folder, 'failed')
|
||||
if not os.path.exists(failsfolder):
|
||||
os.makedirs(failsfolder)
|
||||
return os.path.join(failsfolder, '%s.failed' % oid)
|
||||
return os.path.join(failsfolder, '%s.failed' % tweetid)
|
||||
|
||||
def id_failed(oid, folder):
|
||||
return os.path.isfile(fail_file(oid, folder))
|
||||
def tweet_failed(tweetid, folder):
|
||||
return os.path.isfile(tweet_fail_file(tweetid, folder))
|
||||
|
||||
def tweet_download_batch(wq, batch):
|
||||
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
|
||||
for tid, tweet in tweets.items():
|
||||
yield tid, tweet
|
||||
|
||||
def user_download_batch(wq, batch):
|
||||
screen_names = []
|
||||
user_ids = []
|
||||
for elem in batch:
|
||||
try:
|
||||
int(elem)
|
||||
user_ids.append(str(elem))
|
||||
except ValueError:
|
||||
screen_names.append(elem.lower())
|
||||
args = {}
|
||||
if user_ids:
|
||||
args['user_id'] = ','.join(user_ids)
|
||||
if screen_names:
|
||||
args['screen_name'] = ','.join(screen_names)
|
||||
try:
|
||||
users = wq.users.lookup(**args)
|
||||
except TwitterHTTPError as ex:
|
||||
if ex.e.code in (404,):
|
||||
users = []
|
||||
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
|
||||
def filter_line(line):
|
||||
tweetid = int(line)
|
||||
# print('Checking {}'.format(tweetid))
|
||||
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
|
||||
yield None
|
||||
else:
|
||||
raise
|
||||
found_ids = []
|
||||
found_names = []
|
||||
for user in users:
|
||||
uid = user['id_str']
|
||||
if uid in user_ids:
|
||||
found_ids.append(uid)
|
||||
yield (uid, user)
|
||||
uname = user['screen_name'].lower()
|
||||
if uname in screen_names:
|
||||
found_names.append(uname)
|
||||
yield (uname, user)
|
||||
for uid in set(user_ids) - set(found_ids):
|
||||
yield (uid, None)
|
||||
for name in set(screen_names) - set(found_names):
|
||||
yield (name, None)
|
||||
yield line
|
||||
|
||||
|
||||
def dump_result(oid, obj, folder, ignore_fails=True):
|
||||
if obj:
|
||||
try:
|
||||
write_json(obj, folder=folder, oid=oid)
|
||||
failed = fail_file(oid, folder)
|
||||
if os.path.exists(failed):
|
||||
os.remove(failed)
|
||||
except Exception as ex:
|
||||
logger.error('%s: %s' % (oid, ex))
|
||||
if not ignore_fails:
|
||||
raise
|
||||
else:
|
||||
logger.info('Object not recovered: {}'.format(oid))
|
||||
with open(fail_file(oid, folder), 'w') as f:
|
||||
print('Object not found', file=f)
|
||||
|
||||
|
||||
def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False, cache=True,
|
||||
batch_method=tweet_download_batch):
|
||||
|
||||
done = Queue()
|
||||
|
||||
down = Queue()
|
||||
|
||||
|
||||
def filter_list(lst, done, down):
|
||||
print('filtering')
|
||||
for oid in lst:
|
||||
cached = cached_id(oid, folder)
|
||||
if (cached and not update):
|
||||
done.put((oid, cached))
|
||||
elif (id_failed(oid, folder) and not retry_failed):
|
||||
done.put((oid, None))
|
||||
else:
|
||||
down.put(oid)
|
||||
down.put(None)
|
||||
|
||||
def download_results(batch_method, down, done):
|
||||
def gen():
|
||||
while True:
|
||||
r = down.get()
|
||||
if r is None:
|
||||
down.close()
|
||||
down.join_thread()
|
||||
return
|
||||
yield r
|
||||
|
||||
for t in parallel(batch_method, gen(), 100):
|
||||
done.put(t)
|
||||
|
||||
def batch(*args, **kwargs):
|
||||
return batch_method(wq, *args, **kwargs)
|
||||
|
||||
tc = threading.Thread(target=filter_list, args=(lst, done, down), daemon=True)
|
||||
tc.start()
|
||||
td = threading.Thread(target=download_results, args=(batch, down, done), daemon=True)
|
||||
td.start()
|
||||
|
||||
def check_threads(ts, done):
|
||||
for t in ts:
|
||||
t.join()
|
||||
done.put(None)
|
||||
|
||||
wait = threading.Thread(target=check_threads, args=([tc, td], done), daemon=True)
|
||||
wait.start()
|
||||
|
||||
while True:
|
||||
rec = done.get()
|
||||
|
||||
if rec is None:
|
||||
done.close()
|
||||
done.join_thread()
|
||||
break
|
||||
|
||||
oid, obj = rec
|
||||
if cache or (not obj):
|
||||
dump_result(oid, obj, folder, ignore_fails)
|
||||
yield rec
|
||||
|
||||
wait.join()
|
||||
|
||||
|
||||
def download_tweets_file(*args, **kwargs):
|
||||
kwargs['batch_method'] = tweet_download_batch
|
||||
yield from download_file(*args, **kwargs)
|
||||
|
||||
|
||||
def download_users_file(*args, **kwargs):
|
||||
kwargs['batch_method'] = user_download_batch
|
||||
yield from download_file(*args, **kwargs)
|
||||
|
||||
|
||||
def download_file(wq, csvfile, folder, column=0, delimiter=',', skip=0, cache=True,
|
||||
quotechar='"', commentchar=None, batch_method=tweet_download_batch,
|
||||
**kwargs):
|
||||
with open(csvfile) as f:
|
||||
if commentchar:
|
||||
f = (line for line in f if not line.startswith('#'))
|
||||
|
||||
csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar))
|
||||
for n in range(skip):
|
||||
next(csvreader)
|
||||
|
||||
def reader(r):
|
||||
for row in csvreader:
|
||||
if len(row) > column:
|
||||
yield row[column].strip()
|
||||
|
||||
|
||||
for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method, cache=cache,
|
||||
**kwargs):
|
||||
yield res
|
||||
|
||||
|
||||
def download_timeline(wq, user):
|
||||
return wq.statuses.user_timeline(id=user)
|
||||
|
||||
|
||||
def _consume_feed(func, feed_control=None, **kwargs):
|
||||
'''
|
||||
Get all the tweets using pagination and a given method.
|
||||
It can be controlled with the `count` parameter.
|
||||
|
||||
If max_count < 0 => Loop until the whole feed is consumed.
|
||||
If max_count == 0 => Only call the API once, with the default values.
|
||||
If max_count > 0 => Get max_count tweets from the feed.
|
||||
'''
|
||||
remaining = int(kwargs.pop('max_count', 0))
|
||||
count = int(kwargs.get('count', -1))
|
||||
limit = False
|
||||
|
||||
# We need to at least perform a query, so we simulate a do-while
|
||||
# by running once with no limit and updating the condition at the end
|
||||
with tqdm(total=remaining) as pbar:
|
||||
while not limit:
|
||||
if remaining > 0 and ((count < 0) or (count > remaining)):
|
||||
kwargs['count'] = remaining
|
||||
resp, stop = feed_control(func, kwargs, remaining=remaining, batch_size=count)
|
||||
if not resp:
|
||||
return
|
||||
for entry in resp:
|
||||
yield entry
|
||||
pbar.update(len(resp))
|
||||
limit = stop
|
||||
if remaining < 0:
|
||||
# If the loop was run with a negative remaining, it will only stop
|
||||
# when the control function tells it to.
|
||||
continue
|
||||
# Otherwise, check if we have already downloaded all the required items
|
||||
remaining -= len(resp)
|
||||
limit = limit or remaining <= 0
|
||||
|
||||
|
||||
def consume_tweets(*args, **kwargs):
|
||||
return _consume_feed(*args, feed_control=_tweets_control, **kwargs)
|
||||
|
||||
|
||||
def consume_users(*args, **kwargs):
|
||||
return _consume_feed(*args, feed_control=_users_control, **kwargs)
|
||||
|
||||
|
||||
def _tweets_control(func, apiargs, remaining=0, **kwargs):
|
||||
''' Return a list of entries, the remaining '''
|
||||
|
||||
resp = func(**apiargs)
|
||||
if not resp:
|
||||
return None, True
|
||||
# Update the arguments for the next call
|
||||
# Two options: either resp is a list, or a dict like:
|
||||
# {'statuses': ... 'search_metadata': ...}
|
||||
if isinstance(resp, dict) and 'search_metadata' in resp:
|
||||
resp = resp['statuses']
|
||||
max_id = min(s['id'] for s in resp) - 1
|
||||
apiargs['max_id'] = max_id
|
||||
return resp, False
|
||||
|
||||
|
||||
def _users_control(func, apiargs, remaining=0, **kwargs):
|
||||
resp = func(**apiargs)
|
||||
stop = True
|
||||
# Update the arguments for the next call
|
||||
if 'next_cursor' in resp:
|
||||
cursor = resp['next_cursor']
|
||||
apiargs['cursor'] = cursor
|
||||
if int(cursor) != -1:
|
||||
stop = False
|
||||
return resp['users'], stop
|
||||
|
||||
|
||||
def serialized(it, outfile, outformat='csv', fields=[], header=None, ignore_missing=False, delimiter='\t'):
|
||||
outformat = outformat.lower()
|
||||
def do(out):
|
||||
|
||||
if outformat == 'csv':
|
||||
writer = csv.writer(out, quoting=csv.QUOTE_ALL, delimiter=delimiter)
|
||||
if header != '':
|
||||
h = header
|
||||
if h is None:
|
||||
h = delimiter.join(fields)
|
||||
print(h, file=out)
|
||||
attrs = list(token.strip().split('.') for token in fields)
|
||||
for obj in it:
|
||||
values = []
|
||||
for attr in attrs:
|
||||
try:
|
||||
values.append(reduce(operator.getitem, attr, obj))
|
||||
except KeyError:
|
||||
if not ignore_missing:
|
||||
print('Key not present: {}'.format(attr), file=sys.stderr)
|
||||
values.append(None)
|
||||
writer.writerow(values)
|
||||
elif outformat == 'jsonlines':
|
||||
for obj in it:
|
||||
print(json.dumps(obj, sort_keys=True), file=out)
|
||||
elif outformat == 'indented':
|
||||
for obj in it:
|
||||
print(json.dumps(obj, indent=4, sort_keys=True), file=out)
|
||||
def print_result(res):
|
||||
tid, tweet = res
|
||||
if tweet:
|
||||
try:
|
||||
write_tweet_json(tweet, folder=folder)
|
||||
yield 1
|
||||
except Exception as ex:
|
||||
logger.error('%s: %s' % (tid, ex))
|
||||
if not ignore_fails:
|
||||
raise
|
||||
else:
|
||||
for obj in it:
|
||||
print(obj, file=out)
|
||||
logger.info('Tweet not recovered: {}'.format(tid))
|
||||
with open(tweet_fail_file(tid, folder), 'w') as f:
|
||||
print('Tweet not found', file=f)
|
||||
yield -1
|
||||
|
||||
if outfile is sys.stdout:
|
||||
return do(sys.stdout)
|
||||
def download_batch(batch):
|
||||
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
|
||||
return tweets.items()
|
||||
|
||||
with open(outfile, 'w') as out:
|
||||
return do(out)
|
||||
with open(tweetsfile) as f:
|
||||
lines = map(lambda x: x.strip(), f)
|
||||
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
|
||||
tweets = parallel(download_batch, lines_to_crawl, 100)
|
||||
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
|
||||
pass
|
||||
|
@ -1,12 +0,0 @@
|
||||
version: '2'
|
||||
services:
|
||||
dev:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile-3.4
|
||||
volumes:
|
||||
- '.:/usr/src/app'
|
||||
tty: yes
|
||||
working_dir: '/usr/src/app'
|
||||
entrypoint: '/bin/bash'
|
||||
command: ''
|
@ -2,4 +2,3 @@ sqlalchemy
|
||||
twitter
|
||||
click
|
||||
tqdm
|
||||
pyyaml
|
||||
|
32
setup.py
32
setup.py
@ -1,23 +1,29 @@
|
||||
import pip
|
||||
from setuptools import setup
|
||||
from pip.req import parse_requirements
|
||||
|
||||
def parse_requirements(filename):
|
||||
""" load requirements from a pip requirements file """
|
||||
with open(filename, 'r') as f:
|
||||
lineiter = list(line.strip() for line in f)
|
||||
return [line for line in lineiter if line and not line.startswith("#")]
|
||||
|
||||
install_reqs = parse_requirements("requirements.txt")
|
||||
py2_reqs = parse_requirements("requirements-py2.txt")
|
||||
test_reqs = parse_requirements("test-requirements.txt")
|
||||
# parse_requirements() returns generator of pip.req.InstallRequirement objects
|
||||
# pip 6 introduces the *required* session argument
|
||||
try:
|
||||
install_reqs = parse_requirements("requirements.txt", session=pip.download.PipSession())
|
||||
py2_reqs = parse_requirements("requirements-py2.txt", session=pip.download.PipSession())
|
||||
test_reqs = parse_requirements("test-requirements.txt", session=pip.download.PipSession())
|
||||
except AttributeError:
|
||||
install_reqs = parse_requirements("requirements.txt")
|
||||
py2_reqs = parse_requirements("requirements-py2.txt")
|
||||
test_reqs = parse_requirements("test-requirements.txt")
|
||||
|
||||
import sys
|
||||
import os
|
||||
import itertools
|
||||
if sys.version_info <= (3, 0):
|
||||
install_reqs = install_reqs + py2_reqs
|
||||
install_reqs = itertools.chain(install_reqs, py2_reqs)
|
||||
|
||||
with open(os.path.join('bitter', 'VERSION'), 'r') as f:
|
||||
__version__ = f.read().strip()
|
||||
# reqs is a list of requirement
|
||||
# e.g. ['django==1.5.1', 'mezzanine==1.4.6']
|
||||
install_reqs = [str(ir.req) for ir in install_reqs]
|
||||
test_reqs = [str(ir.req) for ir in test_reqs]
|
||||
|
||||
from bitter import __version__
|
||||
|
||||
setup(
|
||||
name="bitter",
|
||||
|
@ -12,11 +12,7 @@ from bitter import config as c
|
||||
class TestCrawlers(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
CONF_PATH = os.path.join(os.path.dirname(__file__), '.bitter.yaml')
|
||||
if os.path.exists(CONF_PATH):
|
||||
self.wq = easy(CONF_PATH)
|
||||
else:
|
||||
self.wq = easy()
|
||||
self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json'))
|
||||
|
||||
def test_create_worker(self):
|
||||
assert len(self.wq.queue)==1
|
||||
|
@ -1,23 +0,0 @@
|
||||
from unittest import TestCase
|
||||
|
||||
import os
|
||||
import types
|
||||
|
||||
from bitter import utils
|
||||
from bitter.models import *
|
||||
from sqlalchemy import exists
|
||||
|
||||
class TestModels(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.session = make_session('sqlite://')
|
||||
|
||||
def test_user(self):
|
||||
fake_user = User(name="Fake user", id=1548)
|
||||
self.session.add(fake_user)
|
||||
self.session.commit()
|
||||
fake_committed = self.session.query(User).filter_by(name="Fake user").first()
|
||||
assert fake_committed
|
||||
self.session.delete(fake_committed)
|
||||
self.session.commit()
|
||||
assert not list(self.session.execute('SELECT 1 from users where id=\'%s\'' % 1548))
|
@ -8,63 +8,56 @@ from bitter import config as c
|
||||
|
||||
class TestUtils(TestCase):
|
||||
|
||||
configfile = '/tmp/bitter.yaml'
|
||||
|
||||
def setUp(self):
|
||||
c.CONFIG_FILE = self.configfile
|
||||
if os.path.exists(self.configfile):
|
||||
os.remove(self.configfile)
|
||||
assert not os.path.exists(self.configfile)
|
||||
utils.create_config_file(self.configfile)
|
||||
assert os.path.exists(self.configfile)
|
||||
self.credfile = '/tmp/credentials.txt'
|
||||
c.CREDENTIALS = self.credfile
|
||||
if os.path.exists(self.credfile):
|
||||
os.remove(self.credfile)
|
||||
utils.create_credentials(self.credfile)
|
||||
|
||||
|
||||
def test_create_credentials(self):
|
||||
assert os.path.exists(self.credfile)
|
||||
os.remove(self.credfile)
|
||||
utils.create_credentials() # From config
|
||||
assert os.path.exists(self.credfile)
|
||||
|
||||
def test_add_credentials(self):
|
||||
utils.add_credentials(self.configfile, user="test")
|
||||
assert utils.get_credentials(self.configfile)
|
||||
assert utils.get_credentials(self.configfile, user="test")
|
||||
assert list(utils.get_credentials(self.configfile, user="test"))[0]["user"] == "test"
|
||||
utils.add_credentials(self.credfile, user="test")
|
||||
assert utils.get_credentials(self.credfile)
|
||||
assert utils.get_credentials(user="test")
|
||||
assert list(utils.get_credentials(user="test"))[0]["user"] == "test"
|
||||
|
||||
def test_get_credentials(self):
|
||||
utils.add_credentials(self.configfile, user="test")
|
||||
assert utils.get_credentials(self.configfile, user="test")
|
||||
assert not utils.get_credentials(self.configfile, user="test", inverse=True)
|
||||
utils.add_credentials(self.credfile, user="test")
|
||||
assert utils.get_credentials(user="test")
|
||||
assert not utils.get_credentials(user="test", inverse=True)
|
||||
|
||||
def test_add_two_credentials(self):
|
||||
utils.add_credentials(self.configfile, user="test")
|
||||
utils.add_credentials(self.configfile, user="test2")
|
||||
assert utils.get_credentials(self.configfile, user="test")
|
||||
assert utils.get_credentials(self.configfile, user="test2")
|
||||
utils.add_credentials(self.credfile, user="test")
|
||||
utils.add_credentials(self.credfile, user="test2")
|
||||
assert utils.get_credentials(user="test")
|
||||
assert utils.get_credentials(user="test2")
|
||||
|
||||
|
||||
def test_delete_credentials(self):
|
||||
utils.add_credentials(self.configfile, user="test")
|
||||
assert utils.get_credentials(self.configfile, user="test")
|
||||
utils.delete_credentials(self.configfile, user="test")
|
||||
assert not utils.get_credentials(self.configfile, user="test")
|
||||
utils.add_credentials(self.credfile, user="test")
|
||||
assert utils.get_credentials(user="test")
|
||||
utils.delete_credentials(user="test")
|
||||
print(utils.get_credentials())
|
||||
assert not utils.get_credentials(user="test")
|
||||
|
||||
def test_parallel(self):
|
||||
import time
|
||||
def echo(i):
|
||||
time.sleep(0.5)
|
||||
time.sleep(2)
|
||||
return i
|
||||
tic = time.time()
|
||||
resp = utils.parallel(echo, [1, 2, 3])
|
||||
resp = utils.parallel(echo, [1,2,3])
|
||||
assert isinstance(resp, types.GeneratorType)
|
||||
assert sorted(list(resp)) == [1, 2, 3]
|
||||
assert list(resp) == [1,2,3]
|
||||
toc = time.time()
|
||||
assert (tic-toc) < 600
|
||||
assert (tic-toc) < 6000
|
||||
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
|
||||
assert sorted(list(resp2)) == [1, 2, 3, 4]
|
||||
|
||||
|
||||
class TestUtilsEnv(TestUtils):
|
||||
configfile = None
|
||||
|
||||
def setUp(self):
|
||||
if 'BITTER_CONFIG' in os.environ:
|
||||
self.oldenv = os.environ['BITTER_CONFIG']
|
||||
os.environ['BITTER_CONFIG'] = ''
|
||||
|
||||
def tearDown(self):
|
||||
if hasattr(self, 'oldenv'):
|
||||
os.environ['BITTER_CONFIG'] = self.oldenv
|
||||
assert list(resp2) == [1,2,3,4]
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user