Bookmark and Share

Friday, December 18, 2009

Oracle 11gR2, GoldenGate and Koala Jump Start

It was some time ago now, but Oracle GoldenGate statement of direction made me sad. Sometimes, I still wish the person who wrote it is somehow as far away as possible from the database server development team. But I know that's not the case. The sentence that is killing me for sure is:
"Given the strategic nature of Oracle GoldenGate, Oracle Streams will continue to be supported, but will not be actively enhanced. Rather, the best elements of Oracle Streams will be evaluated for inclusion with Oracle GoldenGate."

Some Background

Even today, a few weeks after I've read GoldenGate documentation, tested the product and I've listed GoldenGate benefits for myself (openness, simplicity, completeness, ...). I'm still getting some trouble swallowing I will miss some of the unique features of Streams.

Not that I'm intolerant or I'm stuck to the "good old time". I mean, I know there are bad and good things about Streams. During the past years, I've spent quite some energy(*) on leveraging what is so unique in it and I believe most of its advantages come from its "built-inside" architecture. To a point where it could be very difficult for Oracle to provide Streams strengths to GoldenGate because their architectures are so different.

You may wonder what I am talking about. Well, I'm talking about:
  • Streams in-memory end-to-end propagation that provides an extremely scalable framework without the need to log changes in another separate file (trail)
  • The reliability of the staging (the database) and propagation (buffered queues).
  • Streams lighweight footprint that relies (most of the time) on internal object ids, can easily be offloaded to a separate server without any database copy and doesn't access original data at all
  • The ability Streams has to treat set of changes without "deferring" the constraints
  • The flexibility of the framework that is running inside the database with hooks at every stage you can completely personalize and extend to your needs
  • The openness for developers that can directly use the APIs to provide advanced features to their applications by leveraging their Oracle knowledge
  • The omnipotence and ubiquity of the solution that is already installed with Oracle 11g, work with RAC no change, is extended by the Change Data Capture feature and easy to share with the outside with AQ or XStream.
  • The maturity, ease and simplicity it has gained over the years of hard work by the development team and you can leverage by a simple SQL script
  • The cost model that comes for free with Oracle Database Enterprise Edition and with a limited set of features with 11g Standard Edition
At some point, I was hoping XStream would be the missing link between Streams and GoldenGate so that GoldenGate could leverage the power of Oracle Database 11g. That would for sure have been my preferred scenario and the fact XStream is licenced with GoldenGate made me hope for a while. But the reading of the statement of direction suggests a very different end.

Now Oracle GoldenGate is very likely to, if not succeed, do a better job than Streams to bring real-time data integration to the mass ;-). Nevertheless, I have no clue how Oracle could, in a timely manner, provide Streams strengths to it. Interesting challenge for the product managers and development team... The future will tell us!

Koala jump start

As you can guess, time has come for all of us to learn more about Oracle GoldenGate. Since my laptop runs Karmic Koala, I've given it a try on Ubuntu 9.10. It's not supported but did not face any issue so far. I've downloaded and installed GoldenGate from Oracle E-Delivery and I've choosen the following distribution:
  • Select a product Pack: "Oracle Fusion Middleware"
  • Platform: "Linux x86"
  • Description: "Oracle GoldenGate on Oracle Media Pack for Linux x86"
  • Name: "Oracle GoldenGate V10.4.0.x for Oracle 11g on RedHat 5.0"
To perform the installation, I've just unzip/untar the file in a directory and I've set the environment variable so that I can access my Oracle Database:
. oraenv
ORACLE_SID = [WHITE] ? BLACK
The Oracle base for ORACLE_HOME=/u01/app/oracle/product/11.2.0/db_1 is /u01/app/oracle
That done, you should be ready to use Oracle GoldenGate; run the command line interface like below:
./ggsci 

Oracle GoldenGate Command Interpreter for Oracle
Version 10.4.0.19 Build 002
Linux, x86, 32bit (optimized), Oracle 11 on Sep 29 2009 08:50:50

Copyright (C) 1995, 2009, Oracle and/or its affiliates. All rights reserved.


exit

Prepare the database

There are a few settings to check on the database. It must be in archivelog mode and GoldenGate must be able to access the archivelogs and redologs:
sqlplus / as sysdba

archive log list
Database log mode Archive Mode
Automatic archival Enabled
Archive destination /u01/app/oracle/oradata/BLACK/archivelogs
Oldest online log sequence 1
Next log sequence to archive 3
Current log sequence 3
I've set the NLS_LANG so that it matches the one from my database:
sqlplus / as sysdba

select parameter, value
from nls_database_parameters
where parameter in ('NLS_LANGUAGE',
'NLS_TERRITORY',
'NLS_CHARACTERSET',
'NLS_LENGTH_SEMANTICS');

parameter VALUE
-------------------- -------------
NLS_LANGUAGE AMERICAN
NLS_TERRITORY AMERICA
NLS_CHARACTERSET WE8MSWIN1252
NLS_LENGTH_SEMANTICS BYTE


exit

export NLS_LANG=AMERICAN_AMERICA.WE8MSWIN1252
The configuration has to rely on a user with a high level of privileges to perform several operations, like extracting the content of an UDT or a LOBs with flashback queries. It should also be able to set supplemental log groups to the tables that are part of the replication:
sqlplus / as sysdba

create user gg
identified by gg
default tablespace users
temporary tablespace temp;

grant create session, resource, dba to gg;
The database must also have the minimal supplemental logging enabled:
alter database add supplemental log data;

alter system switch logfile;
In my case, that was it. Obviously, based on what you want to do, your database may require more changes. It could require some tables to audit the DDL or to store checkpoints to be created. To get the complete list of what needs to be done, check GoldenGate's documentation and more specifically the "Oracle Installation and Setup Guide".

A Demo Schema

I did not try to setup anything advanced. I've just replicated scott.dept in a demo schema. To make the example even more simple, I've considered nobody was accessing the table and I did not pay any attention to pending transactions or out-of-sync instantiation. I've just created a table named demo.dept and loaded it with scott.dept's data:
create user demo identified by demo
default tablespace users
temporary tablespace temp
quota unlimited on users;

grant connect, resource to demo;

create table demo.dept
( deptno number(2,0),
dname varchar2(14),
loc varchar2(13),
constraint pk_dept primary key(deptno))
tablespace users;

insert into demo.dept
select * from scott.dept;

commit;

Configure GoldenGate Process Manager

Once the database configured, refer to the "Administration Guide" to continue and get more details about GoldenGate setup. First, it requires some directories to store its configuration, logs and trail files. You can choose an alternate location for them but that wasn't really my concern either:
./ggsci 

create subdirs

Creating subdirectories under current directory /gg

Parameter files /gg/dirprm: created
Report files /gg/dirrpt: created
Checkpoint files /gg/dirchk: created
Process status files /gg/dirpcs: created
SQL script files /gg/dirsql: created
Database definitions files /gg/dirdef: created
Extract data files /gg/dirdat: created
Temporary files /gg/dirtmp: created
Veridata files /gg/dirver: created
Veridata Lock files /gg/dirver/lock: created
Veridata Out-Of-Sync files /gg/dirver/oos: created
Veridata Out-Of-Sync XML files /gg/dirver/oosxml: created
Veridata Parameter files /gg/dirver/params: created
Veridata Report files /gg/dirver/report: created
Veridata Status files /gg/dirver/status: created
Veridata Trace files /gg/dirver/trace: created
Stdout files /gg/dirout: created
Once done, I've edited the Manager configuration file named MGR to set the port parameter and I've started it:
edit params mgr

view params mgr
port 7809

start manager

status manager
Manager is running (IP port arkzoyd.7809).

Source Table Supplemental Log Group

Like Streams (there is no secret!), GoldenGate needs to be able to identify rows to apply captured changes. It provides some generic tools to enable and check additional logging to the tables from ggsci:
dblogin userid gg, password gg
Successfully logged into database.

add trandata scott.dept
Logging of supplemental redo data enabled for table SCOTT.DEPT.

info trandata scott.dept
Logging of supplemental redo log data is enabled for table SCOTT.DEPT

Parameter Files

I've named the extract scott. It captures changes made to the SCOTT.DEPT table and send them to the remote trail file that, in my case, is managed by the same manager. I've named the replicat demo. The parameter files for scott and demo looks like the ones below:
edit params scott

view params scott

extract scott
userid gg, password gg
rmthost localhost mgrport 7809
rmttrail SC
table SCOTT.DEPT;


edit params demo

view params demo

replicat demo
assumetargetdefs
userid gg, password gg
map SCOTT.DEPT, target DEMO.DEPT;
Note:
With Oracle, you have to use double-quote to manage case-sensitive table names. However, that's not the case with all the database engines. As a result, depending on the parameter, GoldenGate may or may not differentiate strings with different cases. To avoid any issue, I use uppercase for the parameter values, unless I want to specifically use a different case.

