Bookmark and Share

Tuesday, April 21, 2009

User-Defined DML LCRs

One of the great features that comes with Oracle Streams is the ability you have to use the APIs to extend the framework and build almost anything you have in mind. Streams provides an easy way to share data across databases and with applications. In this post, I'll be providing an example of PL/SQL code that publishes DML LCRs in a queue; An Apply Handler will execute the content of those LCRs.

Test Schema

For the purpose of this demonstration, you can build one table T1 in the schema of your choice. Below is the script that creates that table and displays its content:
create table T1
(id number,
text varchar2(10),
constraint T1_PK primary key(id));

insert into T1 values (1, 'Text 1');

commit;

col id format 99
col text format a6

select id, text
from T1;

ID TEXT
-- ------
1 Text 1

Streams Queue and Apply

I use the same schema for the table, the queue and the apply process. To speed up the setup, I've granted DBA to the schema owner; There are 3 points worth to mention:
  • I've set apply_capture parameter to false to make sure the apply process dequeues persistent messages
  • I did not add any rule set to the apply so that it dequeues every message in the queue
  • There is no need to define an instantiation SCN or to setup a SCN in the LCR
begin
dbms_streams_adm.set_up_queue(
queue_table => 'custom_queue_table',
queue_name => 'custom_queue');
end;
/
declare
v_name varchar2(256);
begin
select value into v_name
from v$parameter
where name='db_unique_name';
dbms_apply_adm.create_apply(
queue_name => 'custom_queue',
apply_name => 'custom_apply',
apply_captured => false,
source_database => v_name );
end;
/

exec dbms_apply_adm.start_apply('CUSTOM_APPLY');

Create and Enqueue DML LCRs

Once everything ready you can build the LCRs and enqueue them in the queue so that the DELETE, UPDATE or INSERT can be consumed and executed by the apply processes; You'll find below 3 example of such user defined DML LCR
  • User Defined DELETE LCR
declare
v_name varchar2(256);
lcr sys.lcr$_row_record;
v_oldlist sys.lcr$_row_list;
v_oldid sys.lcr$_row_unit;
v_oldtxt sys.lcr$_row_unit;
enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
enq_msgid RAW(16);
begin
-- Get DB Name
select value into v_name
from v$parameter
where name='db_unique_name';

mprop.SENDER_ID := SYS.AQ$_AGENT(user, null, null);

v_oldid:=sys.lcr$_row_unit(
'ID',
anydata.convertnumber(1),
DBMS_LCR.NOT_A_LOB,
null,
null);
v_oldtxt:=sys.lcr$_row_unit(
'TEXT',
anydata.convertvarchar2('Text 1'),
DBMS_LCR.NOT_A_LOB,
null,
null);
v_oldlist:=sys.lcr$_row_list(v_oldid,v_oldtxt);
lcr:=sys.lcr$_row_record.construct(
source_database_name=>v_name,
command_type => 'DELETE',
object_owner=> user,
object_name => 'T1',
tag => null,
transaction_id => null,
scn => null,
old_values => v_oldlist);
DBMS_AQ.ENQUEUE(
queue_name => 'custom_queue',
enqueue_options => enqopt,
message_properties => mprop,
payload => anydata.ConvertObject(lcr),
msgid => enq_msgid);

end;
/
commit;

select *
from t1;

no rows selected
  • User Defined INSERT LCR
declare
v_name varchar2(256);
lcr sys.lcr$_row_record;
v_newlist sys.lcr$_row_list;
v_newid sys.lcr$_row_unit;
v_newtxt sys.lcr$_row_unit;
enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
enq_msgid RAW(16);
begin
-- Get DB Name
select value into v_name
from v$parameter
where name='db_unique_name';

mprop.SENDER_ID := SYS.AQ$_AGENT(user, null, null);

v_newid:=sys.lcr$_row_unit(
'ID',
anydata.convertnumber(1),
DBMS_LCR.NOT_A_LOB,
null,
null);
v_newtxt:=sys.lcr$_row_unit(
'TEXT',
anydata.convertvarchar2('Text 9'),
DBMS_LCR.NOT_A_LOB,
null,
null);
v_newlist:=sys.lcr$_row_list(v_newid,v_newtxt);
lcr:=sys.lcr$_row_record.construct(
source_database_name=>v_name,
command_type => 'INSERT',
object_owner=> user,
object_name => 'T1',
tag => null,
transaction_id => null,
scn => null,
new_values => v_newlist);
DBMS_AQ.ENQUEUE(
queue_name => 'custom_queue',
enqueue_options => enqopt,
message_properties => mprop,
payload => anydata.ConvertObject(lcr),
msgid => enq_msgid);

end;
/
commit;

select *
from t1;

ID TEXT
-- ------
1 Text 9
  • User Defined UPDATE LCR
declare
v_name varchar2(256);
lcr sys.lcr$_row_record;
v_oldlist sys.lcr$_row_list;
v_oldid sys.lcr$_row_unit;
v_oldtxt sys.lcr$_row_unit;
v_newlist sys.lcr$_row_list;
v_newid sys.lcr$_row_unit;
v_newtxt sys.lcr$_row_unit;
enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
enq_msgid RAW(16);
begin
-- Get DB Name
select value into v_name
from v$parameter
where name='db_unique_name';

mprop.SENDER_ID := SYS.AQ$_AGENT(user, null, null);

v_oldid:=sys.lcr$_row_unit(
'ID',
anydata.convertnumber(1),
DBMS_LCR.NOT_A_LOB,
null,
null);
v_newid:=sys.lcr$_row_unit(
'ID',
anydata.convertnumber(1),
DBMS_LCR.NOT_A_LOB,
null,
null);
v_oldtxt:=sys.lcr$_row_unit(
'TEXT',
anydata.convertvarchar2('Text 9'),
DBMS_LCR.NOT_A_LOB,
null,
null);
v_newtxt:=sys.lcr$_row_unit(
'TEXT',
anydata.convertvarchar2('Text 1'),
DBMS_LCR.NOT_A_LOB,
null,
null);
v_oldlist:=sys.lcr$_row_list(v_oldid,v_oldtxt);
v_newlist:=sys.lcr$_row_list(v_newid,v_newtxt);
lcr:=sys.lcr$_row_record.construct(
source_database_name=>v_name,
command_type => 'UPDATE',
object_owner=> user,
object_name => 'T1',
tag => null,
transaction_id => null,
scn => null,
old_values => v_oldlist,
new_values => v_newlist);
DBMS_AQ.ENQUEUE(
queue_name => 'custom_queue',
enqueue_options => enqopt,
message_properties => mprop,
payload => anydata.ConvertObject(lcr),
msgid => enq_msgid);

end;
/
commit;

select *
from t1;

ID TEXT
-- ------
1 Text 1

Conclusion

As you can see, creating and enqueuing DML LCRs is very simple.

I'm hiting an issue with DDL LCRs on top of 11.1.0.7/Linux x86 and, for now, the only way I manage to create one is by using dbms_streams.convert_xml_to_lcr. I'll write a new post as soon as my issue is fixed.

2 comments:

  1. For user defined DDL LCRs, I'm hitting what seems to be (still not published) bug 8338362. I've requested Oracle to publish that bug so that we know when the fix will be available.

    ReplyDelete
  2. After investigation, it appears the issue with the User-Defined DDL LCR doesn't appear with WE8* and MSWIN * character sets. You can follow up on 8464470 (to be confirmed based bug 8338362) if you are using AL32UTF8 or another character set with the same issue.

    ReplyDelete