File size: 4,665 Bytes
2db5c7d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
"""
Utility helpers for Teleagriculture kits API
Provides:
- BASE_URL, HEADERS (with optional Bearer from KIT_API_KEY env)
- get_kit_info(kit_id)
- get_kit_measurements_df(kit_id, sensors=None, page_size=100)
"""
from __future__ import annotations
import os
from typing import Any, Iterable, Optional
import pandas as pd
import requests
# API configuration
BASE_URL = os.getenv("KITS_API_BASE", "https://kits.teleagriculture.org/api")
KIT_API_KEY = os.getenv("KIT_API_KEY")
HEADERS: dict[str, str] = {
"Accept": "application/json",
}
if KIT_API_KEY:
HEADERS["Authorization"] = f"Bearer {KIT_API_KEY}"
def get_kit_info(kit_id: int) -> Optional[dict]:
"""Fetch metadata for a kit (board).
Returns the JSON 'data' object or None if not found / error.
"""
url = f"{BASE_URL}/kits/{kit_id}"
try:
r = requests.get(url, headers=HEADERS, timeout=30)
if r.status_code == 200:
body = r.json()
return body.get("data")
return None
except requests.RequestException:
return None
def _paginate(
url: str,
*,
params: Optional[dict] = None,
headers: Optional[dict] = None,
page_size: int = 100,
max_pages: int = 500,
):
"""Cursor pagination helper yielding lists of items from {'data': [...]} pages.
Stops when no next_cursor is provided or on any non-200/parse error.
"""
q = dict(params or {})
q["page[size]"] = str(page_size)
cursor = None
pages = 0
while pages < max_pages:
if cursor:
q["page[cursor]"] = cursor
try:
r = requests.get(url, headers=headers, params=q, timeout=30)
except requests.RequestException:
break
if r.status_code != 200:
break
try:
payload = r.json()
except Exception:
break
data = payload.get("data")
meta = payload.get("meta", {})
yield data if isinstance(data, list) else []
cursor = meta.get("next_cursor")
pages += 1
if not cursor:
break
def get_kit_measurements_df(
kit_id: int,
sensors: Optional[Iterable[str]] = None,
*,
page_size: int = 100,
) -> pd.DataFrame:
"""Fetch all measurements for the given kit across its sensors as a DataFrame.
- If sensors is None, discover sensors via get_kit_info(kit_id).
- Returns columns: kit_id, sensor, timestamp, value, unit, _raw
(depending on API, some fields may be None/NaT)
"""
# Determine sensor list
if sensors is None:
kit = get_kit_info(kit_id)
if not kit:
return pd.DataFrame(columns=["kit_id", "sensor", "timestamp", "value", "unit", "_raw"])
sensor_list = [
s.get("name")
for s in (kit.get("sensors") or [])
if isinstance(s, dict) and s.get("name")
]
else:
sensor_list = [s for s in sensors if s]
rows: list[dict[str, Any]] = []
for sname in sensor_list:
endpoint = f"{BASE_URL}/kits/{kit_id}/{sname}/measurements"
for page in _paginate(endpoint, headers=HEADERS, page_size=page_size):
for item in page:
if not isinstance(item, dict):
continue
# Some APIs nest details under 'attributes'
rec = item.get("attributes", {})
rec.update({k: v for k, v in item.items() if k != "attributes"})
ts = rec.get("timestamp") or rec.get("time") or rec.get("created_at") or rec.get("datetime")
val = rec.get("value") or rec.get("reading") or rec.get("measurement") or rec.get("val")
unit = rec.get("unit") or rec.get("units")
rows.append(
{
"kit_id": kit_id,
"sensor": sname,
"timestamp": ts,
"value": val,
"unit": unit,
"_raw": item, # preserve original
}
)
df = pd.DataFrame(rows)
if not df.empty and "timestamp" in df.columns:
try:
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce", utc=True)
df = df.sort_values(["sensor", "timestamp"], kind="stable")
except Exception:
pass
return df
def fetch_kit_dataframe(kit_id: int) -> pd.DataFrame:
"""Simplest API: return all measurements for the given kit as a DataFrame.
Equivalent to get_kit_measurements_df(kit_id) with sensible defaults.
"""
return get_kit_measurements_df(kit_id)
|