Advanced Usage
Advanced patterns for building production applications with polar-flow.
Incremental Sync
Fetch only new data since last sync using the since= parameter.
from datetime import date
from polar_flow import PolarFlow, load_token_from_file
async def sync_sleep_data(last_sync_date: str):
"""Sync sleep data since last sync date."""
token = load_token_from_file()
async with PolarFlow(access_token=token) as client:
# Fetch only data since last sync
new_sleep_data = await client.sleep.list(since=last_sync_date)
print(f"Fetched {len(new_sleep_data)} new sleep records")
for sleep in new_sleep_data:
await store_sleep_data(sleep)
return str(date.today())
# Run sync
last_sync = "2026-01-01"
new_last_sync = await sync_sleep_data(last_sync)
Bulk Data Fetching
Fetch all data types efficiently using parallel requests:
import asyncio
from polar_flow import PolarFlow
async def fetch_all_data(client: PolarFlow):
"""Fetch all available data in one session."""
# Fetch in parallel using asyncio.gather
results = await asyncio.gather(
client.sleep.list(days=28),
client.recharge.list(days=28),
client.activity.list(days=28),
client.exercises.list(),
client.cardio_load.list(days=28),
return_exceptions=True # Don't fail if one endpoint fails
)
sleep, recharge, activity, exercises, cardio = results
# Handle potential errors
return {
"sleep": sleep if not isinstance(sleep, Exception) else [],
"recharge": recharge if not isinstance(recharge, Exception) else [],
"activity": activity if not isinstance(activity, Exception) else [],
"exercises": exercises if not isinstance(exercises, Exception) else [],
"cardio_load": cardio if not isinstance(cardio, Exception) else [],
}
Rate Limit Handling
Implement sophisticated rate limit handling for production:
import asyncio
from polar_flow import PolarFlow, RateLimitError
class RateLimitedClient:
def __init__(self, client: PolarFlow):
self.client = client
self.max_retries = 3
async def fetch_with_backoff(self, coro):
"""Execute coroutine with exponential backoff on rate limits."""
for attempt in range(self.max_retries):
try:
return await coro
except RateLimitError as e:
if attempt == self.max_retries - 1:
raise
wait_time = e.retry_after * (2 ** attempt)
print(f"Rate limited. Waiting {wait_time}s...")
await asyncio.sleep(wait_time)
# Use it
async with PolarFlow(access_token=token) as client:
rl_client = RateLimitedClient(client)
sleep_data = await rl_client.fetch_with_backoff(
client.sleep.list(days=28)
)
Biosensing Data
Fetch and process biosensing data (requires compatible device):
from polar_flow import PolarFlow
async def fetch_biosensing_data(client: PolarFlow):
"""Fetch all biosensing metrics."""
# SpO2 - Blood oxygen
spo2_data = await client.biosensing.get_spo2()
for s in spo2_data:
print(f"SpO2: {s.blood_oxygen_percent}% at {s.test_time}")
print(f" Class: {s.spo2_class}")
print(f" HR: {s.average_heart_rate_bpm} bpm")
print(f" HRV: {s.heart_rate_variability_ms} ms")
# ECG - Electrocardiogram
ecg_data = await client.biosensing.get_ecg()
for e in ecg_data:
print(f"ECG at {e.test_time}")
print(f" HR: {e.average_heart_rate_bpm} bpm")
print(f" HRV: {e.heart_rate_variability_ms} ms ({e.heart_rate_variability_level})")
print(f" RRI: {e.rri_ms} ms")
if e.samples:
print(f" Samples: {len(e.samples)} waveform points")
# Body temperature
body_temp = await client.biosensing.get_body_temperature()
for t in body_temp:
print(f"Body temp: {t.start_time} - {t.end_time}")
print(f" Avg: {t.avg_temperature:.1f}°C")
print(f" Min: {t.min_temperature:.1f}°C")
print(f" Max: {t.max_temperature:.1f}°C")
# Skin temperature (nightly)
skin_temp = await client.biosensing.get_skin_temperature()
for t in skin_temp:
print(f"Skin temp ({t.sleep_date}): {t.sleep_time_skin_temperature_celsius:.1f}°C")
if t.deviation_from_baseline_celsius:
print(f" Deviation: {t.deviation_from_baseline_celsius:+.1f}°C")
Data Export Patterns
Export to JSON
import json
from pathlib import Path
from polar_flow import PolarFlow
async def export_to_json(client: PolarFlow, output_path: Path):
"""Export all data to JSON file."""
sleep_data = await client.sleep.list(days=28)
# Convert Pydantic models to dicts
data = [sleep.model_dump() for sleep in sleep_data]
output_path.write_text(json.dumps(data, indent=2, default=str))
print(f"Exported {len(data)} records to {output_path}")
Export to CSV
import csv
from pathlib import Path
from polar_flow import PolarFlow
async def export_exercises_to_csv(client: PolarFlow, output_path: Path):
"""Export exercises to CSV file."""
exercises = await client.exercises.list()
with output_path.open("w", newline="") as f:
writer = csv.writer(f)
writer.writerow([
"Date", "Sport", "Duration (min)",
"Calories", "Distance (km)", "Avg HR"
])
for ex in exercises:
writer.writerow([
ex.start_time.date() if ex.start_time else "",
ex.sport,
ex.duration_minutes or 0,
ex.calories or 0,
ex.distance_km or 0,
ex.average_heart_rate or 0,
])
print(f"Exported {len(exercises)} exercises to {output_path}")
Scheduled Sync Jobs
Run periodic syncs with APScheduler:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from polar_flow import PolarFlow, load_token_from_file
import asyncio
async def sync_job():
"""Sync all data from Polar API."""
token = load_token_from_file()
async with PolarFlow(access_token=token) as client:
sleep = await client.sleep.list(days=7)
print(f"Synced {len(sleep)} sleep records")
recharge = await client.recharge.list(days=7)
print(f"Synced {len(recharge)} recharge records")
cardio = await client.cardio_load.list(days=7)
print(f"Synced {len(cardio)} cardio load records")
# Setup scheduler
scheduler = AsyncIOScheduler()
scheduler.add_job(sync_job, "interval", hours=1)
scheduler.start()
# Keep running
await asyncio.Event().wait()
Database Integration
Store data in SQLite:
import aiosqlite
from polar_flow import PolarFlow, load_token_from_file
async def store_sleep_data(db_path: str):
"""Fetch and store sleep data in SQLite."""
token = load_token_from_file()
async with aiosqlite.connect(db_path) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS sleep (
date TEXT PRIMARY KEY,
sleep_score INTEGER,
total_sleep_seconds INTEGER,
deep_sleep_seconds INTEGER,
rem_sleep_seconds INTEGER,
hrv_avg REAL
)
""")
async with PolarFlow(access_token=token) as client:
sleep_data = await client.sleep.list(days=28)
for sleep in sleep_data:
await db.execute("""
INSERT OR REPLACE INTO sleep VALUES (?, ?, ?, ?, ?, ?)
""", (
sleep.date,
sleep.sleep_score,
sleep.total_sleep_seconds,
sleep.deep_sleep_seconds,
sleep.rem_sleep_seconds,
getattr(sleep, 'hrv_avg', None),
))
await db.commit()
print(f"Stored {len(sleep_data)} sleep records")
Error Recovery
Implement robust error recovery:
from polar_flow import PolarFlow, PolarFlowError, NotFoundError
import logging
logger = logging.getLogger(__name__)
async def sync_with_recovery(client: PolarFlow, dates: list[str]):
"""Sync data with error recovery."""
successful = []
failed = []
for date in dates:
try:
sleep = await client.sleep.get(user_id="self", date=date)
successful.append(date)
await store_sleep(sleep)
except NotFoundError:
logger.info(f"No data for {date}")
# Not an error - just no data
except PolarFlowError as e:
logger.error(f"Failed to fetch {date}: {e}")
failed.append(date)
print(f"Success: {len(successful)}, Failed: {len(failed)}")
return successful, failed
Testing
Mock the Polar API for testing:
import pytest
from polar_flow import PolarFlow
@pytest.mark.asyncio
async def test_sleep_fetch(httpx_mock):
"""Test sleep data fetching."""
httpx_mock.add_response(
url="https://www.polaraccesslink.com/v3/users/sleep",
json={
"nights": [{
"date": "2026-01-09",
"sleep_score": 85,
"total_sleep_seconds": 28800,
}]
}
)
async with PolarFlow(access_token="test_token") as client:
sleep = await client.sleep.list(days=1)
assert len(sleep) == 1
assert sleep[0].sleep_score == 85
Performance Tips
- Use
since=parameter for incremental syncs instead of fetching all data - Batch operations with
asyncio.gather()when fetching multiple endpoints - Handle rate limits proactively - don't retry immediately
- Cache token in memory if making multiple client instances
- Use connection pooling - the async context manager handles this automatically
Security Best Practices
- Never hardcode tokens - use environment variables or secure vaults
- Rotate tokens regularly - re-authenticate periodically
- Store tokens securely - use proper file permissions (600)
- Don't log tokens - redact sensitive data from logs
- Use HTTPS only - the client enforces this by default
Device Compatibility
Not all Polar devices support all endpoints:
| Feature | Vantage V3 | Vantage V2 | Grit X Pro | Ignite 3 | Pacer Pro |
|---|---|---|---|---|---|
| Sleep | ✅ | ✅ | ✅ | ✅ | ✅ |
| Recharge | ✅ | ✅ | ✅ | ✅ | ✅ |
| Cardio Load | ✅ | ✅ | ✅ | ✅ | ✅ |
| SpO2 | ✅ | ❌ | ❌ | ❌ | ❌ |
| ECG | ✅ | ❌ | ❌ | ❌ | ❌ |
| Temperature | ✅ | ❌ | ❌ | ❌ | ❌ |
Biosensing features (SpO2, ECG, Temperature) require the Elixir sensor platform found in Vantage V3.