vikramvasudevan commited on
Commit
73a6587
·
verified ·
1 Parent(s): bb80c5f

Upload folder using huggingface_hub

Browse files
config.py CHANGED
@@ -754,6 +754,13 @@ class SanatanConfig:
754
  if scripture["collection_name"] == collection_name
755
  ][0]
756
 
 
 
 
 
 
 
 
757
  def is_metadata_field_allowed(
758
  self, collection_name: str, metadata_where_clause: MetadataWhereClause
759
  ):
 
754
  if scripture["collection_name"] == collection_name
755
  ][0]
756
 
757
+ def get_scripture_by_name(self, scripture_name: str):
758
+ return [
759
+ scripture
760
+ for scripture in self.scriptures
761
+ if scripture["name"] == scripture_name
762
+ ][0]
763
+
764
  def is_metadata_field_allowed(
765
  self, collection_name: str, metadata_where_clause: MetadataWhereClause
766
  ):
db.py CHANGED
@@ -7,9 +7,8 @@ import re, unicodedata
7
  from config import SanatanConfig
8
  from embeddings import get_embedding
9
  import logging
10
- from pydantic import BaseModel
11
 
12
- from metadata import MetadataFilter, MetadataWhereClause
13
  from modules.db.relevance import validate_relevance_queryresult
14
  from tqdm import tqdm
15
 
@@ -59,7 +58,7 @@ class SanatanDatabase:
59
  metadata_where_clause.to_chroma_where()
60
  if metadata_where_clause is not None
61
  else None
62
- )
63
  )
64
  docs = data["documents"] # list of all verse texts
65
  ids = data["ids"]
@@ -79,9 +78,7 @@ class SanatanDatabase:
79
  )
80
 
81
  def fetch_first_match(
82
- self,
83
- collection_name: str,
84
- metadata_where_clause: MetadataWhereClause = None
85
  ):
86
  """This version is created to support the browse module"""
87
  logger.info(
@@ -96,14 +93,14 @@ class SanatanDatabase:
96
  metadata_where_clause.to_chroma_where()
97
  if metadata_where_clause is not None
98
  else None
99
- )
100
  )
101
 
102
  if data["metadatas"]:
103
  # find index of record with lowest _global_index
104
  min_index = min(
105
  range(len(data["metadatas"])),
106
- key=lambda i: data["metadatas"][i].get("_global_index", float("inf"))
107
  )
108
 
109
  # shrink data to keep same structure but only one record
@@ -521,151 +518,151 @@ class SanatanDatabase:
521
 
522
  return sorted(list(values))
523
 
524
- def build_global_index_for_all_scriptures(self, force: bool = False):
525
- import pandas as pd
526
- import numpy as np
 
 
 
 
 
 
 
 
527
 
528
- logger.info("build_global_index_for_all_scriptures: started")
529
- config = SanatanConfig()
 
 
 
 
 
 
 
 
530
 
531
- for scripture in config.scriptures:
532
- scripture_name = scripture["name"]
533
- chapter_order = scripture.get("chapter_order", None)
534
- # if scripture_name != "vishnu_sahasranamam":
535
- # continue
536
- logger.info(
537
- "build_global_index_for_all_scriptures:%s: Processing", scripture_name
538
- )
539
- collection_name = scripture["collection_name"]
540
- collection = self.chroma_client.get_or_create_collection(
541
- name=collection_name
542
- )
543
- metadata_fields = scripture.get("metadata_fields", [])
544
-
545
- # Get metadata field names marked as unique
546
- unique_fields = [f["name"] for f in metadata_fields if f.get("is_unique")]
547
- if not unique_fields:
548
- if metadata_fields:
549
- unique_fields = [metadata_fields[0]["name"]]
550
- else:
551
- logger.warning(
552
- f"No metadata fields defined for {collection_name}, skipping"
553
- )
554
- continue
555
 
556
- logger.info(
557
- "build_global_index_for_all_scriptures:%s:unique fields: %s",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
558
  scripture_name,
559
- unique_fields,
560
  )
 
561
 
562
- # Build chapter_order mapping if defined
563
- chapter_order_mapping = {}
564
- for field in metadata_fields:
565
- if callable(chapter_order):
566
- chapter_order_mapping = chapter_order()
567
- logger.info(
568
- "build_global_index_for_all_scriptures:%s:chapter_order_mapping: %s",
 
569
  scripture_name,
570
- chapter_order_mapping,
571
  )
 
