mirror of
https://github.com/sotam0316/brain_dogfood.git
synced 2026-04-24 19:48:35 +09:00
Initial Global Release v1.0 (Localization & Security Hardening)
This commit is contained in:
+108
@@ -0,0 +1,108 @@
|
||||
import os
|
||||
import json
|
||||
from flask import Flask, request, abort # type: ignore
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
def create_app():
|
||||
# Set folders to parent directory since app logic is now in a subfolder
|
||||
template_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'templates'))
|
||||
static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'static'))
|
||||
|
||||
app = Flask(__name__, template_folder=template_dir, static_folder=static_dir)
|
||||
app.secret_key = os.getenv('SECRET_KEY', 'dev_key')
|
||||
|
||||
# --- 🛡️ 보안 실드 & 로깅 설정 ---
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
|
||||
log_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'logs'))
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
|
||||
file_handler = RotatingFileHandler(
|
||||
os.path.join(log_dir, 'app.log'),
|
||||
maxBytes=10*1024*1024, # 10MB
|
||||
backupCount=5,
|
||||
encoding='utf-8'
|
||||
)
|
||||
file_handler.setFormatter(logging.Formatter(
|
||||
'[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
|
||||
))
|
||||
app.logger.addHandler(file_handler)
|
||||
app.logger.setLevel(logging.INFO)
|
||||
app.logger.info('🚀 뇌사료 서버 기동 - 로깅 시스템 가동')
|
||||
|
||||
@app.errorhandler(403)
|
||||
def forbidden(e):
|
||||
return "Forbidden: Suspicious activity detected. Your IP has been logged.", 403
|
||||
|
||||
@app.before_request
|
||||
def unified_logger():
|
||||
# 클라이언트 IP (Cloudflare 등을 거칠 경우 X-Forwarded-For 확인)
|
||||
ip = request.headers.get('X-Forwarded-For', request.remote_addr)
|
||||
path = request.path
|
||||
method = request.method
|
||||
params = request.query_string.decode('utf-8')
|
||||
|
||||
# 1. 보안 실드: 메인/로그인 페이지에 파라미터가 붙은 경우 즉시 차단
|
||||
if path.rstrip('/') in ['', '/login'] and params:
|
||||
log_msg = f"[SHIELD] Blocked: [{ip}] {method} {path}?{params}"
|
||||
app.logger.warning(log_msg)
|
||||
abort(403)
|
||||
|
||||
# 2. 트래픽 로깅 (정적 파일 제외)
|
||||
if not path.startswith('/static/'):
|
||||
log_msg = f"ACCESS: [{ip}] {method} {path}"
|
||||
if params:
|
||||
log_msg += f"?{params}"
|
||||
app.logger.info(log_msg)
|
||||
|
||||
upload_folder = os.path.abspath(os.path.join(static_dir, 'uploads'))
|
||||
os.makedirs(upload_folder, exist_ok=True)
|
||||
app.config['UPLOAD_FOLDER'] = upload_folder
|
||||
|
||||
# Load config.json
|
||||
config_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'config.json'))
|
||||
if os.path.exists(config_path):
|
||||
with open(config_path, 'r', encoding='utf-8') as f:
|
||||
cfg = json.load(f)
|
||||
app.config['UPLOAD_SECURITY'] = cfg.get('upload_security', {})
|
||||
else:
|
||||
app.config['UPLOAD_SECURITY'] = {'allowed_extensions': [], 'blocked_extensions': []}
|
||||
|
||||
# Initialize DB schema
|
||||
from .database import init_db
|
||||
init_db()
|
||||
|
||||
# Session and Security configurations
|
||||
app.config.update(
|
||||
SESSION_COOKIE_HTTPONLY=True,
|
||||
SESSION_COOKIE_SAMESITE='Lax',
|
||||
SESSION_COOKIE_SECURE=False, # Set to True in production with HTTPS
|
||||
PERMANENT_SESSION_LIFETIME=3600 # 60 minutes (1 hour) session
|
||||
)
|
||||
|
||||
@app.after_request
|
||||
def add_security_headers(response):
|
||||
"""보안 강화를 위한 HTTP 헤더 추가"""
|
||||
response.headers['X-Content-Type-Options'] = 'nosniff'
|
||||
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
|
||||
response.headers['X-XSS-Protection'] = '1; mode=block'
|
||||
# Content Security Policy (Toast UI 및 외부 CDN 허용)
|
||||
# 운영 환경에 맞춰 점진적으로 강화 가능
|
||||
csp = (
|
||||
"default-src 'self'; "
|
||||
"script-src 'self' 'unsafe-inline' https://uicdn.toast.com https://cdn.jsdelivr.net https://cdnjs.cloudflare.com https://static.cloudflareinsights.com https://d3js.org; "
|
||||
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com https://uicdn.toast.com; "
|
||||
"font-src 'self' https://fonts.gstatic.com; "
|
||||
"img-src 'self' data: blob:; "
|
||||
"connect-src 'self' https://cdn.jsdelivr.net https://cdnjs.cloudflare.com https://cloudflareinsights.com https://d3js.org;"
|
||||
)
|
||||
response.headers['Content-Security-Policy'] = csp
|
||||
return response
|
||||
|
||||
# Register modular blueprints
|
||||
from .routes import register_blueprints
|
||||
register_blueprints(app)
|
||||
|
||||
return app
|
||||
@@ -0,0 +1,95 @@
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
from google import genai
|
||||
from .utils.i18n import _t
|
||||
|
||||
# 로거 설정
|
||||
logger = logging.getLogger('ai')
|
||||
|
||||
def analyze_memo(title, content, lang='en'):
|
||||
"""
|
||||
최신 google-genai SDK를 사용하여 메모 본문을 요약하고 태그를 추출합니다.
|
||||
"""
|
||||
api_key = os.getenv('GEMINI_API_KEY')
|
||||
primary_model = os.getenv('GEMINI_MODEL', 'gemini-2.0-flash')
|
||||
|
||||
# 예비용 모델 리스트
|
||||
fallbacks = [primary_model, 'gemini-1.5-flash-latest', 'gemini-1.5-flash']
|
||||
models_to_try = list(dict.fromkeys(fallbacks))
|
||||
|
||||
if not api_key:
|
||||
error_msg = "Gemini API key is required." if lang == 'en' else "AI 분석을 위해 Gemini API 키가 필요합니다."
|
||||
return error_msg, []
|
||||
|
||||
client = genai.Client(api_key=api_key)
|
||||
|
||||
if lang == 'ko':
|
||||
prompt = f"""
|
||||
당신은 메모 분석 전문가입니다. 아래 메모의 제목과 내용을 읽고 다음 작업을 수행하세요:
|
||||
1. 내용을 1~2문장으로 아주 간결하게 요약할 것.
|
||||
2. 내용과 관련된 핵심 키워드를 태그 형태로 3~5개 추출할 것.
|
||||
|
||||
[제목] {title}
|
||||
[내용] {content}
|
||||
|
||||
출력 형식(JSON):
|
||||
{{
|
||||
"summary": "요약 내용",
|
||||
"tags": ["태그1", "태그2", "태그3"]
|
||||
}}
|
||||
"""
|
||||
else:
|
||||
prompt = f"""
|
||||
You are a memo analysis expert. Read the title and content below and perform the following:
|
||||
1. Summarize the content very concisely in 1-2 sentences.
|
||||
2. Extract 3-5 key keywords as tags.
|
||||
|
||||
[Title] {title}
|
||||
[Content] {content}
|
||||
|
||||
Output Format (JSON):
|
||||
{{
|
||||
"summary": "Summary text",
|
||||
"tags": ["Tag1", "Tag2", "Tag3"]
|
||||
}}
|
||||
"""
|
||||
|
||||
last_error = None
|
||||
for model_name in models_to_try:
|
||||
try:
|
||||
logger.info(f"[AI] Attempting analysis with model: {model_name} (lang={lang})")
|
||||
response = client.models.generate_content(
|
||||
model=model_name,
|
||||
contents=prompt
|
||||
)
|
||||
|
||||
res_text = response.text.strip()
|
||||
if '```json' in res_text:
|
||||
res_text = res_text.split('```json')[1].split('```')[0].strip()
|
||||
elif '```' in res_text:
|
||||
res_text = res_text.split('```')[1].strip()
|
||||
|
||||
data = json.loads(res_text)
|
||||
logger.info(f"[AI] Analysis successful using {model_name}")
|
||||
return data.get('summary', ''), data.get('tags', [])
|
||||
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
logger.warning(f"[AI] Model {model_name} failed: {str(e)}")
|
||||
if "404" in str(e):
|
||||
continue
|
||||
else:
|
||||
break
|
||||
|
||||
# 모든 시도가 실패한 경우
|
||||
error_header = "AI Analysis Error: " if lang == 'en' else "AI 분석 중 오류 발생: "
|
||||
error_msg = f"{error_header}{str(last_error)}"
|
||||
|
||||
if "404" in str(last_error):
|
||||
if lang == 'en':
|
||||
error_msg += "\n(Note: The selected model might be expired. Check GEMINI_MODEL in .env)"
|
||||
else:
|
||||
error_msg += "\n(알림: 선택한 AI 모델이 만료되었을 수 있습니다. .env의 GEMINI_MODEL 설정을 확인해주세요.)"
|
||||
|
||||
return error_msg, []
|
||||
+20
@@ -0,0 +1,20 @@
|
||||
import os
|
||||
import functools
|
||||
from flask import session, redirect, url_for, request, current_app # type: ignore
|
||||
|
||||
def check_auth(username, password):
|
||||
"""
|
||||
환경 변수에 설정된 관리자 계정 정보와 일치하는지 확인합니다.
|
||||
"""
|
||||
admin_user = os.getenv('ADMIN_USERNAME', 'admin')
|
||||
admin_password = os.getenv('ADMIN_PASSWORD', 'admin')
|
||||
return username == admin_user and password == admin_password
|
||||
|
||||
def login_required(view):
|
||||
@functools.wraps(view)
|
||||
def wrapped_view(**kwargs):
|
||||
# app/routes/auth.py의 세션 키와 일치시킴 (logged_in)
|
||||
if session.get('logged_in') is None:
|
||||
return redirect(url_for('main.login_page'))
|
||||
return view(**kwargs)
|
||||
return wrapped_view
|
||||
@@ -0,0 +1,8 @@
|
||||
# System-wide Core Constants (Globalization-ready)
|
||||
|
||||
# System Reserved Groups
|
||||
GROUP_DEFAULT = "default" # 기본
|
||||
GROUP_FILES = "files" # 파일모음
|
||||
GROUP_DONE = "done" # 완료모음
|
||||
|
||||
SYSTEM_GROUPS = [GROUP_DEFAULT, GROUP_FILES, GROUP_DONE]
|
||||
@@ -0,0 +1,83 @@
|
||||
import os
|
||||
import sqlite3
|
||||
|
||||
# Data directory relative to this file
|
||||
DB_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'data'))
|
||||
os.makedirs(DB_DIR, exist_ok=True)
|
||||
DB_PATH = os.path.join(DB_DIR, 'memos.db')
|
||||
|
||||
def init_db():
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
c = conn.cursor()
|
||||
|
||||
# 1. Memos Table
|
||||
c.execute('''
|
||||
CREATE TABLE IF NOT EXISTS memos (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
title TEXT,
|
||||
content TEXT,
|
||||
summary TEXT,
|
||||
color TEXT DEFAULT '#2c3e50',
|
||||
is_pinned BOOLEAN DEFAULT 0,
|
||||
status TEXT DEFAULT 'active', -- 'active', 'done', 'archived'
|
||||
group_name TEXT DEFAULT 'default',
|
||||
is_encrypted BOOLEAN DEFAULT 0,
|
||||
created_at TIMESTAMP,
|
||||
updated_at TIMESTAMP
|
||||
)
|
||||
''')
|
||||
|
||||
try:
|
||||
c.execute("ALTER TABLE memos ADD COLUMN status TEXT DEFAULT 'active'")
|
||||
except sqlite3.OperationalError:
|
||||
pass
|
||||
|
||||
try:
|
||||
c.execute("ALTER TABLE memos ADD COLUMN is_encrypted BOOLEAN DEFAULT 0")
|
||||
except sqlite3.OperationalError:
|
||||
pass
|
||||
|
||||
|
||||
# 2. Separate Tags Table (Normalized)
|
||||
c.execute('''
|
||||
CREATE TABLE IF NOT EXISTS tags (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
memo_id INTEGER,
|
||||
name TEXT,
|
||||
source TEXT, -- 'user' or 'ai'
|
||||
FOREIGN KEY (memo_id) REFERENCES memos (id) ON DELETE CASCADE
|
||||
)
|
||||
''')
|
||||
|
||||
# 3. Attachments Table (Enhanced Asset Tracking)
|
||||
c.execute('''
|
||||
CREATE TABLE IF NOT EXISTS attachments (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
memo_id INTEGER,
|
||||
filename TEXT,
|
||||
original_name TEXT,
|
||||
file_type TEXT,
|
||||
size INTEGER,
|
||||
created_at TIMESTAMP,
|
||||
FOREIGN KEY (memo_id) REFERENCES memos (id) ON DELETE SET NULL
|
||||
)
|
||||
''')
|
||||
|
||||
# 4. Memo Links Table (Backlinks)
|
||||
c.execute('''
|
||||
CREATE TABLE IF NOT EXISTS memo_links (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
source_id INTEGER,
|
||||
target_id INTEGER,
|
||||
FOREIGN KEY (source_id) REFERENCES memos (id) ON DELETE CASCADE,
|
||||
FOREIGN KEY (target_id) REFERENCES memos (id) ON DELETE CASCADE
|
||||
)
|
||||
''')
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def get_db():
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
@@ -0,0 +1,16 @@
|
||||
from flask import Blueprint # type: ignore
|
||||
|
||||
def register_blueprints(app):
|
||||
from .main import main_bp
|
||||
from .auth import auth_bp
|
||||
from .memo import memo_bp
|
||||
from .file import file_bp
|
||||
from .ai import ai_bp
|
||||
from .settings import settings_bp
|
||||
|
||||
app.register_blueprint(main_bp)
|
||||
app.register_blueprint(auth_bp)
|
||||
app.register_blueprint(memo_bp)
|
||||
app.register_blueprint(file_bp)
|
||||
app.register_blueprint(ai_bp)
|
||||
app.register_blueprint(settings_bp)
|
||||
@@ -0,0 +1,46 @@
|
||||
import datetime
|
||||
from flask import Blueprint, jsonify, current_app # type: ignore
|
||||
from ..database import get_db
|
||||
from ..auth import login_required
|
||||
from ..ai import analyze_memo
|
||||
from ..utils.i18n import _t
|
||||
|
||||
ai_bp = Blueprint('ai', __name__)
|
||||
|
||||
@ai_bp.route('/api/memos/<int:memo_id>/analyze', methods=['POST'])
|
||||
@login_required
|
||||
def analyze_memo_route(memo_id):
|
||||
conn = get_db()
|
||||
c = conn.cursor()
|
||||
c.execute('SELECT title, content, is_encrypted FROM memos WHERE id = ?', (memo_id,))
|
||||
memo = c.fetchone()
|
||||
|
||||
if not memo:
|
||||
return jsonify({'error': _t('label_no_results')}), 404
|
||||
|
||||
if memo['is_encrypted']:
|
||||
return jsonify({'error': _t('msg_encrypted_locked')}), 403
|
||||
|
||||
current_app.logger.info(f"AI Analysis Started: ID {memo_id}, Title: '{memo['title']}'")
|
||||
|
||||
lang = current_app.config.get('lang', 'en')
|
||||
summary, ai_tags = analyze_memo(memo['title'], memo['content'], lang=lang)
|
||||
|
||||
try:
|
||||
c.execute('UPDATE memos SET summary = ?, updated_at = ? WHERE id = ?',
|
||||
(summary, datetime.datetime.now().isoformat(), memo_id))
|
||||
|
||||
c.execute("DELETE FROM tags WHERE memo_id = ? AND source = 'ai'", (memo_id,))
|
||||
for tag in ai_tags:
|
||||
if tag.strip():
|
||||
c.execute('INSERT INTO tags (memo_id, name, source) VALUES (?, ?, ?)',
|
||||
(memo_id, tag.strip(), 'ai'))
|
||||
|
||||
conn.commit()
|
||||
current_app.logger.info(f"AI Analysis SUCCESS: ID {memo_id}, Tags extracted: {len(ai_tags)}")
|
||||
return jsonify({'summary': summary, 'tags': ai_tags})
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
return jsonify({'error': str(e)}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
@@ -0,0 +1,22 @@
|
||||
from flask import Blueprint, request, jsonify, session, redirect, url_for # type: ignore
|
||||
from ..auth import check_auth
|
||||
from ..utils.i18n import _t
|
||||
|
||||
auth_bp = Blueprint('auth', __name__)
|
||||
|
||||
@auth_bp.route('/login', methods=['POST'])
|
||||
def login():
|
||||
data = request.json
|
||||
username = data.get('username')
|
||||
password = data.get('password')
|
||||
|
||||
if check_auth(username, password):
|
||||
session.permanent = True # Enable permanent session to use LIFETIME config
|
||||
session['logged_in'] = True
|
||||
return jsonify({'message': 'Logged in successfully'})
|
||||
return jsonify({'error': _t('msg_auth_failed')}), 401
|
||||
|
||||
@auth_bp.route('/logout')
|
||||
def logout():
|
||||
session.pop('logged_in', None)
|
||||
return redirect(url_for('main.login_page'))
|
||||
@@ -0,0 +1,162 @@
|
||||
import os
|
||||
import uuid
|
||||
import datetime
|
||||
import mimetypes
|
||||
from flask import Blueprint, request, jsonify, current_app, Response, send_from_directory, session # type: ignore
|
||||
from werkzeug.utils import secure_filename # type: ignore
|
||||
from urllib.parse import quote # type: ignore
|
||||
from ..database import get_db
|
||||
from ..auth import login_required
|
||||
from ..security import encrypt_file, decrypt_file
|
||||
|
||||
file_bp = Blueprint('file', __name__)
|
||||
|
||||
@file_bp.route('/api/upload', methods=['POST'])
|
||||
@login_required
|
||||
def upload_file():
|
||||
if 'image' not in request.files and 'file' not in request.files:
|
||||
return jsonify({'error': 'No file provided'}), 400
|
||||
|
||||
file = request.files.get('image') or request.files.get('file')
|
||||
if not file or file.filename == '':
|
||||
return jsonify({'error': 'No selected file'}), 400
|
||||
|
||||
ext = os.path.splitext(file.filename)[1].lower().replace('.', '')
|
||||
sec_conf = current_app.config.get('UPLOAD_SECURITY', {})
|
||||
blocked = sec_conf.get('blocked_extensions', [])
|
||||
allowed = sec_conf.get('allowed_extensions', [])
|
||||
|
||||
if ext in blocked:
|
||||
return jsonify({'error': f'Extension .{ext} is blocked for security reasons.'}), 403
|
||||
if allowed and ext not in allowed:
|
||||
return jsonify({'error': f'Extension .{ext} is not in the allowed list.'}), 403
|
||||
|
||||
unique_filename = f"{uuid.uuid4()}.{ext}"
|
||||
filename = secure_filename(unique_filename)
|
||||
filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], filename)
|
||||
|
||||
# Encrypt and save
|
||||
file_bytes = file.read()
|
||||
encrypted_bytes = encrypt_file(file_bytes)
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(encrypted_bytes)
|
||||
|
||||
# Record attachment in DB
|
||||
conn = get_db()
|
||||
c = conn.cursor()
|
||||
c.execute('''
|
||||
INSERT INTO attachments (filename, original_name, file_type, size, created_at)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
''', (filename, file.filename, ext, os.path.getsize(filepath), datetime.datetime.now().isoformat()))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return jsonify({
|
||||
'url': f"/api/download/{filename}",
|
||||
'name': file.filename,
|
||||
'ext': ext
|
||||
})
|
||||
|
||||
@file_bp.route('/api/download/<filename>')
|
||||
def download_file_route(filename):
|
||||
filename = secure_filename(filename)
|
||||
filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], filename)
|
||||
|
||||
if not os.path.exists(filepath):
|
||||
return jsonify({'error': 'File not found'}), 404
|
||||
|
||||
# Check security status of parent memo
|
||||
conn = get_db()
|
||||
c = conn.cursor()
|
||||
c.execute('''
|
||||
SELECT a.original_name, m.is_encrypted
|
||||
FROM attachments a
|
||||
LEFT JOIN memos m ON a.memo_id = m.id
|
||||
WHERE a.filename = ?
|
||||
''', (filename,))
|
||||
row = c.fetchone()
|
||||
conn.close()
|
||||
|
||||
# 로그인된 상태라면 암호화된 메모의 파일이라도 본인 확인이 된 것으로 간주하고 허용
|
||||
is_logged_in = session.get('logged_in') is True
|
||||
|
||||
# 만약 메모가 암호화되어 있고, 로그인도 되어 있지 않다면 차단
|
||||
if row and row['is_encrypted'] and not is_logged_in:
|
||||
current_app.logger.warning(f"Access Denied: Unauthenticated access to encrypted file {filename}")
|
||||
return jsonify({'error': 'Access denied. Please login to view this attachment.'}), 403
|
||||
|
||||
with open(filepath, 'rb') as f:
|
||||
data = f.read()
|
||||
|
||||
decrypted = decrypt_file(data)
|
||||
|
||||
orig_name = row['original_name'] if row else filename
|
||||
# 원본 파일명 기반으로 정확한 마임타입 추측
|
||||
mime_type, _ = mimetypes.guess_type(orig_name)
|
||||
if not mime_type: mime_type = 'application/octet-stream'
|
||||
|
||||
# 이미지인 경우 'inline'으로 설정하여 브라우저 본문 내 렌더링 허용, 그 외는 'attachment'
|
||||
is_image = mime_type.startswith('image/')
|
||||
disposition = 'inline' if is_image else 'attachment'
|
||||
|
||||
headers = {
|
||||
'Content-Disposition': f"{disposition}; filename*=UTF-8''{quote(orig_name)}"
|
||||
}
|
||||
|
||||
content_data = decrypted if decrypted is not None else data
|
||||
return Response(content_data, mimetype=mime_type, headers=headers)
|
||||
|
||||
@file_bp.route('/api/assets', methods=['GET'])
|
||||
@login_required
|
||||
def get_assets():
|
||||
conn = get_db()
|
||||
c = conn.cursor()
|
||||
# Filter out files belonging to encrypted memos
|
||||
c.execute('''
|
||||
SELECT a.*, m.title as memo_title
|
||||
FROM attachments a
|
||||
LEFT JOIN memos m ON a.memo_id = m.id
|
||||
WHERE m.is_encrypted = 0 OR m.is_encrypted IS NULL
|
||||
ORDER BY a.created_at DESC
|
||||
''')
|
||||
assets = [dict(r) for r in c.fetchall()]
|
||||
conn.close()
|
||||
return jsonify(assets)
|
||||
@file_bp.route('/api/attachments/<filename>', methods=['DELETE'])
|
||||
@login_required
|
||||
def delete_attachment_route(filename):
|
||||
filename = secure_filename(filename)
|
||||
filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], filename)
|
||||
|
||||
conn = get_db()
|
||||
c = conn.cursor()
|
||||
# 파일 정보 확인
|
||||
c.execute('SELECT id, memo_id FROM attachments WHERE filename = ?', (filename,))
|
||||
row = c.fetchone()
|
||||
|
||||
if not row:
|
||||
conn.close()
|
||||
return jsonify({'error': 'File not found in database'}), 404
|
||||
|
||||
# 보안: 메모에 이미 연결된 파일은 삭제하지 않음 (취소 시에는 아직 연결되지 않은 파일만 삭제)
|
||||
# 만약 연결된 파일을 삭제하고 싶다면 별도의 로직 필요
|
||||
if row['memo_id'] is not None:
|
||||
conn.close()
|
||||
return jsonify({'error': 'Cannot delete file already linked to a memo'}), 403
|
||||
|
||||
try:
|
||||
# 1. DB 삭제
|
||||
c.execute('DELETE FROM attachments WHERE filename = ?', (filename,))
|
||||
conn.commit()
|
||||
|
||||
# 2. 물리 파일 삭제
|
||||
if os.path.exists(filepath):
|
||||
os.remove(filepath)
|
||||
|
||||
current_app.logger.info(f"Attachment Deleted: {filename}")
|
||||
return jsonify({'message': 'File deleted successfully'})
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
return jsonify({'error': str(e)}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
@@ -0,0 +1,27 @@
|
||||
from flask import Blueprint, render_template, redirect, url_for, session, current_app # type: ignore
|
||||
from ..auth import login_required
|
||||
import os
|
||||
import json
|
||||
|
||||
main_bp = Blueprint('main', __name__)
|
||||
|
||||
@main_bp.route('/')
|
||||
@login_required
|
||||
def index():
|
||||
return render_template('index.html')
|
||||
|
||||
@main_bp.route('/login', methods=['GET'])
|
||||
def login_page():
|
||||
if 'logged_in' in session:
|
||||
return redirect(url_for('main.index'))
|
||||
|
||||
# i18n 지원을 위해 기본 언어 전달
|
||||
config_path = os.path.join(os.getcwd(), 'config.json')
|
||||
lang = 'ko'
|
||||
if os.path.exists(config_path):
|
||||
try:
|
||||
with open(config_path, 'r', encoding='utf-8') as f:
|
||||
lang = json.load(f).get('lang', 'ko')
|
||||
except: pass
|
||||
|
||||
return render_template('login.html', lang=lang)
|
||||
@@ -0,0 +1,356 @@
|
||||
import datetime
|
||||
from flask import Blueprint, request, jsonify, current_app # type: ignore
|
||||
from ..database import get_db
|
||||
from ..auth import login_required
|
||||
from ..constants import GROUP_DONE, GROUP_DEFAULT
|
||||
from ..utils.i18n import _t
|
||||
from ..utils import extract_links
|
||||
from ..security import encrypt_content, decrypt_content
|
||||
|
||||
memo_bp = Blueprint('memo', __name__)
|
||||
|
||||
@memo_bp.route('/api/memos', methods=['GET'])
|
||||
@login_required
|
||||
def get_memos():
|
||||
limit = request.args.get('limit', 20, type=int)
|
||||
offset = request.args.get('offset', 0, type=int)
|
||||
group = request.args.get('group', 'all')
|
||||
query = request.args.get('query', '')
|
||||
date = request.args.get('date', '')
|
||||
|
||||
conn = get_db()
|
||||
c = conn.cursor()
|
||||
|
||||
where_clauses = []
|
||||
params = []
|
||||
|
||||
# 1. 그룹/태그 필터링
|
||||
if group == GROUP_DONE:
|
||||
where_clauses.append("status = 'done'")
|
||||
elif group.startswith('tag:'):
|
||||
tag_name = group.split(':')[-1]
|
||||
where_clauses.append("status != 'done'")
|
||||
where_clauses.append("id IN (SELECT memo_id FROM tags WHERE name = ?)")
|
||||
params.append(tag_name)
|
||||
elif group != 'all':
|
||||
where_clauses.append("status != 'done'")
|
||||
where_clauses.append("group_name = ?")
|
||||
params.append(group)
|
||||
else:
|
||||
where_clauses.append("status != 'done'")
|
||||
|
||||
# 2. 검색어 필터링
|
||||
if query:
|
||||
where_clauses.append("(title LIKE ? OR content LIKE ?)")
|
||||
params.append(f"%{query}%")
|
||||
params.append(f"%{query}%")
|
||||
|
||||
# 3. 날짜 필터링 (캘린더 선택)
|
||||
if date:
|
||||
where_clauses.append("created_at LIKE ?")
|
||||
params.append(f"{date}%")
|
||||
|
||||
# 4. 초기 로드 시 최근 5일 강조 (필터가 없는 경우에만 적용)
|
||||
if offset == 0 and group == 'all' and not query and not date:
|
||||
start_date = (datetime.datetime.now() - datetime.timedelta(days=5)).isoformat()
|
||||
where_clauses.append("(updated_at >= ? OR is_pinned = 1)")
|
||||
params.append(start_date)
|
||||
|
||||
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
|
||||
|
||||
query_sql = f"SELECT * FROM memos WHERE {where_sql} ORDER BY is_pinned DESC, updated_at DESC LIMIT ? OFFSET ?"
|
||||
c.execute(query_sql, params + [limit, offset])
|
||||
memo_rows = c.fetchall()
|
||||
|
||||
if not memo_rows:
|
||||
conn.close()
|
||||
return jsonify([])
|
||||
|
||||
memos = [dict(r) for r in memo_rows]
|
||||
memo_ids = [m['id'] for m in memos]
|
||||
placeholders = ','.join(['?'] * len(memo_ids))
|
||||
|
||||
# --- 🚀 Bulk Fetch: N+1 문제 해결 ---
|
||||
|
||||
# 태그 한꺼번에 가져오기
|
||||
c.execute(f'SELECT memo_id, name, source FROM tags WHERE memo_id IN ({placeholders})', memo_ids)
|
||||
tags_rows = c.fetchall()
|
||||
tags_map = {}
|
||||
for t in tags_rows:
|
||||
tags_map.setdefault(t['memo_id'], []).append(dict(t))
|
||||
|
||||
# 첨부파일 한꺼번에 가져오기
|
||||
c.execute(f'SELECT id, memo_id, filename, original_name, file_type, size FROM attachments WHERE memo_id IN ({placeholders})', memo_ids)
|
||||
attachments_rows = c.fetchall()
|
||||
attachments_map = {}
|
||||
for a in attachments_rows:
|
||||
attachments_map.setdefault(a['memo_id'], []).append(dict(a))
|
||||
|
||||
# 백링크 한꺼번에 가져오기
|
||||
c.execute(f'''
|
||||
SELECT ml.target_id, m.id as source_id, m.title
|
||||
FROM memo_links ml
|
||||
JOIN memos m ON ml.source_id = m.id
|
||||
WHERE ml.target_id IN ({placeholders})
|
||||
''', memo_ids)
|
||||
backlinks_rows = c.fetchall()
|
||||
backlinks_map = {}
|
||||
for l in backlinks_rows:
|
||||
backlinks_map.setdefault(l['target_id'], []).append(dict(l))
|
||||
|
||||
# 전방 링크(Forward Links) 한꺼번에 가져오기
|
||||
c.execute(f'''
|
||||
SELECT ml.source_id, m.id as target_id, m.title
|
||||
FROM memo_links ml
|
||||
JOIN memos m ON ml.target_id = m.id
|
||||
WHERE ml.source_id IN ({placeholders})
|
||||
''', memo_ids)
|
||||
links_rows = c.fetchall()
|
||||
links_map = {}
|
||||
for l in links_rows:
|
||||
links_map.setdefault(l['source_id'], []).append(dict(l))
|
||||
|
||||
# 데이터 가공 및 병합
|
||||
for m in memos:
|
||||
m['tags'] = tags_map.get(m['id'], [])
|
||||
m['attachments'] = attachments_map.get(m['id'], [])
|
||||
m['backlinks'] = backlinks_map.get(m['id'], [])
|
||||
m['links'] = links_map.get(m['id'], [])
|
||||
|
||||
conn.close()
|
||||
return jsonify(memos)
|
||||
|
||||
@memo_bp.route('/api/stats/heatmap', methods=['GET'])
|
||||
@login_required
|
||||
def get_heatmap_stats():
|
||||
days = request.args.get('days', 365, type=int)
|
||||
conn = get_db()
|
||||
c = conn.cursor()
|
||||
# 파라미터로 받은 일수만큼 데이터 조회
|
||||
start_date = (datetime.datetime.now() - datetime.timedelta(days=days)).isoformat()
|
||||
|
||||
c.execute('''
|
||||
SELECT strftime('%Y-%m-%d', created_at) as date, COUNT(*) as count
|
||||
FROM memos
|
||||
WHERE created_at >= ?
|
||||
GROUP BY date
|
||||
''', (start_date,))
|
||||
|
||||
stats = c.fetchall()
|
||||
conn.close()
|
||||
return jsonify([dict(s) for s in stats])
|
||||
|
||||
@memo_bp.route('/api/memos', methods=['POST'])
|
||||
@login_required
|
||||
def create_memo():
|
||||
data = request.json
|
||||
title = data.get('title', '').strip()
|
||||
content = data.get('content', '').strip()
|
||||
color = data.get('color', '#2c3e50')
|
||||
is_pinned = 1 if data.get('is_pinned') else 0
|
||||
status = data.get('status', 'active').strip()
|
||||
group_name = data.get('group_name', GROUP_DEFAULT).strip()
|
||||
user_tags = data.get('tags', [])
|
||||
is_encrypted = 1 if data.get('is_encrypted') else 0
|
||||
password = data.get('password', '').strip()
|
||||
|
||||
if is_encrypted and password:
|
||||
content = encrypt_content(content, password)
|
||||
elif is_encrypted and not password:
|
||||
return jsonify({'error': 'Password required for encryption'}), 400
|
||||
|
||||
now = datetime.datetime.now().isoformat()
|
||||
if not title and not content:
|
||||
return jsonify({'error': 'Title or content required'}), 400
|
||||
|
||||
conn = get_db()
|
||||
c = conn.cursor()
|
||||
try:
|
||||
c.execute('''
|
||||
INSERT INTO memos (title, content, color, is_pinned, status, group_name, is_encrypted, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
''', (title, content, color, is_pinned, status, group_name, is_encrypted, now, now))
|
||||
memo_id = c.lastrowid
|
||||
|
||||
for tag in user_tags:
|
||||
if tag.strip():
|
||||
c.execute('INSERT INTO tags (memo_id, name, source) VALUES (?, ?, ?)', (memo_id, tag.strip(), 'user'))
|
||||
|
||||
links = extract_links(content)
|
||||
for target_id in links:
|
||||
c.execute('INSERT INTO memo_links (source_id, target_id) VALUES (?, ?)', (memo_id, target_id))
|
||||
|
||||
attachment_filenames = data.get('attachment_filenames', [])
|
||||
for fname in set(attachment_filenames):
|
||||
c.execute('UPDATE attachments SET memo_id = ? WHERE filename = ?', (memo_id, fname))
|
||||
|
||||
conn.commit()
|
||||
current_app.logger.info(f"Memo Created: ID {memo_id}, Title: '{title}', Encrypted: {is_encrypted}")
|
||||
return jsonify({'id': memo_id, 'message': 'Memo created'}), 201
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
return jsonify({'error': str(e)}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
@memo_bp.route('/api/memos/<int:memo_id>', methods=['PUT'])
|
||||
@login_required
|
||||
def update_memo(memo_id):
|
||||
data = request.json
|
||||
title = data.get('title')
|
||||
content = data.get('content')
|
||||
color = data.get('color')
|
||||
is_pinned = data.get('is_pinned')
|
||||
status = data.get('status')
|
||||
group_name = data.get('group_name')
|
||||
user_tags = data.get('tags')
|
||||
is_encrypted = data.get('is_encrypted')
|
||||
password = data.get('password', '').strip()
|
||||
|
||||
now = datetime.datetime.now().isoformat()
|
||||
conn = get_db()
|
||||
c = conn.cursor()
|
||||
|
||||
# 보안: 암호화된 메모 수정 시 비밀번호 검증
|
||||
c.execute('SELECT content, is_encrypted FROM memos WHERE id = ?', (memo_id,))
|
||||
memo = c.fetchone()
|
||||
if memo and memo['is_encrypted']:
|
||||
# 암호화된 메모지만 '암호화 해제(is_encrypted=0)' 요청이 온 경우는
|
||||
# 비밀번호 없이도 수정을 시도할 수 있어야 할까? (아니오, 인증이 필요함)
|
||||
if not password:
|
||||
conn.close()
|
||||
return jsonify({'error': _t('msg_encrypted_locked')}), 403
|
||||
|
||||
# 비밀번호가 맞는지 검증 (복호화 시도)
|
||||
if decrypt_content(memo['content'], password) is None:
|
||||
conn.close()
|
||||
return jsonify({'error': _t('msg_auth_failed')}), 403
|
||||
|
||||
try:
|
||||
updates = ['updated_at = ?']
|
||||
params = [now]
|
||||
|
||||
# 암호화 처리 로직: 암호화가 활성화된 경우(또는 새로 설정하는 경우) 본문 암호화
|
||||
final_content = content.strip() if content is not None else None
|
||||
if (is_encrypted or (is_encrypted is None and memo['is_encrypted'])) and password:
|
||||
if final_content is not None:
|
||||
final_content = encrypt_content(final_content, password)
|
||||
|
||||
if title is not None:
|
||||
updates.append('title = ?'); params.append(title.strip())
|
||||
if final_content is not None:
|
||||
updates.append('content = ?'); params.append(final_content)
|
||||
if color is not None:
|
||||
updates.append('color = ?'); params.append(color)
|
||||
if is_pinned is not None:
|
||||
updates.append('is_pinned = ?'); params.append(1 if is_pinned else 0)
|
||||
if status is not None:
|
||||
updates.append('status = ?'); params.append(status.strip())
|
||||
if group_name is not None:
|
||||
updates.append('group_name = ?'); params.append(group_name.strip() or GROUP_DEFAULT)
|
||||
if is_encrypted is not None:
|
||||
updates.append('is_encrypted = ?'); params.append(1 if is_encrypted else 0)
|
||||
|
||||
params.append(memo_id)
|
||||
c.execute(f"UPDATE memos SET {', '.join(updates)} WHERE id = ?", params)
|
||||
|
||||
if user_tags is not None:
|
||||
c.execute("DELETE FROM tags WHERE memo_id = ? AND source = 'user'", (memo_id,))
|
||||
for tag in user_tags:
|
||||
if tag.strip():
|
||||
c.execute('INSERT INTO tags (memo_id, name, source) VALUES (?, ?, ?)', (memo_id, tag.strip(), 'user'))
|
||||
|
||||
if content is not None:
|
||||
c.execute("DELETE FROM memo_links WHERE source_id = ?", (memo_id,))
|
||||
links = extract_links(content)
|
||||
for target_id in links:
|
||||
c.execute('INSERT INTO memo_links (source_id, target_id) VALUES (?, ?)', (memo_id, target_id))
|
||||
|
||||
# [Bug Fix] 첨부파일 링크는 본문 수정 여부와 상관없이 항상 갱신
|
||||
attachment_filenames = data.get('attachment_filenames')
|
||||
if attachment_filenames is not None:
|
||||
c.execute('UPDATE attachments SET memo_id = NULL WHERE memo_id = ?', (memo_id,))
|
||||
for fname in set(attachment_filenames):
|
||||
c.execute('UPDATE attachments SET memo_id = ? WHERE filename = ?', (memo_id, fname))
|
||||
|
||||
if is_encrypted is not None:
|
||||
if is_encrypted and password and content:
|
||||
enc_content = encrypt_content(content, password)
|
||||
c.execute("UPDATE memos SET is_encrypted = 1, content = ? WHERE id = ?", (enc_content, memo_id))
|
||||
elif is_encrypted == 0:
|
||||
c.execute("UPDATE memos SET is_encrypted = 0 WHERE id = ?", (memo_id,))
|
||||
|
||||
conn.commit()
|
||||
current_app.logger.info(f"Memo Updated: ID {memo_id}, Fields: {list(data.keys())}")
|
||||
return jsonify({'message': 'Updated'})
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
return jsonify({'error': str(e)}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
@memo_bp.route('/api/memos/<int:memo_id>', methods=['DELETE'])
|
||||
@login_required
|
||||
def delete_memo(memo_id):
|
||||
import os
|
||||
from flask import current_app
|
||||
|
||||
conn = get_db()
|
||||
c = conn.cursor()
|
||||
|
||||
# 1. 암호화 여부 확인 및 파일 목록 가져오기
|
||||
c.execute('SELECT is_encrypted FROM memos WHERE id = ?', (memo_id,))
|
||||
memo = c.fetchone()
|
||||
if not memo:
|
||||
conn.close()
|
||||
return jsonify({'error': 'Memo not found'}), 404
|
||||
|
||||
if memo['is_encrypted']:
|
||||
conn.close()
|
||||
return jsonify({'error': _t('msg_encrypted_locked')}), 403
|
||||
|
||||
# 2. 물리적 파일 삭제 준비
|
||||
c.execute('SELECT filename FROM attachments WHERE memo_id = ?', (memo_id,))
|
||||
files = c.fetchall()
|
||||
upload_folder = current_app.config['UPLOAD_FOLDER']
|
||||
|
||||
try:
|
||||
# 3. 물리 파일 삭제 루프
|
||||
for f in files:
|
||||
filepath = os.path.join(upload_folder, f['filename'])
|
||||
if os.path.exists(filepath):
|
||||
os.remove(filepath)
|
||||
current_app.logger.info(f"Physical file deleted on memo removal: {f['filename']}")
|
||||
|
||||
# 4. 메모 삭제 (외래 키 제약 조건에 의해 tags 등은 자동 삭제되거나 처리됨)
|
||||
# Note: attachments 테이블의 memo_id는 SET NULL 설정이므로 수동으로 레코드도 삭제해줍니다.
|
||||
c.execute('DELETE FROM attachments WHERE memo_id = ?', (memo_id,))
|
||||
c.execute('DELETE FROM memos WHERE id = ?', (memo_id,))
|
||||
|
||||
conn.commit()
|
||||
current_app.logger.info(f"Memo and its {len(files)} files deleted: ID {memo_id}")
|
||||
return jsonify({'message': 'Deleted memo and all associated files'})
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
return jsonify({'error': str(e)}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
@memo_bp.route('/api/memos/<int:memo_id>/decrypt', methods=['POST'])
|
||||
@login_required
|
||||
def decrypt_memo_route(memo_id):
|
||||
data = request.json
|
||||
password = data.get('password')
|
||||
if not password: return jsonify({'error': 'Password required'}), 400
|
||||
conn = get_db(); c = conn.cursor()
|
||||
c.execute('SELECT content, is_encrypted FROM memos WHERE id = ?', (memo_id,))
|
||||
memo = c.fetchone(); conn.close()
|
||||
if not memo: return jsonify({'error': 'Memo not found'}), 404
|
||||
if not memo['is_encrypted']: return jsonify({'content': memo['content']})
|
||||
decrypted = decrypt_content(memo['content'], password)
|
||||
if decrypted is None:
|
||||
current_app.logger.warning(f"Decryption FAILED: ID {memo_id}")
|
||||
return jsonify({'error': 'Invalid password'}), 403
|
||||
|
||||
current_app.logger.info(f"Decryption SUCCESS: ID {memo_id}")
|
||||
return jsonify({'content': decrypted})
|
||||
@@ -0,0 +1,58 @@
|
||||
import os
|
||||
import json
|
||||
from flask import Blueprint, request, jsonify, current_app # type: ignore
|
||||
from ..auth import login_required
|
||||
|
||||
settings_bp = Blueprint('settings', __name__)
|
||||
|
||||
CONFIG_PATH = os.path.join(os.getcwd(), 'config.json')
|
||||
|
||||
# 기본 테마 및 시스템 설정
|
||||
DEFAULT_SETTINGS = {
|
||||
"bg_color": "#0f172a",
|
||||
"sidebar_color": "rgba(30, 41, 59, 0.7)",
|
||||
"card_color": "rgba(30, 41, 59, 0.85)",
|
||||
"encrypted_border": "#00f3ff",
|
||||
"ai_accent": "#8b5cf6",
|
||||
"enable_ai": True,
|
||||
"lang": "ko"
|
||||
}
|
||||
|
||||
@settings_bp.route('/api/settings', methods=['GET'])
|
||||
@login_required
|
||||
def get_settings():
|
||||
if not os.path.exists(CONFIG_PATH):
|
||||
return jsonify(DEFAULT_SETTINGS)
|
||||
|
||||
try:
|
||||
with open(CONFIG_PATH, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
# 기본값과 병합하여 신규 필드 등 누락 방지
|
||||
full_data = {**DEFAULT_SETTINGS, **data}
|
||||
return jsonify(full_data)
|
||||
except Exception as e:
|
||||
return jsonify(DEFAULT_SETTINGS)
|
||||
|
||||
@settings_bp.route('/api/settings', methods=['POST'])
|
||||
@login_required
|
||||
def save_settings():
|
||||
data = request.json
|
||||
if not data:
|
||||
return jsonify({'error': 'No data provided'}), 400
|
||||
|
||||
try:
|
||||
# 기존 데이터 로드 후 병합
|
||||
current_data = {}
|
||||
if os.path.exists(CONFIG_PATH):
|
||||
with open(CONFIG_PATH, 'r', encoding='utf-8') as f:
|
||||
current_data = json.load(f)
|
||||
|
||||
updated_data = {**current_data, **data}
|
||||
|
||||
with open(CONFIG_PATH, 'w', encoding='utf-8') as f:
|
||||
json.dump(updated_data, f, indent=4, ensure_ascii=False)
|
||||
|
||||
current_app.logger.info(f"System Settings Updated: {list(data.keys())}")
|
||||
return jsonify({'message': 'Settings saved successfully'})
|
||||
except Exception as e:
|
||||
return jsonify({'error': str(e)}), 500
|
||||
@@ -0,0 +1,58 @@
|
||||
import os
|
||||
import base64
|
||||
from cryptography.fernet import Fernet # type: ignore
|
||||
from cryptography.hazmat.primitives import hashes # type: ignore
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC # type: ignore
|
||||
|
||||
def derive_key(password: str):
|
||||
"""
|
||||
.env에 설정된 ENCRYPTION_SEED를 솔트로 사용하여 사용자 비밀번호로부터 암호화 키를 파생합니다.
|
||||
"""
|
||||
seed = os.getenv('ENCRYPTION_SEED', 'default_secret_seed_123').encode()
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=32,
|
||||
salt=seed,
|
||||
iterations=100000,
|
||||
)
|
||||
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
|
||||
return key
|
||||
|
||||
def encrypt_content(content: str, password: str):
|
||||
"""지정된 비밀번호로 내용을 암호화합니다."""
|
||||
key = derive_key(password)
|
||||
f = Fernet(key)
|
||||
encrypted_data = f.encrypt(content.encode()).decode()
|
||||
return encrypted_data
|
||||
|
||||
def decrypt_content(encrypted_data: str, password: str):
|
||||
"""지정된 비밀번호로 암호화된 내용을 복호화합니다."""
|
||||
try:
|
||||
key = derive_key(password)
|
||||
f = Fernet(key)
|
||||
decrypted_data = f.decrypt(encrypted_data.encode()).decode()
|
||||
return decrypted_data
|
||||
except Exception:
|
||||
# 비밀번호가 틀리거나 데이터가 손상된 경우
|
||||
return None
|
||||
|
||||
def get_master_key():
|
||||
"""파일 암호화에 사용될 시스템 마스터 키를 생성합니다."""
|
||||
seed = os.getenv('ENCRYPTION_SEED', 'master_default_seed_777')
|
||||
return derive_key(seed)
|
||||
|
||||
def encrypt_file(data: bytes):
|
||||
"""데이터를 시스템 마스터 키로 암호화합니다."""
|
||||
key = get_master_key()
|
||||
f = Fernet(key)
|
||||
return f.encrypt(data)
|
||||
|
||||
def decrypt_file(data: bytes):
|
||||
"""데이터를 시스템 마스터 키로 복호화합니다."""
|
||||
try:
|
||||
key = get_master_key()
|
||||
f = Fernet(key)
|
||||
return f.decrypt(data)
|
||||
except Exception:
|
||||
# 복호화 실패 (암호화되지 않은 파일이거나 키가 다름)
|
||||
return None
|
||||
@@ -0,0 +1,33 @@
|
||||
import re
|
||||
from ..constants import GROUP_DEFAULT
|
||||
|
||||
def parse_metadata(text):
|
||||
"""
|
||||
텍스트에서 ##그룹명 과 #태그 추출 유틸리티.
|
||||
"""
|
||||
group_name = GROUP_DEFAULT
|
||||
tags = []
|
||||
|
||||
if not text:
|
||||
return group_name, tags
|
||||
|
||||
group_match = re.search(r'##(\S+)', text)
|
||||
if group_match:
|
||||
group_name = group_match.group(1)
|
||||
|
||||
tag_matches = re.finditer(r'(?<!#)#(\S+)', text)
|
||||
for match in tag_matches:
|
||||
tags.append(match.group(1))
|
||||
|
||||
return group_name, list(set(tags))
|
||||
|
||||
def extract_links(text):
|
||||
"""
|
||||
텍스트에서 [[#ID]] 형태의 내부 링크를 찾아 ID 목록(정수)을 반환합니다.
|
||||
"""
|
||||
if not text:
|
||||
return []
|
||||
|
||||
# [[#123]] 패턴 매칭
|
||||
links = re.findall(r'\[\[#(\d+)\]\]', text)
|
||||
return list(set([int(link_id) for link_id in links]))
|
||||
@@ -0,0 +1,41 @@
|
||||
import json
|
||||
import os
|
||||
from flask import current_app # type: ignore
|
||||
|
||||
class I18n:
|
||||
_locales = {}
|
||||
_default_lang = 'en'
|
||||
|
||||
@classmethod
|
||||
def load_locales(cls):
|
||||
locales_dir = os.path.join(current_app.static_folder, 'locales')
|
||||
for filename in os.listdir(locales_dir):
|
||||
if filename.endswith('.json'):
|
||||
lang = filename.split('.')[0]
|
||||
with open(os.path.join(locales_dir, filename), 'r', encoding='utf-8') as f:
|
||||
cls._locales[lang] = json.load(f)
|
||||
|
||||
@classmethod
|
||||
def t(cls, key, lang=None):
|
||||
if not lang:
|
||||
# 기본적으로 config에서 언어를 가져오거나 'en' 사용
|
||||
lang = current_app.config.get('lang', cls._default_lang)
|
||||
|
||||
if not cls._locales:
|
||||
cls.load_locales()
|
||||
|
||||
data = cls._locales.get(lang, cls._locales.get(cls._default_lang, {}))
|
||||
|
||||
# 중첩 키 지원 (예: "groups.done")
|
||||
parts = key.split('.')
|
||||
for p in parts:
|
||||
if isinstance(data, dict):
|
||||
data = data.get(p)
|
||||
else:
|
||||
return key
|
||||
|
||||
return data or key
|
||||
|
||||
# 숏컷 함수
|
||||
def _t(key, lang=None):
|
||||
return I18n.t(key, lang)
|
||||
Reference in New Issue
Block a user