Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info

This collector is for Informatica versions prior to Informatica Intelligent Cloud Services (IICS)

Scroll ignore
scroll-viewporttrue
scroll-pdftrue
scroll-officetrue
scroll-chmtrue
scroll-docbooktrue
scroll-eclipsehelptrue
scroll-htmltrue
scroll-epubtrue

Open in new tab

About Collectors

Insert excerpt
Collector Method
Collector Method
nameabout

...

Pre-requisites

  • Informatica 9.1+ with repository hosted in Oracle.IICS with access to both V2 and V3 APIs

  • Python 3.6 - 3.10

  • Access to K landing directory

  • Access to Informatica Repository (see section below)

Establish Informatica Repository IICS Access

Create an Oracle user with read access to all tables in the Informatica repository database.

Establish Informatica Server Access

Create a user that has read access to the Informatica Server.

Dean Nguyen need to populate this part with screenshots of the check boxes

...

Step 1: Create the Source in K

Create a Informatica IICS source in K

  • Go to Settings, Select Sources and click Add Source

  • Select “Load from File” option

  • Give the source a Name - e.g. Informatica Production

  • Add the Host name for the Informatica Server

  • Click Finish Setup

...

info
Code Block
pip install kada_collectors_lib-<version>-none-any.whl

You may require an ODBC package for the OS to be installed as well as an oracle client library package if do you not have one already, see https://www.oracle.com/au/database/technologies/instant-client.html

...

Step 4

...

In your environment you maybe using runtime overrides for parameters in your Informatica jobs. KADA uses the runtime overrides to resolve lineage for parameter driven jobs.

Use the script below to generate infacmd commands to extract session logs in XML format.

Info

Replace any < > with values for your Informatica environment.

Code Block
languagesql
select 
'call infacmd.bat isp getsessionlog -dn <INFORMATICA_DOMAIN> -hp <HOST>:<PORT> -un <SERVER USERNAME> -pd <SERVER PASSWORD> -is <SERVERNAME> -rs <REPO NAME> -ru <REPO USERNAME> -rp <REPO PASSWORD> -fm xml -fn ' || ws.subject_area || ' -wf ' || ws.workflow_name || ' -ss ' || CASE WHEN hierarchy_structure is null then ws.instance_name ELSE '"' || substr(hierarchy_structure, 2) || '"' END || ' -lo <C:\\output\\path\\for\\logs\\>' || ws.workflow_id || '_' || ws.task_id || '_' || ws.instance_id as cmd
from (
    SELECT ti.instance_name,
        ti.task_id,
        ti.version_number,
        wws.instance_id,
        wf.workflow_id,
        wf.workflow_name,
        wf.workflow_comments,
        wf.server_name,
        wf.subject_area,
        hierarchy_structure,
        path
FROM (
        select path 
        , TO_NUMBER(substr(path, 2, instr(path,'/',1, 2)-2)) as workflow_id
        , TO_NUMBER(substr(path, -instr(reverse(path),'/', 1, 2)+1, instr(reverse(path),'/', 1, 2)-2)) as task_id
        , hierarchy_structure
        , instance_id
        from (SELECT DISTINCT '/' || temp1.task_id AS path
                , temp1.task_name AS hierarchy_structure
                , 0 as instance_id
                FROM opb_task temp1, opb_subject temp2
                WHERE temp1.subject_id = temp2.subj_id
                AND temp1.task_type = 71 -- workflows
                UNION ALL
                SELECT DISTINCT temp1.path
                    , temp1.task_name AS hierarchy_structure
                    , instance_id
                FROM (SELECT opb_task_inst.workflow_id, opb_task_inst.task_id, opb_task_inst.instance_id, LEVEL depth,
                        SYS_CONNECT_BY_PATH(opb_task_inst.workflow_id ,'/') || '/' || opb_task_inst.task_id || '/' path,
                        SYS_CONNECT_BY_PATH(opb_task_inst.instance_name ,'/') task_name
                        FROM opb_task_inst
                        WHERE opb_task_inst.task_type IN (68,70)
                        START WITH workflow_id IN (select distinct w.workflow_id
                                                        from rep_workflows w
                                                        join rep_task_inst ti on w.workflow_id = ti.workflow_id
                                                        where ti.task_type_name = 'Worklet'
                                                        and w.subject_area not in ('<SUBJECT_AREAS_TO_EXCLUDE>')
                                                    )
                        CONNECT BY PRIOR opb_task_inst.task_id = opb_task_inst.workflow_id
                    ) temp1,
                    opb_task temp2,
                    opb_subject temp3
                WHERE temp2.subject_id = temp3.subj_id
                AND temp2.task_id = SUBSTR(temp1.path,2, INSTR(temp1.path,'/', 1, 2) -2 )
                ORDER BY path ASC )
        where instance_id <> 0
) wws
    JOIN rep_task_inst ti on ti.task_id = wws.task_id and ti.task_type = 68
    JOIN REP_WORKFLOWS wf on wws.workflow_id = wf.workflow_id
UNION
SELECT ti.instance_name,
    ti.task_id,
    ti.version_number,
    ti.instance_id,
    wf.workflow_id,
    wf.workflow_name,
    wf.workflow_comments,
    wf.server_name,
    wf.subject_area,
    '' as hierarchy_structure,
    '' as path
FROM REP_WORKFLOWS wf
    JOIN rep_task_inst ti on ti.workflow_id = wf.workflow_id and ti.task_type = 68
where wf.subject_area not in ('<SUBJECT_AREAS_TO_EXCLUDE>')
) ws
    join (select distinct workflow_id as workflow_id from rep_wflow_run) active_wflows on ws.workflow_id = active_wflows.workflow_id 

