Polymorphic Table Functions

I have been working on a presentation on Polymorphic Table Functions. During this time I was looking for a real use case for Polymorphic Table Functions. I came up with an example which is not very useful in real life, but very useful to explain the technique.
At my current job I came across a piece of code that I had to copy and adjust to fit the needs for that specific case. The idea was always the same, I get a table with semi-colon separated values in one column that have to be split into the correct number of columns before checking the data to the current data in a specific table.
I thought: ‘Maybe I can solve this copy-paste-adjust process by using a Polymorphic Table Function.’

Let’s first set the current scene.
We have two tables. The well known EMP and DEPT tables.

create table emp
(empno    number(4) not null
,ename    varchar2(10)
,job      varchar2(9)
,mgr      number(4)
,hiredate date
,sal      number(7, 2)
,comm     number(7, 2)
,deptno   number(2)
)
/
create table dept
(deptno number(2)
,dname  varchar2(14)
,loc    varchar2(14)
)
/        

And we add the well known data:

insert into emp values (7369, 'SMITH',  'CLERK',     7902, to_date('17-DEC-1980', 'DD-MON-YYYY'),  800, null, 20);
insert into emp values (7499, 'ALLEN',  'SALESMAN',  7698, to_date('20-FEB-1981', 'DD-MON-YYYY'), 1600,  300, 30);
insert into emp values (7521, 'WARD',   'SALESMAN',  7698, to_date('22-FEB-1981', 'DD-MON-YYYY'), 1250,  500, 30);
insert into emp values (7566, 'JONES',  'MANAGER',   7839, to_date('2-APR-1981', 'DD-MON-YYYY'),  2975, null, 20);
insert into emp values (7654, 'MARTIN', 'SALESMAN',  7698, to_date('28-SEP-1981', 'DD-MON-YYYY'), 1250, 1400, 30);
insert into emp values (7698, 'BLAKE',  'MANAGER',   7839, to_date('1-MAY-1981', 'DD-MON-YYYY'),  2850, null, 30);
insert into emp values (7782, 'CLARK',  'MANAGER',   7839, to_date('9-JUN-1981', 'DD-MON-YYYY'),  2450, null, 10);
insert into emp values (7788, 'SCOTT',  'ANALYST',   7566, to_date('09-DEC-1982', 'DD-MON-YYYY'), 3000, null, 20);
insert into emp values (7839, 'KING',   'PRESIDENT', null, to_date('17-NOV-1981', 'DD-MON-YYYY'), 5000, null, 10);
insert into emp values (7844, 'TURNER', 'SALESMAN',  7698, to_date('8-SEP-1981', 'DD-MON-YYYY'),  1500,    0, 30);
insert into emp values (7876, 'ADAMS',  'CLERK',     7788, to_date('12-JAN-1983', 'DD-MON-YYYY'), 1100, null, 20);
insert into emp values (7900, 'JAMES',  'CLERK',     7698, to_date('3-DEC-1981', 'DD-MON-YYYY'),   950, null, 30);
insert into emp values (7902, 'FORD',   'ANALYST',   7566, to_date('3-DEC-1981', 'DD-MON-YYYY'),  3000, null, 20);
insert into emp values (7934, 'MILLER', 'CLERK',     7782, to_date('23-JAN-1982', 'DD-MON-YYYY'), 1300, null, 10);
insert into dept values (10, 'ACCOUNTING', 'NEW YORK');
insert into dept values (20, 'RESEARCH',   'DALLAS');
insert into dept values (30, 'SALES',      'CHICAGO');
insert into dept values (40, 'OPERATIONS', 'BOSTON');

We get data from a different system, which is in a semi-colon separated format. So we load that in a couple of staging tables:

create table empstg
(line varchar2(4000)
)
/
create table deptstg
(line varchar2(4000)
)
/

And then we add some data:

