Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Excerpt
namelanding

When using a Collector you will push metadata to a K landing directory.

To find your landing directory you will need to

  1. Go to Platform Settings - Settings. Note down the value of this setting

    1. If using Azure: storage_azure_storage_account

    2. if using AWS:

      1. storage_root_folder - the AWS s3 bucket

      2. storage_aws_region - the region where the AWS s3 bucket is hosted

  2. Go to Sources - Edit the Source you have configured. Note down the landing directory in the About this Source section

To connect to the landing directory you will need

  • If using Azure: a SAS token to push data to the landing directory. Request this from KADA Support (support@kada.ai)

  • if using AWS:

    • an Access key and Secret. Request this from KADA Support (support@kada.ai)

    • OR provide your IAM role to KADA Support to provision access.

Excerpt

namesample
Airflow Example

Excerpt
nameAirflow

The following example is how you can orchestrate the Tableau collector using Airflow and push the files to K hosted on Azure. The code is not expected to be used as-is but as a template for your own DAG.

Code Block
languagepy
# built-in
import os

# Installed
from airflow.operators.python_operator import PythonOperator
from airflow.models.dag import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

from plugins.utils.azure_blob_storage import AzureBlobStorage

from kada_collectors.extractors.utils import load_config, get_hwm, publish_hwm, get_generic_logger
from kada_collectors.extractors.tableau import Extractor

# To be configed by the customer.
# Note variables may change if using a different object store.
KADA_SAS_TOKEN = os.getenv("KADA_SAS_TOKEN")
KADA_CONTAINER = ""
KADA_STORAGE_ACCOUNT = ""
KADA_LANDING_PATH = "lz/tableau/landing"
KADA_EXTRACTOR_CONFIG = {
    "server_address": "http://tabserver",
    "username": "user",
    "password": "password",
    "sites": [],
    "db_host": "tabserver",
    "db_username": "repo_user",
    "db_password": "repo_password",
    "db_port": 8060,
    "db_name": "workgroup",
    "meta_only": False,
    "retries": 5,
    "dry_run": False,
    "output_path": "/set/to/output/path",
    "mask": True,
    "mapping": {}
}

# To be implemented by the customer. 
# Upload to your landing zone storage.
def upload():
  output = KADA_EXTRACTOR_CONFIG['output_path']
  for filename in os.listdir(output):
      if filename.endswith('.csv'):
        file_to_upload_path = os.path.join(output, filename)

        AzureBlobStorage.upload_file_sas_token(
            client=KADA_SAS_TOKEN,
            storage_account=KADA_STORAGE_ACCOUNT,
            container=KADA_CONTAINER, 
            blob=f'{KADA_LANDING_PATH}/{filename}', 
            local_path=file_to_upload_path
        )

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
  
    # To be implemented by the customer.
    # Retrieve the timestamp from the prior run
    start_hwm = 'YYYY-MM-DD HH:mm:SS'
    end_hwm = 'YYYY-MM-DD HH:mm:SS' # timestamp now
    
    ext = Extractor(**KADA_EXTRACTOR_CONFIG)
    
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="extract tableau and upload") as extract_upload:
        task_1 = PythonOperator(
            task_id="extract_tableau",
            python_callable=ext.run, 
            op_kwargs={"start_hwm": start_hwm, "end_hwm": end_hwm},
            provide_context=True,
        )
        
        task_2 = PythonOperator(
            task_id="upload_extracts",
            python_callable=upload, 
            op_kwargs={},
            provide_context=True,
        )

        # To be implemented by the customer. 
        # Timestamp needs to be saved for next run
        task_3 = DummyOperator(task_id='save_hwm') 

    end = DummyOperator(task_id='end')

    start >> extract_upload >> end