Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Example orchestrating the Collector using Airflow

Code Block
# built-in
import os
from datetime import datetime, timedelta

from plugins.utils.azure_blob_storage import AzureBlobStorage

# Installed
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

# TODO: configure connection
KADA_SAS_TOKEN = os.getenv("KADA_SAS_TOKEN")
KADA_CONTAINER = "kada-data"
KADA_STORAGE_ACCOUT = "kadasaasract"
KADA_BASE_FILE_NAME = "lz/dbt/Astronomer Snowflake/landing"
DBT_MANIFEST_FILE = "/usr/local/airflow/include/dbt/target/manifest.json"
DBT_CATALOG_FILE = "/usr/local/airflow/include/dbt/target/catalog.json"
DBT_DATABASE_NAME = "RACT"
DBT_PROJECT_ID = "5f04dd3ecc691a4c54e71507a357d0c3"

def dbt_metadata_loader_function():
    
    os.system('cd /usr/local/airflow/include/dbt/ && dbt compile')

    AzureBlobStorage.upload_file_sas_token(
        client=KADA_SAS_TOKEN,
        storage_account=KADA_STORAGE_ACCOUT,
        container=KADA_CONTAINER, 
        blob=f'{KADA_BASE_FILE_NAME}/{DBT_PROJECT_ID}_manifest.json', 
        local_path=DBT_MANIFEST_FILE
    )

    AzureBlobStorage.upload_file_sas_token(
        client=KADA_SAS_TOKEN,
        storage_account=KADA_STORAGE_ACCOUT,
        container=KADA_CONTAINER, 
        blob=f'{KADA_BASE_FILE_NAME}/{DBT_PROJECT_ID}_catalog.json', 
        local_path=DBT_CATALOG_FILE
    )

    AzureBlobStorage.upload_file_sas_token(
        client=KADA_SAS_TOKEN,
        storage_account=KADA_STORAGE_ACCOUT,
        container=KADA_CONTAINER, 
        blob=f'{KADA_BASE_FILE_NAME}/{DBT_PROJECT_ID}_run_results.json', 
        local_path=DBT_CATALOG_FILE
    )
    
# sourcery skip: raise-specific-error
with DAG(
    "kada_dbt_metadata_loader_dag",
    start_date=datetime(2022, 2, 1),
    max_active_runs=3,
    schedule_interval=timedelta(days=1),
    default_args={
        "owner": "airflow",
        "depends_on_past": False,
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
    },
    catchup=False,
) as dag:

    task = PythonOperator(
        task_id="dbt_metadata_loader",
        python_callable=dbt_metadata_loader_function,
        provide_context=True,
    )