insert into empstg values ('7369;SMITH;CLERK;7902;17121980;800; ;20');
insert into empstg values ('7499;ALLEN;SALESMAN;7698;20021981;1600; 300;30');
insert into empstg values ('7521;WARD;SALESMAN;7698;22021981;1250; 500;30');
insert into empstg values ('7566;JONES;MANAGER;7839;02041981; 2975; ;20');
insert into empstg values ('7654;MARTIN;SALESMAN;7698;28091981;1250;1400;30');
insert into empstg values ('7698;BLAKE;MANAGER;7839;01051981; 2850; ;30');
insert into empstg values ('7782;CLARK;MANAGER;7839;09061981; 2450; ;10');
insert into empstg values ('7788;SCOTT;ANALYST;7566;09121982;3000; ;20');
insert into empstg values ('7839;KING;PRESIDENT; ;17111981;5000; ;10');
insert into empstg values ('7844;TURNER;SALESMAN;7698;08091981; 1500;0;30');
insert into empstg values ('7876;ADAMS;CLERK;7788;12011983;1100; ;20');
insert into empstg values ('7900;JAMES;CLERK;7698;03121981;  950; ;30');
insert into empstg values ('7902;FORD;ANALYST;7566;03121981; 3000; ;20');
insert into empstg values ('7934;MILLER;CLERK;7782;23011982;1300; ;10');
insert into empstg values ('2912;BAREL;DEVELOPER;7839;29122017;4000; ;50');

insert into deptstg values ('10;ACCOUNTING;NEW YORK');
insert into deptstg values ('20;RESEARCH;DALLAS');
insert into deptstg values ('30;SALES;NORTH CAROLINA');
insert into deptstg values ('40;OPERATIONS;BOSTON');
insert into deptstg values ('50;DEVELOPMENT;SAN FRANCISCO');

To process the data and merge it into the main tables we use a package. We could have used a merge statement, but this implies all the rows that are the same will get an update anyway, which results in a lot of journal-ling data which is done by triggers. Using the EMP and DEPT tables this wouldn’t be too much of a problem, but we are talking 250k+ rows each time (at least once a day).
So we want a little more control and only insert/update when it’s really necessary.

create or replace package process_stg is
  procedure dept;
  
  procedure emp;
end process_stg;
/
create or replace package body process_stg is
  failure_in_forall exception;
  pragma exception_init(failure_in_forall, -24381);
  c_limit     constant number := 10;
  procedure dept is
    cursor c_inserts is
      with import as
       (select trim(regexp_substr(line, '[^;]+', 1, 1)) deptno
              ,trim(regexp_substr(line, '[^;]+', 1, 2)) dname
              ,trim(regexp_substr(line, '[^;]+', 1, 3)) loc
          from deptstg stg)
      select i.deptno
            ,i.dname
            ,i.loc
        from import i
        left outer join dept d on (d.deptno = i.deptno)
       where 1 = 1
         and d.deptno is null;
    cursor c_updates is
      with import as
       (select trim(regexp_substr(line, '[^;]+', 1, 1)) deptno
              ,trim(regexp_substr(line, '[^;]+', 1, 2)) dname
              ,trim(regexp_substr(line, '[^;]+', 1, 3)) loc
          from deptstg stg)
      select i.deptno
            ,i.dname
            ,i.loc
        from import i
        join dept d on (d.deptno = i.deptno)
       where 1 = 1
         and (   coalesce(d.dname,'-NULL') <> coalesce(i.dname,'-NULL-')
              or coalesce(d.loc, '-NULL')  <> coalesce(i.loc, '-NULL-')
             );
    type data_t is table of c_inserts%rowtype index by pls_integer;
    l_data data_t;
  begin
    open c_inserts;
    loop
      fetch c_inserts bulk collect into l_data limit c_limit;
      if l_data.count > 0 then
        begin
          forall indx in l_data.first .. l_data.last save exceptions
            insert into dept(deptno, dname, loc)
            values (l_data(indx).deptno, l_data(indx).dname, l_data(indx).loc);
        exception
          when failure_in_forall then null;
        end;
      end if;
      exit when l_data.count < c_limit;
    end loop;
    close c_inserts;
