Data Contracts Execution Layer

Introduction

In the realm of data processing, ensuring the integrity and quality of data is of paramount importance. One way to address this is through the implementation of data contracts, which serve as a means to verify and validate the data before it undergoes transformations. This article delves deeper into the concept of data contracts and explores their role in guaranteeing data quality. Specifically, it focuses on schema validation as a crucial aspect of data contracts, highlighting how it ensures the presence of required fields in the data table without restricting the addition of supplementary columns. The article also sheds light on the technical implementation of data contract checks within the context of a standard data processing stack comprising dbt on BigQuery scheduled by the Airflow instance. Additionally, it discusses the utilization of Great Expectations, an extensible tool, for wrapping the schema validation process, along with the incorporation of Docker containers and Kubernetes Pod Operator in the execution workflow.

This article is the third part of the series. See Part 1 for a high-level overview of Data Contracts subsystems and a gentle introduction to all of the technologies that we’re using in the implementation. Take a look at Part 2 for the implementation of the Data Contract repository and how Data Contracts are versioned and stored.

Schema validation

Schema validation is highly dependent on the schema language chosen. If your implementation uses Avro, you should consider using libraries like Python’s fastavro in order not to reinvent the wheel. We chose our custom but simple schema representation, therefore we needed to provide our own way of validating the schema.

Remember, that the goal here is to check whether all the contracted fields are present in an actual table that we’re referring to, not to make sure that both schemas are identical. This way Data Producers can freely expand on their tables without the burden of changing data contracts every time they do so.

Our schema representation uses the same concepts as the BigQuery schema. This means that it allows for repeated fields (arrays) but repeated fields cannot be repeated again directly (arrays of arrays). In order to accomplish this you need to create an array of structs that have an array as a parameter. With that being said here’s a code that does the validation:

def _compare_yaml_and_bq_schemas(expected_fields: dict, actual_schema: List[SchemaField]) -> (bool, List[str]):
    """
    Expected fields should be the YAML parsed dict.
    See `schema.yaml` for example.
    Actual schema should be the List of SchemaField, from google-cloud-bigquery.
    """
    actual_fields = {field.name: field for field in actual_schema}
    all_messages = []
    aggregated_result = True
    for field in expected_fields['fields']:
        result, messages = _is_field_valid(field, actual_fields)
        all_messages.extend(messages)
        if not result:
            aggregated_result = False

    return aggregated_result, all_messages


def _is_field_valid(expected_field: dict, actual_fields: dict) -> (bool, List[str]):
    # check if column exists by name
    if expected_field['name'] not in actual_fields:
        message = f"Field {expected_field['name']} not found.\nLooked in {actual_fields}"
        logging.error(message)
        return False, [message]
    actual_field = actual_fields[expected_field['name']]

    # check if array
    if "repeated" in expected_field and expected_field["repeated"] == "yes":
        if actual_field.mode != "REPEATED":
            message = f"Field {expected_field['name']} expected to be array but {actual_field} doesn't seem to be one."
            logging.error(message)
            return False, [message]

    # check type
    match expected_field['type']:
        case dict():
            return _compare_yaml_and_bq_schemas(expected_field['type'], actual_field.fields)
        case _:
            if expected_field['type'].upper() != actual_field.field_type.upper():
                message = f"Field {expected_field['name']} expected to be of type {expected_field['type']}. Field: {actual_field} does not match that expectation."
                logging.error(message)
                return False, [message]
    return True, []

Accepting expected_fields as a dictionary of keys and values taken directly with the help of Pyaml parsing the schema and actual_schema taken from google-cloud-bigquery Python library, you can validate, whether all fields present in the contract are also present at the table. This code returns a bool containing the overall result and a list of string messages containing errors if validation wasn’t successful.

Before you can actually validate the schema you will need to retrieve contracts from Google Storage, map the contracts to tables, and get schemas from the tables in BigQuery.

Retrieving data contracts

Here, we’re getting the latest version of the contracts. For more mature implementation, you will want to have the ability to specify the version directly (and pass it as an argument through Airflow) in case of a faulty contract is released.

def validate_table(data_product: str, table_id: str, env: str) -> (bool, List[str]):
    """
    data_product: str - Needed to know where to look for the table (which directory in contracts bucket)
    table_id: str - BigQuery fully qualified table id
    env: str - Choosing 'dev' or 'prd' contracts bucket
    """

    storage_client = storage.Client()
    bigquery_client = bigquery.Client()
    actual_schema = bigquery_client.get_table(table_id).schema

    bucket = f"astrafy-data-contracts-{env}"
    blobs = list(storage_client.list_blobs(bucket))

    if not blobs:
        raise Exception(f"No blobs were found in bucket {bucket}")

    latest_version = _get_latest_version(blobs)

    table_name = table_id.split(".")[-1].strip()
    path_part = f"{latest_version}/{data_product}/{table_name}"
    blobs_filtered = [b for b in blobs if path_part in b.id]

    if len(blobs_filtered) == 0:
        # if no schema found it means that current model is not having a contract.
        # we can change this behaviour if we enforce all models to have a contract.
        logging.warning(f"No blobs of path {path_part} found in {blobs_filtered}")
        return True, None
    elif len(blobs_filtered) > 1:
        raise Exception(
            f"More than one blob of path {path_part} found in {blobs_filtered}")

    return _validate_schema_for_blob(blobs_filtered[0], actual_schema)

