Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
# built-in
import os
from datetime import datetime, timedelta
# Installed
from airflow.operators.python_operator import PythonOperator
from airflow.models.dag import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

from plugins.utils.azure_blob_storage import AzureBlobStorage

# Installed
from airflow import DAG
from airflow.operators.python_operator import PythonOperatorfrom kada_collectors.extractors.utils import load_config, get_hwm, publish_hwm, get_generic_logger
from kada_collectors.extractors.tableau import Extractor

# TODO: configure connection
KADA_SAS_TOKEN = os.getenv("KADA_SAS_TOKEN")
KADA_CONTAINER = "kada-data"
KADA_STORAGE_ACCOUTACCOUNT = "kadasaasract"
KADA_BASELANDING_FILE_NAMEPATH = "lz/dbt/Astronomer Snowflake/landing"
DBTKADA_MANIFESTEXTRACTOR_FILECONFIG = {
    "/usr/local/airflow/include/dbt/target/manifest.json"
DBT_CATALOG_FILE = "/usr/local/airflow/include/dbt/target/catalog.json"
DBT_DATABASE_NAME = "RACT"
DBT_PROJECT_ID = "5f04dd3ecc691a4c54e71507a357d0c3"

def dbt_metadata_loader_function():
    
    os.system('cd /usr/local/airflow/include/dbt/ && dbt compile')

    AzureBlobStorage.upload_file_sas_token(
        client=KADA_SAS_TOKEN,
        storage_account=KADA_STORAGE_ACCOUT,
        container=KADA_CONTAINER, 
        blob=f'{KADA_BASE_FILE_NAME}/{DBT_PROJECT_ID}_manifest.json',server_address": "http://tabserver",
    "username": "user",
    "password": "password",
    "sites": [],
    "db_host": "tabserver",
    "db_username": "repo_user",
    "db_password": "repo_password",
    "db_port": 8060,
    "db_name": "workgroup",
    "meta_only": False,
    "retries": 5,
    "dry_run": False,
    "output_path": "/set/to/output/path",
    "mask": True,
    "mapping": {}
}


def upload():
  output = KADA_EXTRACTOR_CONFIG['output_path']
  for filename in os.listdir(output):
      if filename.endswith('.csv'):
         localfile_to_upload_path=DBT_MANIFEST_FILE
 = os.path.join(output, filename)

  )      AzureBlobStorage.upload_file_sas_token(
            client=KADA_SAS_TOKEN,
            storage_account=KADA_STORAGE_ACCOUTACCOUNT,
            container=KADA_CONTAINER, 
            blob=f'{KADA_BASELANDING_FILE_NAMEPATH}/{DBT_PROJECT_ID}_catalog.jsonfilename}', 
            local_path=DBTfile_to_CATALOGupload_FILEpath
     )   )

 AzureBlobStorage.upload_file_sas_token(
 with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
  
    client=KADA_SAS_TOKEN,
        storage_account=KADA_STORAGE_ACCOUT,
  # To be configure by customer depending on where you store the timestamp
    start_hwm container=KADA_CONTAINER, 
  = 'YYYY-MM-DD HH:mm:SS' # Source the timestamp from the prior run.
    end_hwm blob=f'{KADA_BASE_FILE_NAME}/{DBT_PROJECT_ID}_run_results.json', 
  'YYYY-MM-DD HH:mm:SS' # timestamp now
    
 local_path=DBT_CATALOG_FILE   ext = Extractor(**KADA_EXTRACTOR_CONFIG)
    
 # sourcery skip: raise-specific-error
with DAG( start = DummyOperator(task_id="start")

    with TaskGroup("kada_dbt_metadata_loader_dagtaskgroup_1", tooltip="extract tableau and upload") as start_date=datetime(2022, 2, 1),
extract_upload:
   max_active_runs=3,     schedule_interval=timedelta(days=1),
 task_1 = PythonOperator(
  default_args={         "owner": "airflow task_id="extract_tableau",
        "depends_on_past": False,   python_callable=ext.run, 
    "email_on_failure": False,       op_kwargs={"start_hwm": start_hwm, "emailend_on_retryhwm": False,end_hwm},
          "retries": 1,
  provide_context=True,
       "retry_delay": timedelta(minutes=5),
    },
    catchup=False,
) as dag:      task_2 = PythonOperator(
            task_id="dbtupload_metadata_loaderextracts",
            python_callable=dbt_metadata_loader_function,upload, 
            op_kwargs={},
            provide_context=True,
        )

        task_3 = DummyOperator(task_id='save_hwm') 
          # To be implemented by the customer. Timestamp needs to be saved.

    end = DummyOperator(task_id='end')

    start >> extract_upload >> end