202 lines
6.0 KiB
Python
202 lines
6.0 KiB
Python
|
|
import time
|
|
import json
|
|
import base64
|
|
|
|
# Mock DB and objects
|
|
class MockCollection:
|
|
def __init__(self, name):
|
|
self.name = name
|
|
self.data = []
|
|
|
|
def find_one(self, query):
|
|
for item in self.data:
|
|
match = True
|
|
for k, v in query.items():
|
|
if item.get(k) != v:
|
|
match = False
|
|
break
|
|
if match:
|
|
return item
|
|
return None
|
|
|
|
def find(self, query, projection=None):
|
|
results = []
|
|
for item in self.data:
|
|
match = True
|
|
for k, v in query.items():
|
|
if item.get(k) != v:
|
|
match = False
|
|
break
|
|
if match:
|
|
if projection:
|
|
filtered = {k: v for k, v in item.items() if projection.get(k, False)}
|
|
if projection.get('_id') is None or projection.get('_id') is True:
|
|
filtered['_id'] = item.get('_id')
|
|
results.append(filtered)
|
|
else:
|
|
results.append(item)
|
|
return MockCursor(results)
|
|
|
|
def insert_one(self, doc):
|
|
doc['_id'] = len(self.data) + 1
|
|
print(f"[{self.name}] Inserted: {doc}")
|
|
self.data.append(doc)
|
|
|
|
def count_documents(self, query):
|
|
return len(self.data)
|
|
|
|
class MockCursor:
|
|
def __init__(self, data):
|
|
self.data = data
|
|
def sort(self, key, direction):
|
|
self.data.sort(key=lambda x: x.get(key, 0), reverse=(direction < 0))
|
|
return self
|
|
def limit(self, n):
|
|
self.data = self.data[:n]
|
|
return self.data
|
|
|
|
class MockDB:
|
|
def __init__(self):
|
|
self.users = MockCollection('users')
|
|
self.leaderboards = MockCollection('leaderboards')
|
|
|
|
db = MockDB()
|
|
db.users.insert_one({'username': 'testuser', 'display_name': 'Test User'})
|
|
|
|
# --- Logic from app.py rewritten for standalone test ---
|
|
|
|
def logic_submit(data, username):
|
|
print(f"\n--- Submitting {data.get('song_id')} ---")
|
|
user = db.users.find_one({'username': username})
|
|
if not user:
|
|
return {'error': 'user_not_found'}
|
|
|
|
song_id = data.get('song_id')
|
|
difficulty = data.get('difficulty')
|
|
score_data = data.get('score')
|
|
|
|
current_month = time.strftime('%Y-%m', time.gmtime())
|
|
|
|
existing = db.leaderboards.find_one({
|
|
'song_id': song_id,
|
|
'difficulty': difficulty,
|
|
'username': username,
|
|
'month': current_month
|
|
})
|
|
|
|
# Patched Logic from app.py
|
|
try:
|
|
if isinstance(score_data, str):
|
|
score_obj = json.loads(score_data)
|
|
else:
|
|
score_obj = score_data
|
|
|
|
# Check for 'score' first, then 'points'
|
|
score_val = score_obj.get('score')
|
|
if score_val is None:
|
|
score_val = score_obj.get('points')
|
|
|
|
score_value = int(score_val or 0)
|
|
print(f" Parsed Score Value: {score_value}")
|
|
except Exception as e:
|
|
print(f"Error parsing score: {e}")
|
|
return {'error': 'invalid_score_format'}
|
|
|
|
if existing:
|
|
print(" Existing record found (simulated update).")
|
|
else:
|
|
print(" Inserting new record.")
|
|
db.leaderboards.insert_one({
|
|
'song_id': song_id,
|
|
'difficulty': difficulty,
|
|
'username': username,
|
|
'display_name': user['display_name'],
|
|
'score': score_data,
|
|
'score_value': score_value,
|
|
'submitted_at': time.time(),
|
|
'month': current_month
|
|
})
|
|
return {'status': 'ok'}
|
|
|
|
def logic_get(args):
|
|
print(f"\n--- Getting {args.get('song_id')} ---")
|
|
song_id = args.get('song_id', None)
|
|
difficulty = args.get('difficulty', None)
|
|
|
|
if not song_id or not difficulty:
|
|
return {'error': 'missing args'}
|
|
|
|
try:
|
|
song_id = int(song_id)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
current_month = time.strftime('%Y-%m', time.gmtime())
|
|
|
|
leaderboard = list(db.leaderboards.find({
|
|
'song_id': song_id,
|
|
'difficulty': difficulty,
|
|
'month': current_month
|
|
}, {
|
|
'_id': False,
|
|
'username': True,
|
|
'score_value': True
|
|
}).sort('score_value', -1).limit(50))
|
|
|
|
return {'status': 'ok', 'leaderboard': leaderboard}
|
|
|
|
# --- Test Cases ---
|
|
|
|
# Case 1: Hash ID
|
|
print("=== TEST CASE 1: Hash ID ===")
|
|
logic_submit({
|
|
'song_id': "hash123",
|
|
'difficulty': 'oni',
|
|
'score': {'score': 1000000}
|
|
}, 'testuser')
|
|
res1 = logic_get({'song_id': "hash123", 'difficulty': 'oni'})
|
|
if len(res1['leaderboard']) > 0: print("✅ Success")
|
|
else: print("❌ Failed")
|
|
|
|
# Case 2: Numeric ID
|
|
print("\n=== TEST CASE 2: Numeric ID ===")
|
|
logic_submit({
|
|
'song_id': 123,
|
|
'difficulty': 'oni',
|
|
'score': {'score': 2000000}
|
|
}, 'testuser')
|
|
res2 = logic_get({'song_id': "123", 'difficulty': 'oni'})
|
|
if len(res2['leaderboard']) > 0: print("✅ Success")
|
|
else: print("❌ Failed")
|
|
|
|
# Case 3: Points Only (Frontend Payload Simulation)
|
|
print("\n=== TEST CASE 3: Points Only (Frontend Payload) ===")
|
|
# This simulates what the JS 'this.resultsObj' looks like (it has 'points', not 'score')
|
|
logic_submit({
|
|
'song_id': "points_song",
|
|
'difficulty': 'hard',
|
|
'score': {'points': 345678, 'bad': 0, 'good': 100} # No 'score' key
|
|
}, 'testuser')
|
|
|
|
res3 = logic_get({'song_id': "points_song", 'difficulty': 'hard'})
|
|
passed = False
|
|
if len(res3.get('leaderboard', [])) > 0:
|
|
val = res3['leaderboard'][0].get('score_value')
|
|
if val == 345678:
|
|
print(f"✅ SUCCESS: Correctly parsed 'points' -> {val}")
|
|
passed = True
|
|
else:
|
|
print(f"❌ FAILED: Score value mismatch. Expected 345678, got {val}")
|
|
else:
|
|
print("❌ FAILED: No record found")
|
|
|
|
if passed:
|
|
print("\n---------------------------------------------------")
|
|
print("🎉 ALL LOCAL TESTS PASSED. BACKEND LOGIC VERIFIED.")
|
|
print("---------------------------------------------------")
|
|
else:
|
|
print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
|
|
print("🛑 TESTS FAILED. CHECK LOGS.")
|
|
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
|