You can also see we do some direct string manipulation on the path with no error checking. Since the paths are being created with a tested and automated procedure we have reduced the risk of creating wrong paths, but you might want to opt in for more precautionary measures.

Mapping data product to actual tables

In our case, data contracts are being run per data product. Each data product may contain multiple tables, but only some of them are contracted. This means that we need to have a way to communicate which of the tables should be checked. There are two general approaches that you can choose from.

The first approach is to assume that it’s the data contract’s job to define all parameters, including the data product it belongs to, what is the fully qualified table id of the table it is referring to, etc. This approach will scale well, as your data contract capabilities will grow, however, it complicates the contract itself — not only does it need to have the schema but now additional parameters that are defining what should happen with it.

The second approach would be to define logic somewhere else — it can be governed centrally, or by data producer teams in a distributed fashion, but it would lay outside of data contracts themselves. We chose this approach because it made contracts easier, which we believe is key to fostering their adoption.

This required us to create some sort of mapping, between the tables that are contracted and their respective data products. We chose a simple BigQuery table to store this information. We can update this table automatically based on the convention or manually for starters.

As far as conventions go, in dbt 1.5 scopes of models are being introduced. You can make a model public. This can be picked up by the CI pipeline (either parsing your dbt files, or manifest.json artifact) which will update this information in the table when deploying the code to an environment.

mappings_table_id = f"astrafy-gke.data_product_mappings_{self.env}.data_product_to_public_models"
query_job = self.bigquery_client.query(f"SELECT data_product, public_models FROM {mappings_table_id}")
mappings = {r["data_product"]: r["public_models"] for r in query_job}

Custom Great Expectation

As mentioned, we have already used Great Expectations and we believe it’s a great tool for data stack at every company. Great Expectations allow us to define the tests on the data itself. In this article series, we focus on schema validation, and since we are using our custom schema, we need to wrap our validation in a Custom Expectation. For this, we create Expectation Suite dynamically and add it to other suites. For more information about Great Expectations and Data Contracts see our previous post.

def _build_data_contract_suites(self, mappings):
    data_contract_expectation_suites = []
    for data_product, tables in mappings.items():
        for table in tables:
            suite = ExpectationSuite(
                expectation_suite_name=f"{data_product}.data_contract_schema.{table}",
                meta={
                    "great_expectations_version": "0.16.11",
                    "critical": True
                },
                expectations=[
                    ExpectationConfiguration(
                        expectation_type="expect_table_schema_to_match_data_contract_schema",
                        kwargs={
                            "table_name": table,
                            "data_product": data_product,
                            "env": self.env,
                            "result_format": "COMPLETE"
                        }
                    )
                ]
            )
            data_contract_expectation_suites.append(suite)
    return data_contract_expectation_suites

Great Expectations allow you to define Expectation Suites both statically as files and dynamically as part of the Python code. We add several custom arguments, such as ‘critical’ to tell us whether the failure of this check should cause failure of the whole validation task in Airflow. (And since we really care about data contracts, we set it to ‘True’).

We create one suite for each table we found a mapping for. This is also a design decision that you can consider changing. For example, you might want to create data contract suites for every contract found in the bucket or have another place where you store which of the tables should have their schema validated.

Alerting

If any of the ‘critical’ suites fail, Airflow task would also turn red and a message to the Slack channel will be posted. If you use an on-call tool, such as Pager Duty it makes sense to integrate it as well. Depending on the preference you might want to send the alert from the Python layer that has run the expectations or from the Airflow layer as a SLA miss callback.

The key takeaway is that Producers are the group that should be interested in receiving those alerts. However, it might happen that introducing such contracts will be seen as an unnecessary hurdle and Producers might be unwilling to receive such alerts. In such cases, we recommend starting with notifying the Consumers through the alerting, and then Consumers may reach out to Producers. This, although not ideal, is often a compromise that allows for the introduction of data contracts into the company. In the future, after proven successful, contracts may be deemed valuable enough to limit the freedom of Producers with the pesky alerts.

Conclusion

In conclusion, data contracts, and schema validation are essential for maintaining data integrity and quality in the realm of data processing. By using data contracts, we can verify and validate data before transformations, ensuring the presence of required fields while allowing for flexibility in adding supplementary columns. Technical implementation involves mapping contracts to tables, utilizing tools like Great Expectations for schema validation, and setting up alerting mechanisms to promptly address any validation failures. Incorporating data contracts into the data processing workflow fosters trust in the data and enables organizations to make more informed decisions based on reliable and consistent data.

If you are looking for support on Data Stack or Google Cloud solutions, feel free to reach out to us at sales@astrafy.io.