Lin / backend /api /posts.py
Zelyanoth's picture
fix: Update JWT handling in cookies to avoid modifying immutable headers
995f94b
raw
history blame
24.6 kB
import os
import codecs
import uuid
import base64
from flask import Blueprint, request, jsonify, current_app, send_file
from flask_jwt_extended import jwt_required, get_jwt_identity
from backend.services.content_service import ContentService
from backend.services.linkedin_service import LinkedInService
posts_bp = Blueprint('posts', __name__)
def safe_log_message(message):
"""Safely log messages containing Unicode characters."""
try:
# Try to encode as UTF-8 first, then decode with error handling
if isinstance(message, str):
# For strings, try to encode and decode safely
encoded = message.encode('utf-8', errors='replace')
safe_message = encoded.decode('utf-8', errors='replace')
else:
# For non-strings, convert to string first
safe_message = str(message)
# Log to app logger instead of print
current_app.logger.debug(safe_message)
except Exception as e:
# Ultimate fallback - log the error
current_app.logger.error(f"Failed to log message: {str(e)}")
@posts_bp.route('/', methods=['OPTIONS'])
@posts_bp.route('', methods=['OPTIONS'])
def handle_options():
"""Handle OPTIONS requests for preflight CORS checks."""
return '', 200
@posts_bp.route('/', methods=['GET'])
@posts_bp.route('', methods=['GET'])
@jwt_required()
def get_posts():
"""
Get all posts for the current user.
Query Parameters:
published (bool): Filter by published status
Returns:
JSON: List of posts
"""
try:
user_id = get_jwt_identity()
published = request.args.get('published', type=bool)
# Check if Supabase client is initialized
if not hasattr(current_app, 'supabase') or current_app.supabase is None:
# Add CORS headers to error response
response_data = jsonify({
'success': False,
'message': 'Database connection not initialized'
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 500
# Build query
query = (
current_app.supabase
.table("Post_content")
.select("*, Social_network(id_utilisateur)")
)
# Apply published filter if specified
if published is not None:
query = query.eq("is_published", published)
response = query.execute()
# Filter posts for the current user
user_posts = [
post for post in response.data
if post.get('Social_network', {}).get('id_utilisateur') == user_id
] if response.data else []
# Add CORS headers explicitly
response_data = jsonify({
'success': True,
'posts': user_posts
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 200
except Exception as e:
error_message = str(e)
safe_log_message(f"Get posts error: {error_message}")
# Add CORS headers to error response
response_data = jsonify({
'success': False,
'message': 'An error occurred while fetching posts'
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 500
def _generate_post_task(user_id, job_id, job_store, hugging_key):
"""
Background task to generate post content.
Args:
user_id (str): User ID for personalization
job_id (str): Job ID to update status in job store
job_store (dict): Job store dictionary
hugging_key (str): Hugging Face API key
"""
try:
# Update job status to processing
job_store[job_id] = {
'status': 'processing',
'result': None,
'error': None
}
# Generate content using content service
# Pass the Hugging Face key directly to the service
content_service = ContentService(hugging_key=hugging_key)
generated_result = content_service.generate_post_content(user_id)
# Handle the case where generated_result might be a tuple (content, image_data)
# image_data could be bytes (from base64) or a string (URL)
if isinstance(generated_result, tuple):
generated_content, image_data = generated_result
else:
generated_content = generated_result
image_data = None
# Update job status to completed with result
job_store[job_id] = {
'status': 'completed',
'result': {
'content': generated_content,
'image_data': image_data # This could be bytes or a URL string
},
'error': None
}
except Exception as e:
error_message = str(e)
safe_log_message(f"Generate post background task error: {error_message}")
# Update job status to failed with error
job_store[job_id] = {
'status': 'failed',
'result': None,
'error': error_message
}
@posts_bp.route('/generate', methods=['POST'])
@jwt_required()
def generate_post():
"""
Generate a new post using AI asynchronously.
Request Body:
user_id (str): User ID (optional, defaults to current user)
Returns:
JSON: Job ID for polling
"""
try:
current_user_id = get_jwt_identity()
data = request.get_json()
# Use provided user_id or default to current user
user_id = data.get('user_id', current_user_id)
# Verify user authorization (can only generate for self unless admin)
if user_id != current_user_id:
return jsonify({
'success': False,
'message': 'Unauthorized to generate posts for other users'
}), 403
# Create a job ID
job_id = str(uuid.uuid4())
# Initialize job status
current_app.job_store[job_id] = {
'status': 'pending',
'result': None,
'error': None
}
# Get Hugging Face key
hugging_key = current_app.config['HUGGING_KEY']
# Submit the background task, passing all necessary data
current_app.executor.submit(_generate_post_task, user_id, job_id, current_app.job_store, hugging_key)
# Return job ID immediately
return jsonify({
'success': True,
'job_id': job_id,
'message': 'Post generation started'
}), 202 # 202 Accepted
except Exception as e:
error_message = str(e)
safe_log_message(f"Generate post error: {error_message}")
return jsonify({
'success': False,
'message': f'An error occurred while starting post generation: {error_message}'
}), 500
@posts_bp.route('/jobs/<job_id>', methods=['GET'])
@jwt_required()
def get_job_status(job_id):
"""
Get the status of a post generation job.
Path Parameters:
job_id (str): Job ID
Returns:
JSON: Job status and result if completed
"""
try:
# Get job from store
job = current_app.job_store.get(job_id)
if not job:
return jsonify({
'success': False,
'message': 'Job not found'
}), 404
# Prepare response
response_data = {
'success': True,
'job_id': job_id,
'status': job['status']
}
# Include result or error if available
if job['status'] == 'completed':
# Handle the new structure of the result
if isinstance(job['result'], dict) and 'content' in job['result']:
response_data['content'] = job['result']['content']
# Handle image_data which could be bytes or a URL string
image_data = job['result'].get('image_data')
if isinstance(image_data, bytes):
# Convert bytes to base64 for sending in JSON
try:
# Encode bytes to base64 string
base64_image = base64.b64encode(image_data).decode('utf-8')
# Create a data URL for the image
response_data['image_url'] = f"data:image/png;base64,{base64_image}"
response_data['has_image_data'] = True
# Also include the raw image_data for the frontend
response_data['image_data'] = response_data['image_url']
except Exception as e:
current_app.logger.error(f"Error encoding image to base64: {str(e)}")
response_data['image_url'] = None
response_data['has_image_data'] = True
elif isinstance(image_data, dict):
# Handle the case where image_data is a dict from Gradio API
# The dict may contain 'path', 'url', or other keys
if image_data.get('url'):
response_data['image_url'] = image_data['url']
response_data['has_image_data'] = True
response_data['image_data'] = image_data['url']
elif image_data.get('path'):
# If we have a path, we might need to serve it or convert it to a URL
# For now, we'll just indicate that image data exists
response_data['image_url'] = None
response_data['has_image_data'] = True
response_data['image_data'] = image_data['path']
else:
response_data['image_url'] = None
response_data['has_image_data'] = image_data is not None
response_data['image_data'] = image_data
elif isinstance(image_data, str):
# Check if it's a local file path
if os.path.exists(image_data):
# Store the file path in job store for later retrieval
job['image_file_path'] = image_data
response_data['image_url'] = f"/api/posts/image/{job_id}" # API endpoint to serve the image
response_data['has_image_data'] = True
response_data['image_data'] = image_data
else:
# If it's a URL or base64 data, use it directly
response_data['image_url'] = image_data
response_data['has_image_data'] = True
response_data['image_data'] = image_data
else:
# If it's None or other type
response_data['image_url'] = None
response_data['has_image_data'] = image_data is not None
response_data['image_data'] = image_data
else:
response_data['content'] = job['result']
response_data['image_url'] = None
response_data['has_image_data'] = False
elif job['status'] == 'failed':
response_data['error'] = job['error']
return jsonify(response_data), 200
except Exception as e:
error_message = str(e)
safe_log_message(f"Get job status error: {error_message}")
return jsonify({
'success': False,
'message': f'An error occurred while fetching job status: {error_message}'
}), 500
@posts_bp.route('/image/<job_id>', methods=['GET'])
def get_job_image(job_id):
"""
Serve image file for a completed job.
Path Parameters:
job_id (str): Job ID
Returns:
Image file
"""
try:
# Get job from store
job = current_app.job_store.get(job_id)
if not job:
return jsonify({
'success': False,
'message': 'Job not found'
}), 404
# Check if job has an image file path
image_file_path = job.get('image_file_path')
if not image_file_path or not os.path.exists(image_file_path):
return jsonify({
'success': False,
'message': 'Image not found'
}), 404
# Serve the image file
return send_file(image_file_path)
except Exception as e:
error_message = str(e)
safe_log_message(f"Get job image error: {error_message}")
return jsonify({
'success': False,
'message': f'An error occurred while fetching image: {error_message}'
}), 500
@posts_bp.route('/publish-direct', methods=['OPTIONS'])
def handle_publish_direct_options():
"""Handle OPTIONS requests for preflight CORS checks for publish direct route."""
return '', 200
@posts_bp.route('/publish-direct', methods=['POST'])
@jwt_required()
def publish_post_direct():
"""
Publish a post directly to social media and save to database.
Request Body:
social_account_id (str): Social account ID
text_content (str): Post text content
image_content_url (str, optional): Image URL
scheduled_at (str, optional): Scheduled time in ISO format
Returns:
JSON: Publish post result
"""
try:
user_id = get_jwt_identity()
data = request.get_json()
# Validate required fields
social_account_id = data.get('social_account_id')
text_content = data.get('text_content')
if not social_account_id or not text_content:
return jsonify({
'success': False,
'message': 'social_account_id and text_content are required'
}), 400
# Verify the social account belongs to the user
account_response = (
current_app.supabase
.table("Social_network")
.select("id_utilisateur, token, sub")
.eq("id", social_account_id)
.execute()
)
if not account_response.data:
return jsonify({
'success': False,
'message': 'Social account not found'
}), 404
account = account_response.data[0]
if account.get('id_utilisateur') != user_id:
return jsonify({
'success': False,
'message': 'Unauthorized to use this social account'
}), 403
# Get account details
access_token = account.get('token')
user_sub = account.get('sub')
if not access_token or not user_sub:
return jsonify({
'success': False,
'message': 'Social account not properly configured'
}), 400
# Get optional fields
image_data = data.get('image_content_url') # This could be bytes or a URL string
# Handle image data - if it's bytes, we need to convert it for LinkedIn
image_url_for_linkedin = None
if image_data:
if isinstance(image_data, bytes):
# If it's bytes, we can't directly send it to LinkedIn
# For now, we'll skip sending the image to LinkedIn
# In a future update, we might save it to storage and get a URL
current_app.logger.warning("Image data is in bytes format, skipping LinkedIn upload for now")
else:
# If it's a string, assume it's a URL
image_url_for_linkedin = image_data
# Publish to LinkedIn
linkedin_service = LinkedInService()
publish_response = linkedin_service.publish_post(
access_token, user_sub, text_content, image_url_for_linkedin
)
# Save to database as published
post_data = {
'id_social': social_account_id,
'Text_content': text_content,
'is_published': True
}
# Add optional fields if provided
if image_data:
post_data['image_content_url'] = image_data
if 'scheduled_at' in data:
post_data['scheduled_at'] = data['scheduled_at']
# Insert post into database
response = (
current_app.supabase
.table("Post_content")
.insert(post_data)
.execute()
)
if response.data:
# Add CORS headers explicitly
response_data = jsonify({
'success': True,
'message': 'Post published and saved successfully',
'post': response.data[0],
'linkedin_response': publish_response
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 201
else:
# Add CORS headers to error response
response_data = jsonify({
'success': False,
'message': 'Failed to save post to database'
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 500
except Exception as e:
error_message = str(e)
safe_log_message(f"[Post] Publish post directly error: {error_message}")
# Add CORS headers to error response
response_data = jsonify({
'success': False,
'message': f'An error occurred while publishing post: {error_message}'
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 500
@posts_bp.route('/<post_id>', methods=['OPTIONS'])
def handle_post_options(post_id):
"""Handle OPTIONS requests for preflight CORS checks for specific post."""
return '', 200
@posts_bp.route('/', methods=['POST'])
@posts_bp.route('', methods=['POST'])
@jwt_required()
def create_post():
"""
Create a new post.
Request Body:
social_account_id (str): Social account ID
text_content (str): Post text content
image_content_url (str, optional): Image URL
scheduled_at (str, optional): Scheduled time in ISO format
is_published (bool, optional): Whether the post is published (defaults to True)
Returns:
JSON: Created post data
"""
try:
user_id = get_jwt_identity()
data = request.get_json()
# Validate required fields
social_account_id = data.get('social_account_id')
text_content = data.get('text_content')
if not social_account_id or not text_content:
return jsonify({
'success': False,
'message': 'social_account_id and text_content are required'
}), 400
# Verify the social account belongs to the user
account_response = (
current_app.supabase
.table("Social_network")
.select("id_utilisateur")
.eq("id", social_account_id)
.execute()
)
if not account_response.data:
return jsonify({
'success': False,
'message': 'Social account not found'
}), 404
if account_response.data[0].get('id_utilisateur') != user_id:
return jsonify({
'success': False,
'message': 'Unauthorized to use this social account'
}), 403
# Prepare post data - always mark as published
post_data = {
'id_social': social_account_id,
'Text_content': text_content,
'is_published': data.get('is_published', True) # Default to True
}
# Handle image data - could be bytes or a URL string
image_data = data.get('image_content_url')
# Add optional fields if provided
if image_data is not None:
post_data['image_content_url'] = image_data
if 'scheduled_at' in data:
post_data['scheduled_at'] = data['scheduled_at']
# Insert post into database
response = (
current_app.supabase
.table("Post_content")
.insert(post_data)
.execute()
)
if response.data:
# Add CORS headers explicitly
response_data = jsonify({
'success': True,
'post': response.data[0]
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 201
else:
# Add CORS headers to error response
response_data = jsonify({
'success': False,
'message': 'Failed to create post'
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 500
except Exception as e:
error_message = str(e)
safe_log_message(f"[Post] Create post error: {error_message}")
# Add CORS headers to error response
response_data = jsonify({
'success': False,
'message': f'An error occurred while creating post: {error_message}'
})
response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000')
response_data.headers.add('Access-Control-Allow-Credentials', 'true')
return response_data, 500
@posts_bp.route('/<post_id>', methods=['DELETE'])
@jwt_required()
def delete_post(post_id):
"""
Delete a post.
Path Parameters:
post_id (str): Post ID
Returns:
JSON: Delete post result
"""
try:
user_id = get_jwt_identity()
# Verify the post belongs to the user
response = (
current_app.supabase
.table("Post_content")
.select("Social_network(id_utilisateur)")
.eq("id", post_id)
.execute()
)
if not response.data:
return jsonify({
'success': False,
'message': 'Post not found'
}), 404
post = response.data[0]
if post.get('Social_network', {}).get('id_utilisateur') != user_id:
return jsonify({
'success': False,
'message': 'Unauthorized to delete this post'
}), 403
# Delete post from Supabase
delete_response = (
current_app.supabase
.table("Post_content")
.delete()
.eq("id", post_id)
.execute()
)
if delete_response.data:
return jsonify({
'success': True,
'message': 'Post deleted successfully'
}), 200
else:
return jsonify({
'success': False,
'message': 'Failed to delete post'
}), 500
except Exception as e:
error_message = str(e)
safe_log_message(f"Delete post error: {error_message}")
return jsonify({
'success': False,
'message': 'An error occurred while deleting post'
}), 500