The commands can be be combined in a bat script like the example below to dump out the latest log per session.

Code Block
@echo off
cd /d C:
cd "C:\Informatica\9.1.0\clients\DeveloperClient\infacmd"
echo %cd%
<ADD CALLS from SQL here>
pause
Note

The session logs can take a long time to generate. We recommended that you run this step on an adhoc frequency when your Informatica jobs change.

Use kada_informatica_runtime_parser.py to generate a runtime_session_overrides.json which will be used by the Informatica extractor.

kada_informatica_runtime_parser.py

Code Block
languagepy
import os
import argparse
from kada_collectors.extractors.utils import load_config, get_generic_logger
from kada_collectors.extractors.informatica import runtime_parser

get_generic_logger('root') # Set to use the root logger, you can change the context accordingly or define your own logger

_type = 'informatica_runtime_parser'
dirname = os.path.dirname(__file__)
filename = os.path.join(dirname, 'kada_{}_extractor_config.json'.format(_type))

parser = argparse.ArgumentParser(description='KADA Informatica Runtime Parser.')
parser.add_argument('--config', '-c', dest='config', default=filename, help='Location of the configuration json, default is the config json in the same directory as the script.')
args = parser.parse_args()

config_args = load_config(args.config)

runtime_parser(**{"input_path": config_args["input_path"], "output_path": config_args["output_path"]})

...

: Configure the Collector

The collector requires a set of parameters to connect to and extract metadata from InformaticaIICS

FIELD

FIELD TYPE

DESCRIPTION

EXAMPLE

username

string

Username to log into OracleIICS

“myuser”

password

string

Password to log into OracleIICS

dsnlogin_url

string

Datasource Name for Oracle, this can be one of the following forms

<tnsname>
<host/servicename>

“preprod”

local.example.com/oraservice”

repo_owner

string

This is the owner of all the tables required by the extractor

“inf”

oracle_client_path

string

Full path to the location of the Oracle Client libraries

“/tmp/drivers/lib/oracleinstantclient_11_9”

cached

boolean

If set to true if will prevent re-extracting data

false

input_path

string

Absolute path to the input location where runtime_session_overrides.json is placed

