File size: 4,665 Bytes
2db5c7d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
Utility helpers for Teleagriculture kits API

Provides:
- BASE_URL, HEADERS (with optional Bearer from KIT_API_KEY env)
- get_kit_info(kit_id)
- get_kit_measurements_df(kit_id, sensors=None, page_size=100)
"""
from __future__ import annotations

import os
from typing import Any, Iterable, Optional

import pandas as pd
import requests

# API configuration
BASE_URL = os.getenv("KITS_API_BASE", "https://kits.teleagriculture.org/api")
KIT_API_KEY = os.getenv("KIT_API_KEY")

HEADERS: dict[str, str] = {
    "Accept": "application/json",
}
if KIT_API_KEY:
    HEADERS["Authorization"] = f"Bearer {KIT_API_KEY}"


def get_kit_info(kit_id: int) -> Optional[dict]:
    """Fetch metadata for a kit (board).

    Returns the JSON 'data' object or None if not found / error.
    """
    url = f"{BASE_URL}/kits/{kit_id}"
    try:
        r = requests.get(url, headers=HEADERS, timeout=30)
        if r.status_code == 200:
            body = r.json()
            return body.get("data")
        return None
    except requests.RequestException:
        return None


def _paginate(
    url: str,
    *,
    params: Optional[dict] = None,
    headers: Optional[dict] = None,
    page_size: int = 100,
    max_pages: int = 500,
):
    """Cursor pagination helper yielding lists of items from {'data': [...]} pages.

    Stops when no next_cursor is provided or on any non-200/parse error.
    """
    q = dict(params or {})
    q["page[size]"] = str(page_size)
    cursor = None
    pages = 0
    while pages < max_pages:
        if cursor:
            q["page[cursor]"] = cursor
        try:
            r = requests.get(url, headers=headers, params=q, timeout=30)
        except requests.RequestException:
            break
        if r.status_code != 200:
            break
        try:
            payload = r.json()
        except Exception:
            break
        data = payload.get("data")
        meta = payload.get("meta", {})
        yield data if isinstance(data, list) else []
        cursor = meta.get("next_cursor")
        pages += 1
        if not cursor:
            break


def get_kit_measurements_df(
    kit_id: int,
    sensors: Optional[Iterable[str]] = None,
    *,
    page_size: int = 100,
) -> pd.DataFrame:
    """Fetch all measurements for the given kit across its sensors as a DataFrame.

    - If sensors is None, discover sensors via get_kit_info(kit_id).
    - Returns columns: kit_id, sensor, timestamp, value, unit, _raw
      (depending on API, some fields may be None/NaT)
    """
    # Determine sensor list
    if sensors is None:
        kit = get_kit_info(kit_id)
        if not kit:
            return pd.DataFrame(columns=["kit_id", "sensor", "timestamp", "value", "unit", "_raw"])
        sensor_list = [
            s.get("name")
            for s in (kit.get("sensors") or [])
            if isinstance(s, dict) and s.get("name")
        ]
    else:
        sensor_list = [s for s in sensors if s]

    rows: list[dict[str, Any]] = []

    for sname in sensor_list:
        endpoint = f"{BASE_URL}/kits/{kit_id}/{sname}/measurements"
        for page in _paginate(endpoint, headers=HEADERS, page_size=page_size):
            for item in page:
                if not isinstance(item, dict):
                    continue
                
                # Some APIs nest details under 'attributes'
                rec = item.get("attributes", {})
                rec.update({k: v for k, v in item.items() if k != "attributes"})

                ts = rec.get("timestamp") or rec.get("time") or rec.get("created_at") or rec.get("datetime")
                val = rec.get("value") or rec.get("reading") or rec.get("measurement") or rec.get("val")
                unit = rec.get("unit") or rec.get("units")
                rows.append(
                    {
                        "kit_id": kit_id,
                        "sensor": sname,
                        "timestamp": ts,
                        "value": val,
                        "unit": unit,
                        "_raw": item,  # preserve original
                    }
                )

    df = pd.DataFrame(rows)
    if not df.empty and "timestamp" in df.columns:
        try:
            df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce", utc=True)
            df = df.sort_values(["sensor", "timestamp"], kind="stable")
        except Exception:
            pass
    return df


def fetch_kit_dataframe(kit_id: int) -> pd.DataFrame:
    """Simplest API: return all measurements for the given kit as a DataFrame.

    Equivalent to get_kit_measurements_df(kit_id) with sensible defaults.
    """
    return get_kit_measurements_df(kit_id)