Extract and Replicat

Once the parameter files defined, I've added the extract, the replicat and the trail files from ggsci:
add extract scott, tranlog, begin now
EXTRACT added.

add rmttrail SC, extract scott
RMTTRAIL added.

add replicat demo, exttrail SC, nodbcheckpoint, begin now
REPLICAT added.
And I've started them both:
start er *
Sending START request to MANAGER ...
EXTRACT SCOTT starting

Sending START request to MANAGER ...
REPLICAT DEMO starting


info all

Program Status Group Lag Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING SCOTT 00:00:00 00:00:02
REPLICAT RUNNING DEMO 00:00:00 00:00:08


exit

Are Changes Replicated?

Once the setup completed, I've tested the replication with the script below:
sqlplus / as sysdba

update scott.dept
set dname='OPERATIONS2'
where deptno=40;

commit;

select dname from demo.dept
where deptno=40;

DNAME
--------------
OPERATIONS2


update scott.dept
set dname='OPERATIONS'
where deptno=40;

commit;

select dname from demo.dept
where deptno=40;

DNAME
--------------
OPERATIONS


exit

Configuration Cleanup

Obviously that's just a start. To avoid any issue with my next tests, I've cleaned up my configuration, once happy with it:
./ggsci

stop er *
Sending STOP request to EXTRACT SCOTT ...
Request processed.

Sending STOP request to REPLICAT DEMO ...
Request processed.


delete er *
Are you sure you want to delete all groups? y
Deleted EXTRACT SCOTT.
Deleted REPLICAT DEMO.


stop manager
Manager process is required by other GGS processes.
Are you sure you want to stop it (y/n)? y

Sending STOP request to MANAGER ...
Request processed.
Manager stopped.


exit
And I've dropped the demo and gg users:
sqlplus / as sysdba

drop user gg cascade;
drop user demo cascade;

exit
(*) I know at least one person that considers I'm a useless clown and the time I spend on Streams should be treated less mercifully. I admit it easily. I'm aware I'm not close to one percent of his knowledge about what stays an awesome technology. Anyway, good or bad, I've spent some energy on digging into Streams internals and I've always tried to listen to positive and negative feedback.

Read more...

Tuesday, December 8, 2009

Hidden and Undocumented "Cardinality Feedback"

I know this blog is supposed to be about Oracle Streams and AQ only... I could not resist to share one of my recent finding about some undocumented and automatic plan re-evaluation behavior I've faced with my 11.2.0.1 on Linux 32bits. Hopefully you'll forgive me this post...

If you've read the Oracle Database 11.1 documentation, you've found a feature so called "Intelligent or Adaptive Cursor Sharing". From the documentation the feature seems related to bind peeking and you'll find an excellent article about it on the Optimizer Magic's "Update on Adaptive Cursor Sharing" blog. This story seems both very close and very different.

However, like often with Oracle, that's more subtle than what it appears; I've found a case, you should be able to reproduce, where the query is reparsed, though there is not any bind variable.

My demo schema

The behavior my be related to some of my laptop characteristics. My very simple database is running 11.2.0.1 on Linux x86 32 bits with the following parameters:
sga_target=260M
pga_aggregate_target=180M
filesystemio_options=setall
I create a DEMO user and a set of tables, data and statistics. You can download the script from my website.

The query

Execute the query below; the first time, you should figure out that it gets a first plan and its hash value is 1851413986:
-- To change the behavior, set this parameter to none:
-- alter session set "_optimizer_extended_cursor_sharing_rel"=none;
-- Other than that, leave it to simple that is the default value for 11.2.0.1:
-- alter session set "_optimizer_extended_cursor_sharing_rel"=simple;

set timing on
select /* GG */ t.year_id, sum(f.metric1)
from fact f, time t, dim2 d2, dim3 d3, dim4 d4
where f.time_id=t.time_id
and f.dim2_id=d2.dim2_id
and f.dim3_id1=d3.dim3_id1
and f.dim3_id2=d3.dim3_id2
and f.dim4_id=d4.dim4_id
and d2.dim2_lib='Value 5'
and d3.dim3_lib='Value (2,2)'
and d4.dim4_l2='L2.1'
and attr2='ZZ4'
and t.time_id=trunc(t.time_id,'W')
group by t.year_id
order by t.year_id;

YEAR_ID SUM(F.METRIC1)
--------- --------------
01-JAN-09 38490

Elapsed: 00:00:06.10

select *
from table(dbms_xplan.display_cursor(format=>'basic note'));

PLAN_TABLE_OUTPUT
-----------------
EXPLAINED SQL STATEMENT:
------------------------
select /* GG_2 */ t.year_id, sum(f.metric1) from fact f, time t, dim2
d2, dim3 d3, dim4 d4 where f.time_id=t.time_id and
f.dim2_id=d2.dim2_id and f.dim3_id1=d3.dim3_id1 and
f.dim3_id2=d3.dim3_id2 and f.dim4_id=d4.dim4_id and
d2.dim2_lib='Value 5' and d3.dim3_lib='Value (2,2)' and
d4.dim4_l2='L2.1' and attr2='ZZ4' and
t.time_id=trunc(t.time_id,'W') group by t.year_id order by t.year_id

Plan hash value: 1851413986

-----------------------------------------------------------------
| Id | Operation | Name |
-----------------------------------------------------------------
| 0 | SELECT STATEMENT | |
| 1 | SORT GROUP BY | |
| 2 | NESTED LOOPS | |
| 3 | NESTED LOOPS | |
| 4 | HASH JOIN | |
| 5 | PART JOIN FILTER CREATE | :BF0000 |
| 6 | NESTED LOOPS | |
| 7 | NESTED LOOPS | |
| 8 | MERGE JOIN CARTESIAN | |
| 9 | PARTITION RANGE ALL | |
| 10 | TABLE ACCESS FULL | TIME |
| 11 | BUFFER SORT | |
| 12 | TABLE ACCESS FULL | DIM3 |
| 13 | PARTITION RANGE ITERATOR | |
| 14 | PARTITION HASH ALL | |
| 15 | BITMAP CONVERSION TO ROWIDS | |
| 16 | BITMAP AND | |
| 17 | BITMAP INDEX SINGLE VALUE | FACT_TIME_IDX |
| 18 | BITMAP INDEX SINGLE VALUE | FACT_DIM3_IDX |
| 19 | TABLE ACCESS BY LOCAL INDEX ROWID| FACT |
| 20 | PARTITION HASH JOIN-FILTER | |
| 21 | TABLE ACCESS FULL | DIM2 |
| 22 | INDEX UNIQUE SCAN | DIM4_PK |
| 23 | TABLE ACCESS BY INDEX ROWID | DIM4 |
-----------------------------------------------------------------
When I've executed the exact same query again, I've found a second plan with a hash value of 1094455219. Though the plan is better than the 1st one in my case, I would have expected the cursor to be reused:
set timing on
select /* GG */ t.year_id, sum(f.metric1)
from fact f, time t, dim2 d2, dim3 d3, dim4 d4
where f.time_id=t.time_id
and f.dim2_id=d2.dim2_id
and f.dim3_id1=d3.dim3_id1
and f.dim3_id2=d3.dim3_id2
and f.dim4_id=d4.dim4_id
and d2.dim2_lib='Value 5'
and d3.dim3_lib='Value (2,2)'
and d4.dim4_l2='L2.1'
and attr2='ZZ4'
and t.time_id=trunc(t.time_id,'W')
group by t.year_id
order by t.year_id;

YEAR_ID SUM(F.METRIC1)
--------- --------------
01-JAN-09 38490

Elapsed: 00:00:00.18

select *
from table(dbms_xplan.display_cursor(format=>'basic note'));

PLAN_TABLE_OUTPUT
-----------------
EXPLAINED SQL STATEMENT:
------------------------
select /* GG_2 */ t.year_id, sum(f.metric1) from fact f, time t, dim2
d2, dim3 d3, dim4 d4 where f.time_id=t.time_id and
f.dim2_id=d2.dim2_id and f.dim3_id1=d3.dim3_id1 and
f.dim3_id2=d3.dim3_id2 and f.dim4_id=d4.dim4_id and
d2.dim2_lib='Value 5' and d3.dim3_lib='Value (2,2)' and
d4.dim4_l2='L2.1' and attr2='ZZ4' and
t.time_id=trunc(t.time_id,'W') group by t.year_id order by t.year_id

Plan hash value: 1094455219