--
    open c_updates;
    loop
      fetch c_updates bulk collect into l_data limit c_limit;
      if l_data.count > 0 then
        begin
          forall indx in l_data.first .. l_data.last save exceptions
            update dept
               set dname = l_data(indx).dname
                 , loc = l_data(indx).loc
             where 1=1
               and deptno = l_data(indx).deptno;
        exception
          when failure_in_forall then null;
        end;
      end if;
      exit when l_data.count < c_limit;
    end loop;
    close c_updates;
  end dept;

  procedure emp is
    cursor c_inserts is
      with import as
       (select trim(regexp_substr(line, '[^;]+', 1, 1)) empno   
              ,trim(regexp_substr(line, '[^;]+', 1, 2)) ename   
              ,trim(regexp_substr(line, '[^;]+', 1, 3)) job     
              ,trim(regexp_substr(line, '[^;]+', 1, 4)) mgr     
              ,trim(regexp_substr(line, '[^;]+', 1, 5)) hiredate
              ,trim(regexp_substr(line, '[^;]+', 1, 6)) sal     
              ,trim(regexp_substr(line, '[^;]+', 1, 7)) comm    
              ,trim(regexp_substr(line, '[^;]+', 1, 8)) deptno  
          from empstg stg)
      select i.empno   
            ,i.ename   
            ,i.job     
            ,i.mgr     
            ,i.hiredate
            ,i.sal     
            ,i.comm    
            ,i.deptno  
        from import i
        left outer join emp e on (e.empno = i.empno)
       where 1 = 1
         and e.empno is null;
    cursor c_updates is
      with import as
       (select trim(regexp_substr(line, '[^;]+', 1, 1)) empno   
              ,trim(regexp_substr(line, '[^;]+', 1, 2)) ename   
              ,trim(regexp_substr(line, '[^;]+', 1, 3)) job     
              ,trim(regexp_substr(line, '[^;]+', 1, 4)) mgr     
              ,trim(regexp_substr(line, '[^;]+', 1, 5)) hiredate
              ,trim(regexp_substr(line, '[^;]+', 1, 6)) sal     
              ,trim(regexp_substr(line, '[^;]+', 1, 7)) comm    
              ,trim(regexp_substr(line, '[^;]+', 1, 8)) deptno  
          from empstg stg)
      select i.empno   
            ,i.ename   
            ,i.job     
            ,i.mgr     
            ,i.hiredate
            ,i.sal     
            ,i.comm    
            ,i.deptno  
        from import i
        left outer join emp e on (e.empno = i.empno)
       where 1 = 1
         and (   coalesce(e.ename,    '-NULL') <> coalesce(i.ename,    '-NULL')
              or coalesce(e.job,      '-NULL') <> coalesce(i.job,      '-NULL')
              or          e.mgr                <>          i.mgr               
              or coalesce(to_char(e.hiredate,'DDMMYYYY'), '-NULL') <> coalesce(i.hiredate, '-NULL')
              or          e.sal                <>          i.sal               
              or          e.comm               <>          i.comm               
              or          e.deptno             <>          i.deptno            
             );
    type data_t is table of c_inserts%rowtype index by pls_integer;
    l_data data_t;
  begin
    open c_inserts;
    loop
      fetch c_inserts bulk collect into l_data limit c_limit;
      if l_data.count > 0 then
        begin
          forall indx in l_data.first .. l_data.last save exceptions
            insert into emp(empno, ename, job, mgr, hiredate, sal, comm, deptno)
            values (l_data(indx).empno, l_data(indx).ename, l_data(indx).job, l_data(indx).mgr, to_date(l_data(indx).hiredate, 'DDMMYYYY'), l_data(indx).sal, trim(l_data(indx).comm), l_data(indx).deptno);
        exception
          when failure_in_forall then null;
        end;
      end if;
      exit when l_data.count < c_limit;
    end loop;
    close c_inserts;
--
    open c_updates;
    loop
      fetch c_updates bulk collect into l_data limit c_limit;
      if l_data.count > 0 then
        begin
          forall indx in l_data.first .. l_data.last save exceptions
            update emp
              set ename       = l_data(indx).ename    
                , job         = l_data(indx).job      
                , mgr         = l_data(indx).mgr      
                , hiredate    = to_date(l_data(indx).hiredate, 'DDMMYYYY') 
                , sal         = l_data(indx).sal      
                , comm        = l_data(indx).comm     
                , deptno      = l_data(indx).deptno   
            where 1=1
              and empno       = l_data(indx).empno;
        exception
          when failure_in_forall then null;
        end;
      end if;
      exit when l_data.count < c_limit;
    end loop;
    close c_updates;
  end emp;
