Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Collectors send metadata and logs to K. Use this collector if you cannot connect K to your source via a direct connect extractor.

Collectors are deployed and managed by you.

Using the Collector

Pre-requisites

  • Python 3.6+

  • Tableau Server Version [2018.1] and above.

  • Enable the Tableau Metadata API for Tableau Server

  • Record your Tableau server host

  • Create an API user for the Tableau Metadata API.

    • Record the Credentials (Username & Password)

    • The user must be Site Administrator Creator , Server Administrator or Site Administrator

  • Record Tableau Postgres Database host

  • Create a DB user for the Tableau Postgres Database

    • Record the Credentials (Username & Password)

    • Ben what does the user need access to?

Install the Collector

Run the following commands to install the collector

pip install pipenv
pipenv install

Run the Collector

Run the following command to run the collector

python kada-tableau-extractor.py --server http://example.com --username <YOUR ADMIN USER> --password <YOUR PASSWORD> --db_password <YOUR PASSWORD> --db_host=example.com

Push the files to the KADA Landing Directory

Create the source in K

Record the Landing Directory

Push the files that are generated from the collector to the Landing Directory.

Dean Nguyen Should add how the Admin can find the landing path.

Example orchestrating the Collector using Airflow

# built-in
import os

# Installed
from airflow.operators.python_operator import PythonOperator
from airflow.models.dag import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

from plugins.utils.azure_blob_storage import AzureBlobStorage

from kada_collectors.extractors.utils import load_config, get_hwm, publish_hwm, get_generic_logger
from kada_collectors.extractors.tableau import Extractor

# TODO: configure connection
KADA_SAS_TOKEN = os.getenv("KADA_SAS_TOKEN")
KADA_CONTAINER = ""
KADA_STORAGE_ACCOUNT = ""
KADA_LANDING_PATH = "lz/dbt/Astronomer Snowflake/landing"
KADA_EXTRACTOR_CONFIG = {
    "server_address": "http://tabserver",
    "username": "user",
    "password": "password",
    "sites": [],
    "db_host": "tabserver",
    "db_username": "repo_user",
    "db_password": "repo_password",
    "db_port": 8060,
    "db_name": "workgroup",
    "meta_only": False,
    "retries": 5,
    "dry_run": False,
    "output_path": "/set/to/output/path",
    "mask": True,
    "mapping": {}
}


def upload():
  output = KADA_EXTRACTOR_CONFIG['output_path']
  for filename in os.listdir(output):
      if filename.endswith('.csv'):
        file_to_upload_path = os.path.join(output, filename)

        AzureBlobStorage.upload_file_sas_token(
            client=KADA_SAS_TOKEN,
            storage_account=KADA_STORAGE_ACCOUNT,
            container=KADA_CONTAINER, 
            blob=f'{KADA_LANDING_PATH}/{filename}', 
            local_path=file_to_upload_path
        )

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
  
    # To be configure by customer depending on where you store the timestamp
    start_hwm = 'YYYY-MM-DD HH:mm:SS' # Source the timestamp from the prior run.
    end_hwm = 'YYYY-MM-DD HH:mm:SS' # timestamp now
    
    ext = Extractor(**KADA_EXTRACTOR_CONFIG)
    
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="extract tableau and upload") as extract_upload:
        task_1 = PythonOperator(
            task_id="extract_tableau",
            python_callable=ext.run, 
            op_kwargs={"start_hwm": start_hwm, "end_hwm": end_hwm},
            provide_context=True,
        )
        
        task_2 = PythonOperator(
            task_id="upload_extracts",
            python_callable=upload, 
            op_kwargs={},
            provide_context=True,
        )

        task_3 = DummyOperator(task_id='save_hwm') 
          # To be implemented by the customer. Timestamp needs to be saved.

    end = DummyOperator(task_id='end')

    start >> extract_upload >> end

  • No labels