# Polymorphic Table Functions

I have been working on a presentation on Polymorphic Table Functions. During this time I was looking for a real use case for Polymorphic Table Functions. I came up with an example which is not very useful in real life, but very useful to explain the technique.
At my current job I came across a piece of code that I had to copy and adjust to fit the needs for that specific case. The idea was always the same, I get a table with semi-colon separated values in one column that have to be split into the correct number of columns before checking the data to the current data in a specific table.
I thought: ‘Maybe I can solve this copy-paste-adjust process by using a Polymorphic Table Function.’

Let’s first set the current scene.
We have two tables. The well known EMP and DEPT tables.

```create table emp
(empno    number(4) not null
,ename    varchar2(10)
,job      varchar2(9)
,mgr      number(4)
,hiredate date
,sal      number(7, 2)
,comm     number(7, 2)
,deptno   number(2)
)
/
create table dept
(deptno number(2)
,dname  varchar2(14)
,loc    varchar2(14)
)
/
```

And we add the well known data:

```insert into emp values (7369, 'SMITH',  'CLERK',     7902, to_date('17-DEC-1980', 'DD-MON-YYYY'),  800, null, 20);
insert into emp values (7499, 'ALLEN',  'SALESMAN',  7698, to_date('20-FEB-1981', 'DD-MON-YYYY'), 1600,  300, 30);
insert into emp values (7521, 'WARD',   'SALESMAN',  7698, to_date('22-FEB-1981', 'DD-MON-YYYY'), 1250,  500, 30);
insert into emp values (7566, 'JONES',  'MANAGER',   7839, to_date('2-APR-1981', 'DD-MON-YYYY'),  2975, null, 20);
insert into emp values (7654, 'MARTIN', 'SALESMAN',  7698, to_date('28-SEP-1981', 'DD-MON-YYYY'), 1250, 1400, 30);
insert into emp values (7698, 'BLAKE',  'MANAGER',   7839, to_date('1-MAY-1981', 'DD-MON-YYYY'),  2850, null, 30);
insert into emp values (7782, 'CLARK',  'MANAGER',   7839, to_date('9-JUN-1981', 'DD-MON-YYYY'),  2450, null, 10);
insert into emp values (7788, 'SCOTT',  'ANALYST',   7566, to_date('09-DEC-1982', 'DD-MON-YYYY'), 3000, null, 20);
insert into emp values (7839, 'KING',   'PRESIDENT', null, to_date('17-NOV-1981', 'DD-MON-YYYY'), 5000, null, 10);
insert into emp values (7844, 'TURNER', 'SALESMAN',  7698, to_date('8-SEP-1981', 'DD-MON-YYYY'),  1500,    0, 30);
insert into emp values (7876, 'ADAMS',  'CLERK',     7788, to_date('12-JAN-1983', 'DD-MON-YYYY'), 1100, null, 20);
insert into emp values (7900, 'JAMES',  'CLERK',     7698, to_date('3-DEC-1981', 'DD-MON-YYYY'),   950, null, 30);
insert into emp values (7902, 'FORD',   'ANALYST',   7566, to_date('3-DEC-1981', 'DD-MON-YYYY'),  3000, null, 20);
insert into emp values (7934, 'MILLER', 'CLERK',     7782, to_date('23-JAN-1982', 'DD-MON-YYYY'), 1300, null, 10);
insert into dept values (10, 'ACCOUNTING', 'NEW YORK');
insert into dept values (20, 'RESEARCH',   'DALLAS');
insert into dept values (30, 'SALES',      'CHICAGO');
insert into dept values (40, 'OPERATIONS', 'BOSTON');
```

We get data from a different system, which is in a semi-colon separated format. So we load that in a couple of staging tables:

```create table empstg
(line varchar2(4000)
)
/
create table deptstg
(line varchar2(4000)
)
/```

And then we add some data:

```insert into empstg values ('7369;SMITH;CLERK;7902;17121980;800; ;20');
insert into empstg values ('7499;ALLEN;SALESMAN;7698;20021981;1600; 300;30');
insert into empstg values ('7521;WARD;SALESMAN;7698;22021981;1250; 500;30');
insert into empstg values ('7566;JONES;MANAGER;7839;02041981; 2975; ;20');
insert into empstg values ('7654;MARTIN;SALESMAN;7698;28091981;1250;1400;30');
insert into empstg values ('7698;BLAKE;MANAGER;7839;01051981; 2850; ;30');
insert into empstg values ('7782;CLARK;MANAGER;7839;09061981; 2450; ;10');
insert into empstg values ('7788;SCOTT;ANALYST;7566;09121982;3000; ;20');
insert into empstg values ('7839;KING;PRESIDENT; ;17111981;5000; ;10');
insert into empstg values ('7844;TURNER;SALESMAN;7698;08091981; 1500;0;30');
insert into empstg values ('7876;ADAMS;CLERK;7788;12011983;1100; ;20');
insert into empstg values ('7900;JAMES;CLERK;7698;03121981;  950; ;30');
insert into empstg values ('7902;FORD;ANALYST;7566;03121981; 3000; ;20');
insert into empstg values ('7934;MILLER;CLERK;7782;23011982;1300; ;10');
insert into empstg values ('2912;BAREL;DEVELOPER;7839;29122017;4000; ;50');

insert into deptstg values ('10;ACCOUNTING;NEW YORK');
insert into deptstg values ('20;RESEARCH;DALLAS');
insert into deptstg values ('30;SALES;NORTH CAROLINA');
insert into deptstg values ('40;OPERATIONS;BOSTON');
insert into deptstg values ('50;DEVELOPMENT;SAN FRANCISCO');```

To process the data and merge it into the main tables we use a package. We could have used a merge statement, but this implies all the rows that are the same will get an update anyway, which results in a lot of journal-ling data which is done by triggers. Using the EMP and DEPT tables this wouldn’t be too much of a problem, but we are talking 250k+ rows each time (at least once a day).
So we want a little more control and only insert/update when it’s really necessary.

```create or replace package process_stg is
procedure dept;

procedure emp;
end process_stg;
/
create or replace package body process_stg is
failure_in_forall exception;
pragma exception_init(failure_in_forall, -24381);
c_limit     constant number := 10;
procedure dept is
cursor c_inserts is
with import as
(select trim(regexp_substr(line, '[^;]+', 1, 1)) deptno
,trim(regexp_substr(line, '[^;]+', 1, 2)) dname
,trim(regexp_substr(line, '[^;]+', 1, 3)) loc
from deptstg stg)
select i.deptno
,i.dname
,i.loc
from import i
left outer join dept d on (d.deptno = i.deptno)
where 1 = 1
and d.deptno is null;
with import as
(select trim(regexp_substr(line, '[^;]+', 1, 1)) deptno
,trim(regexp_substr(line, '[^;]+', 1, 2)) dname
,trim(regexp_substr(line, '[^;]+', 1, 3)) loc
from deptstg stg)
select i.deptno
,i.dname
,i.loc
from import i
join dept d on (d.deptno = i.deptno)
where 1 = 1
and (   coalesce(d.dname,'-NULL') <> coalesce(i.dname,'-NULL-')
or coalesce(d.loc, '-NULL')  <> coalesce(i.loc, '-NULL-')
);
type data_t is table of c_inserts%rowtype index by pls_integer;
l_data data_t;
begin
open c_inserts;
loop
fetch c_inserts bulk collect into l_data limit c_limit;
if l_data.count > 0 then
begin
forall indx in l_data.first .. l_data.last save exceptions
insert into dept(deptno, dname, loc)
values (l_data(indx).deptno, l_data(indx).dname, l_data(indx).loc);
exception
when failure_in_forall then null;
end;
end if;
exit when l_data.count < c_limit;
end loop;
close c_inserts;
--
loop
fetch c_updates bulk collect into l_data limit c_limit;
if l_data.count > 0 then
begin
forall indx in l_data.first .. l_data.last save exceptions
update dept
set dname = l_data(indx).dname
, loc = l_data(indx).loc
where 1=1
and deptno = l_data(indx).deptno;
exception
when failure_in_forall then null;
end;
end if;
exit when l_data.count < c_limit;
end loop;
end dept;

procedure emp is
cursor c_inserts is
with import as
(select trim(regexp_substr(line, '[^;]+', 1, 1)) empno
,trim(regexp_substr(line, '[^;]+', 1, 2)) ename
,trim(regexp_substr(line, '[^;]+', 1, 3)) job
,trim(regexp_substr(line, '[^;]+', 1, 4)) mgr
,trim(regexp_substr(line, '[^;]+', 1, 5)) hiredate
,trim(regexp_substr(line, '[^;]+', 1, 6)) sal
,trim(regexp_substr(line, '[^;]+', 1, 7)) comm
,trim(regexp_substr(line, '[^;]+', 1, 8)) deptno
from empstg stg)
select i.empno
,i.ename
,i.job
,i.mgr
,i.hiredate
,i.sal
,i.comm
,i.deptno
from import i
left outer join emp e on (e.empno = i.empno)
where 1 = 1
and e.empno is null;
with import as
(select trim(regexp_substr(line, '[^;]+', 1, 1)) empno
,trim(regexp_substr(line, '[^;]+', 1, 2)) ename
,trim(regexp_substr(line, '[^;]+', 1, 3)) job
,trim(regexp_substr(line, '[^;]+', 1, 4)) mgr
,trim(regexp_substr(line, '[^;]+', 1, 5)) hiredate
,trim(regexp_substr(line, '[^;]+', 1, 6)) sal
,trim(regexp_substr(line, '[^;]+', 1, 7)) comm
,trim(regexp_substr(line, '[^;]+', 1, 8)) deptno
from empstg stg)
select i.empno
,i.ename
,i.job
,i.mgr
,i.hiredate
,i.sal
,i.comm
,i.deptno
from import i
left outer join emp e on (e.empno = i.empno)
where 1 = 1
and (   coalesce(e.ename,    '-NULL') <> coalesce(i.ename,    '-NULL')
or coalesce(e.job,      '-NULL') <> coalesce(i.job,      '-NULL')
or          e.mgr                <>          i.mgr
or coalesce(to_char(e.hiredate,'DDMMYYYY'), '-NULL') <> coalesce(i.hiredate, '-NULL')
or          e.sal                <>          i.sal
or          e.comm               <>          i.comm
or          e.deptno             <>          i.deptno
);
type data_t is table of c_inserts%rowtype index by pls_integer;
l_data data_t;
begin
open c_inserts;
loop
fetch c_inserts bulk collect into l_data limit c_limit;
if l_data.count > 0 then
begin
forall indx in l_data.first .. l_data.last save exceptions
insert into emp(empno, ename, job, mgr, hiredate, sal, comm, deptno)
values (l_data(indx).empno, l_data(indx).ename, l_data(indx).job, l_data(indx).mgr, to_date(l_data(indx).hiredate, 'DDMMYYYY'), l_data(indx).sal, trim(l_data(indx).comm), l_data(indx).deptno);
exception
when failure_in_forall then null;
end;
end if;
exit when l_data.count < c_limit;
end loop;
close c_inserts;
--
loop
fetch c_updates bulk collect into l_data limit c_limit;
if l_data.count > 0 then
begin
forall indx in l_data.first .. l_data.last save exceptions
update emp
set ename       = l_data(indx).ename
, job         = l_data(indx).job
, mgr         = l_data(indx).mgr
, hiredate    = to_date(l_data(indx).hiredate, 'DDMMYYYY')
, sal         = l_data(indx).sal
, comm        = l_data(indx).comm
, deptno      = l_data(indx).deptno
where 1=1
and empno       = l_data(indx).empno;
exception
when failure_in_forall then null;
end;
end if;
exit when l_data.count < c_limit;
end loop;
end emp;
end process_stg;
/```