end process_stg;
/

As you can see, the code, especially for the cursors, is pretty much the same. Only difference is the number of columns that are generated from the semi-colon separated line.
I really don’t like to do the same thing over and over again, especially when the only difference is the number of columns and their names. But since this is what changes between the tables I think there is no way of making this generic in 12c or earlier. But then 18c came into play and they provide us with Polymorphic Table Functions.

This is what the documentation says (summary):
Polymorphic Table Functions
Polymorphic Table Functions (PTF) are user-defined functions that can be invoked in the FROM clause.
They are capable of processing tables whose row type is not declared at definition time and producing a
result table whose row type may or may not be declared at definition time. Polymorphic Table Functions
allow application developers to leverage the long-defined dynamic SQL capabilities to create powerful
and complex custom functions.

In my own words: Call a function, supplying a table and get a set of columns back. You can supply the names (and number) of columns as a parameter. Also, these columns don’t have to exist in the table, you can create them on the fly. That is exactly what I need. I have different tables with pretty much the same layout but the results I need are completely different.
So I came up with the following Polymorphic Table Function to do what I want. First there is the specification of the package. What I need is the DESCRIBE function (which is mandatory) and a procedure to fetch the rows, where I can alter the results.

create or replace package separated_ptf is
  function describe(tab  in out dbms_tf.table_t
                   ,cols in dbms_tf.columns_t default null) return dbms_tf.describe_t;

  procedure fetch_rows;
end separated_ptf;
/

Then there is the implementation of the package:

create or replace package body separated_ptf as
  g_colcount pls_integer; -- save the number of columns requested
  g_colname  varchar2(128); -- save the name of the first column
  function describe(tab  in out dbms_tf.table_t
                   ,cols in dbms_tf.columns_t default null) return dbms_tf.describe_t as
    -- metadata for column to add
    l_new_col dbms_tf.column_metadata_t;
    -- table of columns to add
    l_new_cols dbms_tf.columns_new_t; -- := DBMS_TF.COLUMNS_NEW_T();
  begin
    -- Mark the first column ReadOnly and don't display it anymore
    tab.column(1).for_read := true;
    tab.column(1).pass_through := false;
    -- Save the name of the first column for use in the fetch_rows procedure
    g_colname := tab.column(1).description.name;
    -- Save the number of columns for use in the fetch_rows procedure
    g_colcount := cols.count;
    -- Add the new columns, as specified in the cols parameter
    for indx in 1 .. cols.count loop
      -- define metadata for column named cols(indx)
      -- that will default to a datatype of varchar2 with
      -- a length of 4000
      l_new_col := dbms_tf.column_metadata_t(name => cols(indx));
      -- add the new column to the list of columns new columns
      l_new_cols(l_new_cols.count + 1) := l_new_col;
    end loop;
    -- Instead of returning NULL we will RETURN a specific
    -- DESCRIBE_T that adds new columns
    return dbms_tf.describe_t(new_columns => l_new_cols);
  end;

  procedure fetch_rows is
    -- define a table type of varchar2 tables
    type colset is table of dbms_tf.tab_varchar2_t index by pls_integer;
    -- variable to hold the rowset as retrieved
    l_rowset   dbms_tf.row_set_t;
    -- variable to hold the number of rows as retrieved
    l_rowcount pls_integer;
    -- variable to hold the new values
    l_newcolset colset;
  begin
    -- fetch rows into a local rowset
    -- at this point the rows will have columns
    -- from the the table/view/query passed in
    dbms_tf.get_row_set(l_rowset, l_rowcount);
    -- for every row in the rowset...
    for rowindx in 1 .. l_rowcount loop
      -- for every column
      for colindx in 1 .. g_colcount loop
        -- split the row into separate values
        -- splitting the regexp way: http://nuijten.blogspot.com/2009/07/splitting-comma-delimited-string-regexp.html
        l_newcolset(colindx)(rowindx) := trim(regexp_substr(json_value(dbms_tf.row_to_char(l_rowset, rowindx), '$.' || g_colname)
                                                      ,'[^;]+'
                                                      ,1
                                                      ,colindx));
      end loop; -- every column
    end loop; -- every row in the rowset
    -- add the newly populated columns to the rowset
    for indx in 1 .. g_colcount loop
      dbms_tf.put_col(columnid => indx, collection => l_newcolset(indx));
    end loop;
  end;
