Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

About Collectors

...

Collectors are extractors that are developed and managed by you (A customer of K).

KADA provides python libraries that customers can use to quickly deploy a Collector.

Why you should use a Collector

There are several reasons why you may use a collector vs the direct connect extractor:

  1. You are using the KADA SaaS offering and it cannot connect to your sources due to firewall restrictions

  2. You want to push metadata to KADA rather than allow it pull data for Security reasons

  3. You want to inspect the metadata before pushing it to K

Using a collector requires you to manage

  1. Deploying and orchestrating the extract code

  2. Managing a high water mark so the extract only pull the latest metadata

  3. Storing and pushing the extracts to your K instance.

...

Pre-requisites

  • Python 3.6 - 3.9

  • Access to the KADA Collector repository

    • The repository is currently hosted in KADA’s Azure Blob Storage. You will be given a SAS token to access the repository. Reach out to KADA Support (support@kada.ai) if you do not have access.

    • Download the Redshift whl (e.g. kada_collectors_extractors_redshift-#.#.#-py3-none-any.whl)

  • Access to Redshift (see section below)

...

Example: Using Airflow to orchestrate the Extract and Push to K

...

Code Block

...

language

...

py
# built-in
import os

# Installed
from airflow.operators.python_operator import PythonOperator
from airflow.models.dag import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

from plugins.utils.azure_blob_storage import AzureBlobStorage

from kada_collectors.extractors.utils import load_config, get_hwm, publish_hwm, get_generic_logger
from kada_collectors.extractors.tableau import Extractor

# To be configed by the customer.
# Note variables may change if using a different object store.
KADA_SAS_TOKEN = os.getenv("KADA_SAS_TOKEN")
KADA_CONTAINER = ""
KADA_STORAGE_ACCOUNT = ""
KADA_LANDING_PATH = "lz/tableau/landing"
KADA_EXTRACTOR_CONFIG = {
    "server_address": "http://tabserver",
    "username": "user",
    "password": "password",
    "sites": [],
    "db_host": "tabserver",
    "db_username": "repo_user",
    "db_password": "repo_password",
    "db_port": 8060,
    "db_name": "workgroup",
    "meta_only": False,
    "retries": 5,
    "dry_run": False,
    "output_path": "/set/to/output/path",
    "mask": True,
    "mapping": {}
}

# To be implemented by the customer. 
# Upload to your landing zone storage.
def upload():
  output = KADA_EXTRACTOR_CONFIG['output_path']
  for filename in os.listdir(output):
      if filename.endswith('.csv'):
        file_to_upload_path = os.path.join(output, filename)

        AzureBlobStorage.upload_file_sas_token(
            client=KADA_SAS_TOKEN,
            storage_account=KADA_STORAGE_ACCOUNT,
            container=KADA_CONTAINER, 
            blob=f'{KADA_LANDING_PATH}/{filename}', 
            local_path=file_to_upload_path
        )

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
  
    # To be implemented by the customer.
    # Retrieve the timestamp from the prior run
    start_hwm = 'YYYY-MM-DD HH:mm:SS'
    end_hwm = 'YYYY-MM-DD HH:mm:SS' # timestamp now
    
    ext = Extractor(**KADA_EXTRACTOR_CONFIG)
    
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="extract tableau and upload") as extract_upload:
        task_1 = PythonOperator(
            task_id="extract_tableau",
            python_callable=ext.run, 
            op_kwargs={"start_hwm": start_hwm, "end_hwm": end_hwm},
            provide_context=True,
        )
        
        task_2 = PythonOperator(
            task_id="upload_extracts",
            python_callable=upload, 
            op_kwargs={},
            provide_context=True,
        )

        # To be implemented by the customer. 
        # Timestamp needs to be saved for next run
        task_3 = DummyOperator(task_id='save_hwm') 

    end = DummyOperator(task_id='end')

    start >> extract_upload >> end