--------------------------------------------------------------
| Id | Operation | Name |
--------------------------------------------------------------
| 0 | SELECT STATEMENT | |
| 1 | SORT GROUP BY | |
| 2 | HASH JOIN | |
| 3 | NESTED LOOPS | |
| 4 | NESTED LOOPS | |
| 5 | MERGE JOIN CARTESIAN | |
| 6 | MERGE JOIN CARTESIAN | |
| 7 | PARTITION HASH ALL | |
| 8 | TABLE ACCESS FULL | DIM2 |
| 9 | BUFFER SORT | |
| 10 | TABLE ACCESS FULL | DIM3 |
| 11 | BUFFER SORT | |
| 12 | PARTITION RANGE ALL | |
| 13 | TABLE ACCESS FULL | TIME |
| 14 | PARTITION RANGE ITERATOR | |
| 15 | PARTITION HASH ITERATOR | |
| 16 | BITMAP CONVERSION TO ROWIDS | |
| 17 | BITMAP AND | |
| 18 | BITMAP INDEX SINGLE VALUE | FACT_TIME_IDX |
| 19 | BITMAP INDEX SINGLE VALUE | FACT_DIM3_IDX |
| 20 | BITMAP INDEX SINGLE VALUE | FACT_DIM2_IDX |
| 21 | TABLE ACCESS BY LOCAL INDEX ROWID| FACT |
| 22 | TABLE ACCESS FULL | DIM4 |
--------------------------------------------------------------

Note
-----
- cardinality feedback used for this statement
As you can see from the plan note something has changed in the calculation; if you collect the 10053 trace, you'll see, not only the cursor is re-parsed at the 2nd execution time but also some opt_estimate hints are used to correct the cardinality estimate. Interesting?

Conclusion

From my tests, the value of _optimizer_extended_cursor_sharing_rel impacts that behavior. However, the data the CBO relies on to decide it's more clever to perform a plan change doesn't look obvious to me. In addition, the 1st time the query is parsed, the IS_SHAREABLE column of the query is set to 'Y' and if you query V$SQL_SHARED_CURSOR when the 2 cursors are in the shared pool, all you'll find no property to explain the plan change. I'm very curious about the how and the why... Any Idea?

Read more...

Saturday, October 17, 2009

Turning a Physical Standby and its Broker into a Logical Standby

Turning a Physical Standby into a Logical Standby is well documented. Check Oracle® Data Guard Concepts and Administration - 11g Release 2 (11.2) - Creating a Logical Standby Database for all the details. Nevertheless, the documentation assumes you set up the log transport service manually and doesn't explain how to do it with the broker in place. You'll find below what needs to be done to perform that change with the broker in place. It's, by far, easier than by setting the log_archive_dest_n parameters.
Notes:
  • It's a pity you cannot convert a physical Standby into a logical Standby from one-only dgmgrl command (yet!).
  • This post has been tested with Oracle Database 11g Release 2 (11.2.0.1) on Linux x86.
  • Before you proceed, make sure the temporary files specified correctly.

A Physical Standby

To begin, let's consider a data guard configuration made of 2 databases; BLACK is the primary and WHITE the physical standby, we'll transform the later into a logical standby database. The first step consists in stopping the redo apply process and checking the database is mounted:
edit configuration
set PROTECTION MODE AS MaxPerformance;

Succeeded.

edit database white
set state=APPLY-OFF;

Succeeded.

show database white;

Database - white

Role: PHYSICAL STANDBY
Intended State:
APPLY-OFF
Transport Lag: 0 seconds
Apply Lag: 0 seconds
Real Time Query:
OFF
Instance(s):
WHITE

Database Status:
SUCCESS

exit

Checking for datatypes and tables without keys

Before you transform the standby database, you can check the database doesn't contain tables without unique keys and tables that cannot be maintained by the logical standby. Run the following queries on BLACK:
select owner, table_name
from dba_logstdby_not_unique
where (owner, table_name) not in
(select distinct owner, table_name
from dba_logstdby_unsupported)
and bad_column='Y';

no rows selected
select distinct owner, table_name
from dba_logstdby_unsupported;
[...]

Capturing the dictionary on the primary

To proceed with the logical standby setup, you have to capture the dictionary on the source database (BLACK) and make sure there is no pending transactions. For that purpose, you can use the dbms_logstdby package like below:
. oraenv
BLACK

sqlplus / as sysdba

exec dbms_logstby.build;

exit;

Applying the logs on the standby

The standby database can now be synchronized to the point where the physical standby should be turned into a logical standby; to proceed with that step, proceed like below:
. oraenv
WHITE

sqlplus / as sysdba

alter database recover
to logical standby WHITE;

Database altered.

Opening the Physical Standby with ResetLogs

The next step consists in opening the standby database with resetlogs:
shutdown
ORA-01507: database not mounted
ORACLE instance shut down.


startup mount
ORACLE instance started.

Total System Global Area 263639040 bytes
Fixed Size 1335892 bytes
Variable Size 104861100 bytes
Database Buffers 150994944 bytes
Redo Buffers 6447104 bytes
Database mounted.


alter database open resetlogs;

Database altered.

exit;

Rebuilding the broker configuration

You can now change the data guard broker configuration to remove the physical standby configuration and add the logical standby instead:
. oraenv
BLACK

dgmgrl /

remove database white
preserve destinations;

Removed database "white" from the configuration

show configuration

Configuration - worldwide

Protection Mode: MaxPerformance
Databases:
black - Primary database

Fast-Start Failover: DISABLED

Configuration Status:
SUCCESS


add database white
as connect identifier is white
maintained as logical;

Database "white" added

enable configuration;

Enabled.

show configuration;

Configuration - worldwide

Protection Mode: MaxPerformance
Databases:
black - Primary database
white - Logical standby database

Fast-Start Failover: DISABLED

Configuration Status:
DISABLED


enable configuration;

Enabled.

show configuration

Configuration - worldwide

Protection Mode: MaxPerformance
Databases:
black - Primary database
white - Logical standby database

Fast-Start Failover: DISABLED

Configuration Status:
SUCCESS
Note:
It can take a few minutes for the standby and primary to get to the right state. However, if it still fails after a few minutes, check the alert.log for any unexpected errors.

Changing Properties

You can change some properties to ease the switchover and change the protection mode like below:
edit database white
set property StaticConnectIdentifier=white;

Property "staticconnectidentifier" updated

edit database black
set property StaticConnectIdentifier=black;

Property "staticconnectidentifier" updated

edit configuration
set protection mode as MaxAvailability;

Succeeded.

show configuration;

Configuration - worldwide

Protection Mode: MaxAvailability
Databases:
black - Primary database
white - Logical standby database

Fast-Start Failover: DISABLED

Configuration Status:
SUCCESS

Testing the Logical Standby

Once the configuration done, you can test it; make sure your changes to the primary database are replicated to the logical standby database:
. oraenv
BLACK

sqlplus / as sysdba

update scott.emp
set sal=10000
where ename='KING';

1 row updated.

commit;

Commit complete.

exit;

. oraenv
WHITE

sqlplus / as sysdba

select sal
from scott.emp
where ename='KING';

SAL
-----
10000
You can also test the switchover:
. oraenv
BLACK

dgmgrl /
switchover to white;

show configuration;

Conclusion

You may wonder what is the link between Streams and the Logical Standby. Obviously they both rely on the same technology but, in fact that's not the main reason why I'm investigating logical standby. No; instead that's because, with 11.2 you can now configure a Streams capture from a logical standby as described in Oracle® Data Guard Concepts and Administration - 11g Release 2 (11.2) - 10.6.6 Running an Oracle Streams Capture Process on a Logical Standby Database.

Read more...

Sunday, October 11, 2009

LOB Chunks

To continue with the previous test case, I'll insert a large enough LOB so that the change record is splitted into several chunks. To perform that operation :
  • Setup the example as described in Streams LCRs and LOBs
  • Insert a LOB that is large enough to guaranty it will be deleted
What is interesting is less the output than the number of chunks and the size of each chunks.

Insert a large CLOB

The script below create a LOB as a temporary resource and, once built, insert it in SOURCE.T9:
connect source/source

declare
z clob;
begin
DBMS_LOB.CREATETEMPORARY(z,true,DBMS_LOB.SESSION);
for i in 1..20 loop
dbms_lob.append(z,rpad('Z',1024,'Z'));
end loop;
insert into source.T9 values (2,z);
end;
/
commit;

connect strmadmin/strmadmin
set lines 1000
set serveroutput on
exec print_xml_fromq('MYQUEUE');

The result

You'll find below the output that matches the set of LCRs generated by the one only insert:
---------------------------------------------------------
<ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>INSERT</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>5.5.913</transaction_id>
<scn>1448259</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>2</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2 nil="true">
</data>
<lob_information>EMPTY LOB</lob_information>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB WRITE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>5.5.913</transaction_id>
<scn>1448259</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>2</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2>ZZZ[...]ZZZ</varchar2>
</data>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_offset>1</lob_offset>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB WRITE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>5.5.913</transaction_id>
<scn>1448259</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>2</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2>ZZZ[...]ZZZ</varchar2>
</data>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_offset>8061</lob_offset>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB WRITE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>5.5.913</transaction_id>
<scn>1448259</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>2</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2>ZZZ[...]ZZZ</varchar2>
</data>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_offset>16121</lob_offset>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB TRIM</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>5.5.913</transaction_id>
<scn>1448259</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>2</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2 nil="true">
</data>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_operation_size>20480</lob_operation_size>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>UPDATE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>5.5.913</transaction_id>
<scn>1448259</scn>
<old_values>
<old_value>
<column_name>ID</column_name>
<data>
<number>2</number>
</data>
</old_value>
</old_values>
</row_lcr>
---------------------------------------------------------
No more messages