end separated_ptf;
/

After creating this Polymorphic Table Function we need an interface to use it in a SQL statement:

-- create a 'wrapper' function for the polymorphic table function
CREATE OR REPLACE FUNCTION separated_fnc(p_tbl IN TABLE,  
                cols columns DEFAULT NULL)
   RETURN TABLE PIPELINED
   ROW POLYMORPHIC USING separated_ptf;
/

Now, with this wrapper function in place we can start using it:

select *
  from separated_fnc(deptstg, columns(deptno, dname, loc))
/

Which is a lot easier than:

select trim(regexp_substr(line, '[^;]+', 1, 1)) deptno
      ,trim(regexp_substr(line, '[^;]+', 1, 2)) dname
      ,trim(regexp_substr(line, '[^;]+', 1, 3)) loc
  from deptstg stg
/

And similarly we can access the same code to retrieve data from the other table:

select *
  from separated_fnc(empstg, columns(empno, ename, job, mgr, hiredate, sal, comm, deptno))
/

That looks pretty much the same as the other one, but is definitely a lot simpler than

select trim(regexp_substr(line, '[^;]+', 1, 1)) empno
      ,trim(regexp_substr(line, '[^;]+', 1, 2)) ename
      ,trim(regexp_substr(line, '[^;]+', 1, 3)) job
      ,trim(regexp_substr(line, '[^;]+', 1, 4)) mgr
      ,trim(regexp_substr(line, '[^;]+', 1, 5)) hiredate
      ,trim(regexp_substr(line, '[^;]+', 1, 6)) sal
      ,trim(regexp_substr(line, '[^;]+', 1, 7)) comm
      ,trim(regexp_substr(line, '[^;]+', 1, 8)) deptno
  from empstg stg
/

The new implementation of my package is now like this:

create or replace package body process_stg is
  failure_in_forall exception;
  pragma exception_init(failure_in_forall, -24381);
  c_limit     constant number := 10;
  procedure dept is
    cursor c_inserts is
      with import as
       (select *
          from separated_fnc(deptstg, columns(deptno, dname, loc))
       )
      select i.deptno
            ,i.dname
            ,i.loc
        from import i
        left outer join dept d on (d.deptno = i.deptno)
       where 1 = 1
         and d.deptno is null;
    cursor c_updates is
      with import as
       (select *
          from separated_fnc(deptstg, columns(deptno, dname, loc))
       )
      select i.deptno
            ,i.dname
            ,i.loc
        from import i
        join dept d on (d.deptno = i.deptno)
       where 1 = 1
         and (   coalesce(d.dname,'-NULL') <> coalesce(i.dname,'-NULL-')
              or coalesce(d.loc, '-NULL')  <> coalesce(i.loc, '-NULL-')
             );
    type data_t is table of c_inserts%rowtype index by pls_integer;
    l_data data_t;
  begin
    open c_inserts;
    loop
      fetch c_inserts bulk collect into l_data limit c_limit;
      if l_data.count > 0 then
        begin
          forall indx in l_data.first .. l_data.last save exceptions
            insert into dept(deptno, dname, loc)
            values (l_data(indx).deptno, l_data(indx).dname, l_data(indx).loc);
        exception
          when failure_in_forall then null;
        end;
      end if;
      exit when l_data.count < c_limit;
    end loop;
    close c_inserts;
