본문 바로가기

Oracle

[Oracle] Pipelined table function


-- Pipelined table function
/*** FORMAT :
 *** CREATE OR REPLACE FUNCTION <schema_name>.<function_name>
 *** (<argument> [IN | OUT | IN OUT] [NOCOPY] <data type>)
 *** RETURN <data type>
 *** [AUTHID <CURRENT USER | DEFINER>]
 *** [<AGGREGATE | PIPELINED>]
 *** [PARALLEL_ENABLE (PARTITION <argument> BY [<HASH, RANGE> (<column_list>), ANY])] IS
 ***/
-- create demo table

create table stocktable (
    ticker varchar2(4),
    open_price number(10),
    close_price number(10));

insert into stocktable values ('orcl', 13,16);
insert into stocktable values ('msft', 35,29);
insert into stocktable values ('sunw', 7,11);

commit;

-- create object type for output
create or replace type tickertype as object (
ticker varchar2(4),
pricetype varchar2(1),
price number(10));
/

-- create object list type
create or replace type tickertypeset as table of tickertype;
/

-- ?? for input?
create or replace package refcur_pkg is
    type refcur_t is ref cursor return stocktable%rowtype;
end refcur_pkg;
/

-- create pipelined table function
create or replace function stockpivot
(p refcur_pkg.refcur_t)
return tickertypeset
pipelined is
    out_rec tickertype := tickertype(null,null,null);
    in_rec p%rowtype;
begin
    loop
        fetch p into in_rec;
        exit when p%notfound;
       
        out_rec.ticker := in_rec.ticker;
        out_rec.pricetype := 'O';
        out_rec.price := in_rec.open_price;
        pipe row(out_rec);
       
        out_rec.pricetype := 'C';
        out_rec.price := in_rec.close_price;
        pipe row(out_rec);
    end loop;
    close p;
    return;
end stockpivot;
/

select * from table(stockpivot(cursor(select * from stocktable)));

-- pipelined table function Example. 2
create or replace type obj_type as object (c1 int, c2 int );
/

create or replace type table_type as table of obj_type;
/

create or replace function table_func(p_start int, p_end int)
    return table_type
pipelined   
is
    v_type table_type;
begin
    for i in p_start .. p_end
    loop
        --v_type.c1 := i;
        --v_type.c2 := i*2;
        pipe row(obj_type(i,i*2));
    end loop;
    return;
end;
/

select * from table(
    table_func(4, 28)
);
 
-- pipelined table function Example. 3

CREATE OR REPLACE PACKAGE refcur_pkg IS
 
    TYPE refcur_t IS REF CURSOR            -- cursor type 을 선언한다.
    RETURN employees%ROWTYPE; 
   
    TYPE outrec_typ IS RECORD (            -- structure type을 선언한다.
       var_num employees.employee_id%type,
       var_char1 VARCHAR2(30),
       var_char2 VARCHAR2(30)   );
                             
    TYPE outrecset IS TABLE OF outrec_typ; -- 위에서 선언한 structure 를 배열로 type 으로 선언한다
    
    FUNCTION f_trans(p refcur_t)         -- 커서를 인자로 받아서 Structure 배열을 Return 하는 함수를 선언한다.
    RETURN outrecset PIPELINED;    -- 위에서 선언한 Structure 배열을 사용함.
                                                     -- 반드시 PIPELINED를 명시해야함.
   
END refcur_pkg;
/

CREATE OR REPLACE PACKAGE BODY refcur_pkg IS

    FUNCTION f_trans(p refcur_t)
    RETURN outrecset PIPELINED IS -- Structure 배열을 Return 하는 함수임.  
 
        out_rec outrec_typ;               -- PACKAGE Header 에서 선언한 structute type 을 변수로 선언한다.
        in_rec p%ROWTYPE;            -- p cursor 네의 의 모든컬럼을 변수로 선언한다.

        BEGIN

          LOOP

              FETCH p INTO in_rec;
              EXIT WHEN p%NOTFOUND;
              -- first row
              out_rec.var_num := in_rec.employee_id;
              out_rec.var_char1 := in_rec.first_name;
              out_rec.var_char2 := in_rec.last_name;
              PIPE ROW(out_rec);     --> employee_id, first_name, last_name 으로 1 row 를 즉시 return 한다.
              -- second row
              out_rec.var_char1 := in_rec.email;
              out_rec.var_char2 := in_rec.phone_number;
              PIPE ROW(out_rec);     --> employee_id, email, phone_number 으로 1 row 를 즉시 return 한다.

          END LOOP;

          CLOSE p;

        RETURN; -- return 하는 변수를 지정하지 않는다.(LOOP 내에서 모두 Return 되었기 때문이다.)

        END;
END refcur_pkg;
/

-- pipelined table function Example. 4
SELECT *
FROM TABLE(
    refcur_pkg.f_trans(
        CURSOR(
            SELECT *
            FROM employees
            WHERE department_id = 60
        )
    )
);