“/tmp/input”This is the base url for your IICS login service, see https://docs.informatica.com/integration-cloud/b2b-gateway/current-version/rest-api-reference/platform-rest-api-version-2-resources/login.html for more details

https://dm-ap.informaticacloud.com/ma/api/v2/user/login

days_active

integer

Number of days which a task must have run to be considered active

60

timeout

integer

Timeout in seconds for IICS API responses, sometimes IICS server can be slow so tune this accordingly if needed

20

output_path

string

Absolute path to the output location where files are to be written

“/tmp/output”

mask

boolean

To enable masking or not

true

compress

boolean

To gzip the output or not

true

KADA provides an out of the box script that reads a configuration JSON file and runs the extractor. Below is the configuration file.

kada_informaticaiics_extractor_config.json

Code Block
languagejson
{
    "username": "",
    "password": "",
    "dsnlogin_url": "",
    "repo_ownertimeout": ""30,
    "oracleactive_client_pathdays": ""60,
    "cachedmapping": false{},

   "input_path": "/tmp/input",
    "output_path": "/tmp/output",
    "mask": true,
    "compress": true
}

...

Step

...

5: Run the Collector

The following code is an example of how to run the extractor. You may need to uplift this code to meet any code standards at your organisation.

...

This is the wrapper script: kada_informaticaiics_extractor.py

Code Block
languagepy
import os
import argparse
from kada_collectors.extractors.utils import load_config, get_hwm, publish_hwm, get_generic_logger
from kada_collectors.extractors.informaticaiics import Extractor

get_generic_logger('root') # Set to use the root logger, you can change the context accordingly or define your own logger

_type = 'informaticaiics'
dirname = os.path.dirname(__file__)
filename = os.path.join(dirname, 'kada_{}_extractor_config.json'.format(_type))

parser = argparse.ArgumentParser(description='KADA InformaticaIICS Extractor.')
parser.add_argument('--config', '-c', dest='config', default=filename, help='Location of the configuration json, default is the config json in the same directory as the script.')
args = parser.parse_args()

start_hwm, end_hwm = get_hwm(_type)

ext = Extractor(**load_config(args.config))
ext.test_connection()
ext.run(**{"start_hwm": start_hwm, "end_hwm": end_hwm})

publish_hwm(_type, end_hwm)

...

Code Block
languagepy
class Extractor(username: str = None, password: str = None, dsnlogin_url: str = None, \ 
   repoactive_ownerdays: str int= None60, oracle_client_pathmapping: str = None, \
    cached: bool = False, input_path: str = './input', \
    dict={}, timeout: int=30, 
  output_path: str = './output', mask: bool = False, compress: bool = False) -> None

username: username to sign into server
password: password to sign into server
dsn: server login_url: IICS login address
repoactive_owner: Oracle table owner
oracle_client_path: library path for the Oracle Instant Client
cached: Set to prevent re-extracting data
input_path: full or relative path to the directory containing the input files
days: assessment window in days to consider tasks to be active
timeout: timeout in seconds for API responses
output_path: full or relative path to where the outputs should go
compress: To gzip output mask: to mask files or not

The runtime parser can also be called in isolation

Code Block
languagepy
from kada_collectors.extractors.informatica import runtime_parser

kwargs = {my args} # However you choose to construct your args

runtime_parser(**kwargs)
Code Block
languagepy
def runtime_parser: (input_path: str = './input', output_path: str = './output') -> None

input_path: full or relative path to the directory containing the input files
output_path: full or relative path to where the outputs should go

To edit the internal SQL being run refer to https://kadaai.atlassian.net/wiki/spaces/KSL/pages/1902411777/Additional+Notes#Adding-Custom-SQL

...

compress: To gzip output files or not

...

Step 6: Check the Collector Outputs

K Extracts

A set of files (eg metadata, databaselog, linkages, events etc) will be generated. These files will appear in the output_path directory you set in the configuration details

...

A high water mark file is created in the same directory as the execution called informaticaiics_hwm.txt and produce files according to the configuration JSON. This file is only produced if you call the publish_hwm method.

...