refactor: remove tjaf dependency; add local TJA parser

This commit is contained in:
2025-11-22 21:29:13 +08:00
commit 66d8ed5c6f
299 changed files with 29006 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
{
"postCreateCommand": ""
}

4
.dockerignore Normal file
View File

@@ -0,0 +1,4 @@
/.git
/__pycache__
/public/songs

17
.gitattributes vendored Normal file
View File

@@ -0,0 +1,17 @@
# Auto detect text files and perform LF normalization
* text=auto
# Custom for Visual Studio
*.cs diff=csharp
# Standard to msysgit
*.doc diff=astextplain
*.DOC diff=astextplain
*.docx diff=astextplain
*.DOCX diff=astextplain
*.dot diff=astextplain
*.DOT diff=astextplain
*.pdf diff=astextplain
*.PDF diff=astextplain
*.rtf diff=astextplain
*.RTF diff=astextplain

3
.github/ISSUE_TEMPLATE.md vendored Normal file
View File

@@ -0,0 +1,3 @@
<!--下記の問題を説明してください。 スクリーンショットと診断情報を含めてください。-->
<!--Describe the problem you are having below. Please include a screenshot and the diagnostic information.-->

12
.gitignore vendored Normal file
View File

@@ -0,0 +1,12 @@
__pycache__/
*.pyc
.DS_Store
# build/version artifacts
version.json
# large assets not meant for VCS
public/songs/
public/songs/**
public/preview/
*.log

6
Dockerfile Normal file
View File

@@ -0,0 +1,6 @@
FROM python:3.13.2
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
ENV PYTHONUNBUFFERED 1
CMD ["gunicorn", "app:app", "--access-logfile", "-", "--bind", "0.0.0.0"]

41
README.md Normal file
View File

@@ -0,0 +1,41 @@
# 太鼓ウェブ
この太鼓ウェブは改良版です
## デバッグの開始
依存関係をインストールします
```bash
pip install -r requirements.txt
```
データベースを起動します
```bash
docker run --detach \
--name taiko-web-mongo-debug \
--volume taiko-web-mongo-debug:/data/db \
--publish 27017:27017 \
mongo
```
```bash
docker run --detach \
--name taiko-web-redis-debug \
--volume taiko-web-redis-debug:/data \
--publish 6379:6379 \
redis
```
サーバーを起動してください
```bash
flask run
```
## デプロイ
今すぐデプロイ!
- https://taikoapp.uk/

895
app.py Normal file
View File

@@ -0,0 +1,895 @@
#!/usr/bin/env python3
import base64
import bcrypt
import hashlib
try:
import config
except ModuleNotFoundError:
raise FileNotFoundError('No such file or directory: \'config.py\'. Copy the example config file config.example.py to config.py')
import json
import re
import requests
import schema
import os
import time
# -- カスタム --
import traceback
import pprint
import pathlib
import shutil
from flask_limiter import Limiter
import flask
import tjaf
# ----
from functools import wraps
from flask import Flask, g, jsonify, render_template, request, abort, redirect, session, flash, make_response, send_from_directory
from flask_caching import Cache
from flask_session import Session
from flask_wtf.csrf import CSRFProtect, generate_csrf, CSRFError
from ffmpy import FFmpeg
from pymongo import MongoClient
from redis import Redis
def take_config(name, required=False):
if hasattr(config, name):
return getattr(config, name)
elif required:
raise ValueError('Required option is not defined in the config.py file: {}'.format(name))
else:
return None
app = Flask(__name__)
def get_remote_address() -> str:
return flask.request.headers.get("CF-Connecting-IP") or flask.request.headers.get("X-Forwarded-For") or flask.request.remote_addr or "127.0.0.1"
limiter = Limiter(
get_remote_address,
app=app,
# default_limits=[],
# storage_uri="memory://",
# Redis
storage_uri=os.environ.get("REDIS_URI", "redis://127.0.0.1:6379/"),
# Redis cluster
# storage_uri="redis+cluster://localhost:7000,localhost:7001,localhost:70002",
# Memcached
# storage_uri="memcached://localhost:11211",
# Memcached Cluster
# storage_uri="memcached://localhost:11211,localhost:11212,localhost:11213",
# MongoDB
# storage_uri="mongodb://localhost:27017",
# Etcd
# storage_uri="etcd://localhost:2379",
strategy="fixed-window", # or "moving-window"
)
client = MongoClient(host=os.environ.get("TAIKO_WEB_MONGO_HOST") or take_config('MONGO', required=True)['host'])
basedir = take_config('BASEDIR') or '/'
app.secret_key = take_config('SECRET_KEY') or 'change-me'
app.config['SESSION_TYPE'] = 'redis'
redis_config = take_config('REDIS', required=True)
redis_config['CACHE_REDIS_HOST'] = os.environ.get("TAIKO_WEB_REDIS_HOST") or redis_config['CACHE_REDIS_HOST']
app.config['SESSION_REDIS'] = Redis(
host=redis_config['CACHE_REDIS_HOST'],
port=redis_config['CACHE_REDIS_PORT'],
password=redis_config['CACHE_REDIS_PASSWORD'],
db=redis_config['CACHE_REDIS_DB']
)
app.cache = Cache(app, config=redis_config)
sess = Session()
sess.init_app(app)
#csrf = CSRFProtect(app)
db = client[take_config('MONGO', required=True)['database']]
db.users.create_index('username', unique=True)
db.songs.create_index('id', unique=True)
db.scores.create_index('username')
class HashException(Exception):
pass
def api_error(message):
return jsonify({'status': 'error', 'message': message})
def generate_hash(id, form):
md5 = hashlib.md5()
if form['type'] == 'tja':
urls = ['%s%s/main.tja' % (take_config('SONGS_BASEURL', required=True), id)]
else:
urls = []
for diff in ['easy', 'normal', 'hard', 'oni', 'ura']:
if form['course_' + diff]:
urls.append('%s%s/%s.osu' % (take_config('SONGS_BASEURL', required=True), id, diff))
for url in urls:
if url.startswith("http://") or url.startswith("https://"):
resp = requests.get(url)
if resp.status_code != 200:
raise HashException('Invalid response from %s (status code %s)' % (resp.url, resp.status_code))
md5.update(resp.content)
else:
if url.startswith(basedir):
url = url[len(basedir):]
path = os.path.normpath(os.path.join("public", url))
if not os.path.isfile(path):
raise HashException("File not found: %s" % (os.path.abspath(path)))
with open(path, "rb") as file:
md5.update(file.read())
return base64.b64encode(md5.digest())[:-2].decode('utf-8')
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('username'):
return api_error('not_logged_in')
return f(*args, **kwargs)
return decorated_function
def admin_required(level):
def decorated_function(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not session.get('username'):
return abort(403)
user = db.users.find_one({'username': session.get('username')})
if user['user_level'] < level:
return abort(403)
return f(*args, **kwargs)
return wrapper
return decorated_function
@app.errorhandler(CSRFError)
def handle_csrf_error(e):
return api_error('invalid_csrf')
@app.before_request
def before_request_func():
if session.get('session_id'):
if not db.users.find_one({'session_id': session.get('session_id')}):
session.clear()
def get_config(credentials=False):
config_out = {
'basedir': basedir,
'songs_baseurl': take_config('SONGS_BASEURL', required=True),
'assets_baseurl': take_config('ASSETS_BASEURL', required=True),
'email': take_config('EMAIL'),
'accounts': take_config('ACCOUNTS'),
'custom_js': take_config('CUSTOM_JS'),
'plugins': take_config('PLUGINS') and [x for x in take_config('PLUGINS') if x['url']],
'preview_type': take_config('PREVIEW_TYPE') or 'mp3',
'multiplayer_url': take_config('MULTIPLAYER_URL')
}
relative_urls = ['songs_baseurl', 'assets_baseurl']
for name in relative_urls:
if not config_out[name].startswith("/") and not config_out[name].startswith("http://") and not config_out[name].startswith("https://"):
config_out[name] = basedir + config_out[name]
if credentials:
google_credentials = take_config('GOOGLE_CREDENTIALS')
min_level = google_credentials['min_level'] or 0
if not session.get('username'):
user_level = 0
else:
user = db.users.find_one({'username': session.get('username')})
user_level = user['user_level']
if user_level >= min_level:
config_out['google_credentials'] = google_credentials
else:
config_out['google_credentials'] = {
'gdrive_enabled': False
}
if not config_out.get('songs_baseurl'):
config_out['songs_baseurl'] = ''.join([request.host_url, 'songs']) + '/'
if not config_out.get('assets_baseurl'):
config_out['assets_baseurl'] = ''.join([request.host_url, 'assets']) + '/'
config_out['_version'] = get_version()
return config_out
def get_version():
version = {'commit': None, 'commit_short': '', 'version': None, 'url': take_config('URL')}
if os.path.isfile('version.json'):
try:
ver = json.load(open('version.json', 'r'))
except ValueError:
print('Invalid version.json file')
return version
for key in version.keys():
if ver.get(key):
version[key] = ver.get(key)
return version
def get_db_don(user):
don_body_fill = user['don_body_fill'] if 'don_body_fill' in user else get_default_don('body_fill')
don_face_fill = user['don_face_fill'] if 'don_face_fill' in user else get_default_don('face_fill')
return {'body_fill': don_body_fill, 'face_fill': don_face_fill}
def get_default_don(part=None):
if part == None:
return {
'body_fill': get_default_don('body_fill'),
'face_fill': get_default_don('face_fill')
}
elif part == 'body_fill':
return '#5fb7c1'
elif part == 'face_fill':
return '#ff5724'
def is_hex(input):
try:
int(input, 16)
return True
except ValueError:
return False
@app.route(basedir)
def route_index():
version = get_version()
return render_template('index.html', version=version, config=get_config())
@app.route(basedir + 'api/csrftoken')
def route_csrftoken():
return jsonify({'status': 'ok', 'token': generate_csrf()})
@app.route(basedir + 'admin')
@admin_required(level=50)
def route_admin():
return redirect(basedir + 'admin/songs')
@app.route(basedir + 'admin/songs')
@admin_required(level=50)
def route_admin_songs():
songs = sorted(list(db.songs.find({})), key=lambda x: x['id'])
categories = db.categories.find({})
user = db.users.find_one({'username': session['username']})
return render_template('admin_songs.html', songs=songs, admin=user, categories=list(categories), config=get_config())
@app.route(basedir + 'admin/songs/<int:id>')
@admin_required(level=50)
def route_admin_songs_id(id):
song = db.songs.find_one({'id': id})
if not song:
return abort(404)
categories = list(db.categories.find({}))
song_skins = list(db.song_skins.find({}))
makers = list(db.makers.find({}))
user = db.users.find_one({'username': session['username']})
return render_template('admin_song_detail.html',
song=song, categories=categories, song_skins=song_skins, makers=makers, admin=user, config=get_config())
@app.route(basedir + 'admin/songs/new')
@admin_required(level=100)
def route_admin_songs_new():
categories = list(db.categories.find({}))
song_skins = list(db.song_skins.find({}))
makers = list(db.makers.find({}))
seq = db.seq.find_one({'name': 'songs'})
seq_new = seq['value'] + 1 if seq else 1
return render_template('admin_song_new.html', categories=categories, song_skins=song_skins, makers=makers, config=get_config(), id=seq_new)
@app.route(basedir + 'admin/songs/new', methods=['POST'])
@admin_required(level=100)
def route_admin_songs_new_post():
output = {'title_lang': {}, 'subtitle_lang': {}, 'courses': {}}
output['enabled'] = True if request.form.get('enabled') else False
output['title'] = request.form.get('title') or None
output['subtitle'] = request.form.get('subtitle') or None
for lang in ['ja', 'en', 'cn', 'tw', 'ko']:
output['title_lang'][lang] = request.form.get('title_%s' % lang) or None
output['subtitle_lang'][lang] = request.form.get('subtitle_%s' % lang) or None
for course in ['easy', 'normal', 'hard', 'oni', 'ura']:
if request.form.get('course_%s' % course):
output['courses'][course] = {'stars': int(request.form.get('course_%s' % course)),
'branch': True if request.form.get('branch_%s' % course) else False}
else:
output['courses'][course] = None
output['category_id'] = int(request.form.get('category_id')) or None
output['type'] = request.form.get('type')
output['music_type'] = request.form.get('music_type')
output['offset'] = float(request.form.get('offset')) or None
output['skin_id'] = int(request.form.get('skin_id')) or None
output['preview'] = float(request.form.get('preview')) or None
output['volume'] = float(request.form.get('volume')) or None
output['maker_id'] = int(request.form.get('maker_id')) or None
output['lyrics'] = True if request.form.get('lyrics') else False
output['hash'] = request.form.get('hash')
seq = db.seq.find_one({'name': 'songs'})
seq_new = seq['value'] + 1 if seq else 1
hash_error = False
if request.form.get('gen_hash'):
try:
output['hash'] = generate_hash(seq_new, request.form)
except HashException as e:
hash_error = True
flash('An error occurred: %s' % str(e), 'error')
output['id'] = seq_new
output['order'] = seq_new
db.songs.insert_one(output)
if not hash_error:
flash('Song created.')
db.seq.update_one({'name': 'songs'}, {'$set': {'value': seq_new}}, upsert=True)
return redirect(basedir + 'admin/songs/%s' % str(seq_new))
@app.route(basedir + 'admin/songs/<int:id>', methods=['POST'])
@admin_required(level=50)
def route_admin_songs_id_post(id):
song = db.songs.find_one({'id': id})
if not song:
return abort(404)
user = db.users.find_one({'username': session['username']})
user_level = user['user_level']
output = {'title_lang': {}, 'subtitle_lang': {}, 'courses': {}}
if user_level >= 100:
output['enabled'] = True if request.form.get('enabled') else False
output['title'] = request.form.get('title') or None
output['subtitle'] = request.form.get('subtitle') or None
for lang in ['ja', 'en', 'cn', 'tw', 'ko']:
output['title_lang'][lang] = request.form.get('title_%s' % lang) or None
output['subtitle_lang'][lang] = request.form.get('subtitle_%s' % lang) or None
for course in ['easy', 'normal', 'hard', 'oni', 'ura']:
if request.form.get('course_%s' % course):
output['courses'][course] = {'stars': int(request.form.get('course_%s' % course)),
'branch': True if request.form.get('branch_%s' % course) else False}
else:
output['courses'][course] = None
output['category_id'] = int(request.form.get('category_id')) or None
output['type'] = request.form.get('type')
output['music_type'] = request.form.get('music_type')
output['offset'] = float(request.form.get('offset')) or None
output['skin_id'] = int(request.form.get('skin_id')) or None
output['preview'] = float(request.form.get('preview')) or None
output['volume'] = float(request.form.get('volume')) or None
output['maker_id'] = int(request.form.get('maker_id')) or None
output['lyrics'] = True if request.form.get('lyrics') else False
output['hash'] = request.form.get('hash')
hash_error = False
if request.form.get('gen_hash'):
try:
output['hash'] = generate_hash(id, request.form)
except HashException as e:
hash_error = True
flash('An error occurred: %s' % str(e), 'error')
db.songs.update_one({'id': id}, {'$set': output})
if not hash_error:
flash('Changes saved.')
return redirect(basedir + 'admin/songs/%s' % id)
@app.route(basedir + 'admin/songs/<int:id>/delete', methods=['POST'])
@limiter.limit("1 per day")
@admin_required(level=100)
def route_admin_songs_id_delete(id):
song = db.songs.find_one({'id': id})
if not song:
return abort(404)
db.songs.delete_one({'id': id})
flash('Song deleted.')
return redirect(basedir + 'admin/songs')
@app.route(basedir + 'admin/users')
@admin_required(level=50)
def route_admin_users():
user = db.users.find_one({'username': session.get('username')})
max_level = user['user_level'] - 1
return render_template('admin_users.html', config=get_config(), max_level=max_level, username='', level='')
@app.route(basedir + 'admin/users', methods=['POST'])
@admin_required(level=50)
def route_admin_users_post():
admin_name = session.get('username')
admin = db.users.find_one({'username': admin_name})
max_level = admin['user_level'] - 1
username = request.form.get('username')
try:
level = int(request.form.get('level')) or 0
except ValueError:
level = 0
user = db.users.find_one({'username_lower': username.lower()})
if not user:
flash('Error: User was not found.')
elif admin['username'] == user['username']:
flash('Error: You cannot modify your own level.')
else:
user_level = user['user_level']
if level < 0 or level > max_level:
flash('Error: Invalid level.')
elif user_level > max_level:
flash('Error: This user has higher level than you.')
else:
output = {'user_level': level}
db.users.update_one({'username': user['username']}, {'$set': output})
flash('User updated.')
return render_template('admin_users.html', config=get_config(), max_level=max_level, username=username, level=level)
@app.route(basedir + 'api/preview')
@app.cache.cached(timeout=15, query_string=True)
def route_api_preview():
song_id = request.args.get('id', None)
if not song_id or not re.match('^[0-9]{1,9}$', song_id):
abort(400)
song_id = int(song_id)
song = db.songs.find_one({'id': song_id})
if not song:
abort(400)
song_type = song['type']
song_ext = song['music_type'] if song['music_type'] else "mp3"
prev_path = make_preview(song_id, song_type, song_ext, song['preview'])
if not prev_path:
return redirect(get_config()['songs_baseurl'] + '%s/main.%s' % (song_id, song_ext))
return redirect(get_config()['songs_baseurl'] + '%s/preview.mp3' % song_id)
@app.route(basedir + 'api/songs')
@app.cache.cached(timeout=15)
def route_api_songs():
songs = list(db.songs.find({'enabled': True}, {'_id': False, 'enabled': False}))
for song in songs:
if song['maker_id']:
if song['maker_id'] == 0:
song['maker'] = 0
else:
song['maker'] = db.makers.find_one({'id': song['maker_id']}, {'_id': False})
else:
song['maker'] = None
del song['maker_id']
if song['category_id']:
song['category'] = db.categories.find_one({'id': song['category_id']})['title']
else:
song['category'] = None
#del song['category_id']
if song['skin_id']:
song['song_skin'] = db.song_skins.find_one({'id': song['skin_id']}, {'_id': False, 'id': False})
else:
song['song_skin'] = None
del song['skin_id']
return cache_wrap(flask.jsonify(songs), 60)
@app.route(basedir + 'api/categories')
@app.cache.cached(timeout=15)
def route_api_categories():
categories = list(db.categories.find({},{'_id': False}))
return jsonify(categories)
@app.route(basedir + 'api/config')
@app.cache.cached(timeout=15)
def route_api_config():
config = get_config(credentials=True)
return jsonify(config)
@app.route(basedir + 'api/register', methods=['POST'])
def route_api_register():
data = request.get_json()
if not schema.validate(data, schema.register):
return abort(400)
if session.get('username'):
session.clear()
username = data.get('username', '')
if len(username) < 3 or len(username) > 20 or not re.match('^[a-zA-Z0-9_]{3,20}$', username):
return api_error('invalid_username')
if db.users.find_one({'username_lower': username.lower()}):
return api_error('username_in_use')
password = data.get('password', '').encode('utf-8')
if not 6 <= len(password) <= 5000:
return api_error('invalid_password')
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password, salt)
don = get_default_don()
session_id = os.urandom(24).hex()
db.users.insert_one({
'username': username,
'username_lower': username.lower(),
'password': hashed,
'display_name': username,
'don': don,
'user_level': 1,
'session_id': session_id
})
session['session_id'] = session_id
session['username'] = username
session.permanent = True
return jsonify({'status': 'ok', 'username': username, 'display_name': username, 'don': don})
@app.route(basedir + 'api/login', methods=['POST'])
def route_api_login():
data = request.get_json()
if not schema.validate(data, schema.login):
return abort(400)
if session.get('username'):
session.clear()
username = data.get('username', '')
result = db.users.find_one({'username_lower': username.lower()})
if not result:
return api_error('invalid_username_password')
password = data.get('password', '').encode('utf-8')
if not bcrypt.checkpw(password, result['password']):
return api_error('invalid_username_password')
don = get_db_don(result)
session['session_id'] = result['session_id']
session['username'] = result['username']
session.permanent = True if data.get('remember') else False
return jsonify({'status': 'ok', 'username': result['username'], 'display_name': result['display_name'], 'don': don})
@app.route(basedir + 'api/logout', methods=['POST'])
@login_required
def route_api_logout():
session.clear()
return jsonify({'status': 'ok'})
@app.route(basedir + 'api/account/display_name', methods=['POST'])
@login_required
def route_api_account_display_name():
data = request.get_json()
if not schema.validate(data, schema.update_display_name):
return abort(400)
display_name = data.get('display_name', '').strip()
if not display_name:
display_name = session.get('username')
elif len(display_name) > 25:
return api_error('invalid_display_name')
db.users.update_one({'username': session.get('username')}, {
'$set': {'display_name': display_name}
})
return jsonify({'status': 'ok', 'display_name': display_name})
@app.route(basedir + 'api/account/don', methods=['POST'])
@login_required
def route_api_account_don():
data = request.get_json()
if not schema.validate(data, schema.update_don):
return abort(400)
don_body_fill = data.get('body_fill', '').strip()
don_face_fill = data.get('face_fill', '').strip()
if len(don_body_fill) != 7 or\
not don_body_fill.startswith("#")\
or not is_hex(don_body_fill[1:])\
or len(don_face_fill) != 7\
or not don_face_fill.startswith("#")\
or not is_hex(don_face_fill[1:]):
return api_error('invalid_don')
db.users.update_one({'username': session.get('username')}, {'$set': {
'don_body_fill': don_body_fill,
'don_face_fill': don_face_fill,
}})
return jsonify({'status': 'ok', 'don': {'body_fill': don_body_fill, 'face_fill': don_face_fill}})
@app.route(basedir + 'api/account/password', methods=['POST'])
@login_required
def route_api_account_password():
data = request.get_json()
if not schema.validate(data, schema.update_password):
return abort(400)
user = db.users.find_one({'username': session.get('username')})
current_password = data.get('current_password', '').encode('utf-8')
if not bcrypt.checkpw(current_password, user['password']):
return api_error('current_password_invalid')
new_password = data.get('new_password', '').encode('utf-8')
if not 6 <= len(new_password) <= 5000:
return api_error('invalid_new_password')
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(new_password, salt)
session_id = os.urandom(24).hex()
db.users.update_one({'username': session.get('username')}, {
'$set': {'password': hashed, 'session_id': session_id}
})
session['session_id'] = session_id
return jsonify({'status': 'ok'})
@app.route(basedir + 'api/account/remove', methods=['POST'])
@limiter.limit("1 per day")
@login_required
def route_api_account_remove():
data = request.get_json()
if not schema.validate(data, schema.delete_account):
return abort(400)
user = db.users.find_one({'username': session.get('username')})
password = data.get('password', '').encode('utf-8')
if not bcrypt.checkpw(password, user['password']):
return api_error('verify_password_invalid')
db.scores.delete_many({'username': session.get('username')})
db.users.delete_one({'username': session.get('username')})
session.clear()
return jsonify({'status': 'ok'})
@app.route(basedir + 'api/scores/save', methods=['POST'])
@login_required
def route_api_scores_save():
data = request.get_json()
if not schema.validate(data, schema.scores_save):
return abort(400)
username = session.get('username')
if data.get('is_import'):
db.scores.delete_many({'username': username})
scores = data.get('scores', [])
for score in scores:
db.scores.update_one({'username': username, 'hash': score['hash']},
{'$set': {
'username': username,
'hash': score['hash'],
'score': score['score']
}}, upsert=True)
return jsonify({'status': 'ok'})
@app.route(basedir + 'api/scores/get')
@login_required
def route_api_scores_get():
username = session.get('username')
scores = []
for score in db.scores.find({'username': username}):
scores.append({
'hash': score['hash'],
'score': score['score']
})
user = db.users.find_one({'username': username})
don = get_db_don(user)
return jsonify({'status': 'ok', 'scores': scores, 'username': user['username'], 'display_name': user['display_name'], 'don': don})
@app.route(basedir + 'privacy')
def route_api_privacy():
last_modified = time.strftime('%d %B %Y', time.gmtime(os.path.getmtime('templates/privacy.txt')))
integration = take_config('GOOGLE_CREDENTIALS')['gdrive_enabled'] if take_config('GOOGLE_CREDENTIALS') else False
response = make_response(render_template('privacy.txt', last_modified=last_modified, config=get_config(), integration=integration))
response.headers['Content-type'] = 'text/plain; charset=utf-8'
return response
def make_preview(song_id, song_type, song_ext, preview):
song_path = 'public/songs/%s/main.%s' % (song_id, song_ext)
prev_path = 'public/songs/%s/preview.mp3' % song_id
if os.path.isfile(song_path) and not os.path.isfile(prev_path):
if not preview or preview <= 0:
print('Skipping #%s due to no preview' % song_id)
return False
print('Making preview.mp3 for song #%s' % song_id)
ff = FFmpeg(inputs={song_path: '-ss %s' % preview},
outputs={prev_path: '-codec:a libmp3lame -ar 32000 -b:a 92k -y -loglevel panic'})
ff.run()
return prev_path
error_pages = take_config('ERROR_PAGES') or {}
def create_error_page(code, url):
if url.startswith("http://") or url.startswith("https://"):
resp = requests.get(url)
if resp.status_code == 200:
app.register_error_handler(code, lambda e: (resp.content, code))
else:
if url.startswith(basedir):
url = url[len(basedir):]
path = os.path.normpath(os.path.join("public", url))
if os.path.isfile(path):
app.register_error_handler(code, lambda e: (send_from_directory(".", path), code))
for code in error_pages:
if error_pages[code]:
create_error_page(code, error_pages[code])
def cache_wrap(res_from, secs):
res = flask.make_response(res_from)
res.headers["Cache-Control"] = f"public, max-age={secs}, s-maxage={secs}"
res.headers["CDN-Cache-Control"] = f"max-age={secs}"
return res
@app.route(basedir + "src/<path:ref>")
def send_src(ref):
return cache_wrap(flask.send_from_directory("public/src", ref), 3600)
@app.route(basedir + "assets/<path:ref>")
def send_assets(ref):
return cache_wrap(flask.send_from_directory("public/assets", ref), 3600)
@app.route(basedir + "songs/<path:ref>")
def send_songs(ref):
return cache_wrap(flask.send_from_directory("public/songs", ref), 604800)
@app.route(basedir + "manifest.json")
def send_manifest():
return cache_wrap(flask.send_from_directory("public", "manifest.json"), 3600)
@app.route("/upload/", defaults={"ref": "index.html"})
@app.route("/upload/<path:ref>")
def send_upload(ref):
return cache_wrap(flask.send_from_directory("public/upload", ref), 3600)
@app.route("/api/upload", methods=["POST"])
def upload_file():
try:
# POSTリクエストにファイルの部分がない場合
if 'file_tja' not in flask.request.files or 'file_music' not in flask.request.files:
return flask.jsonify({'error': 'リクエストにファイルの部分がありません'})
file_tja = flask.request.files['file_tja']
file_music = flask.request.files['file_music']
# ファイルが選択されておらず空のファイルを受け取った場合
if file_tja.filename == '' or file_music.filename == '':
return flask.jsonify({'error': 'ファイルが選択されていません'})
# TJAファイルをテキストUTF-8/LFに変換
tja_data = file_tja.read()
# 尝试检测编码并转换为UTF-8
try:
# 首先尝试UTF-8解码
tja_text = tja_data.decode("utf-8")
except UnicodeDecodeError:
# 如果UTF-8失败尝试其他常见编码
for encoding in ['shift_jis', 'euc-jp', 'iso-2022-jp', 'cp932']:
try:
tja_text = tja_data.decode(encoding)
break
except UnicodeDecodeError:
continue
else:
# 如果所有编码都失败,使用错误处理
tja_text = tja_data.decode("utf-8", errors="replace")
# 删除回车符CR只保留换行符LF
tja_text = tja_text.replace('\r', '')
print("TJAのサイズ:",len(tja_text))
# TJAファイルの内容を解析
tja = tjaf.Tja(tja_text)
# TJAファイルのハッシュ値を生成
msg = hashlib.sha256()
msg.update(tja_data)
tja_hash = msg.hexdigest()
print("TJA:",tja_hash)
# 音楽ファイルのハッシュ値を生成
music_data = file_music.read()
msg2 = hashlib.sha256()
msg2.update(music_data)
music_hash = msg2.hexdigest()
print("音楽:",music_hash)
# IDを生成
generated_id = f"{tja_hash}-{music_hash}"
# MongoDBのデータも作成
db_entry = tja.to_mongo(generated_id, time.time_ns())
pprint.pprint(db_entry)
# mongoDBにデータをぶち込む
client['taiko']["songs"].insert_one(db_entry)
# ディレクトリを作成
target_dir = pathlib.Path(os.getenv("TAIKO_WEB_SONGS_DIR", "public/songs")) / generated_id
target_dir.mkdir(parents=True,exist_ok=True)
# TJAを保存
(target_dir / "main.tja").write_bytes(tja_data)
# 曲ファイルも保存
(target_dir / f"main.{db_entry['music_type']}").write_bytes(music_data)
except Exception as e:
error_str = ''.join(traceback.TracebackException.from_exception(e).format())
return flask.jsonify({'error': error_str})
return flask.jsonify({'success': True})
@app.route("/api/delete", methods=["POST"])
@limiter.limit("1 per day")
def delete():
id = flask.request.get_json().get('id')
client["taiko"]["songs"].delete_one({ "id": id })
parent_dir = pathlib.Path(os.getenv("TAIKO_WEB_SONGS_DIR", "public/songs"))
target_dir = parent_dir / id
if not (target_dir.resolve().parents and parent_dir.resolve() in target_dir.resolve().parents):
return flask.jsonify({ "success": False, "reason": "PARENT IS NOT ALLOWED" })
shutil.rmtree(target_dir)
return "成功しました。"
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Run the taiko-web development server.')
parser.add_argument('port', type=int, metavar='PORT', nargs='?', default=34801, help='Port to listen on.')
parser.add_argument('-b', '--bind-address', default='localhost', help='Bind server to address.')
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode.')
args = parser.parse_args()
app.run(host=args.bind_address, port=args.port, debug=args.debug)

65
config.example.py Normal file
View File

@@ -0,0 +1,65 @@
# The base URL for Taiko Web, with trailing slash.
BASEDIR = '/'
# The full URL base asset URL, with trailing slash.
ASSETS_BASEURL = '/assets/'
# The full URL base song URL, with trailing slash.
SONGS_BASEURL = '/songs/'
# Multiplayer websocket URL. Defaults to /p2 if blank.
MULTIPLAYER_URL = ''
# Send static files for custom error pages
ERROR_PAGES = {
404: ''
}
# The email address to display in the "About Simulator" menu.
EMAIL = None
# Whether to use the user account system.
ACCOUNTS = True
# Custom JavaScript file to load with the simulator.
CUSTOM_JS = ''
# Default plugins to load with the simulator.
PLUGINS = [{
'url': '',
'start': False,
'hide': False
}]
# Filetype to use for song previews. (mp3/ogg)
PREVIEW_TYPE = 'mp3'
# MongoDB server settings.
MONGO = {
'host': ['127.0.0.1:27017'],
'database': 'taiko'
}
# Redis server settings, used for sessions + cache.
REDIS = {
'CACHE_TYPE': 'redis',
'CACHE_REDIS_HOST': '127.0.0.1',
'CACHE_REDIS_PORT': 6379,
'CACHE_REDIS_PASSWORD': None,
'CACHE_REDIS_DB': None
}
# Secret key used for sessions.
SECRET_KEY = 'change-me'
# Git repository base URL.
URL = 'https://github.com/bui/taiko-web/'
# Google Drive API.
GOOGLE_CREDENTIALS = {
'gdrive_enabled': False,
'api_key': '',
'oauth_client_id': '',
'project_number': '',
'min_level': None
}

65
config.py Normal file
View File

@@ -0,0 +1,65 @@
# The base URL for Taiko Web, with trailing slash.
BASEDIR = '/'
# The full URL base asset URL, with trailing slash.
ASSETS_BASEURL = '/assets/'
# The full URL base song URL, with trailing slash.
SONGS_BASEURL = '/songs/'
# Multiplayer websocket URL. Defaults to /p2 if blank.
MULTIPLAYER_URL = ''
# Send static files for custom error pages
ERROR_PAGES = {
404: ''
}
# The email address to display in the "About Simulator" menu.
EMAIL = None
# Whether to use the user account system.
ACCOUNTS = True
# Custom JavaScript file to load with the simulator.
CUSTOM_JS = ''
# Default plugins to load with the simulator.
PLUGINS = [{
'url': '',
'start': False,
'hide': False
}]
# Filetype to use for song previews. (mp3/ogg)
PREVIEW_TYPE = 'mp3'
# MongoDB server settings.
MONGO = {
'host': ['127.0.0.1:27017'],
'database': 'taiko'
}
# Redis server settings, used for sessions + cache.
REDIS = {
'CACHE_TYPE': 'redis',
'CACHE_REDIS_HOST': '127.0.0.1',
'CACHE_REDIS_PORT': 6379,
'CACHE_REDIS_PASSWORD': None,
'CACHE_REDIS_DB': None
}
# Secret key used for sessions.
SECRET_KEY = 'change-me'
# Git repository base URL.
URL = 'https://github.com/bui/taiko-web/'
# Google Drive API.
GOOGLE_CREDENTIALS = {
'gdrive_enabled': False,
'api_key': '',
'oauth_client_id': '',
'project_number': '',
'min_level': None
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
public/assets/fonts/TnT.ttf Normal file

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 71 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 73 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

Some files were not shown because too many files have changed in this diff Show More