...
Code Block |
---|
# built-in import os from datetime import datetime, timedelta # Installed from airflow.operators.python_operator import PythonOperator from airflow.models.dag import DAG from airflow.operators.dummy import DummyOperator from airflow.utils.dates import days_ago from airflow.utils.task_group import TaskGroup from plugins.utils.azure_blob_storage import AzureBlobStorage # Installed from airflow import DAG from airflow.operators.python_operator import PythonOperatorfrom kada_collectors.extractors.utils import load_config, get_hwm, publish_hwm, get_generic_logger from kada_collectors.extractors.tableau import Extractor # TODO: configure connection KADA_SAS_TOKEN = os.getenv("KADA_SAS_TOKEN") KADA_CONTAINER = "kada-data" KADA_STORAGE_ACCOUTACCOUNT = "kadasaasract" KADA_BASELANDING_FILE_NAMEPATH = "lz/dbt/Astronomer Snowflake/landing" DBTKADA_MANIFESTEXTRACTOR_FILECONFIG = { "/usr/local/airflow/include/dbt/target/manifest.json" DBT_CATALOG_FILE = "/usr/local/airflow/include/dbt/target/catalog.json" DBT_DATABASE_NAME = "RACT" DBT_PROJECT_ID = "5f04dd3ecc691a4c54e71507a357d0c3" def dbt_metadata_loader_function(): os.system('cd /usr/local/airflow/include/dbt/ && dbt compile') AzureBlobStorage.upload_file_sas_token( client=KADA_SAS_TOKEN, storage_account=KADA_STORAGE_ACCOUT, container=KADA_CONTAINER, blob=f'{KADA_BASE_FILE_NAME}/{DBT_PROJECT_ID}_manifest.json',server_address": "http://tabserver", "username": "user", "password": "password", "sites": [], "db_host": "tabserver", "db_username": "repo_user", "db_password": "repo_password", "db_port": 8060, "db_name": "workgroup", "meta_only": False, "retries": 5, "dry_run": False, "output_path": "/set/to/output/path", "mask": True, "mapping": {} } def upload(): output = KADA_EXTRACTOR_CONFIG['output_path'] for filename in os.listdir(output): if filename.endswith('.csv'): localfile_to_upload_path=DBT_MANIFEST_FILE = os.path.join(output, filename) ) AzureBlobStorage.upload_file_sas_token( client=KADA_SAS_TOKEN, storage_account=KADA_STORAGE_ACCOUTACCOUNT, container=KADA_CONTAINER, blob=f'{KADA_BASELANDING_FILE_NAMEPATH}/{DBT_PROJECT_ID}_catalog.jsonfilename}', local_path=DBTfile_to_CATALOGupload_FILEpath ) ) AzureBlobStorage.upload_file_sas_token( with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag: client=KADA_SAS_TOKEN, storage_account=KADA_STORAGE_ACCOUT, # To be configure by customer depending on where you store the timestamp start_hwm container=KADA_CONTAINER, = 'YYYY-MM-DD HH:mm:SS' # Source the timestamp from the prior run. end_hwm blob=f'{KADA_BASE_FILE_NAME}/{DBT_PROJECT_ID}_run_results.json', 'YYYY-MM-DD HH:mm:SS' # timestamp now local_path=DBT_CATALOG_FILE ext = Extractor(**KADA_EXTRACTOR_CONFIG) # sourcery skip: raise-specific-error with DAG( start = DummyOperator(task_id="start") with TaskGroup("kada_dbt_metadata_loader_dagtaskgroup_1", tooltip="extract tableau and upload") as start_date=datetime(2022, 2, 1), extract_upload: max_active_runs=3, schedule_interval=timedelta(days=1), task_1 = PythonOperator( default_args={ "owner": "airflow task_id="extract_tableau", "depends_on_past": False, python_callable=ext.run, "email_on_failure": False, op_kwargs={"start_hwm": start_hwm, "emailend_on_retryhwm": False,end_hwm}, "retries": 1, provide_context=True, "retry_delay": timedelta(minutes=5), }, catchup=False, ) as dag: task_2 = PythonOperator( task_id="dbtupload_metadata_loaderextracts", python_callable=dbt_metadata_loader_function,upload, op_kwargs={}, provide_context=True, ) task_3 = DummyOperator(task_id='save_hwm') # To be implemented by the customer. Timestamp needs to be saved. end = DummyOperator(task_id='end') start >> extract_upload >> end |