Read more...

Friday, October 9, 2009

Streams LCRs and LOBs

LOBs (BLOB, CLOB, NCLOB) have been designed to provide a rich and efficient set of APIs. With securefile, LOB's storage model is highly efficient and optimized for compression. LOBs don't work like anything else within an Oracle database. You can check my previous post entitled Triggers and LOBs, if you are not convinced yet.

As a result, Streams doesn't work with LOBs like anything else and using LOBs with Streams can quickly turn into a headache:
  • Many tools, including the ones provided by the documentation, don't provide a clean display of LCR with LOBs,
  • Some packages require specific LOB settings, like DBMS_APPLY_ADM.SET_DML_HANDLER and its assemble_lobs parameter,
  • The streams feature set associated with LOB is quite different from the feature set associated with other types : no conflict detection, no support for synchronous capture mechanisms and several specific bugs (search for "Streams" and "LOB" in the Metalink Bug Search Engine).
For all these reasons, it's very interesting to be able to figure out how LOBs and Streams work together. To help, I've built a very simple model that allows to see, if not exactly the content of LCRs containing LOBs at least a XML representation of them.

The test case

You'll find below a description of the test case, I've built:

To work the test case is made of several parts
  1. The source table is named SOURCE.T9. You can easily change its structures (from CLOB to BLOB for example). All the DML commands you'll execute and commit on that table will be captured.
  2. Because I use an asynchronous capture process, I need to make sure the change vectors are stored by LGWR in the redologs and archivelogs.
  3. I've created a strmadmin schema to stored all the objects I use for the setup (queue, procedure, rules...); I've named the capture process streams_capture; it captures the changes from the redologs and enqueue them in the streams_queue queue.
  4. The apply process de-queues the LCRs from streams_queue
  5. It relies on a DML handler to manage the LCRs. That handler converts the LCRs into XML type objects with the DBMS_STREAMS.CONVERT_LCR_TO_XML procedure and enqueue them in the myqueue queue
  6. The print_xml_fromq procedure is used to display the content of the XML type objects that match LCRs.

The script

The script below performs the whole setup of the test case described above:
connect / as sysdba

col global_name format a30 new_value dbname
select global_name from global_name;

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

create table source.t9(
id number,
text clob,
constraint t9_pk primary key (id))
lob(text) store as securefile;

var first_scn number;

set serveroutput on
declare
scn number;
begin
dbms_capture_adm.build(
first_scn => scn);
dbms_output.put_line('First SCN Value = ' || scn);
:first_scn := scn;
end;
/

col first_scn format 999999999999 -
new_value first_scn
select :first_scn first_scn from dual;

create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs.dbf'
size 25M autoextend on maxsize 256M;

create user strmadmin identified by strmadmin
default tablespace streams_tbs
quota unlimited on streams_tbs;

grant dba to strmadmin;
grant execute on dbms_aq to strmadmin;

begin
dbms_streams_adm.set_up_queue(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
end;
/

connect strmadmin/strmadmin

var first_scn number;
exec :first_scn:=&&first_scn

begin
dbms_streams_adm.add_table_rules(
table_name => 'source.t9',
streams_type => 'capture',
streams_name => 'streams_capture',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => '&&dbname',
inclusion_rule => true,
and_condition => null);
end;
/

col capture_name format a15
col queue_name format a13
col first_scn format 999999999999
col start_scn format 999999999999
col rule_set_name format a11

select capture_name,
queue_name,
first_scn,
start_scn,
rule_set_name
from dba_capture;

set lines 120
col streams_name format a16
col streams_type format a9
col rule_owner format a10
col table_name format a15
col rule_type format a8
col rule_name format a15

select rule_owner,
streams_name,
streams_type,
table_owner||'.'||table_name table_name,
rule_type,
rule_name
from dba_streams_table_rules;

select table_owner||'.'||table_name table_name,
scn,
timestamp,
supplemental_log_data_pk
from dba_capture_prepared_tables;

connect strmadmin/strmadmin

begin
dbms_streams_adm.add_table_rules(
table_name => 'source.t9',
streams_type => 'apply',
streams_name => 'streams_apply',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => '&&dbname',
inclusion_rule => true);
end;
/

col apply_name format a13
col queue_name format a13
col rule_set_name format a11

select apply_name,
queue_name,
rule_set_name,
status,
message_delivery_mode
from dba_apply;

col instantiation_scn format 999999999999 -
new_value instantiation_scn

select dbms_flashback.get_system_change_number instantiation_scn
from dual;

begin
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => 'source.t9',
source_database_name => 'BLACK',
instantiation_scn => &&instantiation_scn);
end;
/

col source_database format a6
col object format a10
col instantiation_scn format 999999999999

select source_database,
source_object_owner||'.'||source_object_name object,
instantiation_scn
from dba_apply_instantiated_objects;

connect strmadmin/strmadmin

begin
dbms_aqadm.create_queue_table(
queue_table => 'myqueue_table',
queue_payload_type => 'sys.xmltype',
multiple_consumers => false);
dbms_aqadm.create_queue(
queue_name => 'myqueue',
queue_table => 'myqueue_table');
dbms_aqadm.start_queue('myqueue');
end;
/

create or replace procedure myhandler(in_any in anydata)
is
enqueue_options DBMS_AQ.enqueue_options_t;
message_properties DBMS_AQ.message_properties_t;
recipients DBMS_AQ.aq$_recipient_list_t;
message_handle RAW(16);
begin
enqueue_options.visibility := dbms_aq.immediate;
enqueue_options.delivery_mode := dbms_aq.persistent;
dbms_aq.enqueue(
queue_name => 'STRMADMIN.MYQUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => dbms_streams.convert_lcr_to_xml(in_any),
msgid => message_handle);
end;
/

begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T9',
object_type => 'TABLE',
operation_name => 'DEFAULT',
error_handler => false,
user_procedure => 'strmadmin.myhandler',
apply_name => 'STREAMS_APPLY',
assemble_lobs => false);
end;
/

exec dbms_capture_adm.start_capture('streams_capture');
exec dbms_apply_adm.start_apply('streams_apply');

create or replace procedure print_xml_fromq(queue_name varchar2)
is
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw(16);
message sys.xmltype;
v_out pls_integer;
no_messages exception;
pragma exception_init(no_messages, -25228);
begin
dequeue_options.wait := dbms_aq.no_wait;
dequeue_options.navigation := dbms_aq.first_message;
dequeue_options.visibility := dbms_aq.immediate;
dequeue_options.delivery_mode := dbms_aq.persistent;
loop
begin
dbms_aq.dequeue(
queue_name => queue_name,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
dbms_output.put_line('---------------------------------------------------------');
dbms_output.put_line(message.extract('/*').getstringval());
dbms_output.put_line('---------------------------------------------------------');
dbms_output.put_line('');
dequeue_options.navigation := dbms_aq.next_message;
end;
end loop;
exception
when no_messages then
dbms_output.put_line('No more messages');
commit;
end;
/
If you are use to streams, there is not much that you don't know already in that script. The table contains a LOB but can easily be changed, even after you've started the capture to match you need. The very powerful feature I use and that comes with Streams is the ability to transform a LCR into a XML File with the DBMS_STREAMS.CONVERT_LCR_TO_XML procedure.

Play with LOBs

Once the configuration done, you can use implicit LOB conversion or the LOB built-ins to create and modify LOB values from SQL and PL/SQL like below:
connect source/source

insert into source.T9 values (1,empty_clob());
commit;

declare
my_x clob;
begin
select text into my_x
from source.T9 where id=1
for update;
dbms_lob.append(my_x,'Z');
commit;
end;
/

declare
my_x clob;
amount number;
begin
select text into my_x
from source.T9 where id=1
for update;
amount:=1;
dbms_lob.erase(my_x,amount,1);
commit;
end;
/

Display XMLized LCRs

You can now display the content of the LCRs that have been captured with the script below:
connect strmadmin/strmadmin
set lines 1000
set serveroutput on
exec print_xml_fromq('MYQUEUE');
The result looks like below. Pay attention to the various command type "LOB WRITE", "LOB TRIM" and "LOB ERASE":
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>INSERT</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>6.31.810</transaction_id>
<scn>1351988</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>1</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2 nil="true"></varchar2>
<lob_information>EMPTY LOB</lob_information>
</data>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>UPDATE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>6.31.810</transaction_id>
<scn>1351988</scn>
<old_values>
<old_value>
<column_name>ID</column_name>
<data>
<number>1</number>
</data>
</old_value>
</old_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB WRITE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>3.33.828</transaction_id>
<scn>1352031</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>1</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2>Z</varchar2>
</data>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_offset>1</lob_offset>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB TRIM</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>3.33.828</transaction_id>
<scn>1352031</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>1
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2 nil="true"></varchar2>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_operation_size>1</lob_operation_size>
</data>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB ERASE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>7.19.625</transaction_id>
<scn>1352034</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>1</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2 nil="true"></varchar2>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_offset>1</lob_offset>
<lob_operation_size>1</lob_operation_size>
</data>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
No more messages

Drop the test case

Once your tests done, you can drop the whole test case running the script below:
connect / as sysdba

col global_name format a30 new_value dbname
select global_name from global_name;

exec dbms_apply_adm.stop_apply('STREAMS_APPLY')
exec dbms_capture_adm.stop_capture('STREAMS_CAPTURE')

exec dbms_apply_adm.delete_all_errors('STREAMS_APPLY');
exec dbms_apply_adm.drop_apply('STREAMS_APPLY')
exec dbms_capture_adm.drop_capture('STREAMS_CAPTURE')

drop procedure strmadmin.mydml_handler;

begin
dbms_aqadm.stop_queue('strmadmin.myqueue');
dbms_aqadm.drop_queue('strmadmin.myqueue');
dbms_aqadm.drop_queue_table('strmadmin.myqueue_table', true);
end;
/

begin
for i in (select source_object_owner||'.'||
source_object_name name
from dba_apply_instantiated_objects
where source_object_owner in ('SOURCE'))
loop
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => i.name,
source_database_name => '&&dbname',
instantiation_scn => null);
end loop;
end;
/