create or replace type myScalarType as object
( a int, b date, c varchar2(25) );
/
create or replace type myTableType as table of myScalarType;
/
create or replace package my_pkg
as
 function f return myTableType pipelined;
end;
/
create or replace package body my_pkg
as
     function f return myTableType
        PIPELINED
        is
        begin
                for i in 1 .. 5
                loop
                        pipe row ( myScalarType( i, sysdate+i, 'row ' || i ) );
                end loop;
                return;
        end;
end;
/
select * from table( my_pkg.f() );

-- pipelined table function Example. 5
-- values list from date to date

create or replace type date_array as table of date;
/

create or replace function date_table(sdate date, edate date)
return date_array pipelined as
begin
    for i in 0 .. (edate-sdate) loop
        pipe row(sdate + i);
    end loop;
    return;
end date_table;
/

select obj
ect_name, pipelined, authid
from user_procedures;

select * from table(date_table(trunc(sysdate - 30), trunc(sysdate)));


-- pipelined table function Example. 6
-- join with another table

create table testdata(
    datecol date,
    someval number);

insert into testdata values (trunc(sysdate-25),25);
insert into testdata values (trunc(sysdate-20),20);
insert into testdata values (trunc(sysdate-15),15);
insert into testdata values (trunc(sysdate-10),10);
insert into testdata values (trunc(sysdate-5),5);

commit;

select * from testdata;

select da.column_value as datacol, td.someval
from table (date_table(trunc(sysdate - 30), trunc(sysdate))) da, testdata td
where da.column_value = td.datecol(+);

--Note: A SQL alternative would be:
SELECT iv.datecol, td.someval
FROM (
        WITH dates AS (
            SELECT
                SYSDATE-30 dt_start,
                SYSDATE dt_end
            FROM dual
        )
        SELECT
            dt_start+rownum-1 AS "DATECOL"
        FROM dates
        CONNECT BY LEVEL <= dt_end-dt_start
    ) iv,
    testdata td
WHERE TRUNC(iv.datecol) = TRUNC(td.datecol (+))
ORDER BY datecol;

-- pipelined table function Example. 7
-- Tom Kyte's demo

create or replace type virtual_table_type as table of number;
/

create or replace function virtual_table(p_num_rows in number)
return virtual_table_type
pipelined is
begin
    for i in 1 .. p_num_rows loop
        dbms_output.put_line('going to pipe');
        pipe row(i);
        dbms_output.put_line('done pipeing');
    end loop;
    return;
end virtual_table;
/

select * from table(virtual_table(5));
select * from table(virtual_table(10));

begin
    for x in (select * from table (virtual_table(10))) loop
        dbms_output.put_line('Fetching.... '|| x.column_value);
    end loop;
end;
/

create or replace type myscalartype as object(
    c1 varchar2(9),
    c2 varchar2(9),
    c3 varchar2(9),
    c4 varchar2(9),
    c5 varchar2(9),
    c6 varchar2(9),
    c7 varchar2(9));
/
drop type MYARRAYTYPE ;
CREATE OR REPLACE TYPE MYARRAYTYPE AS TABLE OF MYSCALARTYPE;
/

CREATE OR REPLACE FUNCTION pivot(p_cur IN sys_refcursor)
RETURN myArrayType PIPELINED IS

 l_c1 varchar2(4000);
 l_c2 varchar2(4000);
 l_last varchar2(4000);
 l_cnt number ;
 l_data myScalarType;
BEGIN
  LOOP
    FETCH p_cur INTO l_c1, l_c2;
    EXIT WHEN p_cur%NOTFOUND;

    IF (l_last IS NULL OR l_c1 <> l_last) THEN
      IF (l_data IS NOT NULL) THEN
        pipe row(l_data);
      END IF;

      l_data := myScalarType(l_c1, l_c2, NULL, NULL, NULL, NULL, NULL);
      l_cnt := 3;
      l_last := l_c1;
    ELSE
      CASE l_cnt
      WHEN 3 THEN l_data.c3 := l_c2;
      WHEN 4 THEN l_data.c4 := l_c2;
      WHEN 5 THEN l_data.c5 := l_c2;
      WHEN 6 THEN l_data.c6 := l_c2;
      WHEN 7 THEN l_data.c7 := l_c2;
      ELSE raise program_error;
      END CASE;

      l_cnt := l_cnt+1;
    END IF;
  END LOOP;

  IF (l_data IS NOT NULL) THEN
    PIPE ROW(l_data);
  END IF;
  CLOSE p_cur;
  RETURN;
END pivot;
/

grant select on scott.emp to cheon;

select * from table (
    pivot(
        cursor(
            select
                deptno, ename
            from scott.emp
            order by deptno
        )
    )
);

SELECT *
FROM TABLE(pivot(
CURSOR(SELECT deptno, hiredate FROM scott.emp ORDER BY deptno)));

DROP FUNCTION pivot;

DROP FUNCTION stockpivot;

참고 : http://psoug.org/reference 외 다수