GUARDLABS
GuardLabs ยท Technical note

Syncing Data Between Two APIs in Python (Idempotent with Retries)

Syncing data between a source API and a destination API requires handling two major failure modes: network interruptions and duplicate data delivery. To build a production-grade sync engine, you must implement automatic retries with exponential backoff and idempotent write operations.

1. The Architecture of a Safe Sync

  • Idempotency: Ensures that making the same API call multiple times has the same effect as making it once. We achieve this by using HTTP PUT requests to a specific resource URI (e.g., /items/{id}) or by passing a unique Idempotency-Key header.
  • Retries with Backoff: Prevents sync failures due to transient network drops or rate limits (HTTP 429, 502, 503, 504) by retrying the request after an increasing delay.
  • State Tracking: Keeps track of the last successfully synced record (using a cursor like updated_at) to avoid reprocessing the entire dataset.

2. Implementation Code

This script fetches records from a source API and upserts them to a destination API. It uses requests with urllib3's robust retry adapter to handle network anomalies automatically.

import logging
import requests
from urllib3.util import Retry
from requests.adapters import HTTPAdapter

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

# 1. Configure a resilient HTTP session with exponential backoff
def get_resilient_session() -> requests.Session:
    session = requests.Session()
    retries = Retry(
        total=5,                          # Total number of retries
        backoff_factor=1,                 # Wait 1s, 2s, 4s, 8s, 16s between retries
        status_forcelist=[429, 500, 502, 503, 504], # Retry on rate limits and server errors
        raise_on_status=False             # Allow manual handling of status codes
    )
    adapter = HTTPAdapter(max_retries=retries)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session

# 2. Sync execution function
def sync_records(source_url: str, dest_url: str, auth_token: str):
    session = get_resilient_session()
    headers = {
        "Authorization": f"Bearer {auth_token}",
        "Content-Type": "application/json"
    }

    # Fetch data from Source API
    try:
        logger.info("Fetching data from source API...")
        response = session.get(source_url, headers=headers, timeout=10)
        response.raise_for_status()
        records = response.json().get("data", [])
    except requests.RequestException as e:
        logger.error(f"Failed to fetch source data: {e}")
        return

    logger.info(f"Found {len(records)} records to sync.")

    # Push data to Destination API (Idempotent Upsert)
    for record in records:
        record_id = record.get("id")
        if not record_id:
            logger.warning("Skipping record: Missing unique identifier.")
            continue

        # Use PUT to a specific resource path to guarantee idempotency.
        # If the resource exists, it updates; if not, it creates.
        target_uri = f"{dest_url}/items/{record_id}"
        
        # Alternative: Use POST with an Idempotency-Key header
        # headers["Idempotency-Key"] = f"sync-key-{record_id}"

        try:
            logger.info(f"Syncing record {record_id}...")
            dest_response = session.put(target_uri, json=record, headers=headers, timeout=10)
            
            if dest_response.status_code in [200, 201]:
                logger.info(f"Successfully synced record {record_id}")
            else:
                logger.error(f"Failed to sync {record_id}: Status {dest_response.status_code} - {dest_response.text}")
                
        except requests.RequestException as e:
            # This block is reached only if all retries configured in HTTPAdapter failed
            logger.critical(f"Hard failure syncing record {record_id}: {e}")

if __name__ == "__main__":
    SOURCE_API = "https://api.source.com/v1/products"
    DEST_API = "https://api.destination.com/v1"
    API_TOKEN = "your_secure_token_here"

    sync_records(SOURCE_API, DEST_API, API_TOKEN)

3. Key Design Details

  • Timeout Enforcement: The timeout=10 parameter prevents the script from hanging indefinitely if an API socket opens but stops sending data.
  • Backoff Factor: The backoff_factor=1 calculates sleep time as {backoff factor} * (2 ** ({number of total retries} - 1)), safely spacing out retries to respect rate limits.
  • PUT vs POST: By targeting /items/{record_id} with a PUT request, the destination API can naturally deduplicate incoming requests. If a network drop occurs after the destination processes the write but before sending the response, the subsequent retry will safely overwrite the record with identical data instead of creating a duplicate.

Need this done fast? order an integration on Kwork.

Published 2026-06-22 3 min read All articles EN / RU / ES
Need help with this?

I take on freelance fixes and builds in this area.