572
 
573
- # Fetch all records (keep embeddings for upsert)
574
- try:
575
- results = collection.get(
576
- include=["metadatas", "documents", "embeddings"]
 
 
 
 
 
 
 
577
  )
578
- except Exception as e:
579
- logger.error(
580
- "build_global_index_for_all_scriptures:%s Error getting data from chromadb",
581
- scripture_name,
582
- exc_info=True,
583
- )
584
- continue
 
 
 
 
585
 
586
- ids = results["ids"]
587
- metadatas = results["metadatas"]
588
- documents = results["documents"]
589
- embeddings = results.get("embeddings", [None] * len(ids))
590
 
591
- if not force and metadatas and "_global_index" in metadatas[0]:
592
- logger.warning(
593
- "build_global_index_for_all_scriptures:%s: global index already available. skipping collection",
594
- scripture_name,
595
- )
596
- continue
597
-
598
- # Create a DataFrame for metadata sorting
599
- df = pd.DataFrame(metadatas)
600
- df["_id"] = ids
601
- df["_doc"] = documents
602
-
603
- # Add sortable columns for each unique field
604
- for field_name in unique_fields:
605
- if field_name.lower() == "chapter" and chapter_order_mapping:
606
- # Map chapter names to their defined order
607
- df["_sort_" + field_name] = (
608
- df[field_name].map(chapter_order_mapping).fillna(np.inf)
609
- )
610
- else:
611
- # Try numeric, fallback to string lowercase
612
- def parse_val(v):
613
- if v is None:
614
- return float("inf")
615
- if isinstance(v, int):
616
- return v
617
- if isinstance(v, str):
618
- v = v.strip()
619
- return int(v) if v.isdigit() else v.lower()
620
- return str(v)
621
-
622
- df["_sort_" + field_name] = df[field_name].apply(parse_val)
623
-
624
- sort_cols = ["_sort_" + f for f in unique_fields]
625
- df = df.sort_values(by=sort_cols, kind="stable").reset_index(drop=True)
626
-
627
- # Assign global index
628
- df["_global_index"] = range(1, len(df) + 1)
629
-
630
- logger.info(
631
- "build_global_index_for_all_scriptures:%s: updating database",
632
- scripture_name,
633
- )
634
 
635
- # Batch upsert
636
- BATCH_SIZE = 5000 # safely below max batch size
637
- for i in range(0, len(df), BATCH_SIZE):
638
- batch_df = df.iloc[i : i + BATCH_SIZE]
639
- batch_ids = batch_df["_id"].tolist()
640
- batch_docs = batch_df["_doc"].tolist()
641
- batch_metas = [
642
- {k: record[k] for k in metadatas[0].keys() if k in record}
643
- | {"_global_index": record["_global_index"]}
644
- for record in batch_df.to_dict(orient="records")
645
- ]
646
- # Use original metadata keys for upsert
647
- batch_metas = [
648
- {k: record[k] for k in metadatas[0].keys() if k in record}
649
- | {"_global_index": record["_global_index"]}
650
- for record in batch_df.to_dict(orient="records")
651
- ]
652
- batch_embeds = [embeddings[idx] for idx in batch_df.index]
653
-
654
- collection.update(
655
- ids=batch_ids,
656
- # documents=batch_docs,
657
- metadatas=batch_metas,
658
- # embeddings=batch_embeds,
659
- )
660
 
661
- logger.info(
662
- "build_global_index_for_all_scriptures:%s: Updated with %d records",
663
- scripture_name,
664
- len(df),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
665
  )
666
 
 
 
 
 
 
 
 
 
 
 
 
 
 
667
  def fix_taniyans_in_divya_prabandham(self):
668
- nalayiram_helper.reorder_taniyan(self.chroma_client.get_collection("divya_prabandham"))
 
 
669
 
670
  def delete_taniyans_in_divya_prabandham(self):
671
- nalayiram_helper.delete_taniyan(self.chroma_client.get_collection("divya_prabandham"))
 
 
 
7
  from config import SanatanConfig
8
  from embeddings import get_embedding
9
  import logging
 
10
 