begin
for i in (select object_owner||'.'||
object_name name,
operation_name,
apply_name
from dba_apply_dml_handlers
where object_owner in ('SOURCE'))
loop
dbms_apply_adm.set_dml_handler(
object_name => i.name,
object_type => 'TABLE',
operation_name=> i.operation_name,
user_procedure=> null,
apply_name => i.apply_name);
end loop;
end;
/

exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);

drop user strmadmin cascade;

drop tablespace streams_tbs
including contents and datafiles;

drop user source cascade;
In a next blog post, I'll dig further into how LOBs work within LCRs. If you want write your own LCRs that contains LOB, you can check the documentation; there is a short section called "Oracle® Streams Extended Examples - Logical Change Records With LOBs Example"

Read more...

Monday, October 5, 2009

My XStream FAQ

I have written 2 programs a few months back to use and experiment Oracle XStream; when I discovered Oracle had launched XStream right after closing the deal with GoldenGate (a few hours back from now...), I thought that might be clever to publish those Java programs help people set up a XStream Inbound configuration and XStream Outbound configuration. I really wish those 2 programs can help people.

However, after a few conversations on IM, Twitter (@arkzoyd), and the blog, I'm now convinced I made it wrong. I should have started by the beginning instead of pushing those programs with no explanation. I should have talked about the reasons why it's important to track message positions. How subtle XStream actually is. What benefit you could get from XStream compared to what use to exist before.

I'll try to correct the current situation and provide simple answers to the questions below:
If you have some high level questions about XStream, feel free to comment this post; If you have technical issues, I suggest you directly post on the Streams Forum...

What is Oracle XStream?

XStream is a new way to interact between Oracle Streams and the outside world. It uses a publish subscribe model that is different from the regular AQ queuing model. The purpose is to provide the same kind of features Oracle uses to propagate messages between databases to the outside world.

As a result, with XStream, you can now propagate messages between a database and a program written in OCI or Java/OCI. In addition, you can guaranty that:
  • no messages are written on disks, everything stays in memory during the propagation to and from the program; and not only you can guaranty that behavior between the database and your program but also to the whole chain of programs from the source to the destination of your messages
  • If something crashes whether it's a database instance or your program, you'll be able to recover all your messages, in the right order and without loosing a single one
Obviously, if you face some bottleneck/saturation, you may decide, like Streams does internally, that it's more efficient for your messages to spill out of memory. In that case, you can turn them into persistent messages or you can slow down the capture or propagation processes.

Why could you not get to the same result before XStream?

There are 2 cases, XStream addresses: (1) Inbound messages and (2) Outbound messages.
  • Inbound messages
Before XStream, to send a message to a database the simplest way was to make it persistent; you could for example : "insert the message into a persistent queue" or "insert it into a table". In both cases, Oracle database guaranties you will not loose your message. However, if the database you put your messages in isn't your message destination database, you would have to pay the price of turning messages persistent in that staging area.

Obviously you could also decide to enqueue your messages into a non-persistent queue on the staging database and use a propagation job to send the message to its destination database. The problem with that method is that there is no guaranty that, if an instance crashes, the message will ever go to its destination; otherwise, non-persistent queue would be called persistent queues!

To both guaranty the message to get to its destination, without the price of turning it persistent on the staging databases, until Oracle Database 11g Release 2, you could not rely on any Oracle database built-in... But now, you have XStream!
  • Outbound messages
The principle is basically the same for outbound messages. A Streams captured message could be turned into a persistent message which guaranty you will not loose your message but require you pay the price of turning captured messages persistent on the staging database.

You could also turn your messages into non-persistent messages; yes, it's possible, if say, you create an apply handler that enqueue the message in a buffered queue. In that case, there is no guaranty you will never loose any message.

Like for Inbound messages, there is no built-in feature of Oracle you could use to guaranty a message from inside a database could be send to a destination outside the database without turning it into a persistent message on the staging database.

What problem does it solve?

As you can now guess, XStream enables you to propagate Streams messages between Oracle databases and the outside world in an extremely efficient way. It allows to propagate messages in-memory only (in most cases!) and, at the same time, it guaranties messages will be delivered to their destination.

Why is it cool?

Because Streams is cool! And providing the power of Streams to the outside world is VERY cool, don't you think? ;-)

How does XStream relate to GoldenGate?

XStream, like Oracle Active Data Guard is included into the GoldenGate license. But opposite to Active Data Guard, XStream cannot be licensed separately from GoldenGate. For now that's the only link between XStream and GoldenGate, even if we can guess GoldenGate could leverage the XStream feature sooner or later.

What does Oracle want to do with XStream?

That's actually a very good question and I assume we can only guess for now. There is a bunch of things Oracle could do with XStream, like the ones below:
  • leveraging the technology in GoldenGate more than just by a bundle
  • leveraging the technology for existing features of the Oracle Database like the In-Memory Database Cache or Oracle Coherence
  • leveraging the technology to build new features like file to database integration or mySQL to Oracle integration (If we consider mySQL could turn into an Oracle offer one day soon).
  • probably many more things I cannot even think about...

How does XStream work?

XStream relies on a few facts:
  • Messages are consumed once on the destination; whether they are made persistent or not doesn't actually matter. Once consumed a message on the destination will never be requested again.
  • There is an order in the messages; in addition, there is a raw value that strictly increases and uniquely identifies the message positions in that order
  • If a message is lost in the chain, that's because the chain has broken; and, in that case, all the subsequent messages are lost too.
  • The message order is kept during the propagation from the source to the destination
  • Messages are persistent on the source; Or, saying it in another way, if the destination doesn't get a message, the source will be able to resend it and in the same right order... Assuming it gets back the message position where to restart at!
As a result, an XStream program deals with its server:
  • In the case of XStream Inbound, the program gets the last position from the server and then enqueue its messages from that point. On a regular basis, it gets some feedback from the server saying what messages are consumed on the destination and can release those messages from the source
  • In the case of XStream Outbound, the program sends the last position of the applied messages and then dequeue messages from that point. On regular basis, it provides some feedback to the server saying what messages are consumed to allow the server to release them from its source.

How can I know more about XStream?

To know everything about XStream, read the following documentation:

Read more...

Sunday, October 4, 2009

XStream Outbound... A sample Java Program

Like with XStream InBound in my previous post, you'll find below a simple Java for XStream Outbound. This new program subscribes to Streams captured changes. Does it sound easy ? It is, like you'll figure out below.

Step 1: Create a Sample Schema

For this sample program, create a schema and a table :
connect / as sysdba

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

col dbname new_value dbname

select value dbname
from v$parameter
where name='db_unique_name';

prompt &&dbname

connect source/source

create table t7(
id number primary key,
text1 varchar2(80),
text2 varchar2(80));

insert into t7(id, text1, text2)
values (1,'Text 1','Text 1');

insert into t7(id, text1, text2)
values (2,'Text 2','Text 2');

commit;

Step 2: Create a Streams Administrator

To subscribe to Streams changes, you must create a Streams administrator:
connect / as sysdba

create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'
size 25M autoextend on maxsize 256M;

CREATE USER strmadmin
IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs
temporary tablespace temp;

grant dba to strmadmin;

begin
dbms_streams_adm.set_up_queue(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
end;
/

exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);

