Zelyanoth commited on
Commit
40a3caf
·
1 Parent(s): 4d24b11

feat: Enhance LinkedIn image publishing by supporting bytes data and implementing temporary file handling

Browse files
LINKEDIN_IMAGE_PUBLISHING_SOLUTION.md ADDED
@@ -0,0 +1,250 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # LinkedIn Image Publishing Solution
2
+
3
+ ## Problem Statement
4
+
5
+ The current implementation stores images as bytes in the database, but LinkedIn's API requires temporary upload URLs for images. This mismatch prevents successful image publishing to LinkedIn.
6
+
7
+ ## Current Implementation Analysis
8
+
9
+ ### Image Storage
10
+ Images are currently stored in the database as bytes using the `ensure_bytes_format` utility function in `backend/utils/image_utils.py`. This function handles:
11
+ - Converting base64 encoded strings to bytes
12
+ - Storing URLs as strings
13
+ - Keeping bytes data as-is
14
+
15
+ ### LinkedIn API Process
16
+ The current `LinkedInService.publish_post` method in `backend/services/linkedin_service.py` follows these steps:
17
+ 1. Register upload with LinkedIn API using `registerUploadRequest`
18
+ 2. Get temporary upload URL and asset URN
19
+ 3. If image_url is a string (URL), download and upload to LinkedIn
20
+ 4. Create post with the asset URN
21
+
22
+ However, when image data is stored as bytes, the current implementation skips the image upload entirely.
23
+
24
+ ## Solution Design
25
+
26
+ ### 1. Converting Stored Image Bytes to LinkedIn-Compatible Format
27
+
28
+ We need to modify the `publish_post` method to handle bytes data by:
29
+ 1. Creating a temporary file from the bytes data
30
+ 2. Uploading this file to LinkedIn using the existing register/upload mechanism
31
+ 3. Cleaning up the temporary file after upload
32
+
33
+ ### 2. Temporary File Storage Mechanism
34
+
35
+ We'll implement a temporary file storage solution using Python's `tempfile` module:
36
+ - Create temporary files with secure random names
37
+ - Store files in the system's temporary directory
38
+ - Use appropriate file extensions based on image type
39
+ - Implement automatic cleanup after use
40
+
41
+ ### 3. Integration with Existing LinkedInService
42
+
43
+ The solution will be integrated into the `LinkedInService.publish_post` method:
44
+ - Add a new code path to handle bytes data
45
+ - Maintain backward compatibility with URL-based images
46
+ - Use the same register/upload mechanism for consistency
47
+
48
+ ### 4. Security Considerations
49
+
50
+ - Use secure temporary file creation to prevent path traversal attacks
51
+ - Validate image data before creating temporary files
52
+ - Set appropriate file permissions on temporary files
53
+ - Implement cleanup mechanisms to prevent disk space exhaustion
54
+
55
+ ### 5. Cleanup Mechanism
56
+
57
+ - Immediate cleanup after successful upload
58
+ - Error handling to ensure cleanup even if upload fails
59
+ - Periodic cleanup of orphaned temporary files (if any)
60
+
61
+ ## Implementation Plan
62
+
63
+ ### Step 1: Modify LinkedInService
64
+
65
+ Update `backend/services/linkedin_service.py` to handle bytes data:
66
+
67
+ ```python
68
+ def publish_post(self, access_token: str, user_id: str, text_content: str, image_url: str = None) -> dict:
69
+ # ... existing code ...
70
+
71
+ if image_url:
72
+ if isinstance(image_url, bytes):
73
+ # Handle bytes data - create temporary file and upload
74
+ temp_file_path = self._create_temp_image_file(image_url)
75
+ try:
76
+ # Register upload
77
+ register_body = {
78
+ "registerUploadRequest": {
79
+ "recipes": ["urn:li:digitalmediaRecipe:feedshare-image"],
80
+ "owner": f"urn:li:person:{user_id}",
81
+ "serviceRelationships": [{
82
+ "relationshipType": "OWNER",
83
+ "identifier": "urn:li:userGeneratedContent"
84
+ }]
85
+ }
86
+ }
87
+
88
+ r = requests.post(
89
+ "https://api.linkedin.com/v2/assets?action=registerUpload",
90
+ headers=headers,
91
+ json=register_body
92
+ )
93
+
94
+ if r.status_code not in (200, 201):
95
+ raise Exception(f"Failed to register upload: {r.status_code} {r.text}")
96
+
97
+ datar = r.json()["value"]
98
+ upload_url = datar["uploadMechanism"]["com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest"]["uploadUrl"]
99
+ asset_urn = datar["asset"]
100
+
101
+ # Upload image from temporary file
102
+ upload_headers = {
103
+ "Authorization": f"Bearer {access_token}",
104
+ "X-Restli-Protocol-Version": "2.0.0",
105
+ "Content-Type": "application/octet-stream"
106
+ }
107
+
108
+ with open(temp_file_path, 'rb') as f:
109
+ image_data = f.read()
110
+ upload_response = requests.put(upload_url, headers=upload_headers, data=image_data)
111
+ if upload_response.status_code not in (200, 201):
112
+ raise Exception(f"Failed to upload image: {upload_response.status_code} {upload_response.text}")
113
+
114
+ # Create post with image
115
+ post_body = {
116
+ "author": f"urn:li:person:{user_id}",
117
+ "lifecycleState": "PUBLISHED",
118
+ "specificContent": {
119
+ "com.linkedin.ugc.ShareContent": {
120
+ "shareCommentary": {"text": text_content},
121
+ "shareMediaCategory": "IMAGE",
122
+ "media": [{
123
+ "status": "READY",
124
+ "media": asset_urn,
125
+ "description": {"text": "Post image"},
126
+ "title": {"text": "Post image"}
127
+ }]
128
+ }
129
+ },
130
+ "visibility": {"com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"}
131
+ }
132
+ finally:
133
+ # Clean up temporary file
134
+ self._cleanup_temp_file(temp_file_path)
135
+ # ... rest of existing code for URL handling ...
136
+ ```
137
+
138
+ ### Step 2: Add Helper Methods
139
+
140
+ Add the following helper methods to `LinkedInService`:
141
+
142
+ ```python
143
+ import tempfile
144
+ import os
145
+ from typing import Optional
146
+
147
+ def _create_temp_image_file(self, image_bytes: bytes) -> str:
148
+ """
149
+ Create a temporary file from image bytes.
150
+
151
+ Args:
152
+ image_bytes: Image data as bytes
153
+
154
+ Returns:
155
+ Path to the temporary file
156
+ """
157
+ # Create a temporary file
158
+ temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg')
159
+ temp_file_path = temp_file.name
160
+
161
+ try:
162
+ # Write image bytes to the temporary file
163
+ temp_file.write(image_bytes)
164
+ temp_file.flush()
165
+ finally:
166
+ temp_file.close()
167
+
168
+ return temp_file_path
169
+
170
+ def _cleanup_temp_file(self, file_path: str) -> None:
171
+ """
172
+ Safely remove a temporary file.
173
+
174
+ Args:
175
+ file_path: Path to the temporary file to remove
176
+ """
177
+ try:
178
+ if file_path and os.path.exists(file_path):
179
+ os.unlink(file_path)
180
+ except Exception as e:
181
+ # Log the error but don't fail the operation
182
+ import logging
183
+ logging.error(f"Failed to cleanup temporary file {file_path}: {str(e)}")
184
+ ```
185
+
186
+ ### Step 3: Update Error Handling
187
+
188
+ Enhance error handling to ensure cleanup happens even if the upload fails:
189
+
190
+ ```python
191
+ def publish_post(self, access_token: str, user_id: str, text_content: str, image_url: str = None) -> dict:
192
+ temp_file_path = None
193
+ try:
194
+ # ... existing implementation with temporary file creation ...
195
+ except Exception as e:
196
+ # Ensure cleanup happens even if an error occurs
197
+ if temp_file_path:
198
+ self._cleanup_temp_file(temp_file_path)
199
+ raise e
200
+ ```
201
+
202
+ ## Security Considerations
203
+
204
+ 1. **Secure Temporary File Creation**: Using `tempfile.NamedTemporaryFile` with `delete=False` creates files with secure random names in the system's temporary directory, preventing predictable file name attacks.
205
+
206
+ 2. **File Permissions**: The temporary files will have default system permissions, which are typically restricted to the creating user.
207
+
208
+ 3. **Input Validation**: Before creating temporary files, we should validate that the data is indeed image data and not malicious content.
209
+
210
+ 4. **Resource Management**: Implementing proper cleanup mechanisms prevents disk space exhaustion from orphaned temporary files.
211
+
212
+ ## Alternative Approaches
213
+
214
+ ### In-Memory Upload
215
+ Instead of creating temporary files, we could upload directly from memory. However, this approach:
216
+ - Requires modifying the existing LinkedIn upload mechanism
217
+ - May have memory limitations for large images
218
+ - Is more complex to implement safely
219
+
220
+ ### Streaming Upload
221
+ Streaming the bytes directly to LinkedIn's upload endpoint:
222
+ - More memory efficient
223
+ - More complex implementation
224
+ - Requires careful handling of HTTP streaming
225
+
226
+ The temporary file approach is chosen for its simplicity and reliability while maintaining compatibility with the existing upload mechanism.
227
+
228
+ ## Testing Plan
229
+
230
+ 1. **Unit Tests**: Create tests for the new helper methods
231
+ 2. **Integration Tests**: Test the complete flow with bytes data
232
+ 3. **Error Handling Tests**: Verify cleanup happens even when uploads fail
233
+ 4. **Security Tests**: Validate temporary files are created securely
234
+
235
+ ## Deployment Considerations
236
+
237
+ 1. **File System Permissions**: Ensure the application has write access to the temporary directory
238
+ 2. **Disk Space Monitoring**: Monitor temporary directory for space issues
239
+ 3. **Cleanup Verification**: Verify that temporary files are properly cleaned up in all scenarios
240
+
241
+ ## Conclusion
242
+
243
+ This solution addresses the LinkedIn image publishing issue by:
244
+ 1. Converting stored image bytes to a format compatible with LinkedIn's upload API through temporary files
245
+ 2. Implementing a secure temporary file storage mechanism
246
+ 3. Integrating seamlessly with the existing LinkedInService
247
+ 4. Addressing security considerations for temporary file storage
248
+ 5. Implementing proper cleanup mechanisms
249
+
250
+ The approach maintains backward compatibility while adding the needed functionality to handle bytes-based image data.
backend/services/linkedin_service.py CHANGED
@@ -2,6 +2,9 @@ from flask import current_app
2
  import requests
