import os import sys import ConfigParser import os.path as op from flask import Flask, url_for, redirect, render_template, request, abort from flask_sqlalchemy import SQLAlchemy from flask_security import Security, SQLAlchemyUserDatastore, \ UserMixin, RoleMixin, login_required, current_user from flask_security.utils import encrypt_password import flask_admin from flask_admin.contrib import sqla from flask_admin.contrib import fileadmin from flask_admin import helpers as admin_helpers from flask_ini import FlaskIni # Create Flask application app = Flask(__name__) db = SQLAlchemy(app) config_path = '/boot/video_looper.ini' if len(sys.argv) == 2: config_path = sys.argv[1] with app.app_context(): app.iniconfig = FlaskIni() app.iniconfig.read(config_path) # Build a sample db on the fly, if one does not exist yet. app_dir = os.path.realpath(os.path.dirname(__file__)) print app.config["SECURITY_POST_LOGIN_VIEW"] # Define models roles_users = db.Table( 'roles_users', db.Column('user_id', db.Integer(), db.ForeignKey('user.id')), db.Column('role_id', db.Integer(), db.ForeignKey('role.id')) ) class Role(db.Model, RoleMixin): id = db.Column(db.Integer(), primary_key=True) name = db.Column(db.String(80), unique=True) description = db.Column(db.String(255)) def __repr__(self): return self.name class User(db.Model, UserMixin): id = db.Column(db.Integer, primary_key=True) email = db.Column(db.String(255), unique=True) password = db.Column(db.String(255)) active = db.Column(db.Boolean()) roles = db.relationship('Role', secondary=roles_users, backref=db.backref('users', lazy='dynamic')) # Setup Flask-Security user_datastore = SQLAlchemyUserDatastore(db, User, Role) security = Security(app, user_datastore) # Create customized model view class class MyModelView(sqla.ModelView): def is_accessible(self): if not current_user.is_active() or not current_user.is_authenticated(): return False if current_user.has_role('superuser'): return True return False def _handle_view(self, name, **kwargs): """ Override builtin _handle_view in order to redirect users when a view is not accessible. """ if not self.is_accessible(): if current_user.is_authenticated(): # permission denied abort(403) else: # login return redirect(url_for('security.login', next=request.url)) # Flask views @app.route('/') def index(): return render_template('index.html') # Create admin admin = flask_admin.Admin(app, 'Example: Auth', base_template='my_master.html') # Add model views admin.add_view(MyModelView(Role, db.session)) admin.add_view(MyModelView(User, db.session)) # define a context processor for merging flask-admin's template context into the # flask-security views. @security.context_processor def security_context_processor(): return dict( admin_base_template=admin.base_template, admin_view=admin.index_view, h=admin_helpers, ) def build_sample_db(): """ Populate a small db with some example entries. """ import string import random db.drop_all() db.create_all() with app.app_context(): user_role = Role(name='user') super_user_role = Role(name='superuser') db.session.add(user_role) db.session.add(super_user_role) db.session.commit() test_user = user_datastore.create_user( email='admin', password=encrypt_password('admin'), roles=[user_role, super_user_role] ) emails = [ 'rpi', ] passwords = [ 'rpi' ] for i in range(len(emails)): #tmp_email = first_names[i].lower() + "." + last_names[i].lower() + "@example.com" #tmp_pass = ''.join(random.choice(string.ascii_lowercase + string.digits) for i in range(10)) user_datastore.create_user( email=emails[i], password=encrypt_password(passwords[i]), roles=[user_role, ] ) db.session.commit() return class VideoAdmin(fileadmin.FileAdmin): def __init__(self): super(VideoAdmin, self).__init__(app.iniconfig.get("directory", "path"), '/files/', name='Files') def get_base_path(self): return self.user_path @property def user_path(self): if current_user.has_role('superuser'): return self.base_dir path = op.normpath('{}/{}'.format(app.iniconfig.get("directory", "path"), current_user.email)) if not op.exists(path): os.makedirs(path) return path @property def base_dir(self): return app.iniconfig.get("directory", "path") def is_accessible_path(self, path): return True def update_base(self): os.utime(self.base_dir, None) def on_file_upload(self, *args, **kwargs): self.update_base() def on_edit_file(self, *args, **kwargs): self.update_base() def on_file_delete(self, *args, **kwargs): self.update_base() def on_rename(self, *args, **kwargs): self.update_base() videoadmin = VideoAdmin() admin.add_view(videoadmin) if __name__ == '__main__': database_path = os.path.join(app_dir, app.config['DATABASE_FILE']) if not os.path.exists(database_path): build_sample_db() app.run(host=app.config.get("HOST", "localhost"), port=app.config.get("PORT", 5000), debug=True)