select *
from dba_streams_administrator;

Step 3: Create a Simple XStream Out Configuration

For our Java program to dequeue messages, you must create a XStream Outbound server:
connect strmadmin/strmadmin

begin
dbms_xstream_adm.create_outbound(
server_name => 'DEMO_SERVER',
source_database => '&&dbname',
table_names => 'source.t7',
schema_names => null,
capture_user => null,
connect_user => null,
comment => 'XStream OutBound Server Demonstration');
end;
/

SELECT *
FROM DBA_XSTREAM_OUTBOUND;

select *
from dba_apply
where purpose='XSTREAM OUT';

Step 4: Create a JAVA XStream Outbound Client

Create a XStreamOutDemo.java file, like below. Change the strings in red to meet your own configuration:
import java.sql.Connection;
import java.sql.DriverManager;

import oracle.jdbc.internal.OracleConnection;

import oracle.streams.XStreamOut;
import oracle.streams.ChunkColumnValue;
import oracle.streams.ColumnValue;
import oracle.streams.DefaultRowLCR;
import oracle.streams.LCR;
import oracle.streams.RowLCR;

public class XStreamOutDemo {

public static void main(String args[])
{
String out_url = "jdbc:oracle:oci:@arkzoyd-easyteam:1521:BLACK";

/*
* Connect to the Database
*/
Connection out_conn = null;
try
{
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
out_conn=DriverManager.getConnection(out_url, "strmadmin", "strmadmin");
}
catch(Exception e)
{
System.out.println("DB Connection Failed: " + out_url);
e.printStackTrace();
}

/*
* Get a XStream Out Handler
*/
XStreamOut xsOut=null;
byte[] lastPosition = null;

try
{
// when attach to an outbound server, client needs to tell outbound
// server the last position.
xsOut = XStreamOut.attach((OracleConnection) out_conn, "DEMO_SERVER",
lastPosition, XStreamOut.DEFAULT_MODE);
System.out.println("Attached to outbound server: DEMO_SERVER");
System.out.print("Last Position is: ");
if (lastPosition != null) { printHex(lastPosition); }
else { System.out.println("NULL");}
}
catch(Exception e)
{
System.out.println("cannot attach to outbound server: DEMO_SERVER");
System.out.println(e.getMessage());
e.printStackTrace();
}

byte[] processedLowPosition = null;
try
{
while(true)
{
// receive an LCR from outbound server
LCR alcr = xsOut.receiveLCR(XStreamOut.DEFAULT_MODE);

if (xsOut.getBatchStatus() == XStreamOut.EXECUTING) // batch is active
{
assert alcr != null;

// also get chunk data for this LCR if any
if (alcr instanceof RowLCR)
{
// receive chunk from outbound then send to inbound
if (((RowLCR)alcr).hasChunkData())
{
ChunkColumnValue chunk = null;
do
{
chunk = xsOut.receiveChunk(XStreamOut.DEFAULT_MODE);
} while (!chunk.isEndOfRow());
}
}

String command=alcr.getCommandType();
if (!command.equals("COMMIT"))
System.out.print(command+" on "+ alcr.getObjectOwner()+"."+
alcr.getObjectName()+"\n");
else System.out.print(command+"\n");
if (command.equals("INSERT") || command.equals("UPDATE")) {
System.out.print(" -- NEW VALUES ----------\n");
for (int i=0;i<((RowLCR) alcr).getNewValues().length;i++) {
System.out.print(" Column:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getNewValues()[i]).getColumnName(),10)+
"Value:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getNewValues()[i]).getColumnData().
stringValue(),30)+"\n");
}
}
if (command.equals("UPDATE") || command.equals("DELETE")) {
System.out.print(" -- OLD VALUES ----------\n");
for (int i=0;i<((RowLCR) alcr).getOldValues().length;i++) {
System.out.print(" Column:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getOldValues()[i]).getColumnName(),10)+
"Value:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getOldValues()[i]).getColumnData().
stringValue(),30)+"\n");
}
}
System.out.print(" -- DML ----------\n");
System.out.println(" "+((DefaultRowLCR) alcr).getStatement(false));
processedLowPosition = alcr.getPosition();
if (null != processedLowPosition)
xsOut.setProcessedLowWatermark(processedLowPosition,
XStreamOut.DEFAULT_MODE);
System.out.print("Last Position is: ");
printHex(processedLowPosition);
} else // batch is end
{ assert alcr == null; }
}
} catch(Exception e) {
System.out.println("exception when processing LCRs");
System.out.println(e.getMessage());
e.printStackTrace();
}
}

public static void printHex(byte[] b) { for (int i = 0; i <>length) {
output=text.substring(0,text.length());
} else {
output=text;
for (int i=0; i<(length-text.length()); i++) {
output=output+" ";
}
}
return output;
}
}
To compile and execute the program, run the script below:
export CLASSPATH=.:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc6.jar:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/rdbms/jlib/xstreams.jar:$CLASSPATH
export JAVA_HOME=/opt/jdk1.6.0_13
export PATH=$JAVA_HOME/bin:$PATH

javac XStreamOutDemo.java
java XStreamOutDemo
To test the program, open another session to the database and enqueue messages int the table like below:
insert into source.t7 values (3,'X','X');
commit;
update source.t7 set text2='Y' where id=3;
commit;
delete from source.t7 where id=3;
commit;
The output looks like below:
INSERT on SOURCE.T7
-- NEW VALUES ----------
Column:ID Value:3
Column:TEXT1 Value:X
Column:TEXT2 Value:X
Last Position is: 0000001d856d00000001000000010000001d856c000000010000000101
COMMIT
Last Position is: 0000001d856d00000001000000010000001d856d000000010000000101
UPDATE on SOURCE.T7
-- NEW VALUES ----------
Column:TEXT2 Value:Y
-- OLD VALUES ----------
Column:ID Value:3
Column:TEXT2 Value:X
Last Position is: 0000001d856f00000001000000010000001d856e000000010000000101
COMMIT
Last Position is: 0000001d856f00000001000000010000001d856f000000010000000101
DELETE on SOURCE.T7
-- OLD VALUES ----------
Column:ID Value:3
Column:TEXT1 Value:X
Column:TEXT2 Value:Y
Last Position is: 0000001d857200000001000000010000001d8570000000010000000101
COMMIT
Last Position is: 0000001d857200000001000000010000001d8572000000010000000101
To exit the program, type:
CTRL+C

Step 5: Drop the Configuration

I hope you've found this example useful. To drop the configuration, run the script below:
connect strmadmin/strmadmin

begin
dbms_xstream_adm.drop_outbound(
server_name=>'DEMO_SERVER');
end;
/

SELECT *
FROM DBA_XSTREAM_OUTBOUND;

select * from dba_apply;
select * from dba_capture;
select * from dba_streams_table_rules;

connect / as sysdba

drop user source cascade;
drop user strmadmin cascade;
You're done! It's nice to enter the new Oracle Data Integration generation with GoldenGate, don't you think?

Read more...

XStream Inbound... A Sample Java Program

Oracle XStream is a new feature of Oracle Database 11g Release 2. Though it has been developed by the Streams team and is part of the database, XStream is part of GoldenGate! And now that the GoldenGate deal has been closed, 7 days before Oracle Openworld, it's easy to guess why XStream went out a few hours ago.

This post is about XStream; it provides a Java program that uses "XStream In" to enqueue messages to a buffered queue. A Streams apply process applies them to a table named SOURCE.T8.

To know everything about XStream, read the following documentation:

Step 1: Create a Sample Schema

For this example, you'll need a schema and a table:
sqlplus / as sysdba

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

col dbname new_value dbname

select value dbname
from v$parameter
where name='db_unique_name';

prompt &&dbname

connect source/source

create table t8(
id number primary key,
text1 varchar2(80),
text2 varchar2(80));

commit;

Step 2: Create a Streams Administrator

To use XStream, you'll also need a Streams administrator and a queue to stage buffered messages :
connect / as sysdba

create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'
size 25M autoextend on maxsize 256M;

CREATE USER strmadmin
IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs
temporary tablespace temp;

grant dba to strmadmin;

begin
dbms_streams_adm.set_up_queue(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
end;
/

exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);

select *
from dba_streams_administrator;

Step 3: Create a Simple XStream In Configuration

You must create and start a XStream In configuration before you use a Java or OCI client to enqueue messages:
connect strmadmin/strmadmin

BEGIN
DBMS_XSTREAM_ADM.CREATE_INBOUND(
server_name => 'xin',
queue_name => 'xin_queue');
END;
/

SELECT *
FROM DBA_XSTREAM_INBOUND;

set pages 1000
select *
from dba_apply
where purpose='XSTREAM IN';

exec DBMS_APPLY_ADM.START_APPLY('xin');

Step 4: Create a JAVA XStream Inbound Client

Create a XStreamInDemo.java file that enqueue messages to the buffered queue. Change the part in red to match your environment:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

