204 lines
5.6 KiB
Python
204 lines
5.6 KiB
Python
import os
|
|
import sys
|
|
import ConfigParser
|
|
import os.path as op
|
|
from flask import Flask, url_for, redirect, render_template, request, abort
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_security import Security, SQLAlchemyUserDatastore, \
|
|
UserMixin, RoleMixin, login_required, current_user
|
|
from flask_security.utils import encrypt_password
|
|
import flask_admin
|
|
from flask_admin.contrib import sqla
|
|
from flask_admin.contrib import fileadmin
|
|
from flask_admin import helpers as admin_helpers
|
|
from flask_ini import FlaskIni
|
|
|
|
|
|
# Create Flask application
|
|
app = Flask(__name__)
|
|
db = SQLAlchemy(app)
|
|
|
|
config_path = '/boot/video_looper.ini'
|
|
if len(sys.argv) == 2:
|
|
config_path = sys.argv[1]
|
|
with app.app_context():
|
|
app.iniconfig = FlaskIni()
|
|
app.iniconfig.read(config_path)
|
|
# Build a sample db on the fly, if one does not exist yet.
|
|
app_dir = os.path.realpath(os.path.dirname(__file__))
|
|
database_path = os.path.join(app_dir, app.config['DATABASE_FILE'])
|
|
if not os.path.exists(database_path):
|
|
build_sample_db()
|
|
|
|
print app.config["SECURITY_POST_LOGIN_VIEW"]
|
|
# Define models
|
|
roles_users = db.Table(
|
|
'roles_users',
|
|
db.Column('user_id', db.Integer(), db.ForeignKey('user.id')),
|
|
db.Column('role_id', db.Integer(), db.ForeignKey('role.id'))
|
|
)
|
|
|
|
|
|
class Role(db.Model, RoleMixin):
|
|
id = db.Column(db.Integer(), primary_key=True)
|
|
name = db.Column(db.String(80), unique=True)
|
|
description = db.Column(db.String(255))
|
|
|
|
def __repr__(self):
|
|
return self.name
|
|
|
|
class User(db.Model, UserMixin):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
email = db.Column(db.String(255), unique=True)
|
|
password = db.Column(db.String(255))
|
|
active = db.Column(db.Boolean())
|
|
roles = db.relationship('Role', secondary=roles_users,
|
|
backref=db.backref('users', lazy='dynamic'))
|
|
|
|
|
|
# Setup Flask-Security
|
|
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
|
|
security = Security(app, user_datastore)
|
|
|
|
|
|
# Create customized model view class
|
|
class MyModelView(sqla.ModelView):
|
|
|
|
def is_accessible(self):
|
|
if not current_user.is_active() or not current_user.is_authenticated():
|
|
return False
|
|
|
|
if current_user.has_role('superuser'):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _handle_view(self, name, **kwargs):
|
|
"""
|
|
Override builtin _handle_view in order to redirect users when a view is not accessible.
|
|
"""
|
|
if not self.is_accessible():
|
|
if current_user.is_authenticated():
|
|
# permission denied
|
|
abort(403)
|
|
else:
|
|
# login
|
|
return redirect(url_for('security.login', next=request.url))
|
|
|
|
# Flask views
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
# Create admin
|
|
admin = flask_admin.Admin(app, 'Example: Auth', base_template='my_master.html')
|
|
|
|
# Add model views
|
|
admin.add_view(MyModelView(Role, db.session))
|
|
admin.add_view(MyModelView(User, db.session))
|
|
|
|
# define a context processor for merging flask-admin's template context into the
|
|
# flask-security views.
|
|
@security.context_processor
|
|
def security_context_processor():
|
|
return dict(
|
|
admin_base_template=admin.base_template,
|
|
admin_view=admin.index_view,
|
|
h=admin_helpers,
|
|
)
|
|
|
|
|
|
def build_sample_db():
|
|
"""
|
|
Populate a small db with some example entries.
|
|
"""
|
|
|
|
import string
|
|
import random
|
|
|
|
db.drop_all()
|
|
db.create_all()
|
|
|
|
with app.app_context():
|
|
user_role = Role(name='user')
|
|
super_user_role = Role(name='superuser')
|
|
db.session.add(user_role)
|
|
db.session.add(super_user_role)
|
|
db.session.commit()
|
|
|
|
test_user = user_datastore.create_user(
|
|
email='admin',
|
|
password=encrypt_password('admin'),
|
|
roles=[user_role, super_user_role]
|
|
)
|
|
|
|
emails = [
|
|
'rpi',
|
|
]
|
|
|
|
passwords = [
|
|
'rpi'
|
|
|
|
]
|
|
|
|
for i in range(len(emails)):
|
|
#tmp_email = first_names[i].lower() + "." + last_names[i].lower() + "@example.com"
|
|
#tmp_pass = ''.join(random.choice(string.ascii_lowercase + string.digits) for i in range(10))
|
|
user_datastore.create_user(
|
|
email=emails[i],
|
|
password=encrypt_password(passwords[i]),
|
|
roles=[user_role, ]
|
|
)
|
|
db.session.commit()
|
|
return
|
|
|
|
|
|
class VideoAdmin(fileadmin.FileAdmin):
|
|
def __init__(self):
|
|
super(VideoAdmin, self).__init__(app.iniconfig.get("directory", "path"),
|
|
'/files/', name='Files')
|
|
|
|
def get_base_path(self):
|
|
return self.user_path
|
|
|
|
@property
|
|
def user_path(self):
|
|
if current_user.has_role('superuser'):
|
|
return self.base_dir
|
|
path = op.normpath('{}/{}'.format(app.iniconfig.get("directory", "path"),
|
|
current_user.email))
|
|
if not op.exists(path):
|
|
os.makedirs(path)
|
|
return path
|
|
|
|
@property
|
|
def base_dir(self):
|
|
return app.iniconfig.get("directory", "path")
|
|
|
|
def is_accessible_path(self, path):
|
|
return True
|
|
|
|
def update_base(self):
|
|
os.utime(self.base_dir, None)
|
|
|
|
def on_file_upload(self, *args, **kwargs):
|
|
self.update_base()
|
|
|
|
def on_edit_file(self, *args, **kwargs):
|
|
self.update_base()
|
|
|
|
def on_file_delete(self, *args, **kwargs):
|
|
self.update_base()
|
|
|
|
def on_rename(self, *args, **kwargs):
|
|
self.update_base()
|
|
|
|
|
|
videoadmin = VideoAdmin()
|
|
admin.add_view(videoadmin)
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host=app.config.get("HOST", "localhost"),
|
|
port=app.config.get("PORT", 5000),
|
|
debug=True)
|