Introduction

any teams leveraging the open-source edition of Airbyte to manage their data pipelines face a common obstacle when integrating it with Apache Airflow: the Airbyte Operator provided by Airflow expects authentication (client ID and secret). Unfortunately, this authentication isn’t available in the Airbyte OSS (Open Source Software) edition. This means using the official Airbyte Operator directly is impossible unless authentication is enabled (which is impractical) or you’re patient enough to wait for an update to the operator to be made (feel free to collaborate!).

In this article, we’ll explain clearly how we’ve overcome this limitation here at Astrafy by using Airflow’s native Python operators. By following our guide, you’ll be able to trigger and monitor your Airbyte sync jobs reliably without modifying Airbyte itself.

Understanding the Problem

Currently, the official Airbyte Operator provided by Airflow requires:

  • Client ID

  • Client Secret

However, Airbyte OSS does not provide these credentials since authentication is not enabled by default. As a result, many users face errors or blocked workflows.

Screenshot of the Airflow documentation page describing the Airbyte connection parameters. It details fields such as "Host," "Token URL," "Client ID," and "Client Secret," showing required and optional configuration settings for integrating Airflow with Airbyte.

This limitation has been documented extensively in the Airbyte and Airflow communities:

Security overview

The community version of Airbyte settings related to authentication is typically managed by Keycloak, an identity and access management solution bundled with Airbyte by default, however, in our setup, we’ve explicitly disabled the bundled Keycloak service.

Instead, we rely on Google’s Identity-Aware Proxy (IAP). IAP ensures that only authenticated users and services within our organization can reach Airbyte. Specifically, once a request passes IAP authentication, the request reaching Airbyte’s API is automatically accepted since there is no internal authorization layer.

Moreover, because both Airflow and Airbyte reside within the same Kubernetes cluster (specifically, on our Google Kubernetes Engine (GKE)), we don’t have additional restrictions or network policies blocking internal cluster traffic. This allows Airflow to communicate directly with Airbyte’s API via the local DNS name without traversing external networks, thus enhancing both security and network efficiency.

In practice, Airflow makes requests directly to Airbyte’s internal DNS endpoint, such as:

http://airbyte-airbyte-webapp-svc.airbyte.svc.cluster.local/api/public/v1/

Diagram of a cloud architecture with users accessing services through an Identity-Aware Proxy and Global HTTPS Load Balancer, connected to a Kubernetes (GKE) cluster protected by Istio. The cluster hosts Airbyte and Airflow namespaces, deployed via Helm charts. Infrastructure is provisioned with Terraform and GitLab, overseen by an illustrated octopus at the bottom.

Our Solution: Airflow Python Operators

Instead of relying on the official Airbyte Operator, we propose using Airflow’s built-in Python operators to directly interact with Airbyte’s REST API through the request library. We’ll create a custom method that bypasses the need for authentication entirely while providing full control and reliability.

The main API endpoints you’ll use are:

  • Trigger Job: /api/v1/connections/sync

  • Job Status: /api/v1/jobs/get

It works by:

  1. Triggering Airbyte jobs.

  • Preventing another sync from starting until it finishes the current one

2. Constantly monitoring the jobs synced

Step-by-Step Implementation Guide

Step 1: Setup Airflow variables

The connection ID is a unique identifier for each synchronization task you set up in Airbyte. (UI → URL → Connection). The URL in your browser will include the connection ID, typically formatted as shown:

https://airbyte.your-domain.com/workspaces/{workspace_id}/connections/{connection_id}.

Extract the {connection_id} from this URL and store it securely as an Airflow Variable.

The Airbyte API endpoint path for jobs is what we’ll use to interact with Airbyte’s jobs (triggering, checking statuses, etc.). The URL is typically formatted as shown:

http://airbyte-airbyte-webapp-svc.airbyte.svc.cluster.local/api/public/v1/jobs/

Screenshot of the Airflow UI on the Variables management page, showing how to navigate from the "Admin" menu to "Variables." The page lists Airflow variables including entries like "AIRBYTE_API_URL" and "AIRBYTE_{NAME}_CONN_ID," each marked as encrypted.


Step 2: Triggering a Sync Job

Implement a trigger_airbyte_sync() function in your DAG to initiate the Airbyte sync job (in this example we are using two different connections to demonstrate you can use as many as you want)