import java.util.Date;

import oracle.jdbc.internal.OracleConnection;

import oracle.sql.CHAR;
import oracle.sql.DATE;
import oracle.streams.ColumnValue;
import oracle.streams.DefaultColumnValue;
import oracle.streams.DefaultRowLCR;
import oracle.streams.RowLCR;
import oracle.streams.XStreamIn;

public class XStreamInDemo {

public static void main(String args[])
{
String in_url = "jdbc:oracle:oci:@arkzoyd-easyteam:1521:BLACK";

/*
* Connect to the Database
*/
Connection in_conn = null;
try
{
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
in_conn=DriverManager.getConnection(in_url, "strmadmin", "strmadmin");
}
catch(Exception e)
{
System.out.println("DB Connection Failed: " + in_url);
e.printStackTrace();
}

/*
* Get a XStream In Handler
*/
XStreamIn xsIn=null;
String xsinName="XIN";
byte[] lastPosition = null;
int transaction=0;
int rank=0;
try
{
xsIn = XStreamIn.attach ((OracleConnection)in_conn, xsinName,
"HI2" , XStreamIn.DEFAULT_MODE);

// use last position to decide where should we start sending LCRs
System.out.println("Attached to inbound server:"+xsinName);
System.out.print("Inbound Server Last Position is: ");
lastPosition = xsIn.getLastPosition();
if (null == lastPosition)
{
System.out.println("null");
transaction=1;
rank=1;
}
else {
printHex(lastPosition); System.out.println("");
transaction = getTransaction(lastPosition);
rank = getRank(lastPosition);
if (rank==1) rank=2; else { rank=1; transaction++; }
}
}
catch(Exception e)
{
System.out.println("cannot attach to inbound server: "+xsinName);
System.out.println(e.getMessage());
e.printStackTrace();
}

/*
* Create a
*/

try {
DATE mydate;
DefaultRowLCR alcr;
byte[] processedLowPosition;

while(true) {
if (rank==1) {
mydate = new DATE();
System.out.println("-- " +
Integer.toString(transaction) +
" -------------------------");
alcr=new DefaultRowLCR(
"BLACK", RowLCR.INSERT, "SOURCE", "T8",
"X."+Integer.toString(transaction), null,
encode2bytes(transaction, rank), mydate);
ColumnValue[] newcolumn= new ColumnValue[3];
newcolumn[0]= new DefaultColumnValue("ID",
new oracle.sql.NUMBER(transaction));
newcolumn[1]= new DefaultColumnValue("TEXT1",
new CHAR("Hello2", CHAR.DEFAULT_CHARSET));
newcolumn[2]= new DefaultColumnValue("TEXT2",
new CHAR("Hello2", CHAR.DEFAULT_CHARSET));
alcr.setNewValues(newcolumn);
xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE);
System.out.println(" " +alcr.getStatement(false));
xsIn.flush(XStreamIn.DEFAULT_MODE);
rank++;
Thread.sleep(500);
} else {
mydate = new DATE();
alcr=new DefaultRowLCR(
"BLACK", RowLCR.COMMIT, null, null,
"X."+Integer.toString(transaction), null,
encode2bytes(transaction, rank), mydate);
xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE);
xsIn.flush(XStreamIn.DEFAULT_MODE);
System.out.println(" " +alcr.getStatement(false));
rank=1;
transaction++;
Thread.sleep(500);
}
System.out.print(" ");
processedLowPosition =
xsIn.getProcessedLowWatermark();
if (processedLowPosition != null) {
System.out.print("processedLowPosition: ");
printHex(processedLowPosition);
System.out.print(" (" +
Integer.toString(getTransaction(processedLowPosition))+
", " +
Integer.toString(getRank(processedLowPosition)) +
")");
}
else {
System.out.print("processedLowPosition: null");
}
lastPosition =
xsIn.getLastPosition();
System.out.println("");
}
} catch (Exception e) {
System.out.println("exception when processing LCRs");
System.out.println(e.getMessage());
e.printStackTrace();
}
}

public static void printHex(byte[] b)
{
for (int i = 0; i < b.length; ++i)
{
System.out.print(
Integer.toHexString((b[i]&0xFF) | 0x100).substring(1,3));
}
}

public static byte[] encode2bytes(int transaction, int rank)
{
byte[] mybyte= new byte[5];
mybyte[0] =(byte)( transaction >> 24 );
mybyte[1] =(byte)( (transaction << 8) >> 24 );
mybyte[2] =(byte)( (transaction << 16) >> 24 );
mybyte[3] =(byte)( (transaction << 24) >> 24 );
mybyte[4] =(byte)( rank ) ;
return mybyte;
}

public static int getTransaction(byte[] mybyte)
{
int i = 0;
int pos = 0;
i += ((int) mybyte[pos++] & 0xFF) << 24;
i += ((int) mybyte[pos++] & 0xFF) << 16;
i += ((int) mybyte[pos++] & 0xFF) << 8;
i += ((int) mybyte[pos] & 0xFF);
return i;
}

public static int getRank(byte[] mybyte)
{
int foo;
foo =((int)mybyte[4] & 0xFF);
return foo;
}
}
Notes:
  • getLastPosition should be used once after you attach the server to get the last message handled by XStream In
  • The client has to be OCI or Java/OCI. You must include the xstreams.jar library in the classpath

To compile the code, set the CLASSPATH, the PATH and the JAVA_HOME. One done, you can simply run it:
export CLASSPATH=.:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc6.jar:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/rdbms/jlib/xstreams.jar:$CLASSPATH
export JAVA_HOME=/opt/jdk1.6.0_13
export PATH=$JAVA_HOME/bin:$PATH

javac XStreamInDemo.java
java XStreamInDemo

Check messages are applied to the SOURCE.T8 table

The program enqueue messages. You can verify messages are applied to the table like in the script below:
sqlplus / as sysdba

select count(*)
from source.T8;

COUNT(*)
----------
1073


/

COUNT(*)
----------
1076

Clean Up the Environment

To clean up the environment, execute the script below:
connect strmadmin/strmadmin

exec DBMS_APPLY_ADM.STOP_APPLY('xin');

BEGIN
DBMS_XSTREAM_ADM.DROP_INBOUND(
server_name => 'xin');
END;
/

select *
from DBA_XSTREAM_INBOUND;

select *
from dba_apply
where purpose='XSTREAM IN';

connect / as sysdba

drop user source cascade;
drop user strmadmin cascade;

Read more...

Monday, September 21, 2009

Triggers and LOBs: Synchronous CDC and Synchronous Streams Capture

Just a quick post to outline some of the limits of using LOBs and triggers together; there is no secret, it's written (black on white) in the manual : depending how a LOB column is updated, triggers on the table will fire... or NOT ! The reason is indeed related to the design of the LOB Locator that allow data chunk updates without any reference to the changed row.

Anyway, I don't want to dig too much into the details of why things are like they are. Instead, you'll find below some PL/SQL code samples. Those pieces of code show that, if you work with the LOB locator, the triggers won't fire. On the other side, if you update the LOB columns directly or create a locator, in that case, the triggers will fire.

Create a table and a "for each row" trigger

To begin, we'll create a table with a trigger. That trigger increments the number of times it fires in a separate table named xlog :
create table x (id number, text clob)
lob(text) store as securefiles;

create table xlog (id number, call number);

create or replace trigger x_trigger
after insert or update or delete on x
for each row
begin
if inserting then
insert into xlog (id, call) values (:new.id, 1);
else
update xlog set call=call+1 where id=:old.id;
end if;
end;
/
Note:
To test Oracle Database 11g Release 2 Streams new features (in a latter post), I've used a securefile LOB.

Changes to LOB does fire triggers

To begin, we change a few row data without using any LOB persistent locator. As you'll discover in the xlog table every row changes fire the trigger:
insert into x values (1,null);
update x set text='X' where id=1;
update x set text='XY' where id=1;
commit;

select x.text, xlog.call
from x, xlog
where x.id=1
and xlog.id=1;

TEXT CALL
---- ----
XY 3

Changes to LOB doesn't fire triggers

On the other side, if you change the row data with the persistent LOB Locator like below, you'll find that the trigger doesn't fire:
declare
my_x clob;
begin
select text into my_x
from x where id=1
for update;
dbms_lob.append(my_x,'Z');
dbms_lob.append(my_x,'Z');
commit;
end;
/

select x.text, xlog.call
from x, xlog
where x.id=1
and xlog.id=1;

TEXT CALL
---- ----
XYZZ 3

Conclusion & Cleanup

If you just think about it, this may feed your thoughts about the LOB support (or actually non support!) with Synchronous Change Data Capture (CDC) and Synchronous Streams Capture. But enough of conclusions for today, lets just drop the tables and its trigger:
drop table x purge;
drop table xlog purge;

Read more...

Sunday, September 6, 2009

The New Streams 11.2 SQL Generation Facility