11
+ from metadata import MetadataWhereClause
12
  from modules.db.relevance import validate_relevance_queryresult
13
  from tqdm import tqdm
14
 
 
58
  metadata_where_clause.to_chroma_where()
59
  if metadata_where_clause is not None
60
  else None
61
+ ),
62
  )
63
  docs = data["documents"] # list of all verse texts
64
  ids = data["ids"]
 
78
  )
79
 
80
  def fetch_first_match(
81
+ self, collection_name: str, metadata_where_clause: MetadataWhereClause = None
 
 
82
  ):
83
  """This version is created to support the browse module"""
84
  logger.info(
 
93
  metadata_where_clause.to_chroma_where()
94
  if metadata_where_clause is not None
95
  else None
96
+ ),
97
  )
98
 
99
  if data["metadatas"]:
100
  # find index of record with lowest _global_index
101
  min_index = min(
102
  range(len(data["metadatas"])),
103
+ key=lambda i: data["metadatas"][i].get("_global_index", float("inf")),
104
  )
105
 
106
  # shrink data to keep same structure but only one record
 
518
 
519
  return sorted(list(values))
520
 
521
+ def build_global_index_for_scripture(self, scripture: dict, force: bool = False):
522
+ scripture_name = scripture["name"]
523
+ chapter_order = scripture.get("chapter_order", None)
524
+ # if scripture_name != "vishnu_sahasranamam":
525
+ # continue
526
+ logger.info(
527
+ "build_global_index_for_all_scriptures:%s: Processing", scripture_name
528
+ )
529
+ collection_name = scripture["collection_name"]
530
+ collection = self.chroma_client.get_or_create_collection(name=collection_name)
531
+ metadata_fields = scripture.get("metadata_fields", [])
532
 
533
+ # Get metadata field names marked as unique
534
+ unique_fields = [f["name"] for f in metadata_fields if f.get("is_unique")]
535
+ if not unique_fields:
536
+ if metadata_fields:
537
+ unique_fields = [metadata_fields[0]["name"]]
538
+ else:
539
+ logger.warning(
540
+ f"No metadata fields defined for {collection_name}, skipping"
541
+ )
542
+ return
543
 