As you can see, the code, especially for the cursors, is pretty much the same. Only difference is the number of columns that are generated from the semi-colon separated line.
I really don’t like to do the same thing over and over again, especially when the only difference is the number of columns and their names. But since this is what changes between the tables I think there is no way of making this generic in 12c or earlier. But then 18c came into play and they provide us with Polymorphic Table Functions.

This is what the documentation says (summary):
Polymorphic Table Functions
Polymorphic Table Functions (PTF) are user-defined functions that can be invoked in the FROM clause.
They are capable of processing tables whose row type is not declared at definition time and producing a
result table whose row type may or may not be declared at definition time. Polymorphic Table Functions
allow application developers to leverage the long-defined dynamic SQL capabilities to create powerful
and complex custom functions.

In my own words: Call a function, supplying a table and get a set of columns back. You can supply the names (and number) of columns as a parameter. Also, these columns don’t have to exist in the table, you can create them on the fly. That is exactly what I need. I have different tables with pretty much the same layout but the results I need are completely different.
So I came up with the following Polymorphic Table Function to do what I want. First there is the specification of the package. What I need is the DESCRIBE function (which is mandatory) and a procedure to fetch the rows, where I can alter the results.

```create or replace package separated_ptf is
function describe(tab  in out dbms_tf.table_t
,cols in dbms_tf.columns_t default null) return dbms_tf.describe_t;

procedure fetch_rows;
end separated_ptf;
/```

Then there is the implementation of the package:

```create or replace package body separated_ptf as
g_colcount pls_integer; -- save the number of columns requested
g_colname  varchar2(128); -- save the name of the first column
function describe(tab  in out dbms_tf.table_t
,cols in dbms_tf.columns_t default null) return dbms_tf.describe_t as
-- table of columns to add
l_new_cols dbms_tf.columns_new_t; -- := DBMS_TF.COLUMNS_NEW_T();
begin
-- Mark the first column ReadOnly and don't display it anymore
tab.column(1).pass_through := false;
-- Save the name of the first column for use in the fetch_rows procedure
g_colname := tab.column(1).description.name;
-- Save the number of columns for use in the fetch_rows procedure
g_colcount := cols.count;
-- Add the new columns, as specified in the cols parameter
for indx in 1 .. cols.count loop
-- define metadata for column named cols(indx)
-- that will default to a datatype of varchar2 with
-- a length of 4000
-- add the new column to the list of columns new columns
l_new_cols(l_new_cols.count + 1) := l_new_col;
end loop;
-- Instead of returning NULL we will RETURN a specific
-- DESCRIBE_T that adds new columns
return dbms_tf.describe_t(new_columns => l_new_cols);
end;

procedure fetch_rows is
-- define a table type of varchar2 tables
type colset is table of dbms_tf.tab_varchar2_t index by pls_integer;
-- variable to hold the rowset as retrieved
l_rowset   dbms_tf.row_set_t;
-- variable to hold the number of rows as retrieved
l_rowcount pls_integer;
-- variable to hold the new values
l_newcolset colset;
begin
-- fetch rows into a local rowset
-- at this point the rows will have columns
-- from the the table/view/query passed in
dbms_tf.get_row_set(l_rowset, l_rowcount);
-- for every row in the rowset...
for rowindx in 1 .. l_rowcount loop
-- for every column
for colindx in 1 .. g_colcount loop
-- split the row into separate values
-- splitting the regexp way: http://nuijten.blogspot.com/2009/07/splitting-comma-delimited-string-regexp.html
l_newcolset(colindx)(rowindx) := trim(regexp_substr(json_value(dbms_tf.row_to_char(l_rowset, rowindx), '\$.' || g_colname)
,'[^;]+'
,1
,colindx));
end loop; -- every column
end loop; -- every row in the rowset
-- add the newly populated columns to the rowset
for indx in 1 .. g_colcount loop
dbms_tf.put_col(columnid => indx, collection => l_newcolset(indx));
end loop;
end;
end separated_ptf;
/```

After creating this Polymorphic Table Function we need an interface to use it in a SQL statement:

```-- create a 'wrapper' function for the polymorphic table function
CREATE OR REPLACE FUNCTION separated_fnc(p_tbl IN TABLE,
cols columns DEFAULT NULL)
RETURN TABLE PIPELINED
ROW POLYMORPHIC USING separated_ptf;
/```

Now, with this wrapper function in place we can start using it:

```select *
from separated_fnc(deptstg, columns(deptno, dname, loc))
/```

Which is a lot easier than:

```select trim(regexp_substr(line, '[^;]+', 1, 1)) deptno
,trim(regexp_substr(line, '[^;]+', 1, 2)) dname
,trim(regexp_substr(line, '[^;]+', 1, 3)) loc
from deptstg stg
/```

And similarly we can access the same code to retrieve data from the other table:

```select *
from separated_fnc(empstg, columns(empno, ename, job, mgr, hiredate, sal, comm, deptno))
/```

That looks pretty much the same as the other one, but is definitely a lot simpler than

```select trim(regexp_substr(line, '[^;]+', 1, 1)) empno
,trim(regexp_substr(line, '[^;]+', 1, 2)) ename
,trim(regexp_substr(line, '[^;]+', 1, 3)) job
,trim(regexp_substr(line, '[^;]+', 1, 4)) mgr
,trim(regexp_substr(line, '[^;]+', 1, 5)) hiredate
,trim(regexp_substr(line, '[^;]+', 1, 6)) sal
,trim(regexp_substr(line, '[^;]+', 1, 7)) comm
,trim(regexp_substr(line, '[^;]+', 1, 8)) deptno
from empstg stg
/```

The new implementation of my package is now like this:

```create or replace package body process_stg is
failure_in_forall exception;
pragma exception_init(failure_in_forall, -24381);
c_limit     constant number := 10;
procedure dept is
cursor c_inserts is
with import as
(select *
from separated_fnc(deptstg, columns(deptno, dname, loc))
)
select i.deptno
,i.dname
,i.loc
from import i
left outer join dept d on (d.deptno = i.deptno)
where 1 = 1
and d.deptno is null;
with import as
(select *
from separated_fnc(deptstg, columns(deptno, dname, loc))
)
select i.deptno
,i.dname
,i.loc
from import i
join dept d on (d.deptno = i.deptno)
where 1 = 1
and (   coalesce(d.dname,'-NULL') <> coalesce(i.dname,'-NULL-')
or coalesce(d.loc, '-NULL')  <> coalesce(i.loc, '-NULL-')
);
type data_t is table of c_inserts%rowtype index by pls_integer;
l_data data_t;
begin
open c_inserts;
loop
fetch c_inserts bulk collect into l_data limit c_limit;
if l_data.count > 0 then
begin
forall indx in l_data.first .. l_data.last save exceptions
insert into dept(deptno, dname, loc)
values (l_data(indx).deptno, l_data(indx).dname, l_data(indx).loc);
exception
when failure_in_forall then null;
end;
end if;
exit when l_data.count < c_limit;
end loop;
close c_inserts;
--
loop
fetch c_updates bulk collect into l_data limit c_limit;
if l_data.count > 0 then
begin
forall indx in l_data.first .. l_data.last save exceptions
update dept
set dname = l_data(indx).dname
, loc = l_data(indx).loc
where 1=1
and deptno = l_data(indx).deptno;
exception
when failure_in_forall then null;
end;
end if;
exit when l_data.count < c_limit;
end loop;
end dept;

procedure emp is
cursor c_inserts is
with import as
(select *
from separated_fnc(empstg, columns(empno,ename,job,mgr,hiredate,sal,comm,deptno))
)
select i.empno
,i.ename
,i.job
,i.mgr
,i.hiredate
,i.sal
,i.comm
,i.deptno
from import i
left outer join emp e on (e.empno = i.empno)
where 1 = 1
and e.empno is null;
with import as
(select *
from separated_fnc(empstg, columns(empno,ename,job,mgr,hiredate,sal,comm,deptno))
)
select i.empno
,i.ename
,i.job
,i.mgr
,i.hiredate
,i.sal
,i.comm
,i.deptno
from import i
left outer join emp e on (e.empno = i.empno)
where 1 = 1
and (   coalesce(e.ename,    '-NULL') <> coalesce(i.ename,    '-NULL')
or coalesce(e.job,      '-NULL') <> coalesce(i.job,      '-NULL')
or          e.mgr                <>          i.mgr
or coalesce(to_char(e.hiredate,'DDMMYYYY'), '-NULL') <> coalesce(i.hiredate, '-NULL')
or          e.sal                <>          i.sal
or          e.comm               <>          i.comm
or          e.deptno             <>          i.deptno
);
type data_t is table of c_inserts%rowtype index by pls_integer;
l_data data_t;
begin
open c_inserts;
loop
fetch c_inserts bulk collect into l_data limit c_limit;
if l_data.count > 0 then
begin
forall indx in l_data.first .. l_data.last save exceptions
insert into emp(empno, ename, job, mgr, hiredate, sal, comm, deptno)
values (l_data(indx).empno, l_data(indx).ename, l_data(indx).job, l_data(indx).mgr, to_date(l_data(indx).hiredate, 'DDMMYYYY'), l_data(indx).sal, trim(l_data(indx).comm), l_data(indx).deptno);
exception
when failure_in_forall then
dbms_output.put_line(q'[error]');
dbms_output.put_line(sqlcode);
dbms_output.put_line(sqlerrm);
for indx in 1 .. sql%bulk_exceptions.count loop
dbms_output.put_line('Error ' || indx || ' occurred on index ' || sql%bulk_exceptions(indx).error_index);
dbms_output.put_line('Oracle error is ' ||
sqlerrm(-1 * sql%bulk_exceptions(indx).error_code));
end loop;
null;
end;
end if;
exit when l_data.count < c_limit;
end loop;
close c_inserts;
--
loop
fetch c_updates bulk collect into l_data limit c_limit;
if l_data.count > 0 then
begin
forall indx in l_data.first .. l_data.last save exceptions
update emp
set ename       = l_data(indx).ename
, job         = l_data(indx).job
, mgr         = l_data(indx).mgr
, hiredate    = to_date(l_data(indx).hiredate, 'DDMMYYYY')
, sal         = l_data(indx).sal
, comm        = l_data(indx).comm
, deptno      = l_data(indx).deptno
where 1=1
and empno       = l_data(indx).empno;
exception
when failure_in_forall then null;
end;
end if;
exit when l_data.count < c_limit;
end loop;