Files
luntan/app/blueprints/admin.py

226 lines
10 KiB
Python

from datetime import datetime
from flask import Blueprint, render_template, redirect, url_for, request, flash
from flask_login import login_required, current_user, login_user
from werkzeug.security import check_password_hash, generate_password_hash
from ..extensions import db
from ..models import User, UserStatus, Post, ReviewStatus, Activity, ActivityStatus, ActivitySubmission, ReviewLog
from sqlalchemy import func
from ..services.notify import notify
bp = Blueprint("admin", __name__, url_prefix="/admin")
def role():
return current_user.role if current_user.is_authenticated else None
@bp.before_request
def guard():
if request.endpoint and request.endpoint.startswith("admin."):
r = role()
allowed_checker = {"admin.review_posts","admin.approve_post","admin.reject_post","admin.review_submissions","admin.approve_submission","admin.reject_submission"}
allowed_sub = {"admin.review_users","admin.approve_user","admin.reject_user","admin.manage_activities","admin.publish_activity","admin.close_activity"}
if request.endpoint in {"admin.dashboard","admin.change_password"}:
return None
if r == "admin":
return None
if r == "sub_admin" and (request.endpoint in allowed_checker or request.endpoint in allowed_sub):
return None
if r == "checker" and request.endpoint in allowed_checker:
return None
return redirect(url_for("admin.dashboard"))
@bp.route("/", methods=["GET","POST"])
def dashboard():
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
user = User.query.filter(func.lower(User.username)==(username or "").lower(), User.role.in_(["admin","sub_admin","checker"])) .first()
if not user or not check_password_hash(user.password_hash, password) or user.status != UserStatus.approved:
flash("登录失败")
return render_template("admin/login.html")
login_user(user)
if user.role == "admin" and getattr(user, "must_change_password", False):
return redirect(url_for("admin.change_password"))
if not current_user.is_authenticated or role() not in {"admin","sub_admin","checker"}:
return render_template("admin/login.html")
if role()=="admin" and getattr(current_user, "must_change_password", False):
return redirect(url_for("admin.change_password"))
pending_users = User.query.filter_by(status=UserStatus.pending).count()
pending_posts = Post.query.filter_by(status=ReviewStatus.pending).count()
pending_subs = ActivitySubmission.query.filter_by(status=ReviewStatus.pending).count()
return render_template("admin/dashboard.html", pending_users=pending_users, pending_posts=pending_posts, pending_subs=pending_subs)
@bp.route("/change-password", methods=["GET","POST"])
@login_required
def change_password():
if role() != "admin":
return redirect(url_for("admin.dashboard"))
if request.method == "POST":
p1 = request.form.get("password")
p2 = request.form.get("confirm")
if not p1 or len(p1) < 8 or p1 != p2:
flash("密码至少8位且两次一致")
return render_template("admin/change_password.html")
current_user.password_hash = generate_password_hash(p1)
current_user.must_change_password = False
db.session.commit()
return redirect(url_for("admin.dashboard"))
return render_template("admin/change_password.html")
@bp.route("/sub-admin", methods=["GET","POST"])
@login_required
def sub_admin_page():
if role() != "admin":
return redirect(url_for("admin.dashboard"))
if request.method == "POST":
email = request.form.get("email")
username = request.form.get("username")
password = request.form.get("password")
if not email or not username or not password:
flash("请完整填写信息")
else:
if User.query.filter((User.email==email) | (User.username==username)).first():
flash("邮箱或用户名已存在")
else:
u = User(email=email, username=username, password_hash=generate_password_hash(password), role="sub_admin", status=UserStatus.approved)
db.session.add(u)
db.session.commit()
flash("副管理员已创建")
return redirect(url_for("admin.sub_admin_page"))
users = User.query.filter_by(role="sub_admin").all()
return render_template("admin/sub_admin.html", users=users)
@bp.route("/checker", methods=["GET","POST"])
@login_required
def checker_page():
if role() != "admin":
return redirect(url_for("admin.dashboard"))
if request.method == "POST":
email = request.form.get("email")
username = request.form.get("username")
password = request.form.get("password")
if not email or not username or not password:
flash("请完整填写信息")
else:
if User.query.filter((User.email==email) | (User.username==username)).first():
flash("邮箱或用户名已存在")
else:
u = User(email=email, username=username, password_hash=generate_password_hash(password), role="checker", status=UserStatus.approved)
db.session.add(u)
db.session.commit()
flash("审核员已创建")
return redirect(url_for("admin.checker_page"))
users = User.query.filter_by(role="checker").all()
return render_template("admin/checker.html", users=users)
@bp.route("/reviews/users")
@login_required
def review_users():
users = User.query.filter_by(status=UserStatus.pending).all()
return render_template("admin/reviews_users.html", users=users)
@bp.route("/reviews/users/<int:user_id>/approve", methods=["POST"])
@login_required
def approve_user(user_id):
u = User.query.get_or_404(user_id)
u.status = UserStatus.approved
db.session.add(ReviewLog(target_type="user", target_id=u.id, admin_id=current_user.id, action="approve"))
notify(u.id, "user_approved")
db.session.commit()
return redirect(url_for("admin.review_users"))
@bp.route("/reviews/users/<int:user_id>/reject", methods=["POST"])
@login_required
def reject_user(user_id):
u = User.query.get_or_404(user_id)
u.status = UserStatus.rejected
db.session.add(ReviewLog(target_type="user", target_id=u.id, admin_id=current_user.id, action="reject"))
notify(u.id, "user_rejected")
db.session.commit()
return redirect(url_for("admin.review_users"))
@bp.route("/reviews/posts")
@login_required
def review_posts():
posts = Post.query.filter_by(status=ReviewStatus.pending).all()
return render_template("admin/reviews_posts.html", posts=posts)
@bp.route("/reviews/posts/<int:post_id>/approve", methods=["POST"])
@login_required
def approve_post(post_id):
p = Post.query.get_or_404(post_id)
p.status = ReviewStatus.approved
p.published_at = datetime.utcnow()
db.session.add(ReviewLog(target_type="post", target_id=p.id, admin_id=current_user.id, action="approve"))
notify(p.user_id, "post_approved")
db.session.commit()
return redirect(url_for("admin.review_posts"))
@bp.route("/reviews/posts/<int:post_id>/reject", methods=["POST"])
@login_required
def reject_post(post_id):
p = Post.query.get_or_404(post_id)
p.status = ReviewStatus.rejected
db.session.add(ReviewLog(target_type="post", target_id=p.id, admin_id=current_user.id, action="reject"))
notify(p.user_id, "post_rejected")
db.session.commit()
return redirect(url_for("admin.review_posts"))
@bp.route("/activities", methods=["GET", "POST"])
@login_required
def manage_activities():
if request.method == "POST":
title = request.form.get("title")
theme = request.form.get("theme")
description = request.form.get("description")
a = Activity(title=title, theme=theme, description=description)
db.session.add(a)
db.session.commit()
flash("活动已创建")
return redirect(url_for("admin.manage_activities"))
acts = Activity.query.order_by(Activity.created_at.desc()).all()
return render_template("admin/activities.html", activities=acts)
@bp.route("/activities/<int:act_id>/publish", methods=["POST"])
@login_required
def publish_activity(act_id):
a = Activity.query.get_or_404(act_id)
a.status = ActivityStatus.published
db.session.add(ReviewLog(target_type="activity", target_id=a.id, admin_id=current_user.id, action="publish"))
db.session.commit()
return redirect(url_for("admin.manage_activities"))
@bp.route("/activities/<int:act_id>/close", methods=["POST"])
@login_required
def close_activity(act_id):
a = Activity.query.get_or_404(act_id)
a.status = ActivityStatus.closed
db.session.add(ReviewLog(target_type="activity", target_id=a.id, admin_id=current_user.id, action="close"))
db.session.commit()
return redirect(url_for("admin.manage_activities"))
@bp.route("/reviews/submissions")
@login_required
def review_submissions():
subs = ActivitySubmission.query.filter_by(status=ReviewStatus.pending).all()
return render_template("admin/reviews_submissions.html", submissions=subs)
@bp.route("/reviews/submissions/<int:sub_id>/approve", methods=["POST"])
@login_required
def approve_submission(sub_id):
s = ActivitySubmission.query.get_or_404(sub_id)
s.status = ReviewStatus.approved
db.session.add(ReviewLog(target_type="submission", target_id=s.id, admin_id=current_user.id, action="approve"))
notify(s.user_id, "submission_approved")
db.session.commit()
return redirect(url_for("admin.review_submissions"))
@bp.route("/reviews/submissions/<int:sub_id>/reject", methods=["POST"])
@login_required
def reject_submission(sub_id):
s = ActivitySubmission.query.get_or_404(sub_id)
s.status = ReviewStatus.rejected
db.session.add(ReviewLog(target_type="submission", target_id=s.id, admin_id=current_user.id, action="reject"))
notify(s.user_id, "submission_rejected")
db.session.commit()
return redirect(url_for("admin.review_submissions"))