544
+ logger.info(
545
+ "build_global_index_for_all_scriptures:%s:unique fields: %s",
546
+ scripture_name,
547
+ unique_fields,
548
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
549
 
550
+ # Build chapter_order mapping if defined
551
+ chapter_order_mapping = {}
552
+ for field in metadata_fields:
553
+ if callable(chapter_order):
554
+ chapter_order_mapping = chapter_order()
555
+ logger.info(
556
+ "build_global_index_for_all_scriptures:%s:chapter_order_mapping: %s",
557
+ scripture_name,
558
+ chapter_order_mapping,
559
+ )
560
+
561
+ # Fetch all records (keep embeddings for upsert)
562
+ try:
563
+ results = collection.get(include=["metadatas", "documents", "embeddings"])
564
+ except Exception as e:
565
+ logger.error(
566
+ "build_global_index_for_all_scriptures:%s Error getting data from chromadb",
567
  scripture_name,
568
+ exc_info=True,
569
  )
570
+ return
571
 
572
+ ids = results["ids"]
573
+ metadatas = results["metadatas"]
574
+ documents = results["documents"]
575
+ embeddings = results.get("embeddings", [None] * len(ids))
576
+
577
+ if not force and metadatas and "_global_index" in metadatas[0]:
578
+ logger.warning(
579
+ "build_global_index_for_all_scriptures:%s: global index already available. skipping collection",
580
  scripture_name,
 
581
  )
582
+ return
583
 
584
+ # Create a DataFrame for metadata sorting
585
+ df = pd.DataFrame(metadatas)
586
+ df["_id"] = ids
587
+ df["_doc"] = documents
588
+
589
+ # Add sortable columns for each unique field
590
+ for field_name in unique_fields:
591
+ if field_name.lower() == "chapter" and chapter_order_mapping:
592
+ # Map chapter names to their defined order
593
+ df["_sort_" + field_name] = (
594
+ df[field_name].map(chapter_order_mapping).fillna(np.inf)
595
  )
596
+ else:
597
+ # Try numeric, fallback to string lowercase
598
+ def parse_val(v):
599
+ if v is None:
600
+ return float("inf")
601
+ if isinstance(v, int):
602
+ return v
603
+ if isinstance(v, str):
604
+ v = v.strip()
605
+ return int(v) if v.isdigit() else v.lower()
606
+ return str(v)
607
 
608
+ df["_sort_" + field_name] = df[field_name].apply(parse_val)
 
 
 
609
 
610
+ sort_cols = ["_sort_" + f for f in unique_fields]
611
+ df = df.sort_values(by=sort_cols, kind="stable").reset_index(drop=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
612
 
613
+ # Assign global index
614
+ df["_global_index"] = range(1, len(df) + 1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
615
 
616
+ logger.info(
617
+ "build_global_index_for_all_scriptures:%s: updating database",
618
+ scripture_name,
619
+ )
620
+
621
+ # Batch upsert
622
+ BATCH_SIZE = 5000 # safely below max batch size
623
+ for i in range(0, len(df), BATCH_SIZE):
624
+ batch_df = df.iloc[i : i + BATCH_SIZE]
625
+ batch_ids = batch_df["_id"].tolist()
626
+ batch_docs = batch_df["_doc"].tolist()
627
+ batch_metas = [
628
+ {k: record[k] for k in metadatas[0].keys() if k in record}
629
+ | {"_global_index": record["_global_index"]}
630
+ for record in batch_df.to_dict(orient="records")
631
+ ]
632
+ # Use original metadata keys for upsert
633
+ batch_metas = [
634
+ {k: record[k] for k in metadatas[0].keys() if k in record}
635
+ | {"_global_index": record["_global_index"]}
636
+ for record in batch_df.to_dict(orient="records")
637
+ ]
638
+ batch_embeds = [embeddings[idx] for idx in batch_df.index]
639
+
640
+ collection.update(
641
+ ids=batch_ids,
642
+ # documents=batch_docs,
643
+ metadatas=batch_metas,
644
+ # embeddings=batch_embeds,
645
  )
646
 
647
+ logger.info(
648
+ "build_global_index_for_all_scriptures:%s: ✅ Updated with %d records",
649
+ scripture_name,
650
+ len(df),
651
+ )
652
+
653
+ def build_global_index_for_all_scriptures(self, force: bool = False):
654
+ logger.info("build_global_index_for_all_scriptures: started")
655
+ config = SanatanConfig()
656
+
657
+ for scripture in config.scriptures:
658
+ self.build_global_index_for_scripture(scripture=scripture, force=force)
659
+
660
  def fix_taniyans_in_divya_prabandham(self):
661
+ nalayiram_helper.reorder_taniyan(
662
+ self.chroma_client.get_collection("divya_prabandham")
663
+ )
664
 
665
  def delete_taniyans_in_divya_prabandham(self):
666
+ nalayiram_helper.delete_taniyan(
667
+ self.chroma_client.get_collection("divya_prabandham")
668
+ )
modules/youtube_metadata/answerer.py CHANGED
@@ -4,6 +4,7 @@
4
  from typing import List
5
  from pydantic import BaseModel
6
  from openai import OpenAI
 
7
  from modules.youtube_metadata.retriever import retrieve_videos
8
 
9
 
@@ -26,12 +27,13 @@ class LLMAnswer(BaseModel):
26
  # Main Function
27
  # -------------------------------
28
  def answer_query(
29
- query: str, collection, top_k: int = 5, channel_id: str = None
30
  ) -> LLMAnswer:
31
  """
32
  Answer a user query using YouTube video metadata.
33
  Returns an LLMAnswer object with textual answer + list of videos.
34
  """
 
35
  results = retrieve_videos(query, collection, top_k=top_k, channel_id=channel_id)
36
 
37
  if not results:
 
4
  from typing import List
5
  from pydantic import BaseModel
6
  from openai import OpenAI
7
+ from modules.youtube_metadata.db import get_youtube_metadata_collection
8
  from modules.youtube_metadata.retriever import retrieve_videos
9
 
10
 
 
27
  # Main Function
28
  # -------------------------------
29
  def answer_query(
30
+ query: str, top_k: int = 5, channel_id: str = None
31
  ) -> LLMAnswer:
32
  """
33
  Answer a user query using YouTube video metadata.
34
  Returns an LLMAnswer object with textual answer + list of videos.
35
  """
36
+ collection = get_youtube_metadata_collection()
37
  results = retrieve_videos(query, collection, top_k=top_k, channel_id=channel_id)
38
 
39
  if not results:
modules/youtube_metadata/app.py CHANGED
@@ -1,14 +1,13 @@
1
- import asyncio
2
  import os
3
  import re
4
- import threading
5
  import gradio as gr
6
  from gradio_modal import Modal
 
 
7
  from modules.youtube_metadata.downloader import export_channel_json
8
  from modules.youtube_metadata.channel_utils import fetch_channel_dataframe
9
  from modules.youtube_metadata.db import (
10
  delete_channel_from_collection,
11
- get_collection,
12
  get_indexed_channels,
13
  )
14
  from modules.youtube_metadata.answerer import answer_query
@@ -16,6 +15,11 @@ from dotenv import load_dotenv
16
 
17
  from modules.youtube_metadata.youtube_poller import start_poll
18
  from modules.youtube_metadata.youtube_sync import sync_channels_from_youtube
 
 
 
 
 
19
 
20
  load_dotenv()
21
 
@@ -89,9 +93,9 @@ def index_channels(channel_urls: str):
89
  def youtube_metadata_init(progress: gr.Progress = None):
90
  channels = (
91
  "https://www.youtube.com/@onedayonepasuram6126,"
92
- "https://www.youtube.com/@srisookthi,"
93
- "https://www.youtube.com/@learn-aksharam,"
94
- "https://www.youtube.com/@SriYadugiriYathirajaMutt,"
95
  "https://www.youtube.com/@akivasudev,"
96
  "https://www.youtube.com/@Arulicheyal_Amutham"
97
  )
@@ -102,7 +106,7 @@ def youtube_metadata_init(progress: gr.Progress = None):
102
 
103
  def refresh_all_channels():
104
  yt_api_key = os.environ["YOUTUBE_API_KEY"]
105
- channels = get_indexed_channels(get_collection())
106
 
107
  if not channels:
108
  return "⚠️ No channels available to refresh.", refresh_channel_list()
@@ -127,7 +131,7 @@ def refresh_all_channels():
127
  # Channel selection as radio
128
  # -------------------------------
129
  def list_channels_radio():
130
- channels = get_indexed_channels(get_collection())
131
  choices = []
132
  for key, val in channels.items():
133
  if isinstance(val, dict):
@@ -155,7 +159,7 @@ def delete_channel(channel_url: str):
155
  # -------------------------------
156
  def handle_query(query: str, search_channel_id: str):
157
  answer_text, video_html = answer_query(
158
- query, get_collection(), channel_id=search_channel_id, top_k=10
159
  )
160
  if not answer_text:
161
  answer_text = "No answer available."
@@ -480,15 +484,13 @@ with gr.Blocks(title="Sanatana AI - Youtube Metadata Surfer") as youtube_metadat
480
 
481
 
482
  def initialize_youtube_metadata_and_poll():
483
- # Step 1: Initialize metadata
484
  for msg in youtube_metadata_init():
485
- print(msg)
 
 
 
 
486
 
487
- # Step 2: Start polling after init
488
- start_poll() # run in the same thread
489
- # OR if you want it in a separate daemon thread:
490
- # poll_thread = threading.Thread(target=start_poll, daemon=True)
491
- # poll_thread.start()
492
 
493
  if __name__ == "__main__":
494
  initialize_youtube_metadata_and_poll()
 
 
1
  import os
2
  import re
 
3
  import gradio as gr
4
  from gradio_modal import Modal
5
+ from config import SanatanConfig
6
+ from db import SanatanDatabase
7
  from modules.youtube_metadata.downloader import export_channel_json
8
  from modules.youtube_metadata.channel_utils import fetch_channel_dataframe
9
  from modules.youtube_metadata.db import (
10
  delete_channel_from_collection,
 
11
  get_indexed_channels,
12
  )
13
  from modules.youtube_metadata.answerer import answer_query
 
15
 
16
  from modules.youtube_metadata.youtube_poller import start_poll
17
  from modules.youtube_metadata.youtube_sync import sync_channels_from_youtube
18
+ import logging
19
+
20
+ logging.basicConfig()
21
+ logger = logging.getLogger(__name__)
22
+ logger.setLevel(logging.INFO)
23
 
24
  load_dotenv()
25
 
 
93
  def youtube_metadata_init(progress: gr.Progress = None):
94
  channels = (
95
  "https://www.youtube.com/@onedayonepasuram6126,"
96
+ # "https://www.youtube.com/@srisookthi,"
97
+ # "https://www.youtube.com/@learn-aksharam,"
98
+ # "https://www.youtube.com/@SriYadugiriYathirajaMutt,"
99
  "https://www.youtube.com/@akivasudev,"
100
  "https://www.youtube.com/@Arulicheyal_Amutham"
101
  )
 
106
 
107
  def refresh_all_channels():
108
  yt_api_key = os.environ["YOUTUBE_API_KEY"]
109
+ channels = get_indexed_channels()
110
 
111
  if not channels:
112
  return "⚠️ No channels available to refresh.", refresh_channel_list()
 
131
  # Channel selection as radio
132
  # -------------------------------
133
  def list_channels_radio():
134
+ channels = get_indexed_channels()
135
  choices = []
136
  for key, val in channels.items():
137
  if isinstance(val, dict):
 
159
  # -------------------------------
160
  def handle_query(query: str, search_channel_id: str):
161
  answer_text, video_html = answer_query(
162
+ query, channel_id=search_channel_id, top_k=10
163
  )
164
  if not answer_text:
165
  answer_text = "No answer available."
 
484
 
485
 
486
  def initialize_youtube_metadata_and_poll():
 
487
  for msg in youtube_metadata_init():
488
+ logger.info("initialize_youtube_metadata_and_poll: %s", msg)
489
+ SanatanDatabase().build_global_index_for_scripture(
490
+ scripture=SanatanConfig().get_scripture_by_name("yt_metadata"), force=True
491
+ )
492
+ start_poll()
493
 
 
 
 
 
 
494
 
495
  if __name__ == "__main__":
496
  initialize_youtube_metadata_and_poll()
modules/youtube_metadata/channel_utils.py CHANGED
@@ -1,4 +1,4 @@
1
- from modules.youtube_metadata.db import get_collection
2
  import pandas as pd
3
 
4
  page_size = 10 # change if you like
@@ -8,7 +8,7 @@ page_size = 10 # change if you like
8
  # Fetch channel videos as HTML table with pagination
9
  # -------------------------------
10
  def fetch_channel_html(channel_id: str, page: int = 1, page_size: int = 10):
11
- collection = get_collection()
12
  offset = (page - 1) * page_size
13
 
14
  all_results = collection.get(
@@ -73,7 +73,7 @@ def fetch_channel_html(channel_id: str, page: int = 1, page_size: int = 10):
73
  # Fetch channel videos as HTML table with pagination
74
  # -------------------------------
75
  def fetch_channel_dataframe(channel_id: str):
76
- collection = get_collection()
77
 
78
  results = collection.get(
79
  where={"channel_id": channel_id}, include=["documents", "metadatas"]
 
1
+ from modules.youtube_metadata.db import get_youtube_metadata_collection
2
  import pandas as pd
3
 
4
  page_size = 10 # change if you like
 
8
  # Fetch channel videos as HTML table with pagination
9
  # -------------------------------
10
  def fetch_channel_html(channel_id: str, page: int = 1, page_size: int = 10):
11
+ collection = get_youtube_metadata_collection()
12
  offset = (page - 1) * page_size
13
 
14
  all_results = collection.get(
 
73
  # Fetch channel videos as HTML table with pagination
74
  # -------------------------------
75
  def fetch_channel_dataframe(channel_id: str):
76
+ collection = get_youtube_metadata_collection()
77
 
78
  results = collection.get(
79
  where={"channel_id": channel_id}, include=["documents", "metadatas"]
modules/youtube_metadata/db.py CHANGED
@@ -1,38 +1,19 @@
1
  import chromadb
2
 
3
  from config import SanatanConfig
 
4
 
5
  config = SanatanConfig()
6
  YT_METADATA_COLLECTION_NAME = config.get_collection_name(scripture_name="yt_metadata")
 
7
 
8
- def get_client():
9
- client = chromadb.PersistentClient(path=config.dbStorePath)
10
- return client
11
 
12
 
13
- def get_collection():
14
- client = get_client()
15
-
16
- # Ensure fresh collection with correct dimension
17
- try:
18
- collection = client.get_collection(YT_METADATA_COLLECTION_NAME)
19
- except Exception:
20
- collection = client.create_collection(YT_METADATA_COLLECTION_NAME)
21
-
22
- # # Check dimension mismatch
23
- # try:
24
- # # quick test query
25
- # collection.query(query_embeddings=[[0.0] * 1536], n_results=1)
26
- # except Exception:
27
- # # Delete and recreate with fresh schema
28
- # client.delete_collection("yt_metadata")
29
- # collection = client.create_collection("yt_metadata")
30
-
31
- return collection
32
-
33
-
34
- # modules/db.py
35
- def get_indexed_channels(collection=get_collection()):
36
  results = collection.get(include=["metadatas"])
37
  channels = {}
38
 
@@ -55,11 +36,11 @@ def delete_channel_from_collection(channel_id: str):
55
  # print("Deleting channel", channel_id)
56
 
57
  # print("data = ", data)
58
- get_collection().delete(where={"channel_id": channel_id})
59
 
60
 
61
  def fetch_channel_data(channel_id: str):
62
- data = get_collection().get(
63
  where={"channel_id": channel_id}, include=["embeddings", "metadatas", "documents"]
64
  )
65
  return data
 
1
  import chromadb
2
 
3
  from config import SanatanConfig
4
+ from db import SanatanDatabase
5
 
6
  config = SanatanConfig()
7
  YT_METADATA_COLLECTION_NAME = config.get_collection_name(scripture_name="yt_metadata")
8
+ db = SanatanDatabase()
9
 
10
+ def get_youtube_metadata_collection():
11
+ client = db.chroma_client
12
+ return client.get_or_create_collection(YT_METADATA_COLLECTION_NAME)
13
 
14
 
15
+ def get_indexed_channels():
16
+ collection=get_youtube_metadata_collection()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17
  results = collection.get(include=["metadatas"])
18
  channels = {}
19
 
 
36
  # print("Deleting channel", channel_id)
37
 
38
  # print("data = ", data)
39
+ get_youtube_metadata_collection().delete(where={"channel_id": channel_id})
40
 
41
 
42
  def fetch_channel_data(channel_id: str):
43
+ data = get_youtube_metadata_collection().get(
44
  where={"channel_id": channel_id}, include=["embeddings", "metadatas", "documents"]
45
  )
46
  return data
modules/youtube_metadata/indexer.py CHANGED
@@ -1,6 +1,6 @@
1
  # modules/indexer.py
2
  from typing import Dict, List
3
- from openai import OpenAI
4
  from modules.youtube_metadata.embeddings import get_embedding
5
  import logging
6
 
@@ -10,10 +10,9 @@ logger.setLevel(logging.INFO)
10
 
11
 
12
  def index_videos(
13
- videos: List[Dict], collection, channel_url: str, batch_size: int = 50
14
  ):
15
- client = OpenAI()
16
-
17
  total = len(videos)
18
  logger.info(
19
  f"index_videos: [INDEX] Starting indexing for {total} videos (channel={channel_url})"
 
1
  # modules/indexer.py
2
  from typing import Dict, List
3
+ from modules.youtube_metadata.db import get_youtube_metadata_collection
4
  from modules.youtube_metadata.embeddings import get_embedding
5
  import logging
6
 
 
10
 
11
 
12
  def index_videos(
13
+ videos: List[Dict], channel_url: str, batch_size: int = 50
14
  ):
15
+ collection = get_youtube_metadata_collection()
 
16
  total = len(videos)
17
  logger.info(
18
  f"index_videos: [INDEX] Starting indexing for {total} videos (channel={channel_url})"
modules/youtube_metadata/youtube_poller.py CHANGED
@@ -1,6 +1,6 @@
1
  from chromadb import Collection
2
  import feedparser
3
- from modules.youtube_metadata.db import get_collection, get_indexed_channels
4
  from modules.youtube_metadata.embeddings import get_embedding
5
  import logging
6
 
@@ -65,22 +65,20 @@ def filter_new_videos(videos, existing_ids):
65
  def add_to_chroma(collection: Collection, new_videos):
66
  if not new_videos:
67
  return
 
68
  collection.add(
69
- documents=[v["title"] for v in new_videos],
70
  embeddings=[get_embedding(v["title"]) for v in new_videos],
71
  metadatas=[
72
- {
73
- "video_id": v["video_id"],
74
- "channel_id": v["channel_id"],
75
- "link": v["link"],
76
- }
77
- for v in new_videos
78
  ],
79
  ids=[v["video_id"] for v in new_videos],
80
  )
81
 
82
 
83
- def incremental_update(collection, channel_id):
 
84
  existing_ids = get_existing_video_ids(collection, channel_id)
85
  latest_videos = fetch_channel_videos_rss(channel_id)
86
  new_videos = filter_new_videos(latest_videos, existing_ids)
@@ -88,10 +86,10 @@ def incremental_update(collection, channel_id):
88
  if new_videos:
89
  add_to_chroma(collection, new_videos)
90
  logger.info(
91
- f"incremental_update: Added {len(new_videos)} new videos from {channel_id}"
92
  )
93
  else:
94
- logger.info(f"incremental_uddate: No new videos for {channel_id}")
95
 
96
 
97
  def start_poll():
@@ -101,5 +99,6 @@ def start_poll():
101
 
102
  while True:
103
  for channel_id in configured_channels:
104
- incremental_update(get_collection(), channel_id)
 
105
  time.sleep(600) # 10 minutes
 
1
  from chromadb import Collection
2
  import feedparser
3
+ from modules.youtube_metadata.db import get_youtube_metadata_collection, get_indexed_channels
4
  from modules.youtube_metadata.embeddings import get_embedding
5
  import logging
6
 
 
65
  def add_to_chroma(collection: Collection, new_videos):
66
  if not new_videos:
67
  return
68
+ count = collection.count()
69
  collection.add(
70
+ documents=[f"{v['title']} - v['description']" for v in new_videos],
71
  embeddings=[get_embedding(v["title"]) for v in new_videos],
72
  metadatas=[
73
+ {**v, "_global_index": i}
74
+ for i, v in enumerate(new_videos, start=count)
 
 
 
 
75
  ],
76
  ids=[v["video_id"] for v in new_videos],
77
  )
78
 
79
 
80
+ def incremental_update(channel_id):
81
+ collection = get_youtube_metadata_collection()
82
  existing_ids = get_existing_video_ids(collection, channel_id)
83
  latest_videos = fetch_channel_videos_rss(channel_id)
84
  new_videos = filter_new_videos(latest_videos, existing_ids)
 
86
  if new_videos:
87
  add_to_chroma(collection, new_videos)
88
  logger.info(
89
+ f"youtube_poller: incremental_update: Added {len(new_videos)} new videos from {channel_id}"
90
  )
91
  else:
92
+ logger.info(f"youtube_poller: incremental_uddate: No new videos for {channel_id}")
93
 
94
 
95
  def start_poll():
 
99
 
100
  while True:
101
  for channel_id in configured_channels:
102
+ incremental_update(channel_id)
103
+ logger.info("youtube_poller: Sleeping for 10 minutes")
104
  time.sleep(600) # 10 minutes
modules/youtube_metadata/youtube_sync.py CHANGED
@@ -3,7 +3,6 @@ import gradio as gr
3
  from concurrent.futures import ThreadPoolExecutor, as_completed
4
 
5
  from modules.youtube_metadata.collector import fetch_all_channel_videos
6
- from modules.youtube_metadata.db import get_collection
7
  from modules.youtube_metadata.indexer import index_videos
8
 
9
  # global stop signal
@@ -51,7 +50,7 @@ def _refresh_single_channel(api_key, channel_url, progress):
51
 
52
  with ThreadPoolExecutor(max_workers=4) as executor:
53
  futures = [
54
- executor.submit(index_videos, batch, get_collection(), channel_url=channel_url)
55
  for _, batch in fetched_batches
56
  ]
57
 
 
3
  from concurrent.futures import ThreadPoolExecutor, as_completed
4
 
5
  from modules.youtube_metadata.collector import fetch_all_channel_videos
 
6
  from modules.youtube_metadata.indexer import index_videos
7
 
8
  # global stop signal
 
50
 
51
  with ThreadPoolExecutor(max_workers=4) as executor:
52
  futures = [
53
+ executor.submit(index_videos, batch, channel_url=channel_url)
54
  for _, batch in fetched_batches
55
  ]
56