LiamKhoaLe commited on
Commit
367abb6
·
1 Parent(s): 01ac218

Optimise chats and reports route concurrency

Browse files
Files changed (2) hide show
  1. routes/chats.py +84 -43
  2. routes/reports.py +83 -81
routes/chats.py CHANGED
@@ -203,6 +203,7 @@ async def delete_all_chat_history(user_id: str):
203
 
204
  # In-memory status tracking for real-time updates
205
  chat_status_store = {}
 
206
 
207
  @app.get("/chat/status/{session_id}", response_model=StatusUpdateResponse)
208
  async def get_chat_status(session_id: str):
@@ -349,19 +350,23 @@ async def chat(
349
  rag.db["chat_sessions"].insert_one(session_data)
350
  logger.info(f"[CHAT] Created session record for {session_id}")
351
 
352
- # If this is the first user message, trigger auto-naming
353
  if existing_messages == 0:
354
  try:
355
  from helpers.namer import auto_name_session_immediate
356
- session_name = await auto_name_session_immediate(
357
- user_id, project_id, session_id, question, nvidia_rotator, rag.db
358
- )
359
- if session_name:
360
- logger.info(f"[CHAT] Auto-named session {session_id} to '{session_name}'")
361
- else:
362
- logger.warning(f"[CHAT] Auto-naming failed for session {session_id}")
 
 
 
 
363
  except Exception as e:
364
- logger.warning(f"[CHAT] Auto-naming failed: {e}")
365
 
366
  # Get the chat response
367
  chat_response = await asyncio.wait_for(_chat_impl(user_id, project_id, question, k, use_web=use_web, max_web=max_web, session_id=session_id), timeout=120.0)
@@ -502,29 +507,55 @@ async def _chat_impl(
502
  # Use enhanced question for better query variations
503
  enhanced_queries = await _generate_query_variations(enhanced_question, nvidia_rotator)
504
  logger.info(f"[CHAT] Generated {len(enhanced_queries)} query variations")
505
-
506
  # Update status: Planning action (planning search strategy)
507
  if session_id:
508
  update_chat_status(session_id, "planning", "Planning action...", 25)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
509
  all_hits = []
510
  search_strategies = ["flat", "hybrid", "local"]
 
 
511
  for strategy in search_strategies:
512
- for query_variant in enhanced_queries:
513
- q_vec = embedder.embed([query_variant])[0]
514
- hits = rag.vector_search(
515
  user_id=user_id,
516
  project_id=project_id,
517
  query_vector=q_vec,
518
  k=k,
519
  filenames=relevant_files if relevant_files else None,
520
  search_type=strategy
521
- )
522
- if hits:
523
- all_hits.extend(hits)
524
- logger.info(f"[CHAT] {strategy} search with '{query_variant[:50]}...' returned {len(hits)} hits")
525
- break
526
- if all_hits:
527
- break
 
 
 
 
528
  hits = _deduplicate_and_rank_hits(all_hits, question)
529
  logger.info(f"[CHAT] Final vector search returned {len(hits) if hits else 0} hits")
530
  if not hits:
@@ -682,38 +713,48 @@ async def _chat_impl(
682
  from memo.history import get_history_manager
683
  history_manager = get_history_manager(memory)
684
  qa_sum = await history_manager.summarize_qa_with_nvidia(question, answer, nvidia_rotator)
685
-
686
  # Use session-specific memory storage
687
  memory.add_session_memory(user_id, project_id, session_id, question, answer, {
688
  "relevant_files": relevant_files,
689
  "sources_count": len(sources_meta),
690
  "timestamp": time.time()
691
  })
692
-
693
  # Also add to global memory for backward compatibility
694
  memory.add(user_id, qa_sum)
695
-
696
- if memory.is_enhanced_available():
697
- await memory.add_conversation_memory(
698
- user_id=user_id,
699
- question=question,
700
- answer=answer,
701
- project_id=project_id,
702
- session_id=session_id, # Add session_id to enhanced memory
703
- context={
704
- "relevant_files": relevant_files,
705
- "sources_count": len(sources_meta),
706
- "timestamp": time.time()
707
- }
708
- )
709
-
710
- # Trigger memory consolidation if needed
711
  try:
712
- consolidation_result = await memory.consolidate_memories(user_id, nvidia_rotator)
713
- if consolidation_result.get("consolidated", 0) > 0:
714
- logger.info(f"[CHAT] Memory consolidated: {consolidation_result}")
715
- except Exception as e:
716
- logger.warning(f"[CHAT] Memory consolidation failed: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
717
  except Exception as e:
718
  logger.warning(f"QA summarize/store failed: {e}")
719
  # Merge web sources if any (normalize to filename=url for frontend display)
 
203
 
204
  # In-memory status tracking for real-time updates
205
  chat_status_store = {}
206
+ _embedding_cache = {}
207
 
208
  @app.get("/chat/status/{session_id}", response_model=StatusUpdateResponse)
209
  async def get_chat_status(session_id: str):
 
350
  rag.db["chat_sessions"].insert_one(session_data)
351
  logger.info(f"[CHAT] Created session record for {session_id}")
352
 
353
+ # If this is the first user message, trigger auto-naming (non-blocking)
354
  if existing_messages == 0:
355
  try:
356
  from helpers.namer import auto_name_session_immediate
357
+ import asyncio as _asyncio_name
358
+ async def _do_name():
359
+ try:
360
+ _name = await auto_name_session_immediate(
361
+ user_id, project_id, session_id, question, nvidia_rotator, rag.db
362
+ )
363
+ if _name:
364
+ logger.info(f"[CHAT] Auto-named session {session_id} to '{_name}'")
365
+ except Exception as _e:
366
+ logger.warning(f"[CHAT] Auto-naming failed: {_e}")
367
+ _asyncio_name.create_task(_do_name())
368
  except Exception as e:
369
+ logger.warning(f"[CHAT] Auto-naming scheduling failed: {e}")
370
 
371
  # Get the chat response
372
  chat_response = await asyncio.wait_for(_chat_impl(user_id, project_id, question, k, use_web=use_web, max_web=max_web, session_id=session_id), timeout=120.0)
 
507
  # Use enhanced question for better query variations
508
  enhanced_queries = await _generate_query_variations(enhanced_question, nvidia_rotator)
509
  logger.info(f"[CHAT] Generated {len(enhanced_queries)} query variations")
510
+
511
  # Update status: Planning action (planning search strategy)
512
  if session_id:
513
  update_chat_status(session_id, "planning", "Planning action...", 25)
514
+
515
+ # Batch-embed all query variants once
516
+ # Simple per-session embedding cache
517
+ cache_key = f"{project_id}:{session_id or 'na'}:" + ("|".join(enhanced_queries))[:512]
518
+ try:
519
+ now_ts = time.time()
520
+ # Evict stale entries (older than 10 minutes) or keep cache under 128 items
521
+ if len(_embedding_cache) > 128:
522
+ for k in list(_embedding_cache.keys())[:32]:
523
+ _embedding_cache.pop(k, None)
524
+ if cache_key in _embedding_cache and (now_ts - _embedding_cache[cache_key][0] < 600):
525
+ query_vectors = _embedding_cache[cache_key][1]
526
+ else:
527
+ query_vectors = embedder.embed(enhanced_queries)
528
+ _embedding_cache[cache_key] = (now_ts, query_vectors)
529
+ except Exception as e:
530
+ logger.warning(f"[CHAT] Batch embedding failed, falling back per-query: {e}")
531
+ query_vectors = [embedder.embed([q])[0] for q in enhanced_queries]
532
+
533
+ # Run vector searches concurrently across strategies and query variants
534
  all_hits = []
535
  search_strategies = ["flat", "hybrid", "local"]
536
+ tasks = []
537
+ import asyncio as _asyncio
538
  for strategy in search_strategies:
539
+ for q_vec in query_vectors:
540
+ tasks.append(_asyncio.to_thread(
541
+ rag.vector_search,
542
  user_id=user_id,
543
  project_id=project_id,
544
  query_vector=q_vec,
545
  k=k,
546
  filenames=relevant_files if relevant_files else None,
547
  search_type=strategy
548
+ ))
549
+ try:
550
+ results = await _asyncio.gather(*tasks, return_exceptions=True)
551
+ for idx, res in enumerate(results):
552
+ if isinstance(res, Exception):
553
+ continue
554
+ if res:
555
+ all_hits.extend(res)
556
+ logger.info(f"[CHAT] Parallel search produced {len(all_hits)} raw hits across {len(tasks)} tasks")
557
+ except Exception as e:
558
+ logger.warning(f"[CHAT] Parallel search failed: {e}")
559
  hits = _deduplicate_and_rank_hits(all_hits, question)
560
  logger.info(f"[CHAT] Final vector search returned {len(hits) if hits else 0} hits")
561
  if not hits:
 
713
  from memo.history import get_history_manager
714
  history_manager = get_history_manager(memory)
715
  qa_sum = await history_manager.summarize_qa_with_nvidia(question, answer, nvidia_rotator)
716
+
717
  # Use session-specific memory storage
718
  memory.add_session_memory(user_id, project_id, session_id, question, answer, {
719
  "relevant_files": relevant_files,
720
  "sources_count": len(sources_meta),
721
  "timestamp": time.time()
722
  })
723
+
724
  # Also add to global memory for backward compatibility
725
  memory.add(user_id, qa_sum)
726
+
727
+ # Enhanced memory writes and consolidation deferred to background
728
+ async def _write_enhanced_and_consolidate():
 
 
 
 
 
 
 
 
 
 
 
 
 
729
  try:
730
+ if memory.is_enhanced_available():
731
+ await memory.add_conversation_memory(
732
+ user_id=user_id,
733
+ question=question,
734
+ answer=answer,
735
+ project_id=project_id,
736
+ session_id=session_id,
737
+ context={
738
+ "relevant_files": relevant_files,
739
+ "sources_count": len(sources_meta),
740
+ "timestamp": time.time()
741
+ }
742
+ )
743
+ try:
744
+ consolidation_result = await memory.consolidate_memories(user_id, nvidia_rotator)
745
+ if consolidation_result.get("consolidated", 0) > 0:
746
+ logger.info(f"[CHAT] Memory consolidated: {consolidation_result}")
747
+ except Exception as ce:
748
+ logger.warning(f"[CHAT] Memory consolidation failed: {ce}")
749
+ except Exception as we:
750
+ logger.warning(f"[CHAT] Enhanced memory write failed: {we}")
751
+
752
+ try:
753
+ import asyncio as _asyncio2
754
+ _asyncio2.create_task(_write_enhanced_and_consolidate())
755
+ except Exception:
756
+ # If scheduling fails, fall back to inline write
757
+ await _write_enhanced_and_consolidate()
758
  except Exception as e:
759
  logger.warning(f"QA summarize/store failed: {e}")
760
  # Merge web sources if any (normalize to filename=url for frontend display)
routes/reports.py CHANGED
@@ -425,12 +425,14 @@ async def execute_detailed_subtasks(cot_plan: Dict[str, Any], context_text: str,
425
  subsection_number = 1
426
  agent_context = {} # Store context from previous agents for CoT references
427
 
428
- for section in sections:
 
 
 
 
429
  section_title = section.get("title", "Unknown Section")
430
  section_priority = section.get("priority", "important")
431
-
432
- # Assign section number (1, 2, 3, etc.)
433
- section_id = f"{section_number}"
434
  section_analysis = {
435
  "section_id": section_id,
436
  "title": section_title,
@@ -438,94 +440,94 @@ async def execute_detailed_subtasks(cot_plan: Dict[str, Any], context_text: str,
438
  "priority": section_priority,
439
  "subtask_results": [],
440
  "section_synthesis": "",
441
- "agent_context": agent_context.copy() # Pass context from previous agents
442
  }
443
-
444
- # Process each subtask with hierarchical subsection assignment
445
- subtask_number = 1
446
- for subtask in section.get("subtasks", []):
447
- task = subtask.get("task", "")
448
- reasoning = subtask.get("reasoning", "")
449
- sources_needed = subtask.get("sources_needed", ["local"])
450
- depth = subtask.get("depth", "detailed")
451
- sub_actions = subtask.get("sub_actions", [])
452
- expected_output = subtask.get("expected_output", "")
453
- quality_checks = subtask.get("quality_checks", [])
454
-
455
- # Assign subsection number (1.1, 1.2, 2.1, 2.2, etc.)
456
- subsection_id = f"{section_number}.{subtask_number}"
457
-
458
- # Generate comprehensive analysis with CoT references
459
- subtask_result = await analyze_subtask_with_cot_references(
460
- subsection_id, task, reasoning, sources_needed, depth, sub_actions,
461
- expected_output, quality_checks, context_text, web_context, filename,
462
- agent_context, nvidia_rotator, gemini_rotator
463
- )
464
-
465
- # If the subtask implies coding, generate code artifacts and explanations
466
- if any(kw in (task.lower() + " " + reasoning.lower()) for kw in ["implement", "code", "function", "class", "api", "script", "module", "endpoint"]):
467
- try:
468
- logger.info(f"[REPORT] Triggering code generation for {subsection_id}")
469
- code_markdown = await generate_code_artifacts(
470
- subsection_id=subsection_id,
471
- task=task,
472
- reasoning=reasoning,
473
- context_text=context_text,
474
- web_context=web_context,
475
- gemini_rotator=gemini_rotator,
476
- nvidia_rotator=nvidia_rotator
477
- )
478
- # Append code and explanation beneath the analysis
479
- subtask_result = subtask_result + "\n\n" + code_markdown
480
- # Parse structured code for indexing and downstream usage
481
  try:
482
- code_blocks = extract_structured_code(code_markdown)
483
- except Exception as pe:
484
- logger.warning(f"[REPORT] Failed to parse structured code for {subsection_id}: {pe}")
485
- code_blocks = []
486
- except Exception as ce:
487
- logger.warning(f"[REPORT] Code generation failed for {subsection_id}: {ce}")
488
-
489
- # Store agent context for next agents
490
- agent_context[f"{section_id}.{subtask_number}"] = {
491
- "subsection_id": subsection_id,
492
- "task": task,
493
- "key_findings": extract_key_findings(subtask_result),
494
- "evidence": extract_evidence(subtask_result),
495
- "conclusions": extract_conclusions(subtask_result)
496
- }
497
-
498
- section_analysis["subtask_results"].append({
499
- "subsection_id": subsection_id,
500
- "task": task,
501
- "reasoning": reasoning,
502
- "depth": depth,
503
- "sub_actions": sub_actions,
504
- "expected_output": expected_output,
505
- "quality_checks": quality_checks,
506
- "analysis": subtask_result,
507
- **({"code_blocks": code_blocks} if 'code_blocks' in locals() else {}),
508
- "agent_context": agent_context.copy()
509
- })
510
-
511
- subtask_number += 1
512
-
513
- # Generate section-level synthesis with cross-references
 
 
 
 
 
 
 
 
 
 
 
 
 
 
514
  section_synthesis = await synthesize_section_with_cot_references(
515
- section_analysis, synthesis_strategy, agent_context, nvidia_rotator, gemini_rotator
516
  )
517
  section_analysis["section_synthesis"] = section_synthesis
518
-
519
- # Update agent context with section-level insights
520
- agent_context[f"section_{section_id}"] = {
521
  "section_id": section_id,
522
  "title": section_title,
523
  "key_insights": extract_key_insights(section_synthesis),
524
  "cross_references": extract_cross_references(section_synthesis)
525
  }
526
-
527
- detailed_analysis[section_title] = section_analysis
 
 
 
528
  section_number += 1
 
 
 
 
 
529
 
530
  logger.info(f"[REPORT] Completed hierarchical analysis for {len(detailed_analysis)} sections with CoT references")
531
  return detailed_analysis
 
425
  subsection_number = 1
426
  agent_context = {} # Store context from previous agents for CoT references
427
 
428
+ import asyncio as _asyncio
429
+ semaphore = _asyncio.Semaphore(4) # limit concurrency to avoid provider rate limits
430
+
431
+ async def _process_section(section, section_number_local, agent_context_shared):
432
+ nonlocal subsection_number
433
  section_title = section.get("title", "Unknown Section")
434
  section_priority = section.get("priority", "important")
435
+ section_id = f"{section_number_local}"
 
 
436
  section_analysis = {
437
  "section_id": section_id,
438
  "title": section_title,
 
440
  "priority": section_priority,
441
  "subtask_results": [],
442
  "section_synthesis": "",
443
+ "agent_context": agent_context_shared.copy()
444
  }
445
+
446
+ async def _process_subtask(subtask, subtask_index):
447
+ async with semaphore:
448
+ task = subtask.get("task", "")
449
+ reasoning = subtask.get("reasoning", "")
450
+ sources_needed = subtask.get("sources_needed", ["local"])
451
+ depth = subtask.get("depth", "detailed")
452
+ sub_actions = subtask.get("sub_actions", [])
453
+ expected_output = subtask.get("expected_output", "")
454
+ quality_checks = subtask.get("quality_checks", [])
455
+ subsection_id = f"{section_number_local}.{subtask_index}"
456
+ subtask_result = await analyze_subtask_with_cot_references(
457
+ subsection_id, task, reasoning, sources_needed, depth, sub_actions,
458
+ expected_output, quality_checks, context_text, web_context, filename,
459
+ agent_context_shared, nvidia_rotator, gemini_rotator
460
+ )
461
+ code_blocks = None
462
+ if any(kw in (task.lower() + " " + reasoning.lower()) for kw in ["implement", "code", "function", "class", "api", "script", "module", "endpoint"]):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
463
  try:
464
+ logger.info(f"[REPORT] Triggering code generation for {subsection_id}")
465
+ code_markdown = await generate_code_artifacts(
466
+ subsection_id=subsection_id,
467
+ task=task,
468
+ reasoning=reasoning,
469
+ context_text=context_text,
470
+ web_context=web_context,
471
+ gemini_rotator=gemini_rotator,
472
+ nvidia_rotator=nvidia_rotator
473
+ )
474
+ subtask_result = subtask_result + "\n\n" + code_markdown
475
+ try:
476
+ code_blocks = extract_structured_code(code_markdown)
477
+ except Exception as pe:
478
+ logger.warning(f"[REPORT] Failed to parse structured code for {subsection_id}: {pe}")
479
+ code_blocks = []
480
+ except Exception as ce:
481
+ logger.warning(f"[REPORT] Code generation failed for {subsection_id}: {ce}")
482
+ agent_context_shared[f"{section_id}.{subtask_index}"] = {
483
+ "subsection_id": subsection_id,
484
+ "task": task,
485
+ "key_findings": extract_key_findings(subtask_result),
486
+ "evidence": extract_evidence(subtask_result),
487
+ "conclusions": extract_conclusions(subtask_result)
488
+ }
489
+ section_analysis["subtask_results"].append({
490
+ "subsection_id": subsection_id,
491
+ "task": task,
492
+ "reasoning": reasoning,
493
+ "depth": depth,
494
+ "sub_actions": sub_actions,
495
+ "expected_output": expected_output,
496
+ "quality_checks": quality_checks,
497
+ "analysis": subtask_result,
498
+ **({"code_blocks": code_blocks} if code_blocks is not None else {}),
499
+ "agent_context": agent_context_shared.copy()
500
+ })
501
+
502
+ subtask_tasks = []
503
+ subtask_index = 1
504
+ for subtask in section.get("subtasks", []):
505
+ subtask_tasks.append(_process_subtask(subtask, subtask_index))
506
+ subtask_index += 1
507
+ if subtask_tasks:
508
+ await _asyncio.gather(*subtask_tasks)
509
+
510
  section_synthesis = await synthesize_section_with_cot_references(
511
+ section_analysis, synthesis_strategy, agent_context_shared, nvidia_rotator, gemini_rotator
512
  )
513
  section_analysis["section_synthesis"] = section_synthesis
514
+ agent_context_shared[f"section_{section_id}"] = {
 
 
515
  "section_id": section_id,
516
  "title": section_title,
517
  "key_insights": extract_key_insights(section_synthesis),
518
  "cross_references": extract_cross_references(section_synthesis)
519
  }
520
+ return section_title, section_analysis
521
+
522
+ section_tasks = []
523
+ for section in sections:
524
+ section_tasks.append(_process_section(section, section_number, agent_context))
525
  section_number += 1
526
+ if section_tasks:
527
+ results = await _asyncio.gather(*section_tasks)
528
+ for title, analysis in results:
529
+ detailed_analysis[title] = analysis
530
+
531
 
532
  logger.info(f"[REPORT] Completed hierarchical analysis for {len(detailed_analysis)} sections with CoT references")
533
  return detailed_analysis