Spaces:
Running
Running
Commit
·
06a4ed9
1
Parent(s):
2bf48c4
fix: add job app id and split the incoming msg
Browse files
public-prediction/kafka_consumer.py
CHANGED
|
@@ -14,7 +14,7 @@ def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
|
|
| 14 |
return data
|
| 15 |
|
| 16 |
|
| 17 |
-
def process_batch(batch: List[dict[str, any]], batch_size: int):
|
| 18 |
with ThreadPoolExecutor(max_workers=batch_size) as executor:
|
| 19 |
gpt_helper = GetGPTAnswer()
|
| 20 |
futures = [executor.submit(
|
|
@@ -39,7 +39,8 @@ def consume_messages():
|
|
| 39 |
|
| 40 |
for message in consumer:
|
| 41 |
try:
|
| 42 |
-
|
|
|
|
| 43 |
except json.JSONDecodeError:
|
| 44 |
print("Failed to decode JSON from message:", message.value)
|
| 45 |
print("Continuing...")
|
|
@@ -47,4 +48,5 @@ def consume_messages():
|
|
| 47 |
|
| 48 |
for i in range(0, len(full_batch), BATCH_SIZE):
|
| 49 |
batch = full_batch[i:i+BATCH_SIZE]
|
| 50 |
-
process_batch(batch, BATCH_SIZE
|
|
|
|
|
|
| 14 |
return data
|
| 15 |
|
| 16 |
|
| 17 |
+
def process_batch(batch: List[dict[str, any]], batch_size: int, job_application_id: int):
|
| 18 |
with ThreadPoolExecutor(max_workers=batch_size) as executor:
|
| 19 |
gpt_helper = GetGPTAnswer()
|
| 20 |
futures = [executor.submit(
|
|
|
|
| 39 |
|
| 40 |
for message in consumer:
|
| 41 |
try:
|
| 42 |
+
incoming_message = json.loads(message.value.decode("utf-8"))
|
| 43 |
+
full_batch = incoming_message["data"]
|
| 44 |
except json.JSONDecodeError:
|
| 45 |
print("Failed to decode JSON from message:", message.value)
|
| 46 |
print("Continuing...")
|
|
|
|
| 48 |
|
| 49 |
for i in range(0, len(full_batch), BATCH_SIZE):
|
| 50 |
batch = full_batch[i:i+BATCH_SIZE]
|
| 51 |
+
process_batch(batch, BATCH_SIZE,
|
| 52 |
+
incoming_message["job_application_id"])
|