Info |
---|
This collector is for Informatica versions prior to Informatica Intelligent Cloud Services (IICS) |
Scroll ignore | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||||||||
About Collectors
Insert excerpt | ||||||
---|---|---|---|---|---|---|
|
...
Pre-requisites
Informatica 9.1+ with repository hosted in Oracle.IICS with access to both V2 and V3 APIs
Python 3.6 - 3.10
Access to K landing directory
Access to Informatica Repository (see section below)
Establish Informatica Repository IICS Access
Create an Oracle user with read access to all tables in the Informatica repository database.
Establish Informatica Server Access
Create a user that has read access to the Informatica Server.
Dean Nguyen need to populate this part with screenshots of the check boxes
...
Step 1: Create the Source in K
Create a Informatica IICS source in K
Go to Settings, Select Sources and click Add Source
Select “Load from File” option
Give the source a Name - e.g. Informatica Production
Add the Host name for the Informatica Server
Click Finish Setup
...
Code Block |
---|
pip install kada_collectors_lib-<version>-none-any.whl |
You may require an ODBC package for the OS to be installed as well as an oracle client library package if do you not have one already, see https://www.oracle.com/au/database/technologies/instant-client.html |
...
Step 4
...
In your environment you maybe using runtime overrides for parameters in your Informatica jobs. KADA uses the runtime overrides to resolve lineage for parameter driven jobs.
Use the script below to generate infacmd commands to extract session logs in XML format.
Info |
---|
Replace any |
Code Block | ||
---|---|---|
| ||
select
'call infacmd.bat isp getsessionlog -dn <INFORMATICA_DOMAIN> -hp <HOST>:<PORT> -un <SERVER USERNAME> -pd <SERVER PASSWORD> -is <SERVERNAME> -rs <REPO NAME> -ru <REPO USERNAME> -rp <REPO PASSWORD> -fm xml -fn ' || ws.subject_area || ' -wf ' || ws.workflow_name || ' -ss ' || CASE WHEN hierarchy_structure is null then ws.instance_name ELSE '"' || substr(hierarchy_structure, 2) || '"' END || ' -lo <C:\\output\\path\\for\\logs\\>' || ws.workflow_id || '_' || ws.task_id || '_' || ws.instance_id as cmd
from (
SELECT ti.instance_name,
ti.task_id,
ti.version_number,
wws.instance_id,
wf.workflow_id,
wf.workflow_name,
wf.workflow_comments,
wf.server_name,
wf.subject_area,
hierarchy_structure,
path
FROM (
select path
, TO_NUMBER(substr(path, 2, instr(path,'/',1, 2)-2)) as workflow_id
, TO_NUMBER(substr(path, -instr(reverse(path),'/', 1, 2)+1, instr(reverse(path),'/', 1, 2)-2)) as task_id
, hierarchy_structure
, instance_id
from (SELECT DISTINCT '/' || temp1.task_id AS path
, temp1.task_name AS hierarchy_structure
, 0 as instance_id
FROM opb_task temp1, opb_subject temp2
WHERE temp1.subject_id = temp2.subj_id
AND temp1.task_type = 71 -- workflows
UNION ALL
SELECT DISTINCT temp1.path
, temp1.task_name AS hierarchy_structure
, instance_id
FROM (SELECT opb_task_inst.workflow_id, opb_task_inst.task_id, opb_task_inst.instance_id, LEVEL depth,
SYS_CONNECT_BY_PATH(opb_task_inst.workflow_id ,'/') || '/' || opb_task_inst.task_id || '/' path,
SYS_CONNECT_BY_PATH(opb_task_inst.instance_name ,'/') task_name
FROM opb_task_inst
WHERE opb_task_inst.task_type IN (68,70)
START WITH workflow_id IN (select distinct w.workflow_id
from rep_workflows w
join rep_task_inst ti on w.workflow_id = ti.workflow_id
where ti.task_type_name = 'Worklet'
and w.subject_area not in ('<SUBJECT_AREAS_TO_EXCLUDE>')
)
CONNECT BY PRIOR opb_task_inst.task_id = opb_task_inst.workflow_id
) temp1,
opb_task temp2,
opb_subject temp3
WHERE temp2.subject_id = temp3.subj_id
AND temp2.task_id = SUBSTR(temp1.path,2, INSTR(temp1.path,'/', 1, 2) -2 )
ORDER BY path ASC )
where instance_id <> 0
) wws
JOIN rep_task_inst ti on ti.task_id = wws.task_id and ti.task_type = 68
JOIN REP_WORKFLOWS wf on wws.workflow_id = wf.workflow_id
UNION
SELECT ti.instance_name,
ti.task_id,
ti.version_number,
ti.instance_id,
wf.workflow_id,
wf.workflow_name,
wf.workflow_comments,
wf.server_name,
wf.subject_area,
'' as hierarchy_structure,
'' as path
FROM REP_WORKFLOWS wf
JOIN rep_task_inst ti on ti.workflow_id = wf.workflow_id and ti.task_type = 68
where wf.subject_area not in ('<SUBJECT_AREAS_TO_EXCLUDE>')
) ws
join (select distinct workflow_id as workflow_id from rep_wflow_run) active_wflows on ws.workflow_id = active_wflows.workflow_id |
The commands can be be combined in a bat script like the example below to dump out the latest log per session.
Code Block |
---|
@echo off
cd /d C:
cd "C:\Informatica\9.1.0\clients\DeveloperClient\infacmd"
echo %cd%
<ADD CALLS from SQL here>
pause |
Note |
---|
The session logs can take a long time to generate. We recommended that you run this step on an adhoc frequency when your Informatica jobs change. |
Use kada_informatica_runtime_parser.py
to generate a runtime_session_overrides.json
which will be used by the Informatica extractor.
kada_informatica_runtime_parser.py
Code Block | ||
---|---|---|
| ||
import os
import argparse
from kada_collectors.extractors.utils import load_config, get_generic_logger
from kada_collectors.extractors.informatica import runtime_parser
get_generic_logger('root') # Set to use the root logger, you can change the context accordingly or define your own logger
_type = 'informatica_runtime_parser'
dirname = os.path.dirname(__file__)
filename = os.path.join(dirname, 'kada_{}_extractor_config.json'.format(_type))
parser = argparse.ArgumentParser(description='KADA Informatica Runtime Parser.')
parser.add_argument('--config', '-c', dest='config', default=filename, help='Location of the configuration json, default is the config json in the same directory as the script.')
args = parser.parse_args()
config_args = load_config(args.config)
runtime_parser(**{"input_path": config_args["input_path"], "output_path": config_args["output_path"]}) |
...
: Configure the Collector
The collector requires a set of parameters to connect to and extract metadata from InformaticaIICS
FIELD | FIELD TYPE | DESCRIPTION | EXAMPLE | |
---|---|---|---|---|
username | string | Username to log into OracleIICS | “myuser” | |
password | string | Password to log into OracleIICS | ||
dsnlogin_url | string | Datasource Name for Oracle, this can be one of the following forms <tnsname> | “preprod” | |
repo_owner | string | This is the owner of all the tables required by the extractor | “inf” | |
oracle_client_path | string | Full path to the location of the Oracle Client libraries | “/tmp/drivers/lib/oracleinstantclient_11_9” | |
cached | boolean | If set to true if will prevent re-extracting data | false | |
input_path | string | Absolute path to the input location where | “/tmp/input”This is the base url for your IICS login service, see https://docs.informatica.com/integration-cloud/b2b-gateway/current-version/rest-api-reference/platform-rest-api-version-2-resources/login.html for more details | |
days_active | integer | Number of days which a task must have run to be considered active | 60 | |
timeout | integer | Timeout in seconds for IICS API responses, sometimes IICS server can be slow so tune this accordingly if needed | 20 | |
output_path | string | Absolute path to the output location where files are to be written | “/tmp/output” | |
mask | boolean | To enable masking or not | true | |
compress | boolean | To gzip the output or not | true |
KADA provides an out of the box script that reads a configuration JSON file and runs the extractor. Below is the configuration file.
kada_informaticaiics_extractor_config.json
Code Block | ||
---|---|---|
| ||
{ "username": "", "password": "", "dsnlogin_url": "", "repo_ownertimeout": ""30, "oracleactive_client_pathdays": ""60, "cachedmapping": false{}, "input_path": "/tmp/input", "output_path": "/tmp/output", "mask": true, "compress": true } |
...
Step
...
5: Run the Collector
The following code is an example of how to run the extractor. You may need to uplift this code to meet any code standards at your organisation.
...
This is the wrapper script: kada_informaticaiics_extractor.py
Code Block | ||
---|---|---|
| ||
import os import argparse from kada_collectors.extractors.utils import load_config, get_hwm, publish_hwm, get_generic_logger from kada_collectors.extractors.informaticaiics import Extractor get_generic_logger('root') # Set to use the root logger, you can change the context accordingly or define your own logger _type = 'informaticaiics' dirname = os.path.dirname(__file__) filename = os.path.join(dirname, 'kada_{}_extractor_config.json'.format(_type)) parser = argparse.ArgumentParser(description='KADA InformaticaIICS Extractor.') parser.add_argument('--config', '-c', dest='config', default=filename, help='Location of the configuration json, default is the config json in the same directory as the script.') args = parser.parse_args() start_hwm, end_hwm = get_hwm(_type) ext = Extractor(**load_config(args.config)) ext.test_connection() ext.run(**{"start_hwm": start_hwm, "end_hwm": end_hwm}) publish_hwm(_type, end_hwm) |
...
Code Block | ||
---|---|---|
| ||
class Extractor(username: str = None, password: str = None, dsnlogin_url: str = None, \ repoactive_ownerdays: str int= None60, oracle_client_pathmapping: str = None, \ cached: bool = False, input_path: str = './input', \ dict={}, timeout: int=30, output_path: str = './output', mask: bool = False, compress: bool = False) -> None |
username: username to sign into server
password: password to sign into server
dsn: server login_url: IICS login address
repoactive_owner: Oracle table owner
oracle_client_path: library path for the Oracle Instant Client
cached: Set to prevent re-extracting data
input_path: full or relative path to the directory containing the input files
days: assessment window in days to consider tasks to be active
timeout: timeout in seconds for API responses
output_path: full or relative path to where the outputs should go
compress: To gzip output mask: to mask files or not
The runtime parser can also be called in isolation
Code Block | ||
---|---|---|
| ||
from kada_collectors.extractors.informatica import runtime_parser
kwargs = {my args} # However you choose to construct your args
runtime_parser(**kwargs) |
Code Block | ||
---|---|---|
| ||
def runtime_parser: (input_path: str = './input', output_path: str = './output') -> None |
input_path: full or relative path to the directory containing the input files
output_path: full or relative path to where the outputs should go
To edit the internal SQL being run refer to https://kadaai.atlassian.net/wiki/spaces/KSL/pages/1902411777/Additional+Notes#Adding-Custom-SQL
...
compress: To gzip output files or not
...
Step 6: Check the Collector Outputs
K Extracts
A set of files (eg metadata, databaselog, linkages, events etc) will be generated. These files will appear in the output_path directory you set in the configuration details
...
A high water mark file is created in the same directory as the execution called informaticaiics_hwm.txt and produce files according to the configuration JSON. This file is only produced if you call the publish_hwm method.
...