You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

204 lines
5.6 KiB
Python

import os
import sys
import ConfigParser
import os.path as op
from flask import Flask, url_for, redirect, render_template, request, abort
from flask_sqlalchemy import SQLAlchemy
from flask_security import Security, SQLAlchemyUserDatastore, \
UserMixin, RoleMixin, login_required, current_user
from flask_security.utils import encrypt_password
import flask_admin
from flask_admin.contrib import sqla
from flask_admin.contrib import fileadmin
from flask_admin import helpers as admin_helpers
from flask_ini import FlaskIni
# Create Flask application
app = Flask(__name__)
db = SQLAlchemy(app)
config_path = '/boot/video_looper.ini'
if len(sys.argv) == 2:
config_path = sys.argv[1]
with app.app_context():
app.iniconfig = FlaskIni()
app.iniconfig.read(config_path)
# Build a sample db on the fly, if one does not exist yet.
app_dir = os.path.realpath(os.path.dirname(__file__))
print app.config["SECURITY_POST_LOGIN_VIEW"]
# Define models
roles_users = db.Table(
'roles_users',
db.Column('user_id', db.Integer(), db.ForeignKey('user.id')),
db.Column('role_id', db.Integer(), db.ForeignKey('role.id'))
)
class Role(db.Model, RoleMixin):
id = db.Column(db.Integer(), primary_key=True)
name = db.Column(db.String(80), unique=True)
description = db.Column(db.String(255))
def __repr__(self):
return self.name
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(255), unique=True)
password = db.Column(db.String(255))
active = db.Column(db.Boolean())
roles = db.relationship('Role', secondary=roles_users,
backref=db.backref('users', lazy='dynamic'))
# Setup Flask-Security
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security = Security(app, user_datastore)
# Create customized model view class
class MyModelView(sqla.ModelView):
def is_accessible(self):
if not current_user.is_active() or not current_user.is_authenticated():
return False
if current_user.has_role('superuser'):
return True
return False
def _handle_view(self, name, **kwargs):
"""
Override builtin _handle_view in order to redirect users when a view is not accessible.
"""
if not self.is_accessible():
if current_user.is_authenticated():
# permission denied
abort(403)
else:
# login
return redirect(url_for('security.login', next=request.url))
# Flask views
@app.route('/')
def index():
return render_template('index.html')
# Create admin
admin = flask_admin.Admin(app, 'Example: Auth', base_template='my_master.html')
# Add model views
admin.add_view(MyModelView(Role, db.session))
admin.add_view(MyModelView(User, db.session))
# define a context processor for merging flask-admin's template context into the
# flask-security views.
@security.context_processor
def security_context_processor():
return dict(
admin_base_template=admin.base_template,
admin_view=admin.index_view,
h=admin_helpers,
)
def build_sample_db():
"""
Populate a small db with some example entries.
"""
import string
import random
db.drop_all()
db.create_all()
with app.app_context():
user_role = Role(name='user')
super_user_role = Role(name='superuser')
db.session.add(user_role)
db.session.add(super_user_role)
db.session.commit()
test_user = user_datastore.create_user(
email='admin',
password=encrypt_password('admin'),
roles=[user_role, super_user_role]
)
emails = [
'rpi',
]
passwords = [
'rpi'
]
for i in range(len(emails)):
#tmp_email = first_names[i].lower() + "." + last_names[i].lower() + "@example.com"
#tmp_pass = ''.join(random.choice(string.ascii_lowercase + string.digits) for i in range(10))
user_datastore.create_user(
email=emails[i],
password=encrypt_password(passwords[i]),
roles=[user_role, ]
)
db.session.commit()
return
class VideoAdmin(fileadmin.FileAdmin):
def __init__(self):
super(VideoAdmin, self).__init__(app.iniconfig.get("directory", "path"),
'/files/', name='Files')
def get_base_path(self):
return self.user_path
@property
def user_path(self):
if current_user.has_role('superuser'):
return self.base_dir
path = op.normpath('{}/{}'.format(app.iniconfig.get("directory", "path"),
current_user.email))
if not op.exists(path):
os.makedirs(path)
return path
@property
def base_dir(self):
return app.iniconfig.get("directory", "path")
def is_accessible_path(self, path):
return True
def update_base(self):
os.utime(self.base_dir, None)
def on_file_upload(self, *args, **kwargs):
self.update_base()
def on_edit_file(self, *args, **kwargs):
self.update_base()
def on_file_delete(self, *args, **kwargs):
self.update_base()
def on_rename(self, *args, **kwargs):
self.update_base()
videoadmin = VideoAdmin()
admin.add_view(videoadmin)
if __name__ == '__main__':
database_path = os.path.join(app_dir, app.config['DATABASE_FILE'])
if not os.path.exists(database_path):
build_sample_db()
app.run(host=app.config.get("HOST", "localhost"),
port=app.config.get("PORT", 5000),
debug=True)