...
Example orchestrating the Collector using Airflow
Code Block |
---|
# built-in
import os
from datetime import datetime, timedelta
from plugins.utils.azure_blob_storage import AzureBlobStorage
# Installed
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# TODO: configure connection
KADA_SAS_TOKEN = os.getenv("KADA_SAS_TOKEN")
KADA_CONTAINER = "kada-data"
KADA_STORAGE_ACCOUT = "kadasaasract"
KADA_BASE_FILE_NAME = "lz/dbt/Astronomer Snowflake/landing"
DBT_MANIFEST_FILE = "/usr/local/airflow/include/dbt/target/manifest.json"
DBT_CATALOG_FILE = "/usr/local/airflow/include/dbt/target/catalog.json"
DBT_DATABASE_NAME = "RACT"
DBT_PROJECT_ID = "5f04dd3ecc691a4c54e71507a357d0c3"
def dbt_metadata_loader_function():
os.system('cd /usr/local/airflow/include/dbt/ && dbt compile')
AzureBlobStorage.upload_file_sas_token(
client=KADA_SAS_TOKEN,
storage_account=KADA_STORAGE_ACCOUT,
container=KADA_CONTAINER,
blob=f'{KADA_BASE_FILE_NAME}/{DBT_PROJECT_ID}_manifest.json',
local_path=DBT_MANIFEST_FILE
)
AzureBlobStorage.upload_file_sas_token(
client=KADA_SAS_TOKEN,
storage_account=KADA_STORAGE_ACCOUT,
container=KADA_CONTAINER,
blob=f'{KADA_BASE_FILE_NAME}/{DBT_PROJECT_ID}_catalog.json',
local_path=DBT_CATALOG_FILE
)
AzureBlobStorage.upload_file_sas_token(
client=KADA_SAS_TOKEN,
storage_account=KADA_STORAGE_ACCOUT,
container=KADA_CONTAINER,
blob=f'{KADA_BASE_FILE_NAME}/{DBT_PROJECT_ID}_run_results.json',
local_path=DBT_CATALOG_FILE
)
# sourcery skip: raise-specific-error
with DAG(
"kada_dbt_metadata_loader_dag",
start_date=datetime(2022, 2, 1),
max_active_runs=3,
schedule_interval=timedelta(days=1),
default_args={
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
catchup=False,
) as dag:
task = PythonOperator(
task_id="dbt_metadata_loader",
python_callable=dbt_metadata_loader_function,
provide_context=True,
) |