3
  from requests_oauthlib import OAuth2Session
4
  from urllib.parse import urlencode
 
 
 
5
 
6
  class LinkedInService:
7
  """Service for LinkedIn API integration."""
@@ -122,6 +125,43 @@ class LinkedInService:
122
  logger.error(f"🔗 [LinkedIn] Error type: {type(e)}")
123
  raise e
124
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
  def publish_post(self, access_token: str, user_id: str, text_content: str, image_url: str = None) -> dict:
126
  """
127
  Publish a post to LinkedIn.
@@ -130,11 +170,12 @@ class LinkedInService:
130
  access_token (str): LinkedIn access token
131
  user_id (str): LinkedIn user ID
132
  text_content (str): Post content
133
- image_url (str, optional): Image URL (must be a URL, not bytes)
134
 
135
  Returns:
136
  dict: Publish response
137
  """
 
138
  url = "https://api.linkedin.com/v2/ugcPosts"
139
  headers = {
140
  "Authorization": f"Bearer {access_token}",
@@ -142,83 +183,150 @@ class LinkedInService:
142
  "Content-Type": "application/json"
143
  }
144
 
145
- # Skip image handling if image_url is not a string (i.e., if it's bytes)
146
- if image_url and isinstance(image_url, str):
147
- # Handle image upload
148
- register_body = {
149
- "registerUploadRequest": {
150
- "recipes": ["urn:li:digitalmediaRecipe:feedshare-image"],
151
- "owner": f"urn:li:person:{user_id}",
152
- "serviceRelationships": [{
153
- "relationshipType": "OWNER",
154
- "identifier": "urn:li:userGeneratedContent"
155
- }]
 
 
 
 
156
  }
157
- }
158
-
159
- r = requests.post(
160
- "https://api.linkedin.com/v2/assets?action=registerUpload",
161
- headers=headers,
162
- json=register_body
163
- )
164
-
165
- if r.status_code not in (200, 201):
166
- raise Exception(f"Failed to register upload: {r.status_code} {r.text}")
167
-
168
- datar = r.json()["value"]
169
- upload_url = datar["uploadMechanism"]["com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest"]["uploadUrl"]
170
- asset_urn = datar["asset"]
171
-
172
- # Upload image
173
- upload_headers = {
174
- "Authorization": f"Bearer {access_token}",
175
- "X-Restli-Protocol-Version": "2.0.0",
176
- "Content-Type": "application/octet-stream"
177
- }
178
-
179
- # Download image and upload to LinkedIn
180
- image_response = requests.get(image_url)
181
- if image_response.status_code == 200:
182
- upload_response = requests.put(upload_url, headers=upload_headers, data=image_response.content)
183
- if upload_response.status_code not in (200, 201):
184
- raise Exception(f"Failed to upload image: {upload_response.status_code} {upload_response.text}")
185
-
186
- # Create post with image
187
- post_body = {
188
- "author": f"urn:li:person:{user_id}",
189
- "lifecycleState": "PUBLISHED",
190
- "specificContent": {
191
- "com.linkedin.ugc.ShareContent": {
192
- "shareCommentary": {"text": text_content},
193
- "shareMediaCategory": "IMAGE",
194
- "media": [{
195
- "status": "READY",
196
- "media": asset_urn,
197
- "description": {"text": "Post image"},
198
- "title": {"text": "Post image"}
 
 
 
 
 
 
 
 
 
 
 
 
199
  }]
200
  }
201
- },
202
- "visibility": {"com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"}
203
- }
204
- else:
205
- # Create text-only post
206
- post_body = {
207
- "author": f"urn:li:person:{user_id}",
208
- "lifecycleState": "PUBLISHED",
209
- "specificContent": {
210
- "com.linkedin.ugc.ShareContent": {
211
- "shareCommentary": {
212
- "text": text_content
213
- },
214
- "shareMediaCategory": "NONE"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
215
  }
216
- },
217
- "visibility": {
218
- "com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"
219
  }
220
- }
221
-
222
- response = requests.post(url, headers=headers, json=post_body)
223
- response.raise_for_status()
224
- return response.json()
 
 
 
 
 
 
 
