본문 바로가기

ETL Tools/Oracle Data Integrator

로그로 스케쥴링관리ODI ETL JOB LOG - v1.3


ODI ETL JOB LOG  - v1.3

 

1. 목적 :

1.     ETL Batch Job의 재처리 용이성,

2.     로그 테이블 관리를 통해 보다 쉽고 정확히 관리하기 위해 개발

 

2. 기능 :

1.     적재 시작 시간과 종료 시간저장을 통한 데이터 누수 확인의 용이성

2.     재처리 시, 기존의 배치주기를 유지하여 대용량을 한꺼번에 처리하지 않도록 구현

3.     재처리가 완료되었을 경우 바로 기존의 스케줄링 형태로 돌아가도록 구현

4.     재처리 작업 중, 스케쥴링에 의한 배치가 실행되었을 경우 하나만 실행되도록 구현

 

 

CREATE TABLE ETL_JOB_LOG

(

           SESS_NO      NUMBER,

           SESS_NAME    VARCHAR2 (30),

           ETL_START_TIME   VARCHAR2 (14),

           ETL_END_TIME     VARCHAR2 (14),

           SESS_START_TIME     TIMESTAMP(6),

SESS_END_TIME     TIMESTAMP(6),

           STATUS       CHAR(1),

           PRIMARY KEY ( SESS_NAME, ETL_START_TIME, ETL_END_TIME)

)

** Log 저장 테이블

 

 

 

 

 

 

 

 

 

 

 

 

 

3. 구성 요소 :

l  변수

n  V_START_TIME데이터 추출 시작 일시

u  새로고침

l  테이블에 같은 Session name중 가장 최근까지 적재된 시간을 추출 -> 시작 시간

SELECT  NVL(     MAX(ETL_END_TIME),

                      TO_CHAR(TRUNC(SYSDATE,'HH24'),

                                'YYYYMMDDHH24MISS')

           ) START_TIME

FROM ETL_JOB_LOG

WHERE  (1=1)

AND     SESS_NAME = '<%=snpRef.getSession("SESS_NAME")%>'

 

 

 

 

 

 

 

 


n  V_END_TIME데이터 추출 종료 일시

u  새로고침

l  추출주기에 맞도록 예) 시배치

SELECT

TO_CHAR(TRUNC(SYSDATE,'HH24'),'YYYYMMDDHH24MISS')

FROM DUAL

 

 

 

 


n  V_LIMIT_TIME재처리시 데이터 종료 시작 일시

u  새로고침

l  지연 한계시간? ) 추출된 시작시간 + 주기 * 2

SELECT

TO_CHAR(TO_DATE('#V_START_TIME',

'YYYYMMDDHH24MISS') + 1/12 ,

'YYYYMMDDHH24MISS')

from DUAL

 

 

 

 

 


u  재처리 필요여부 결정

n  V_STATUS현재 진행 여부

u  새로고침

l  마지막 적재 로그의 상태 정보 확인

SELECT  NVL(MAX(STATUS),'N') STATUS

FROM   ETL_JOB_LOG

WHERE  (1=1)

AND     SESS_NAME = '<%=snpRef.getSession("SESS_NAME")%>'

AND     ETL_END_TIME  =

(

        SELECT   MAX(ETL_END_TIME) FROM ETL_JOB_LOG

        WHERE   SESS_NAME =

'<%=snpRef.getSession("SESS_NAME")%>'

)

 

 

 

 

 

 

 

 

 

 

 


u  재처리 & 스케쥴링 중복 실행 예방

 

 

l  프로시저

n  PROC_ETL_LOG_INSERT

u  실행 시간 : 배치잡 실행 전

u  상태정보 변경 ( 실행 중 ) & 시작 일시와 함께 저장.

INSERT INTO ETL_JOB_LOG

(

           SESS_NO,

           SESS_NAME,

           ETL_START_TIME,

           ETL_END_TIME,

           SESS_START_TIME,

           STATUS

)

VALUES (

           <%=snpRef.getSession("SESS_NO")%>,

           '<%=snpRef.getSession("SESS_NAME")%>',

           '<%=odiRef.getOption ("ETL_START_TIME")%>', -- Option 추가

           '<%=odiRef.getOption ("ETL_END_TIME")%>', -- Option 추가

 

           SYSTIMESTAMP,

    'S'

)

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


n  PROC_ETL_LOG_UPDATE

u  실행 시간 : 배치잡 정상 종료 후

u  상태정보 변경 ( 정상 종료 ) & 종료시간 저장

 

SELECT  NVL(MAX(STATUS),'N') STATUS

FROM   ETL_JOB_LOG

WHERE  (1=1)

AND     SESS_NAME = '<%=snpRef.getSession("SESS_NAME")%>'

AND     ETL_END_TIME  =

(

        SELECT   MAX(ETL_END_TIME) FROM ETL_JOB_LOG

        WHERE   SESS_NAME =

'<%=snpRef.getSession("SESS_NAME")%>'

)

 

 

 

 

 

 

 

 

 

 

 


n  PROC_ETL_LOG_DELETE

u  실행 시간 : 배치잡 실행 중 Or 로그 남길 때 비정상 종료 발생시

u  해당 로그 삭제 후 종료

DELETE FROM ETL_JOB_LOG

WHERE (1=1)   

AND     SESS_NAME = '<%=snpRef.getSession("SESS_NAME")%>'     

AND     ETL_START_TIME = '<%=odiRef.getOption ("ETL_START_TIME")%>'

AND     ETL_END_TIME = '<%=odiRef.getOption ("ETL_END_TIME")%>'


 





 




4. 
로직 해석 (Transform to Java Language) :

pkg_Etl_Job_Log()

{

             String v_start_time, v_end_time, v_limit_time, v_status;

 

            v_start_time.refresh();// 1.변수 설정

             v_end_time.refresh();// 1.변수 설정

             v_limit_time.refresh();// 1.변수 설정

 

             if(v_limit_time > v_end_time) // 2. 재처리 여부 확인       

{/*************  정상 **************/

                           v_status.refresh();

                           if(v_status.getStatus() <> “실행중”) //3. 작업중 여부 확인

                           {

                                        try//4. 로그 및 배치잡 실행

                                        {

                                                     insertLog();           

                                                     batchJob();

                                                     updateLog();

                                        }

                                        catch (Exception e)

                                        {

                                                     deleteLog();

                                        }//4. 로그 및 배치 잡 실행

                           }

             }/**********  정상 **************/

             else {/**********  재처리 ***********/

                           v_end_time = v_limit_time; //3. 추출 시간 재정의

                           v_status.refresh();

                           if(v_status.getStatus() <> “실행중”) //3. 작업중 여부 확인

                           {

                                        try//4. 로그 및 배치 잡 실행

                                        {

                                                     insertLog();           

                                                     batchJob();

                                                     updateLog();

                                        }

                                        catch (Exception e)

                                        {

                                                     deleteLog();

                                        }//4. 로그 및 배치 잡 실행

                           }

                           this.pkgEtlJobLog();//5. 재실행

             }/**********  재처리 ***********/

)