Syncing Data Between Two APIs in Python (Idempotent with Retries)
Syncing data between a source API and a destination API requires handling two major failure modes: network interruptions and duplicate data delivery. To build a production-grade sync engine, you must implement automatic retries with exponential backoff and idempotent write operations.
1. The Architecture of a Safe Sync
- Idempotency: Ensures that making the same API call multiple times has the same effect as making it once. We achieve this by using HTTP
PUTrequests to a specific resource URI (e.g.,/items/{id}) or by passing a uniqueIdempotency-Keyheader. - Retries with Backoff: Prevents sync failures due to transient network drops or rate limits (HTTP 429, 502, 503, 504) by retrying the request after an increasing delay.
- State Tracking: Keeps track of the last successfully synced record (using a cursor like
updated_at) to avoid reprocessing the entire dataset.
2. Implementation Code
This script fetches records from a source API and upserts them to a destination API. It uses requests with urllib3's robust retry adapter to handle network anomalies automatically.
import logging
import requests
from urllib3.util import Retry
from requests.adapters import HTTPAdapter
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# 1. Configure a resilient HTTP session with exponential backoff
def get_resilient_session() -> requests.Session:
session = requests.Session()
retries = Retry(
total=5, # Total number of retries
backoff_factor=1, # Wait 1s, 2s, 4s, 8s, 16s between retries
status_forcelist=[429, 500, 502, 503, 504], # Retry on rate limits and server errors
raise_on_status=False # Allow manual handling of status codes
)
adapter = HTTPAdapter(max_retries=retries)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# 2. Sync execution function
def sync_records(source_url: str, dest_url: str, auth_token: str):
session = get_resilient_session()
headers = {
"Authorization": f"Bearer {auth_token}",
"Content-Type": "application/json"
}
# Fetch data from Source API
try:
logger.info("Fetching data from source API...")
response = session.get(source_url, headers=headers, timeout=10)
response.raise_for_status()
records = response.json().get("data", [])
except requests.RequestException as e:
logger.error(f"Failed to fetch source data: {e}")
return
logger.info(f"Found {len(records)} records to sync.")
# Push data to Destination API (Idempotent Upsert)
for record in records:
record_id = record.get("id")
if not record_id:
logger.warning("Skipping record: Missing unique identifier.")
continue
# Use PUT to a specific resource path to guarantee idempotency.
# If the resource exists, it updates; if not, it creates.
target_uri = f"{dest_url}/items/{record_id}"
# Alternative: Use POST with an Idempotency-Key header
# headers["Idempotency-Key"] = f"sync-key-{record_id}"
try:
logger.info(f"Syncing record {record_id}...")
dest_response = session.put(target_uri, json=record, headers=headers, timeout=10)
if dest_response.status_code in [200, 201]:
logger.info(f"Successfully synced record {record_id}")
else:
logger.error(f"Failed to sync {record_id}: Status {dest_response.status_code} - {dest_response.text}")
except requests.RequestException as e:
# This block is reached only if all retries configured in HTTPAdapter failed
logger.critical(f"Hard failure syncing record {record_id}: {e}")
if __name__ == "__main__":
SOURCE_API = "https://api.source.com/v1/products"
DEST_API = "https://api.destination.com/v1"
API_TOKEN = "your_secure_token_here"
sync_records(SOURCE_API, DEST_API, API_TOKEN)
3. Key Design Details
- Timeout Enforcement: The
timeout=10parameter prevents the script from hanging indefinitely if an API socket opens but stops sending data. - Backoff Factor: The
backoff_factor=1calculates sleep time as{backoff factor} * (2 ** ({number of total retries} - 1)), safely spacing out retries to respect rate limits. - PUT vs POST: By targeting
/items/{record_id}with aPUTrequest, the destination API can naturally deduplicate incoming requests. If a network drop occurs after the destination processes the write but before sending the response, the subsequent retry will safely overwrite the record with identical data instead of creating a duplicate.
Need this done fast? order an integration on Kwork.
Need help with this?
I take on freelance fixes and builds in this area.