2
  import requests
3
  from requests_oauthlib import OAuth2Session
4
  from urllib.parse import urlencode
5
+ import tempfile
6
+ import os
7
+ import logging
8
 
9
  class LinkedInService:
10
  """Service for LinkedIn API integration."""
 
125
  logger.error(f"🔗 [LinkedIn] Error type: {type(e)}")
126
  raise e
127
 
128
+ def _create_temp_image_file(self, image_bytes: bytes) -> str:
129
+ """
130
+ Create a temporary file from image bytes.
131
+
132
+ Args:
133
+ image_bytes: Image data as bytes
134
+
135
+ Returns:
136
+ Path to the temporary file
137
+ """
138
+ # Create a temporary file
139
+ temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg')
140
+ temp_file_path = temp_file.name
141
+
142
+ try:
143
+ # Write image bytes to the temporary file
144
+ temp_file.write(image_bytes)
145
+ temp_file.flush()
146
+ finally:
147
+ temp_file.close()
148
+
149
+ return temp_file_path
150
+
151
+ def _cleanup_temp_file(self, file_path: str) -> None:
152
+ """
153
+ Safely remove a temporary file.
154
+
155
+ Args:
156
+ file_path: Path to the temporary file to remove
157
+ """
158
+ try:
159
+ if file_path and os.path.exists(file_path):
160
+ os.unlink(file_path)
161
+ except Exception as e:
162
+ # Log the error but don't fail the operation
163
+ logging.error(f"Failed to cleanup temporary file {file_path}: {str(e)}")
164
+
165
  def publish_post(self, access_token: str, user_id: str, text_content: str, image_url: str = None) -> dict:
166
  """
167
  Publish a post to LinkedIn.
 
170
  access_token (str): LinkedIn access token
171
  user_id (str): LinkedIn user ID
172
  text_content (str): Post content
173
+ image_url (str or bytes, optional): Image URL or image bytes
174
 
175
  Returns:
176
  dict: Publish response
177
  """
178
+ temp_file_path = None
179
  url = "https://api.linkedin.com/v2/ugcPosts"
180
  headers = {
181
  "Authorization": f"Bearer {access_token}",
 
183
  "Content-Type": "application/json"
184
  }
185
 
186
+ try:
187
+ if image_url and isinstance(image_url, bytes):
188
+ # Handle bytes data - create temporary file and upload
189
+ temp_file_path = self._create_temp_image_file(image_url)
190
+
191
+ # Register upload
192
+ register_body = {
193
+ "registerUploadRequest": {
194
+ "recipes": ["urn:li:digitalmediaRecipe:feedshare-image"],
195
+ "owner": f"urn:li:person:{user_id}",
196
+ "serviceRelationships": [{
197
+ "relationshipType": "OWNER",
198
+ "identifier": "urn:li:userGeneratedContent"
199
+ }]
200
+ }
201
  }
202
+
203
+ r = requests.post(
204
+ "https://api.linkedin.com/v2/assets?action=registerUpload",
205
+ headers=headers,
206
+ json=register_body
207
+ )
208
+
209
+ if r.status_code not in (200, 201):
210
+ raise Exception(f"Failed to register upload: {r.status_code} {r.text}")
211
+
212
+ datar = r.json()["value"]
213
+ upload_url = datar["uploadMechanism"]["com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest"]["uploadUrl"]
214
+ asset_urn = datar["asset"]
215
+
216
+ # Upload image from temporary file
217
+ upload_headers = {
218
+ "Authorization": f"Bearer {access_token}",
219
+ "X-Restli-Protocol-Version": "2.0.0",
220
+ "Content-Type": "application/octet-stream"
221
+ }
222
+
223
+ with open(temp_file_path, 'rb') as f:
224
+ image_data = f.read()
225
+ upload_response = requests.put(upload_url, headers=upload_headers, data=image_data)
226
+ if upload_response.status_code not in (200, 201):
227
+ raise Exception(f"Failed to upload image: {upload_response.status_code} {upload_response.text}")
228
+
229
+ # Create post with image
230
+ post_body = {
231
+ "author": f"urn:li:person:{user_id}",
232
+ "lifecycleState": "PUBLISHED",
233
+ "specificContent": {
234
+ "com.linkedin.ugc.ShareContent": {
235
+ "shareCommentary": {"text": text_content},
236
+ "shareMediaCategory": "IMAGE",
237
+ "media": [{
238
+ "status": "READY",
239
+ "media": asset_urn,
240
+ "description": {"text": "Post image"},
241
+ "title": {"text": "Post image"}
242
+ }]
243
+ }
244
+ },
245
+ "visibility": {"com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"}
246
+ }
247
+ elif image_url and isinstance(image_url, str):
248
+ # Handle image upload for URL-based images
249
+ register_body = {
250
+ "registerUploadRequest": {
251
+ "recipes": ["urn:li:digitalmediaRecipe:feedshare-image"],
252
+ "owner": f"urn:li:person:{user_id}",
253
+ "serviceRelationships": [{
254
+ "relationshipType": "OWNER",
255
+ "identifier": "urn:li:userGeneratedContent"
256
  }]
257
  }
258
+ }
259
+
260
+ r = requests.post(
261
+ "https://api.linkedin.com/v2/assets?action=registerUpload",
262
+ headers=headers,
263
+ json=register_body
264
+ )
265
+
266
+ if r.status_code not in (200, 201):
267
+ raise Exception(f"Failed to register upload: {r.status_code} {r.text}")
268
+
269
+ datar = r.json()["value"]
270
+ upload_url = datar["uploadMechanism"]["com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest"]["uploadUrl"]
271
+ asset_urn = datar["asset"]
272
+
273
+ # Upload image
274
+ upload_headers = {
275
+ "Authorization": f"Bearer {access_token}",
276
+ "X-Restli-Protocol-Version": "2.0.0",
277
+ "Content-Type": "application/octet-stream"
278
+ }
279
+
280
+ # Download image and upload to LinkedIn
281
+ image_response = requests.get(image_url)
282
+ if image_response.status_code == 200:
283
+ upload_response = requests.put(upload_url, headers=upload_headers, data=image_response.content)
284
+ if upload_response.status_code not in (200, 201):
285
+ raise Exception(f"Failed to upload image: {upload_response.status_code} {upload_response.text}")
286
+
287
+ # Create post with image
288
+ post_body = {
289
+ "author": f"urn:li:person:{user_id}",
290
+ "lifecycleState": "PUBLISHED",
291
+ "specificContent": {
292
+ "com.linkedin.ugc.ShareContent": {
293
+ "shareCommentary": {"text": text_content},
294
+ "shareMediaCategory": "IMAGE",
295
+ "media": [{
296
+ "status": "READY",
297
+ "media": asset_urn,
298
+ "description": {"text": "Post image"},
299
+ "title": {"text": "Post image"}
300
+ }]
301
+ }
302
+ },
303
+ "visibility": {"com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"}
304
+ }
305
+ else:
306
+ # Create text-only post
307
+ post_body = {
308
+ "author": f"urn:li:person:{user_id}",
309
+ "lifecycleState": "PUBLISHED",
310
+ "specificContent": {
311
+ "com.linkedin.ugc.ShareContent": {
312
+ "shareCommentary": {
313
+ "text": text_content
314
+ },
315
+ "shareMediaCategory": "NONE"
316
+ }
317
+ },
318
+ "visibility": {
319
+ "com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"
320
  }
 
 
 
321
  }
