본문 바로가기
3. 데이터관리

OCI Data Integration 활용 - Data Integration Task 작업 모니터링 하기

by 에너자이죠 2024. 6. 19.

이번 Post는 앞서 구성했던 Data Integration  Task의 수행 결과를 모니터링 하는 방법에 대해 정리하겠습니다. 

수행 결과에 대한 모니터링으로는 OCI Notification & Event Service를 통해 전달 받거나 Oracle Database 내에 결과 테일블에 Data Integration Task 수행 결과를 저장할 수 있습니다. 

 

1. OCI Notification Service와 Event Service를 이용하여 Data Integration task 수행 결과을 전달 받을 수 있습니다. 

OCI Notification은 특정 Topic에 사전에 등록된 메일이나 Slack, SMS에 설정된 Event 결과를 전달 받을 수 있습니다.

상세한 OCI Notification & Alarm Service에 대해서는 이전에 작성된 다음 post에서 확인하실 수 있습니다. 

 

Notification, Alarm, Health Check 서비스 활용하기

1. Notification 설정하기 https://docs.oracle.com/en-us/iaas/Content/Notification/home.htm Topic는 Subscription의 묶음이고 알람을 설정하는 단위이다. 그리고 Topic이름은 Tenancy전체에서 유일해야 한다. Mail Subscription을

oracle-cloud.tistory.com

 

위의 Post에서 설명된 것과 같이 OCI Data Integration Notification을 위한 Topic과 구독을 위한 subscription이 생성되었다고 하면 해당 Topic에 Alarm을 전달할 Event를 설정하여야 합니다.  OCI console에서 Observability & Management의 Event Service로 이동하여 Create Rule을 통해 Event를 생성합니다. 

 

Event Rule은 사용자의 Alarm에 대한 요구 조건에 따라 설정할 수 있습니다. 아래 예는 Task 수행에 대한 Event Rule 설정과 Task 수행 시 오류 발생시 Event rule 설정, 그리고 Execute Time Exceed 에 대한 Event rule 설정에 대한 예 입니다.  

Notification과 Event 가 설정되었다면 아래와 같이 Data Integration Task 수행에 따른 공지를 받을 수 있습니다. 

 

2. OCI Data Integration의 Pipeline Task에 SQL Task를 이용하여 작업 수행 결과 테이블에 Data Integration 작업 결과를 저장할 수 있습니다.  

우선 대상 ADW에 작업 결과를 저장할 테이블과 Result에 따라 결과를 insert 하는 procedure를 만들어 줍니다. 

 

--------------------------------------------------------
--  Create Table DWH_LOAD_STATS (used for the SQL task)
--------------------------------------------------------
CREATE TABLE  DWH_LOAD_STATS 
   ( DWH_LOAD_ID NUMBER(5,0) PRIMARY KEY, 
DWH_LOAD_STATUS VARCHAR2(100), 
    PIPELINE_NAME_TASK_RUN VARCHAR2(100),
DWH_LOAD_DATE DATE);
    
    
--------------------------------------------------------
--  Create Sequence DWH_PK (used by the procedure below to populate the PK in table DWH_LOAD_STATS)
-------------------------------------------------------- 
CREATE SEQUENCE  DWH_PK  INCREMENT BY 1 START WITH 1 NOCACHE  NOORDER  NOCYCLE  NOKEEP  NOSCALE  GLOBAL ;
    
    
--------------------------------------------------------
--  Create Procedure OCIDI_RESULT (used for the SQL task)
--------------------------------------------------------     
    CREATE OR REPLACE PROCEDURE OCIDI_RESULT (IN_DI_RESULT VARCHAR2, PIPELINE_NAME_TASK_RUN VARCHAR2 )
IS
 BEGIN
     IF IN_DI_RESULT='SUCCESS' THEN 
    INSERT INTO DWH_LOAD_STATS VALUES (DWH_PK.NEXTVAL , 'OCI Data Integration pipeline was executed successfully.' , PIPELINE_NAME_TASK_RUN, to_date(sysdate,'DD-MM-YYYY'));

     ELSIF IN_DI_RESULT='ERROR' THEN
    INSERT INTO DWH_LOAD_STATS VALUES (DWH_PK.NEXTVAL , 'OCI Data Integration pipeline was not executed. Please check the errors in OCI DI.' , PIPELINE_NAME_TASK_RUN, to_date(sysdate,'DD-MM-YYYY'));

     END IF;

END;

 

Data Integration 결과를 저장하기 위한 테이블, 프로시져가 생성되면 OCI Data Integration에서 생성된 Procedure를 이용하여 SQL Task를 생성합니다. SQL 항목에서 앞서 생성한 프로시져를 선택합니다. 

 

Procedure 까지 선택한 이후 추가된 Configure Parameters를 통해 input parameter 값을 설정해 줍니다. 

 

Stored Procedure와 Parameter에 대한 설정이 완료되면 validate task를 수행하고 Create 합니다.  Create 이후 생성한 SQL Task를 Application에 publish 해 줍니다. 

 

이제 이전에 생성한 Pipeline에 Data Integration 수행 결과를 저장하는 SQL Task를 추가 구성해보겠습니다. 이전 Post에서 생성한 Pipeline에 SQL Task를 새로 추가하고 연결 합니다. 해당 SQL Task에 앞서 생성/배포한 SQL task를 선택하고 incomming link condition을 Ruun on success of previous operator로 선택합니다. 

 

 

Configuration 항목으로 이동하여 parameter에 대한 설정을 해줍니다. Stored Procedure의 input parameter에 대해 'Success' 그리고 Task name을 지정합니다. 

 

Task 실패시 처리를 위한 SQL Task를 아래와 같이 추가 하고 이번 SQL Task에는 'Run on failure of previous operator'로 구성하고 Parameter에 IN_DI_RESULT에 ERROR로 구성합니다.  

 

위와 같이 변경된 Pipeline을 Task로 구성하여 Publish 이후 수행합니다.  수행 결과가 DWH_LOAD_STATS table에 저장된 것을 확인할 수 있습니다. 

 

 

 

작성자: 조용훈  
개인 시간을 투자하여 작성된 글로서, 글의 내용에 오류가 있을 수 있으며, 글 속의 의견은 개인적인 의견입니다.

댓글