I must admit, I'm kind of disappointed by the Oracle 11.2 Streams new features. Not that there is nothing important: the support for SecureFile LOBs and compressed tables are really things, I've been eagerly waiting for. This new release is huge and looks promising! However, I'm disappointed for the following 3 reasons:
  • I did not manage to use statement handlers yet. Though I find them very easy to use for some specific use cases. But by hand or with dbms_streams_adm.maintain_change_table, they've always been failing with ORA-01008: not all variables bound
  • I was kind of hoping for the following restriction to be handled: "The DBMS_STREAMS_ADVISOR_ADM package does not gather information about synchronous captures or messaging clients.".
  • Last but not least, there are obvious hidden and undocumented features which raise my level of frustration to its maximum.
Anyway, I imagine I have no other choices than to be patient and wish those features can be fixed and documented, hopefully, as soon as possible. In the meantime, I propose we explore one of those new features called SQL Generation.

SQL Generation is the ability you get in a Custom DML Handler or any piece of code that deal with LCR to transform it into its canonical SQL command. What we'll do is write a very simple example that will use a DML handler to store that SQL command in a table instead of applying it to the destination schema.

Step 1: Create a sample Schema

To begin with the sample, create a source schema with a table and a few rows:
connect / as sysdba

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

col dbname new_value dbname

select value dbname
from v$parameter
where name='db_unique_name';

prompt &&dbname

connect source/source

create table t5(
id number primary key,
text1 varchar2(80),
text2 varchar2(80));

insert into t5(id, text1, text2)
values (1,'Text 1','Text 1');

insert into t5(id, text1, text2)
values (2,'Text 2','Text 2');

commit;

Step 2: Create the Streams administrator and a queue

Once done, you can create a Streams Administrator and a queue to use in your configuration:
connect / as sysdba

create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'
size 25M autoextend on maxsize 256M;

CREATE USER strmadmin
IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs
temporary tablespace temp;

grant dba to strmadmin;

begin
dbms_streams_adm.set_up_queue(
queue_table =>'strmadmin.streams_queue_table',
queue_name =>'strmadmin.streams_queue');
end;
/

exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);

select *
from dba_streams_administrator;

USERNAME LOC ACC
--------- --- ---
STRMADMIN YES YES

Step 3: Create a Capture Process

Then create a capture process on the source schema:
connect / as sysdba

var first_scn number;
set serveroutput on

DECLARE
scn NUMBER;
BEGIN
DBMS_CAPTURE_ADM.BUILD(
first_scn => scn);
DBMS_OUTPUT.PUT_LINE('First SCN Value = ' || scn);
:first_scn := scn;
END;
/

col first_scn new_value first_scn
select :first_scn first_scn
from dual;

connect strmadmin/strmadmin

prompt &&first_scn
prompt &&dbname

BEGIN
DBMS_CAPTURE_ADM.CREATE_CAPTURE(
queue_name => 'strmadmin.streams_queue',
capture_name => 'SQLGEN_CAPTURE',
rule_set_name => NULL,
source_database => '&&dbname',
use_database_link => false,
first_scn => &&first_scn,
logfile_assignment => 'implicit');
END;
/

col capture_name format a15
col queue_name format a13
col first_scn format 999999999999
col start_scn format 999999999999
col rule_set_name format a11

select capture_name,
queue_name,
first_scn,
start_scn,
rule_set_name
from dba_capture;

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'source.t5',
streams_type => 'capture',
streams_name => 'SQLGEN_CAPTURE',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => '&&dbname',
inclusion_rule => true);
END;
/

set lines 120
col streams_name format a16
col streams_type format a9
col table format a15
col rule_type format a8
col rule_name format a15
col rule_condition format a60 wor wra

select streams_name,
streams_type,
table_owner||'.'||table_name "TABLE",
rule_type,
rule_name,
rule_condition
from dba_streams_table_rules
where streams_name='SQLGEN_CAPTURE'
and table_owner='SOURCE'
and table_name='T5';

Step 4: Create a procedure and a table to store the SQL of the LCR

Don't apply the LCR on the destination; instead, create a table and a procedure that will use SQL Generation to store the change vector as a DML command in that table:
connect strmadmin/strmadmin

create table mydml_tab(
id number,
sqltext clob);

create sequence mydml_seq;

create or replace procedure mydml_handler(in_any in anydata)
is
lcr sys.lcr$_row_record;
v_sqltext clob:=' /* COMMENT */';
rc number;
begin
rc := in_any.GETOBJECT(lcr);
lcr.get_row_text(v_sqltext);
insert into mydml_tab
values (mydml_seq.nextval,v_sqltext);
end;
/

Step 5: Create an Apply process with a DML handler

Create an apply process and add the DML handler to it:
connect strmadmin/strmadmin

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'source.t5',
streams_type => 'apply',
streams_name => 'SQLGEN_APPLY',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => '&&dbname',
inclusion_rule => true);
END;
/
col apply_name format a13
col queue_name format a13
col rule_set_name format a11

select apply_name,
queue_name,
rule_set_name,
status
from dba_apply;

set lines 120
col streams_name format a16
col streams_type format a9
col table_owner format a11
col table_name format a15
col rule_type format a8
col rule_name format a15

select STREAMS_NAME,
STREAMS_TYPE,
TABLE_OWNER,
TABLE_NAME,
RULE_TYPE,
RULE_NAME
from DBA_STREAMS_TABLE_RULES
where STREAMS_NAME='SQLGEN_APPLY';

begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T5',
object_type => 'TABLE',
operation_name => 'INSERT',
error_handler => false,
user_procedure => 'strmadmin.mydml_handler',
apply_name => 'SQLGEN_APPLY');
end;
/

begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T5',
object_type => 'TABLE',
operation_name => 'UPDATE',
error_handler => false,
user_procedure => 'strmadmin.mydml_handler',
apply_name => 'SQLGEN_APPLY');
end;
/

begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T5',
object_type => 'TABLE',
operation_name => 'DELETE',
error_handler => false,
user_procedure => 'strmadmin.mydml_handler',
apply_name => 'SQLGEN_APPLY');
end;
/

begin
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => 'source.t5',
source_database_name => '&&dbname',
instantiation_scn => &&first_scn);
end;
/

col "OBJ3CT" format a15
col operation_name format a8
col user_procedure format a28
col apply_name format a13

select object_owner||'.'||object_name "OBJ3CT",
operation_name,
user_procedure,
apply_name
from dba_apply_dml_handlers;

col source_database format a10
col "OBJECT" format a15
set numwidth 15
select source_database,
SOURCE_OBJECT_OWNER||'.'||SOURCE_OBJECT_NAME "OBJECT",
instantiation_scn
from dba_apply_instantiated_objects;

Step 6: Start the Apply and Capture processes

Now start the apply and the capture processes and wait for them to keep up with the current position of the log writer:
exec dbms_apply_adm.start_apply('SQLGEN_APPLY')
exec dbms_capture_adm.start_capture('SQLGEN_CAPTURE')

Step 7: Test the SQL Generation

You can test your settings by inserting or updating a row in the source table; After some time, query the table that stores the generated SQL:
insert into source.t5 values (3,'Hello','Hello');
commit;

-- Wait a few minutes
col id format 99
col sqltext format a50
set long 1000
set longchunksize 1000
select * from mydml_tab;
The table content should look like below:
 ID SQLTEXT
--- --------------------------------------------------
1 /* COMMENT */ INSERT INTO "SOURCE"."T5"("ID","TEX
T1","TEXT2" ) VALUES ( 3,'Hello','Hello')
You can perform more tests with UPDATE or DELETE

Step 8: Drop the test environment

Like always with my example, I propose you drop the whole configuration before you leave:
connect / as sysdba

exec dbms_apply_adm.stop_apply('SQLGEN_APPLY')
exec dbms_capture_adm.stop_capture('SQLGEN_CAPTURE')

exec dbms_apply_adm.delete_all_errors('SQLGEN_APPLY');
exec dbms_apply_adm.drop_apply('SQLGEN_APPLY')
exec dbms_capture_adm.drop_capture('SQLGEN_CAPTURE')

begin
for i in (select source_object_owner||'.'||
source_object_name name
from dba_apply_instantiated_objects
where source_object_owner in ('SOURCE'))
loop
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => i.name,
source_database_name => '&&dbname',
instantiation_scn => null);
end loop;
end;
/

begin
for i in (select object_owner||'.'||
object_name name,
operation_name,
apply_name
from dba_apply_dml_handlers
where object_owner in ('SOURCE'))
loop
dbms_apply_adm.set_dml_handler(
object_name => i.name,
object_type => 'TABLE',
operation_name=> i.operation_name,
user_procedure=> null,
apply_name => i.apply_name);
end loop;
end;
/

exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);

drop user strmadmin cascade;

drop tablespace streams_tbs
including contents and datafiles;

drop user source cascade;

Read more...