Refactor: Modularize memo.py to 3-layer architecture, Fix tag extraction regex, and Enhance infinite scroll for large screens

This commit is contained in:
leeyj
2026-04-20 11:04:00 +09:00
parent 6fca65eef1
commit ac58e14c8c
10 changed files with 476 additions and 372 deletions
+210
View File
@@ -0,0 +1,210 @@
import datetime
from ..database import get_db
class MemoRepository:
@staticmethod
def get_all(filters, limit=20, offset=0):
"""필터 조건에 따른 메모 목록 조회 및 연관 데이터(태그, 링크 등) 통합 조회"""
conn = get_db()
c = conn.cursor()
where_clauses = []
params = []
group = filters.get('group', 'all')
query = filters.get('query', '')
date = filters.get('date', '')
category = filters.get('category', '')
if group == 'done':
where_clauses.append("status = 'done'")
elif group.startswith('tag:'):
tag_name = group.split(':')[-1]
where_clauses.append("status != 'done'")
where_clauses.append("id IN (SELECT memo_id FROM tags WHERE name = ?)")
params.append(tag_name)
elif group != 'all':
where_clauses.append("status != 'done'")
where_clauses.append("group_name = ?")
params.append(group)
else:
where_clauses.append("status != 'done'")
if query:
where_clauses.append("(title LIKE ? OR content LIKE ?)")
params.extend([f"%{query}%", f"%{query}%"])
if date:
where_clauses.append("created_at LIKE ?")
params.append(f"{date}%")
if category:
where_clauses.append("category = ?")
params.append(category)
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
query_sql = f"SELECT * FROM memos WHERE {where_sql} ORDER BY is_pinned DESC, updated_at DESC LIMIT ? OFFSET ?"
c.execute(query_sql, params + [limit, offset])
memo_rows = c.fetchall()
if not memo_rows:
conn.close()
return []
memos = [dict(r) for r in memo_rows]
memo_ids = [m['id'] for m in memos]
placeholders = ','.join(['?'] * len(memo_ids))
# Bulk Fetch Tags
c.execute(f'SELECT memo_id, name, source FROM tags WHERE memo_id IN ({placeholders})', memo_ids)
tags_map = {}
for t in c.fetchall():
tags_map.setdefault(t['memo_id'], []).append(dict(t))
# Bulk Fetch Attachments
c.execute(f'SELECT id, memo_id, filename, original_name, file_type, size FROM attachments WHERE memo_id IN ({placeholders})', memo_ids)
attachments_map = {}
for a in c.fetchall():
attachments_map.setdefault(a['memo_id'], []).append(dict(a))
# Bulk Fetch Backlinks
c.execute(f'''
SELECT ml.target_id, m.id as source_id, m.title
FROM memo_links ml
JOIN memos m ON ml.source_id = m.id
WHERE ml.target_id IN ({placeholders})
''', memo_ids)
backlinks_map = {}
for l in c.fetchall():
backlinks_map.setdefault(l['target_id'], []).append(dict(l))
# Bulk Fetch Forward Links
c.execute(f'''
SELECT ml.source_id, m.id as target_id, m.title
FROM memo_links ml
JOIN memos m ON ml.target_id = m.id
WHERE ml.source_id IN ({placeholders})
''', memo_ids)
links_map = {}
for l in c.fetchall():
links_map.setdefault(l['source_id'], []).append(dict(l))
for m in memos:
m['tags'] = tags_map.get(m['id'], [])
m['attachments'] = attachments_map.get(m['id'], [])
m['backlinks'] = backlinks_map.get(m['id'], [])
m['links'] = links_map.get(m['id'], [])
conn.close()
return memos
@staticmethod
def get_by_id(memo_id):
conn = get_db()
c = conn.cursor()
c.execute('SELECT * FROM memos WHERE id = ?', (memo_id,))
row = c.fetchone()
if not row:
conn.close()
return None
memo = dict(row)
c.execute('SELECT name, source FROM tags WHERE memo_id = ?', (memo_id,))
memo['tags'] = [dict(r) for r in c.fetchall()]
c.execute('SELECT id, filename, original_name, file_type, size FROM attachments WHERE memo_id = ?', (memo_id,))
memo['attachments'] = [dict(r) for r in c.fetchall()]
conn.close()
return memo
@staticmethod
def create(data, tags=[], links=[], attachment_filenames=[]):
conn = get_db()
c = conn.cursor()
try:
placeholders = ', '.join(['?'] * len(data))
columns = ', '.join(data.keys())
c.execute(f'INSERT INTO memos ({columns}) VALUES ({placeholders})', list(data.values()))
memo_id = c.lastrowid
for tag in tags:
if tag.strip():
c.execute('INSERT INTO tags (memo_id, name, source) VALUES (?, ?, ?)', (memo_id, tag.strip(), 'user'))
for target_id in links:
c.execute('INSERT INTO memo_links (source_id, target_id) VALUES (?, ?)', (memo_id, target_id))
for fname in set(attachment_filenames):
c.execute('UPDATE attachments SET memo_id = ? WHERE filename = ?', (memo_id, fname))
conn.commit()
return memo_id
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
@staticmethod
def update(memo_id, updates, tags=None, links=None, attachment_filenames=None):
conn = get_db()
c = conn.cursor()
try:
if updates:
sql = "UPDATE memos SET " + ", ".join([f"{k} = ?" for k in updates.keys()]) + " WHERE id = ?"
c.execute(sql, list(updates.values()) + [memo_id])
if tags is not None:
c.execute("DELETE FROM tags WHERE memo_id = ?", (memo_id,))
for tag in tags:
if tag.strip():
c.execute('INSERT INTO tags (memo_id, name, source) VALUES (?, ?, ?)', (memo_id, tag.strip(), 'user'))
if links is not None:
c.execute("DELETE FROM memo_links WHERE source_id = ?", (memo_id,))
for target_id in links:
c.execute('INSERT INTO memo_links (source_id, target_id) VALUES (?, ?)', (memo_id, target_id))
if attachment_filenames is not None:
c.execute('UPDATE attachments SET memo_id = NULL WHERE memo_id = ?', (memo_id,))
for fname in set(attachment_filenames):
c.execute('UPDATE attachments SET memo_id = ? WHERE filename = ?', (memo_id, fname))
conn.commit()
return True
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
@staticmethod
def delete(memo_id):
conn = get_db()
c = conn.cursor()
try:
# Physical file lookup is done in service layer
c.execute('DELETE FROM attachments WHERE memo_id = ?', (memo_id,))
c.execute('DELETE FROM memos WHERE id = ?', (memo_id,))
conn.commit()
return True
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
@staticmethod
def get_heatmap(days=365):
conn = get_db()
c = conn.cursor()
start_date = (datetime.datetime.now() - datetime.timedelta(days=days)).isoformat()
c.execute('''
SELECT strftime('%Y-%m-%d', created_at) as date, COUNT(*) as count
FROM memos
WHERE created_at >= ?
GROUP BY date
''', (start_date,))
stats = [dict(s) for s in c.fetchall()]
conn.close()
return stats
+2
View File
@@ -7,6 +7,7 @@ def register_blueprints(app):
from .file import file_bp
from .ai import ai_bp
from .settings import settings_bp
from .stats import stats_bp # [Added]
app.register_blueprint(main_bp)
app.register_blueprint(auth_bp)
@@ -14,3 +15,4 @@ def register_blueprints(app):
app.register_blueprint(file_bp)
app.register_blueprint(ai_bp)
app.register_blueprint(settings_bp)
app.register_blueprint(stats_bp) # [Added]
+32 -368
View File
@@ -1,241 +1,43 @@
import datetime
from flask import Blueprint, request, jsonify, current_app # type: ignore
from ..database import get_db
from ..auth import login_required
from ..constants import GROUP_DONE, GROUP_DEFAULT
from ..services.memo_service import MemoService
from ..utils.i18n import _t
from ..utils import extract_links, parse_metadata, parse_and_clean_metadata, generate_auto_title
from ..security import encrypt_content, decrypt_content
memo_bp = Blueprint('memo', __name__)
@memo_bp.route('/api/memos', methods=['GET'])
@login_required
def get_memos():
filters = {
'group': request.args.get('group', 'all'),
'query': request.args.get('query', ''),
'date': request.args.get('date', '').replace('null', '').replace('undefined', ''),
'category': request.args.get('category', '').replace('null', '').replace('undefined', '')
}
limit = request.args.get('limit', 20, type=int)
offset = request.args.get('offset', 0, type=int)
group = request.args.get('group', 'all')
query = request.args.get('query', '')
date = request.args.get('date', '')
category = request.args.get('category')
if date in ('null', 'undefined'):
date = ''
if category in ('null', 'undefined'):
category = ''
conn = get_db()
c = conn.cursor()
where_clauses = []
params = []
# 1. 그룹/태그 필터링
if group == GROUP_DONE:
where_clauses.append("status = 'done'")
elif group.startswith('tag:'):
tag_name = group.split(':')[-1]
where_clauses.append("status != 'done'")
where_clauses.append("id IN (SELECT memo_id FROM tags WHERE name = ?)")
params.append(tag_name)
elif group != 'all':
where_clauses.append("status != 'done'")
where_clauses.append("group_name = ?")
params.append(group)
else:
where_clauses.append("status != 'done'")
# 2. 검색어 필터링
if query:
where_clauses.append("(title LIKE ? OR content LIKE ?)")
params.append(f"%{query}%")
params.append(f"%{query}%")
# 3. 날짜 필터링 (캘린더 선택)
if date:
where_clauses.append("created_at LIKE ?")
params.append(f"{date}%")
# 4. 카테고리 필터링
if category:
where_clauses.append("category = ?")
params.append(category)
# 5. 초기 로드 시 최근 5일 강조 (필터가 없는 경우에만 적용)
if offset == 0 and group == 'all' and not query and not date and not category:
start_date = (datetime.datetime.now() - datetime.timedelta(days=5)).isoformat()
where_clauses.append("(updated_at >= ? OR is_pinned = 1)")
params.append(start_date)
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
query_sql = f"SELECT * FROM memos WHERE {where_sql} ORDER BY is_pinned DESC, updated_at DESC LIMIT ? OFFSET ?"
c.execute(query_sql, params + [limit, offset])
memo_rows = c.fetchall()
if not memo_rows:
conn.close()
return jsonify([])
memos = [dict(r) for r in memo_rows]
memo_ids = [m['id'] for m in memos]
placeholders = ','.join(['?'] * len(memo_ids))
# --- 🚀 Bulk Fetch: N+1 문제 해결 ---
# 태그 한꺼번에 가져오기
c.execute(f'SELECT memo_id, name, source FROM tags WHERE memo_id IN ({placeholders})', memo_ids)
tags_rows = c.fetchall()
tags_map = {}
for t in tags_rows:
tags_map.setdefault(t['memo_id'], []).append(dict(t))
# 첨부파일 한꺼번에 가져오기
c.execute(f'SELECT id, memo_id, filename, original_name, file_type, size FROM attachments WHERE memo_id IN ({placeholders})', memo_ids)
attachments_rows = c.fetchall()
attachments_map = {}
for a in attachments_rows:
attachments_map.setdefault(a['memo_id'], []).append(dict(a))
# 백링크 한꺼번에 가져오기
c.execute(f'''
SELECT ml.target_id, m.id as source_id, m.title
FROM memo_links ml
JOIN memos m ON ml.source_id = m.id
WHERE ml.target_id IN ({placeholders})
''', memo_ids)
backlinks_rows = c.fetchall()
backlinks_map = {}
for l in backlinks_rows:
backlinks_map.setdefault(l['target_id'], []).append(dict(l))
# 전방 링크(Forward Links) 한꺼번에 가져오기
c.execute(f'''
SELECT ml.source_id, m.id as target_id, m.title
FROM memo_links ml
JOIN memos m ON ml.target_id = m.id
WHERE ml.source_id IN ({placeholders})
''', memo_ids)
links_rows = c.fetchall()
links_map = {}
for l in links_rows:
links_map.setdefault(l['source_id'], []).append(dict(l))
# 데이터 가공 및 병합
for m in memos:
m['tags'] = tags_map.get(m['id'], [])
m['attachments'] = attachments_map.get(m['id'], [])
m['backlinks'] = backlinks_map.get(m['id'], [])
m['links'] = links_map.get(m['id'], [])
conn.close()
memos = MemoService.get_all_memos(filters, limit, offset)
return jsonify(memos)
@memo_bp.route('/api/memos/<int:memo_id>', methods=['GET'])
@login_required
def get_memo(memo_id):
conn = get_db()
c = conn.cursor()
c.execute('SELECT * FROM memos WHERE id = ?', (memo_id,))
row = c.fetchone()
if not row:
conn.close()
memo = MemoService.get_memo_by_id(memo_id)
if not memo:
return jsonify({'error': 'Memo not found'}), 404
memo = dict(row)
# 태그 가져오기
c.execute('SELECT name, source FROM tags WHERE memo_id = ?', (memo_id,))
memo['tags'] = [dict(r) for r in c.fetchall()]
# 첨부파일 가져오기
c.execute('SELECT id, filename, original_name, file_type, size FROM attachments WHERE memo_id = ?', (memo_id,))
memo['attachments'] = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify(memo)
@memo_bp.route('/api/stats/heatmap', methods=['GET'])
@login_required
def get_heatmap_stats():
days = request.args.get('days', 365, type=int)
conn = get_db()
c = conn.cursor()
# 파라미터로 받은 일수만큼 데이터 조회
start_date = (datetime.datetime.now() - datetime.timedelta(days=days)).isoformat()
c.execute('''
SELECT strftime('%Y-%m-%d', created_at) as date, COUNT(*) as count
FROM memos
WHERE created_at >= ?
GROUP BY date
''', (start_date,))
stats = c.fetchall()
conn.close()
return jsonify([dict(s) for s in stats])
@memo_bp.route('/api/memos', methods=['POST'])
@login_required
def create_memo():
try:
data = request.json
title = data.get('title', '').strip()
content = data.get('content', '').strip()
color = data.get('color', '#2c3e50')
is_pinned = 1 if data.get('is_pinned') else 0
status = data.get('status', 'active').strip()
group_name = data.get('group_name', GROUP_DEFAULT).strip()
user_tags = data.get('tags', [])
is_encrypted = 1 if data.get('is_encrypted') else 0
password = data.get('password', '').strip()
category = data.get('category')
# 본문 기반 메타데이터 통합 및 정리 ($그룹, #태그 하단 이동)
new_content, final_group, final_tags = parse_and_clean_metadata(content, ui_group=group_name, ui_tags=user_tags)
content = new_content
group_name = final_group
user_tags = final_tags
# 제목 자동 생성 (비어있을 경우)
if not title:
title = generate_auto_title(content)
if is_encrypted and password:
content = encrypt_content(content, password)
elif is_encrypted and not password:
return jsonify({'error': 'Password required for encryption'}), 400
now = datetime.datetime.now().isoformat()
if not title and not content:
return jsonify({'error': 'Title or content required'}), 400
conn = get_db()
c = conn.cursor()
c.execute('''
INSERT INTO memos (title, content, color, is_pinned, status, group_name, category, is_encrypted, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (title, content, color, is_pinned, status, group_name, category, is_encrypted, now, now))
memo_id = c.lastrowid
for tag in user_tags:
if tag.strip():
c.execute('INSERT INTO tags (memo_id, name, source) VALUES (?, ?, ?)', (memo_id, tag.strip(), 'user'))
links = extract_links(content)
for target_id in links:
c.execute('INSERT INTO memo_links (source_id, target_id) VALUES (?, ?)', (memo_id, target_id))
attachment_filenames = data.get('attachment_filenames', [])
for fname in set(attachment_filenames):
c.execute('UPDATE attachments SET memo_id = ? WHERE filename = ?', (memo_id, fname))
conn.commit()
conn.close()
current_app.logger.info(f"Memo Created: ID {memo_id}, Title: '{title}', Encrypted: {is_encrypted}")
memo_id = MemoService.create_memo(data)
return jsonify({'id': memo_id, 'message': 'Memo created'}), 201
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
if 'conn' in locals() and conn:
conn.rollback()
conn.close()
current_app.logger.error(f"CREATE_MEMO FAILED: {str(e)}")
return jsonify({'error': str(e)}), 500
@@ -244,176 +46,38 @@ def create_memo():
def update_memo(memo_id):
try:
data = request.json
title = data.get('title')
content = data.get('content')
color = data.get('color')
is_pinned = data.get('is_pinned')
status = data.get('status')
group_name = data.get('group_name')
user_tags = data.get('tags')
is_encrypted = data.get('is_encrypted')
password = data.get('password', '').strip()
category = data.get('category')
success, message = MemoService.update_memo(memo_id, data)
if not success:
if message in ["msg_encrypted_locked", "msg_auth_failed"]:
return jsonify({'error': _t(message)}), 403
return jsonify({'error': message}), 404
now = datetime.datetime.now().isoformat()
conn = get_db()
c = conn.cursor()
# 보안: 암호화된 메모 수정 시 비밀번호 검증
c.execute('SELECT content, is_encrypted, group_name FROM memos WHERE id = ?', (memo_id,))
memo = c.fetchone()
if memo and memo['is_encrypted']:
if not password:
conn.close()
return jsonify({'error': _t('msg_encrypted_locked')}), 403
if decrypt_content(memo['content'], password) is None:
conn.close()
return jsonify({'error': _t('msg_auth_failed')}), 403
# 본문 기반 메타데이터 통합 및 정리 ($그룹, #태그 하단 이동)
if content is not None:
new_content, final_group, final_tags = parse_and_clean_metadata(
content,
ui_group=(group_name or memo['group_name']),
ui_tags=(user_tags if user_tags is not None else [])
)
content = new_content
group_name = final_group
user_tags = final_tags
# 제목 자동 생성 (비어있을 경우)
if title == "":
title = generate_auto_title(content or "")
updates = ['updated_at = ?']
params = [now]
# 암호화 처리 로직: 암호화가 활성화된 경우(또는 새로 설정하는 경우) 본문 암호화
final_content = content.strip() if content is not None else None
if (is_encrypted or (is_encrypted is None and memo['is_encrypted'])) and password:
if final_content is not None:
final_content = encrypt_content(final_content, password)
if title is not None:
updates.append('title = ?'); params.append(title.strip())
if final_content is not None:
updates.append('content = ?'); params.append(final_content)
if color is not None:
updates.append('color = ?'); params.append(color)
if is_pinned is not None:
updates.append('is_pinned = ?'); params.append(1 if is_pinned else 0)
if status is not None:
updates.append('status = ?'); params.append(status.strip())
if group_name is not None:
updates.append('group_name = ?'); params.append(group_name.strip() or GROUP_DEFAULT)
if is_encrypted is not None:
updates.append('is_encrypted = ?'); params.append(1 if is_encrypted else 0)
if category is not None:
updates.append('category = ?'); params.append(category)
params.append(memo_id)
c.execute(f"UPDATE memos SET {', '.join(updates)} WHERE id = ?", params)
if user_tags is not None:
c.execute("DELETE FROM tags WHERE memo_id = ? AND source = 'user'", (memo_id,))
for tag in user_tags:
if tag.strip():
c.execute('INSERT INTO tags (memo_id, name, source) VALUES (?, ?, ?)', (memo_id, tag.strip(), 'user'))
if content is not None:
c.execute("DELETE FROM memo_links WHERE source_id = ?", (memo_id,))
links = extract_links(content)
for target_id in links:
c.execute('INSERT INTO memo_links (source_id, target_id) VALUES (?, ?)', (memo_id, target_id))
# [Bug Fix] 첨부파일 링크는 본문 수정 여부와 상관없이 항상 갱신
attachment_filenames = data.get('attachment_filenames')
if attachment_filenames is not None:
c.execute('UPDATE attachments SET memo_id = NULL WHERE memo_id = ?', (memo_id,))
for fname in set(attachment_filenames):
c.execute('UPDATE attachments SET memo_id = ? WHERE filename = ?', (memo_id, fname))
if is_encrypted is not None:
if is_encrypted and password and content:
enc_content = encrypt_content(content, password)
c.execute("UPDATE memos SET is_encrypted = 1, content = ? WHERE id = ?", (enc_content, memo_id))
elif is_encrypted == 0:
c.execute("UPDATE memos SET is_encrypted = 0 WHERE id = ?", (memo_id,))
conn.commit()
conn.close()
current_app.logger.info(f"Memo Updated: ID {memo_id}, Fields: {list(data.keys())}")
return jsonify({'message': 'Updated'})
except Exception as e:
if 'conn' in locals() and conn:
conn.rollback()
conn.close()
current_app.logger.error(f"UPDATE_MEMO FAILED: {str(e)}")
return jsonify({'error': str(e)}), 500
@memo_bp.route('/api/memos/<int:memo_id>', methods=['DELETE'])
@login_required
def delete_memo(memo_id):
import os
from flask import current_app
conn = get_db()
c = conn.cursor()
# 1. 암호화 여부 확인 및 파일 목록 가져오기
c.execute('SELECT is_encrypted FROM memos WHERE id = ?', (memo_id,))
memo = c.fetchone()
if not memo:
conn.close()
return jsonify({'error': 'Memo not found'}), 404
if memo['is_encrypted']:
conn.close()
return jsonify({'error': _t('msg_encrypted_locked')}), 403
# 2. 물리적 파일 삭제 준비
c.execute('SELECT filename FROM attachments WHERE memo_id = ?', (memo_id,))
files = c.fetchall()
upload_folder = current_app.config['UPLOAD_FOLDER']
try:
# 3. 물리 파일 삭제 루프
for f in files:
filepath = os.path.join(upload_folder, f['filename'])
if os.path.exists(filepath):
os.remove(filepath)
current_app.logger.info(f"Physical file deleted on memo removal: {f['filename']}")
# 4. 메모 삭제 (외래 키 제약 조건에 의해 tags 등은 자동 삭제되거나 처리됨)
# Note: attachments 테이블의 memo_id는 SET NULL 설정이므로 수동으로 레코드도 삭제해줍니다.
c.execute('DELETE FROM attachments WHERE memo_id = ?', (memo_id,))
c.execute('DELETE FROM memos WHERE id = ?', (memo_id,))
conn.commit()
current_app.logger.info(f"Memo and its {len(files)} files deleted: ID {memo_id}")
return jsonify({'message': 'Deleted memo and all associated files'})
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
conn.close()
success, message = MemoService.delete_memo(memo_id)
if not success:
if message == "msg_encrypted_locked":
return jsonify({'error': _t(message)}), 403
return jsonify({'error': message}), 404
return jsonify({'message': 'Deleted memo and all associated files'})
@memo_bp.route('/api/memos/<int:memo_id>/decrypt', methods=['POST'])
@login_required
def decrypt_memo_route(memo_id):
data = request.json
password = data.get('password')
if not password: return jsonify({'error': 'Password required'}), 400
conn = get_db(); c = conn.cursor()
c.execute('SELECT content, is_encrypted FROM memos WHERE id = ?', (memo_id,))
memo = c.fetchone(); conn.close()
if not memo: return jsonify({'error': 'Memo not found'}), 404
if not memo['is_encrypted']: return jsonify({'content': memo['content']})
decrypted = decrypt_content(memo['content'], password)
if decrypted is None:
current_app.logger.warning(f"Decryption FAILED: ID {memo_id}")
return jsonify({'error': 'Invalid password'}), 403
if not password:
return jsonify({'error': 'Password required'}), 400
current_app.logger.info(f"Decryption SUCCESS: ID {memo_id}")
return jsonify({'content': decrypted})
content, error_msg = MemoService.decrypt_memo(memo_id, password)
if error_msg:
code = 404 if error_msg == "Memo not found" else 403
return jsonify({'error': error_msg}), code
return jsonify({'content': content})
+12
View File
@@ -0,0 +1,12 @@
from flask import Blueprint, request, jsonify # type: ignore
from ..auth import login_required
from ..services.memo_service import MemoService
stats_bp = Blueprint('stats', __name__)
@stats_bp.route('/api/stats/heatmap', methods=['GET'])
@login_required
def get_heatmap_stats():
days = request.args.get('days', 365, type=int)
stats = MemoService.get_heatmap_stats(days)
return jsonify(stats)
+152
View File
@@ -0,0 +1,152 @@
import os
import datetime
from flask import current_app
from ..models.memo_repo import MemoRepository
from ..utils import extract_links, parse_and_clean_metadata, generate_auto_title
from ..security import encrypt_content, decrypt_content
from ..constants import GROUP_DEFAULT
class MemoService:
@staticmethod
def get_all_memos(filters, limit=20, offset=0):
return MemoRepository.get_all(filters, limit, offset)
@staticmethod
def get_memo_by_id(memo_id):
return MemoRepository.get_by_id(memo_id)
@staticmethod
def create_memo(data):
content = data.get('content', '').strip()
group_name = data.get('group_name', GROUP_DEFAULT).strip()
user_tags = data.get('tags', [])
is_encrypted = 1 if data.get('is_encrypted') else 0
password = data.get('password', '').strip()
# 1. 메타데이터 파싱 및 본문 정리
new_content, final_group, final_tags = parse_and_clean_metadata(content, ui_group=group_name, ui_tags=user_tags)
content = new_content
group_name = final_group
user_tags = final_tags
# 2. 제목 자동 생성
title = data.get('title', '').strip()
if not title:
title = generate_auto_title(content)
# 3. 암호화 처리
if is_encrypted and password:
content = encrypt_content(content, password)
elif is_encrypted and not password:
raise ValueError('Password required for encryption')
now = datetime.datetime.now().isoformat()
repo_data = {
'title': title,
'content': content,
'color': data.get('color', '#2c3e50'),
'is_pinned': 1 if data.get('is_pinned') else 0,
'status': data.get('status', 'active').strip(),
'group_name': group_name,
'category': data.get('category'),
'is_encrypted': is_encrypted,
'created_at': now,
'updated_at': now
}
links = extract_links(content)
attachment_filenames = data.get('attachment_filenames', [])
return MemoRepository.create(repo_data, tags=user_tags, links=links, attachment_filenames=attachment_filenames)
@staticmethod
def update_memo(memo_id, data):
password = data.get('password', '').strip()
# 1. 기존 메모 정보 조회 (암호화 검증용)
memo = MemoRepository.get_by_id(memo_id)
if not memo:
return None, "Memo not found"
if memo['is_encrypted']:
if not password:
return None, "msg_encrypted_locked"
if decrypt_content(memo['content'], password) is None:
return None, "msg_auth_failed"
# 2. 데이터 가공
content = data.get('content')
group_name = data.get('group_name')
user_tags = data.get('tags')
if content is not None:
new_content, final_group, final_tags = parse_and_clean_metadata(
content,
ui_group=(group_name or memo['group_name']),
ui_tags=(user_tags if user_tags is not None else [])
)
content = new_content
group_name = final_group
user_tags = final_tags
title = data.get('title')
if title == "": # 빈 문자열일 때만 자동 생성
title = generate_auto_title(content or "")
now = datetime.datetime.now().isoformat()
updates = {'updated_at': now}
# 암호화 처리
is_encrypted = data.get('is_encrypted')
final_content = content.strip() if content is not None else None
# 기존 암호화 상태 유지 혹은 신규 설정 시 암호화 적용
if (is_encrypted or (is_encrypted is None and memo['is_encrypted'])) and password:
if final_content is not None:
final_content = encrypt_content(final_content, password)
if title is not None: updates['title'] = title.strip()
if final_content is not None: updates['content'] = final_content
if data.get('color') is not None: updates['color'] = data.get('color')
if data.get('is_pinned') is not None: updates['is_pinned'] = 1 if data.get('is_pinned') else 0
if data.get('status') is not None: updates['status'] = data.get('status').strip()
if group_name is not None: updates['group_name'] = group_name.strip()
if is_encrypted is not None: updates['is_encrypted'] = 1 if is_encrypted else 0
if data.get('category') is not None: updates['category'] = data.get('category')
links = extract_links(content) if content is not None else None
attachment_filenames = data.get('attachment_filenames')
MemoRepository.update(memo_id, updates, tags=user_tags, links=links, attachment_filenames=attachment_filenames)
return True, "Updated"
@staticmethod
def delete_memo(memo_id):
memo = MemoRepository.get_by_id(memo_id)
if not memo: return False, "Memo not found"
if memo['is_encrypted']: return False, "msg_encrypted_locked"
# 물리 파일 삭제
upload_folder = current_app.config['UPLOAD_FOLDER']
for f in memo['attachments']:
filepath = os.path.join(upload_folder, f['filename'])
if os.path.exists(filepath):
os.remove(filepath)
MemoRepository.delete(memo_id)
return True, "Deleted"
@staticmethod
def decrypt_memo(memo_id, password):
memo = MemoRepository.get_by_id(memo_id)
if not memo: return None, "Memo not found"
if not memo['is_encrypted']: return memo['content'], None
decrypted = decrypt_content(memo['content'], password)
if decrypted is None: return None, "Invalid password"
return decrypted, None
@staticmethod
def get_heatmap_stats(days=365):
return MemoRepository.get_heatmap(days)
+3 -3
View File
@@ -17,9 +17,9 @@ def parse_metadata(text, default_group=GROUP_DEFAULT):
if group_match:
group_name = group_match.group(1)
# #태그 추출 (마크다운 헤더 # , ## 및 내부 링크[[#ID]] 방지)
# 태그는 반드시 # 바로 뒤에 영문/숫자/한글이 붙어 있어야 하며, 앞에 다른 문자가 없어야 함
tag_matches = re.finditer(r'(?<!#)(?<!\[\[)(?<!\w)#([^\s\#\d\W][^\s\#]*)', text)
# 태그 추출 (공백이나 줄 시작 뒤의 #만 인정하며, HTML 속성(: 또는 =) 뒤의 #은 무시)
tag_regex = r'(?<![:=])(?<![:=]\s)(?:(?<=\s)|(?<=^))#([^\s\#\d\W][\w가-힣-]*)'
tag_matches = re.finditer(tag_regex, text)
for match in tag_matches:
tags.append(match.group(1))