def trigger_airbyte_syncs(**context):
    """Trigger both ECI and SAAS Airbyte syncs and return their job IDs"""
    
    url = Variable.get("AIRBYTE_API_URL")
    eci_connection_id = Variable.get("AIRBYTE_FIRST_CONN_ID")
    saas_connection_id = Variable.get("AIRBYTE_SAAS_CONN_ID")
    
    headers = {
        "accept": "application/json",
        "content-type": "application/json"
    }
    
    job_ids = {}
    
    # Trigger ECI sync
    eci_payload = {
        "jobType": "sync",
        "connectionId": eci_connection_id
    }
    
    eci_retry_count = 0
    eci_max_retries = 3
    eci_success = False
    
    while not eci_success and eci_retry_count < eci_max_retries:
        eci_response = requests.post(url, json=eci_payload, headers=headers)
        
        if eci_response.status_code == 409:
            eci_retry_count += 1
            print(f"ECI Sync conflict (409): A sync is already running! Retry attempt {eci_retry_count}/{eci_max_retries}")
            
            if eci_retry_count < eci_max_retries:
                print(f"Waiting 90 seconds before retrying ECI sync...")
                time.sleep(90)
            else:
                print("Maximum retries reached for ECI sync. Moving on.")
        elif eci_response.status_code in [200, 201]:
            eci_success = True
            try:
                eci_json = eci_response.json()
                eci_job_id = eci_json.get('jobId')
                if eci_job_id:
                    print(f"Successfully created ECI job with ID: {eci_job_id}")
                    job_ids['eci_job_id'] = eci_job_id
            except Exception as e:
                print(f"Error parsing ECI response: {e}")
        else:
            print(f"ECI Sync Status: {eci_response.status_code} - {eci_response.text}")
            break
    
    # Trigger SAAS sync
    saas_payload = {
        "jobType": "sync",
        "connectionId": saas_connection_id
    }
    
    saas_retry_count = 0
    saas_max_retries = 3
    saas_success = False
    
    while not saas_success and saas_retry_count < saas_max_retries:
        saas_response = requests.post(url, json=saas_payload, headers=headers)
        
        if saas_response.status_code == 409:
            saas_retry_count += 1
            print(f"SAAS Sync conflict (409): A sync is already running! Retry attempt {saas_retry_count}/{saas_max_retries}")
            
            if saas_retry_count < saas_max_retries:
                print(f"Waiting 60 seconds before retrying SAAS sync...")
                time.sleep(90)
            else:
                print("Maximum retries reached for SAAS sync. Moving on.")
        elif saas_response.status_code in [200, 201]:
            saas_success = True
            try:
                saas_json = saas_response.json()
                saas_job_id = saas_json.get('jobId')
                if saas_job_id:
                    print(f"Successfully created SAAS job with ID: {saas_job_id}")
                    job_ids['saas_job_id'] = saas_job_id
            except Exception as e:
                print(f"Error parsing SAAS response: {e}")
        else:
            print(f"SAAS Sync Status: {saas_response.status_code} - {saas_response.text}")
            break  # Exit the retry loop for non-409 errors
    
    context['ti'].xcom_push(key='airbyte_job_ids', value=job_ids)
    
    if 'eci_job_id' in job_ids and 'saas_job_id' in job_ids:
        return job_ids
    else:
        if not job_ids:
            raise Exception("Failed to create any Airbyte sync jobs")
        else:
            print(f"Warning: Only created some jobs: {job_ids}")
            return job_ids


Step 3: Polling Job Status

Implement a trigger_airbyte_sync() function in your DAG to initiate the Airbyte sync job:

def check_airbyte_sync_status(**context):
    """Check status of Airbyte sync jobs and wait until they're complete"""
    
    url_base = Variable.get("AIRBYTE_SYNC_STATUS_URL") 
    
    headers = {
        "accept": "application/json"
    }
    
    job_ids = context['ti'].xcom_pull(task_ids='trigger_airbyte_syncs', key='airbyte_job_ids')
    
    if not job_ids:
        raise Exception("No job IDs found from sync task")
    
    max_checks = 5
    check_interval = 300 
    
    for job_type, job_id in job_ids.items():
        job_complete = False
        check_count = 0
        
        print(f"Checking status for {job_type} job: {job_id}")
        
        while not job_complete and check_count < max_checks:
            check_count += 1
        
            job_url = f"{url_base}/{job_id}"
            response = requests.get(job_url, headers=headers)

            if response.status_code == 200:
                try:
                    job_info = response.json()
                    status = job_info.get('status')

                    print(f"Job {job_id} status: {status} (check {check_count})")

                    if status == "succeeded":
                        job_complete = True
                        print(f"Job {job_id} completed successfully")
                    elif status in ["failed", "cancelled", "error"]:
                        raise Exception(f"Job {job_id} {status}: {job_info}")
                    else:
                        time.sleep(check_interval)
                except Exception as e:
                    print(f"Error parsing job status response: {e}")
                    raise
            else:
                print(f"Failed to get job status: {response.status_code} - {response.text}")
                raise Exception(f"Failed to get job status for {job_id}")
        
    return True


Step 4: DAG

With all the above steps implemented, the DAG should look like this:

with DAG(
     dag_id="Medium Article",
     start_date=datetime.datetime(2025, 1, 1),
     schedule="@daily",
 ): 

   # Task 1: Trigger Airbyte syncs
    sync_task = PythonOperator(
        task_id='trigger_airbyte_syncs',
        python_callable=trigger_airbyte_syncs,
        provide_context=True,
    )
    
    # Task 2: Wait for sync completion
    wait_for_sync = PythonOperator(
        task_id='wait_for_airbyte_syncs',
        python_callable=check_airbyte_sync_status,
        provide_context=True,
    )

    # Task 3: Whatever other task you may have
    skip_vertex_ai = DummyOperator(
        task_id="skip_vertex_ai"
        )

    # Task dependencies
    sync_task >> wait_for_sync >> skip_vertex_ai


Handling Errors and Retries

We implemented robust error handling with Airflow’s built-in retries:

  • Set the retries and retry_delay parameters in your operators.

  • Incorporate try-except logic to catch different HTTP response codes and raise appropriate exceptions for pipeline visibility.

Conclusion & Next Steps

By implementing Airflow’s Python operators, you gain a flexible, reliable integration between Airbyte OSS and Airflow without the overhead or limitations of authentication. This approach:

  • Eliminates the need for Airbyte modifications.

  • Maintains security by limiting access via network controls rather than API credentials.

  • Provides transparency and control through Airflow’s robust orchestration tools.

We encourage you to explore this straightforward yet powerful integration approach in your own pipelines.

def trigger_airbyte_syncs(**context):
    """Trigger both ECI and SAAS Airbyte syncs and return their job IDs"""
    
    url = Variable.get("AIRBYTE_API_URL")
    eci_connection_id = Variable.get("AIRBYTE_FIRST_CONN_ID")
    saas_connection_id = Variable.get("AIRBYTE_SAAS_CONN_ID")
    
    headers = {
        "accept": "application/json",
        "content-type": "application/json"
    }
    
    job_ids = {}
    
    # Trigger ECI sync
    eci_payload = {
        "jobType": "sync",
        "connectionId": eci_connection_id
    }
    
    eci_retry_count = 0
    eci_max_retries = 3
    eci_success = False
    
    while not eci_success and eci_retry_count < eci_max_retries:
        eci_response = requests.post(url, json=eci_payload, headers=headers)
        
        if eci_response.status_code == 409:
            eci_retry_count += 1
            print(f"ECI Sync conflict (409): A sync is already running! Retry attempt {eci_retry_count}/{eci_max_retries}")
            
            if eci_retry_count < eci_max_retries:
                print(f"Waiting 90 seconds before retrying ECI sync...")
                time.sleep(90)
            else:
                print("Maximum retries reached for ECI sync. Moving on.")
        elif eci_response.status_code in [200, 201]:
            eci_success = True
            try:
                eci_json = eci_response.json()
                eci_job_id = eci_json.get('jobId')
                if eci_job_id:
                    print(f"Successfully created ECI job with ID: {eci_job_id}")
                    job_ids['eci_job_id'] = eci_job_id
            except Exception as e:
                print(f"Error parsing ECI response: {e}")
        else:
            print(f"ECI Sync Status: {eci_response.status_code} - {eci_response.text}")
            break
    
    # Trigger SAAS sync
    saas_payload = {
        "jobType": "sync",
        "connectionId": saas_connection_id
    }
    
    saas_retry_count = 0
    saas_max_retries = 3
    saas_success = False
    
    while not saas_success and saas_retry_count < saas_max_retries:
        saas_response = requests.post(url, json=saas_payload, headers=headers)
        
        if saas_response.status_code == 409:
            saas_retry_count += 1
            print(f"SAAS Sync conflict (409): A sync is already running! Retry attempt {saas_retry_count}/{saas_max_retries}")
            
            if saas_retry_count < saas_max_retries:
                print(f"Waiting 60 seconds before retrying SAAS sync...")
                time.sleep(90)
            else:
                print("Maximum retries reached for SAAS sync. Moving on.")
        elif saas_response.status_code in [200, 201]:
            saas_success = True
            try:
                saas_json = saas_response.json()
                saas_job_id = saas_json.get('jobId')
                if saas_job_id:
                    print(f"Successfully created SAAS job with ID: {saas_job_id}")
                    job_ids['saas_job_id'] = saas_job_id
            except Exception as e:
                print(f"Error parsing SAAS response: {e}")
        else:
            print(f"SAAS Sync Status: {saas_response.status_code} - {saas_response.text}")
            break  # Exit the retry loop for non-409 errors
    
    context['ti'].xcom_push(key='airbyte_job_ids', value=job_ids)
    
    if 'eci_job_id' in job_ids and 'saas_job_id' in job_ids:
        return job_ids
    else:
        if not job_ids:
            raise Exception("Failed to create any Airbyte sync jobs")
        else:
            print(f"Warning: Only created some jobs: {job_ids}")
            return job_ids

Thank you

If you enjoyed reading this article, stay tuned as we regularly publish technical articles on Airbyte, Airflow, and Modern Data Stack solutions.

👉 Follow Astrafy on LinkedIn or Medium, and to be notified of the next article.

👉 Looking for support with a Modern Data Stack approach or Google Cloud solutions? Reach out to us at sales@astrafy.io.