322
+
323
+ response = requests.post(url, headers=headers, json=post_body)
324
+ response.raise_for_status()
325
+ return response.json()
326
+ except Exception as e:
327
+ # Re-raise the exception to maintain existing behavior
328
+ raise e
329
+ finally:
330
+ # Clean up temporary file if it was created
331
+ if temp_file_path:
332
+ self._cleanup_temp_file(temp_file_path)
backend/tests/test_scheduler_image_integration.py ADDED
@@ -0,0 +1,310 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Integration test for scheduling system with image handling.
4
+ Tests the end-to-end workflow from scheduling through content generation to publishing,
5
+ specifically for posts with images in both bytes and URL formats.
6
+ """
7
+
8
+ import sys
9
+ import os
10
+ import unittest
11
+ import uuid
12
+ from unittest.mock import patch, MagicMock
13
+ from datetime import datetime, timedelta
14
+
15
+ # Add the backend directory to the path
16
+ sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
17
+
18
+ from backend.scheduler.apscheduler_service import APSchedulerService
19
+ from backend.services.content_service import ContentService
20
+ from backend.utils.image_utils import ensure_bytes_format
21
+
22
+
23
+ class TestSchedulerImageIntegration(unittest.TestCase):
24
+ """Test cases for scheduler integration with image handling."""
25
+
26
+ def setUp(self):
27
+ """Set up test fixtures."""
28
+ self.user_id = "test_user_123"
29
+ self.schedule_id = str(uuid.uuid4())
30
+ self.social_account_id = "test_social_456"
31
+
32
+ # Mock Flask app
33
+ self.mock_app = MagicMock()
34
+ self.mock_app.config = {
35
+ 'SUPABASE_URL': 'test_url',
36
+ 'SUPABASE_KEY': 'test_key'
37
+ }
38
+
39
+ # Initialize scheduler service with mock app
40
+ self.scheduler_service = APSchedulerService(self.mock_app)
41
+
42
+ # Mock Supabase client
43
+ self.mock_supabase = MagicMock()
44
+ self.scheduler_service.supabase_client = self.mock_supabase
45
+
46
+ # Mock scheduler
47
+ self.mock_scheduler = MagicMock()
48
+ self.scheduler_service.scheduler = self.mock_scheduler
49
+
50
+ def test_image_bytes_processing_in_content_generation(self):
51
+ """Test that image bytes are properly processed and stored during content generation."""
52
+ # Mock content service to return content with image bytes
53
+ test_content = "This is a test post with an image"
54
+ test_image_bytes = b"fake image bytes data"
55
+
56
+ with patch('backend.scheduler.apscheduler_service.ContentService') as mock_content_service_class:
57
+ mock_content_service = MagicMock()
58
+ mock_content_service.generate_post_content.return_value = (test_content, test_image_bytes)
59
+ mock_content_service_class.return_value = mock_content_service
60
+
61
+ # Mock database response for schedule lookup
62
+ self.mock_supabase.table.return_value.select.return_value.eq.return_value.execute.return_value.data = [
63
+ {'id_social': self.social_account_id}
64
+ ]
65
+
66
+ # Mock database response for content storage
67
+ mock_insert_response = MagicMock()
68
+ mock_insert_response.data = [{'id': 'test_post_789'}]
69
+ self.mock_supabase.table.return_value.insert.return_value.execute.return_value = mock_insert_response
70
+
71
+ # Execute the content generation task
72
+ self.scheduler_service.generate_content_task(self.user_id, self.schedule_id)
73
+
74
+ # Verify content service was called
75
+ mock_content_service.generate_post_content.assert_called_once_with(self.user_id)
76
+
77
+ # Verify database insert was called with correct parameters
78
+ self.mock_supabase.table.assert_called_with("Post_content")
79
+ self.mock_supabase.table.return_value.insert.assert_called_once()
80
+
81
+ # Get the actual call arguments to verify image handling
82
+ call_args = self.mock_supabase.table.return_value.insert.call_args
83
+ inserted_data = call_args[0][0] # First positional argument
84
+
85
+ # Verify text content is stored correctly
86
+ self.assertEqual(inserted_data['Text_content'], test_content)
87
+
88
+ # Verify image data is stored correctly (should be bytes)
89
+ self.assertEqual(inserted_data['image_content_url'], test_image_bytes)
90
+ self.assertEqual(inserted_data['is_published'], False)
91
+ self.assertEqual(inserted_data['sched'], self.schedule_id)
92
+
93
+ def test_image_url_processing_in_content_generation(self):
94
+ """Test that image URLs are properly stored during content generation."""
95
+ # Mock content service to return content with image URL
96
+ test_content = "This is a test post with an image URL"
97
+ test_image_url = "https://example.com/test-image.jpg"
98
+
99
+ with patch('backend.scheduler.apscheduler_service.ContentService') as mock_content_service_class:
100
+ mock_content_service = MagicMock()
101
+ mock_content_service.generate_post_content.return_value = (test_content, test_image_url)
102
+ mock_content_service_class.return_value = mock_content_service
103
+
104
+ # Mock database response for schedule lookup
105
+ self.mock_supabase.table.return_value.select.return_value.eq.return_value.execute.return_value.data = [
106
+ {'id_social': self.social_account_id}
107
+ ]
108
+
109
+ # Mock database response for content storage
110
+ mock_insert_response = MagicMock()
111
+ mock_insert_response.data = [{'id': 'test_post_789'}]
112
+ self.mock_supabase.table.return_value.insert.return_value.execute.return_value = mock_insert_response
113
+
114
+ # Execute the content generation task
115
+ self.scheduler_service.generate_content_task(self.user_id, self.schedule_id)
116
+
117
+ # Verify content service was called
118
+ mock_content_service.generate_post_content.assert_called_once_with(self.user_id)
119
+
120
+ # Verify database insert was called with correct parameters
121
+ self.mock_supabase.table.assert_called_with("Post_content")
122
+ self.mock_supabase.table.return_value.insert.assert_called_once()
123
+
124
+ # Get the actual call arguments to verify image handling
125
+ call_args = self.mock_supabase.table.return_value.insert.call_args
126
+ inserted_data = call_args[0][0] # First positional argument
127
+
128
+ # Verify text content is stored correctly
129
+ self.assertEqual(inserted_data['Text_content'], test_content)
130
+
131
+ # Verify image URL is stored correctly
132
+ self.assertEqual(inserted_data['image_content_url'], test_image_url)
133
+ self.assertEqual(inserted_data['is_published'], False)
134
+ self.assertEqual(inserted_data['sched'], self.schedule_id)
135
+
136
+ def test_image_base64_processing_in_content_generation(self):
137
+ """Test that base64 encoded images are properly converted to bytes during content generation."""
138
+ # Mock content service to return content with base64 image
139
+ test_content = "This is a test post with a base64 image"
140
+ test_base64_image = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg=="
141
+ expected_bytes = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\x0cIDAT\x08\xd7c\xfc\xff\x9f\xa1\x1e\x00\x07\x82\x02\x7f=\xc8H\xef\x00\x00\x00\x00IEND\xaeB`\x82'
142
+
143
+ with patch('backend.scheduler.apscheduler_service.ContentService') as mock_content_service_class:
144
+ mock_content_service = MagicMock()
145
+ mock_content_service.generate_post_content.return_value = (test_content, test_base64_image)
146
+ mock_content_service_class.return_value = mock_content_service
147
+
148
+ # Mock database response for schedule lookup
149
+ self.mock_supabase.table.return_value.select.return_value.eq.return_value.execute.return_value.data = [
150
+ {'id_social': self.social_account_id}
151
+ ]
152
+
153
+ # Mock database response for content storage
154
+ mock_insert_response = MagicMock()
155
+ mock_insert_response.data = [{'id': 'test_post_789'}]
156
+ self.mock_supabase.table.return_value.insert.return_value.execute.return_value = mock_insert_response
157
+
158
+ # Execute the content generation task
159
+ self.scheduler_service.generate_content_task(self.user_id, self.schedule_id)
160
+
161
+ # Verify content service was called
162
+ mock_content_service.generate_post_content.assert_called_once_with(self.user_id)
163
+
164
+ # Verify database insert was called with correct parameters
165
+ self.mock_supabase.table.assert_called_with("Post_content")
166
+ self.mock_supabase.table.return_value.insert.assert_called_once()
167
+
168
+ # Get the actual call arguments to verify image handling
169
+ call_args = self.mock_supabase.table.return_value.insert.call_args
170
+ inserted_data = call_args[0][0] # First positional argument
171
+
172
+ # Verify text content is stored correctly
173
+ self.assertEqual(inserted_data['Text_content'], test_content)
174
+
175
+ # Verify base64 image was converted to bytes
176
+ self.assertEqual(inserted_data['image_content_url'], expected_bytes)
177
+ self.assertEqual(inserted_data['is_published'], False)
178
+ self.assertEqual(inserted_data['sched'], self.schedule_id)
179
+
180
+ def test_publishing_with_image_bytes(self):
181
+ """Test that posts with image bytes are properly published."""
182
+ # Test data
183
+ test_post_id = "test_post_123"
184
+ test_content = "This is a test post for publishing"
185
+ test_image_bytes = b"fake image bytes data for publishing"
186
+
187
+ # Mock database response for unpublished post lookup
188
+ self.mock_supabase.table.return_value.select.return_value.eq.return_value.eq.return_value.order.return_value.limit.return_value.execute.return_value.data = [
189
+ {
190
+ 'id': test_post_id,
191
+ 'Text_content': test_content,
192
+ 'image_content_url': test_image_bytes,
193
+ 'is_published': False
194
+ }
195
+ ]
196
+
197
+ # Mock database response for schedule lookup
198
+ self.mock_supabase.table.return_value.select.return_value.eq.return_value.execute.return_value.data = [
199
+ {
200
+ 'Social_network': {
201
+ 'token': 'test_access_token',
202
+ 'sub': 'test_user_sub'
203
+ }
204
+ }
205
+ ]
206
+
207
+ # Mock LinkedIn service
208
+ with patch('backend.scheduler.apscheduler_service.LinkedInService') as mock_linkedin_service_class:
209
+ mock_linkedin_service = MagicMock()
210
+ mock_linkedin_service.publish_post.return_value = {'success': True}
211
+ mock_linkedin_service_class.return_value = mock_linkedin_service
212
+
213
+ # Mock database response for post update
214
+ mock_update_response = MagicMock()
215
+ mock_update_response.data = [{'id': test_post_id}]
216
+ self.mock_supabase.table.return_value.update.return_value.eq.return_value.execute.return_value = mock_update_response
217
+
218
+ # Execute the publishing task
219
+ self.scheduler_service.publish_post_task(self.schedule_id)
220
+
221
+ # Verify LinkedIn service was called with correct parameters
222
+ mock_linkedin_service.publish_post.assert_called_once_with(
223
+ 'test_access_token',
224
+ 'test_user_sub',
225
+ test_content,
226
+ test_image_bytes # Bytes should be passed directly
227
+ )
228
+
229
+ # Verify post status was updated in database
230
+ self.mock_supabase.table.return_value.update.assert_called_once_with({"is_published": True})
231
+ self.mock_supabase.table.return_value.update.return_value.eq.assert_called_once_with("id", test_post_id)
232
+
233
+ def test_publishing_with_image_url(self):
234
+ """Test that posts with image URLs are properly published."""
235
+ # Test data
236
+ test_post_id = "test_post_456"
237
+ test_content = "This is a test post with URL for publishing"
238
+ test_image_url = "https://example.com/publish-test-image.jpg"
239
+
240
+ # Mock database response for unpublished post lookup
241
+ self.mock_supabase.table.return_value.select.return_value.eq.return_value.eq.return_value.order.return_value.limit.return_value.execute.return_value.data = [
242
+ {
243
+ 'id': test_post_id,
244
+ 'Text_content': test_content,
245
+ 'image_content_url': test_image_url,
246
+ 'is_published': False
247
+ }
248
+ ]
249
+
250
+ # Mock database response for schedule lookup
251
+ self.mock_supabase.table.return_value.select.return_value.eq.return_value.execute.return_value.data = [
252
+ {
253
+ 'Social_network': {
254
+ 'token': 'test_access_token',
255
+ 'sub': 'test_user_sub'
256
+ }
257
+ }
258
+ ]
259
+
260
+ # Mock LinkedIn service
261
+ with patch('backend.scheduler.apscheduler_service.LinkedInService') as mock_linkedin_service_class:
262
+ mock_linkedin_service = MagicMock()
263
+ mock_linkedin_service.publish_post.return_value = {'success': True}
264
+ mock_linkedin_service_class.return_value = mock_linkedin_service
265
+
266
+ # Mock database response for post update
267
+ mock_update_response = MagicMock()
268
+ mock_update_response.data = [{'id': test_post_id}]
269
+ self.mock_supabase.table.return_value.update.return_value.eq.return_value.execute.return_value = mock_update_response
270
+
271
+ # Execute the publishing task
272
+ self.scheduler_service.publish_post_task(self.schedule_id)
273
+
274
+ # Verify LinkedIn service was called with correct parameters
275
+ mock_linkedin_service.publish_post.assert_called_once_with(
276
+ 'test_access_token',
277
+ 'test_user_sub',
278
+ test_content,
279
+ test_image_url # URL should be passed directly
280
+ )
281
+
282
+ # Verify post status was updated in database
283
+ self.mock_supabase.table.return_value.update.assert_called_once_with({"is_published": True})
284
+ self.mock_supabase.table.return_value.update.return_value.eq.assert_called_once_with("id", test_post_id)
285
+
286
+ def test_ensure_bytes_format_utility(self):
287
+ """Test the ensure_bytes_format utility function with different input types."""
288
+ # Test with bytes input
289
+ test_bytes = b"test bytes data"
290
+ result = ensure_bytes_format(test_bytes)
291
+ self.assertEqual(result, test_bytes)
292
+
293
+ # Test with base64 string input
294
+ test_base64 = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg=="
295
+ expected_bytes = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\x0cIDAT\x08\xd7c\xfc\xff\x9f\xa1\x1e\x00\x07\x82\x02\x7f=\xc8H\xef\x00\x00\x00\x00IEND\xaeB`\x82'
296
+ result = ensure_bytes_format(test_base64)
297
+ self.assertEqual(result, expected_bytes)
298
+
299
+ # Test with URL string input
300
+ test_url = "https://example.com/image.jpg"
301
+ result = ensure_bytes_format(test_url)
302
+ self.assertEqual(result, test_url)
303
+
304
+ # Test with None input
305
+ result = ensure_bytes_format(None)
306
+ self.assertIsNone(result)
307
+
308
+
309
+ if __name__ == '__main__':
310
+ unittest.main()