--
    open c_updates;
    loop
      fetch c_updates bulk collect into l_data limit c_limit;
      if l_data.count > 0 then
        begin
          forall indx in l_data.first .. l_data.last save exceptions
            update dept
               set dname = l_data(indx).dname
                 , loc = l_data(indx).loc
             where 1=1
               and deptno = l_data(indx).deptno;
        exception
          when failure_in_forall then null;
        end;
      end if;
      exit when l_data.count < c_limit;
    end loop;
    close c_updates;
  end dept;

  procedure emp is
    cursor c_inserts is
      with import as
       (select *
          from separated_fnc(empstg, columns(empno,ename,job,mgr,hiredate,sal,comm,deptno))
       )
      select i.empno   
            ,i.ename   
            ,i.job     
            ,i.mgr     
            ,i.hiredate
            ,i.sal     
            ,i.comm    
            ,i.deptno  
        from import i
        left outer join emp e on (e.empno = i.empno)
       where 1 = 1
         and e.empno is null;
    cursor c_updates is
      with import as
       (select *
          from separated_fnc(empstg, columns(empno,ename,job,mgr,hiredate,sal,comm,deptno))
       )
      select i.empno   
            ,i.ename   
            ,i.job     
            ,i.mgr     
            ,i.hiredate
            ,i.sal     
            ,i.comm    
            ,i.deptno  
        from import i
        left outer join emp e on (e.empno = i.empno)
       where 1 = 1
         and (   coalesce(e.ename,    '-NULL') <> coalesce(i.ename,    '-NULL')
              or coalesce(e.job,      '-NULL') <> coalesce(i.job,      '-NULL')
              or          e.mgr                <>          i.mgr               
              or coalesce(to_char(e.hiredate,'DDMMYYYY'), '-NULL') <> coalesce(i.hiredate, '-NULL')
              or          e.sal                <>          i.sal               
              or          e.comm               <>          i.comm               
              or          e.deptno             <>          i.deptno            
             );
    type data_t is table of c_inserts%rowtype index by pls_integer;
    l_data data_t;
  begin
    open c_inserts;
    loop
      fetch c_inserts bulk collect into l_data limit c_limit;
      if l_data.count > 0 then
        begin
          forall indx in l_data.first .. l_data.last save exceptions
            insert into emp(empno, ename, job, mgr, hiredate, sal, comm, deptno)
            values (l_data(indx).empno, l_data(indx).ename, l_data(indx).job, l_data(indx).mgr, to_date(l_data(indx).hiredate, 'DDMMYYYY'), l_data(indx).sal, trim(l_data(indx).comm), l_data(indx).deptno);
        exception
          when failure_in_forall then 
            dbms_output.put_line(q'[error]');
           dbms_output.put_line(sqlcode);
           dbms_output.put_line(sqlerrm);
            for indx in 1 .. sql%bulk_exceptions.count loop
              dbms_output.put_line('Error ' || indx || ' occurred on index ' || sql%bulk_exceptions(indx).error_index);
              dbms_output.put_line('Oracle error is ' ||
                                sqlerrm(-1 * sql%bulk_exceptions(indx).error_code));
            end loop;
            null;
        end;
      end if;
      exit when l_data.count < c_limit;
    end loop;
    close c_inserts;
--
    open c_updates;
    loop
      fetch c_updates bulk collect into l_data limit c_limit;
      if l_data.count > 0 then
        begin
          forall indx in l_data.first .. l_data.last save exceptions
            update emp
              set ename       = l_data(indx).ename    
                , job         = l_data(indx).job      
                , mgr         = l_data(indx).mgr      
                , hiredate    = to_date(l_data(indx).hiredate, 'DDMMYYYY')
                , sal         = l_data(indx).sal      
                , comm        = l_data(indx).comm     
                , deptno      = l_data(indx).deptno   
            where 1=1
              and empno       = l_data(indx).empno;
        exception
          when failure_in_forall then null;
        end;
      end if;
      exit when l_data.count < c_limit;
    end loop;
    close c_updates;
  end emp;
end process_stg;
/

There is absolutely some improvement possible to the current implementation, like supporting duplicate separators, making the column to be split up a parameter, making the separator character a parameter as well, but that is a nice project for a later time.

I hope it all makes a bit of sense. If you have any improvements, don’t hesitate to comment.