Bookmark and Share

Saturday, October 17, 2009

Turning a Physical Standby and its Broker into a Logical Standby

Turning a Physical Standby into a Logical Standby is well documented. Check Oracle® Data Guard Concepts and Administration - 11g Release 2 (11.2) - Creating a Logical Standby Database for all the details. Nevertheless, the documentation assumes you set up the log transport service manually and doesn't explain how to do it with the broker in place. You'll find below what needs to be done to perform that change with the broker in place. It's, by far, easier than by setting the log_archive_dest_n parameters.
Notes:
  • It's a pity you cannot convert a physical Standby into a logical Standby from one-only dgmgrl command (yet!).
  • This post has been tested with Oracle Database 11g Release 2 (11.2.0.1) on Linux x86.
  • Before you proceed, make sure the temporary files specified correctly.

A Physical Standby

To begin, let's consider a data guard configuration made of 2 databases; BLACK is the primary and WHITE the physical standby, we'll transform the later into a logical standby database. The first step consists in stopping the redo apply process and checking the database is mounted:
edit configuration
set PROTECTION MODE AS MaxPerformance;

Succeeded.

edit database white
set state=APPLY-OFF;

Succeeded.

show database white;

Database - white

Role: PHYSICAL STANDBY
Intended State:
APPLY-OFF
Transport Lag: 0 seconds
Apply Lag: 0 seconds
Real Time Query:
OFF
Instance(s):
WHITE

Database Status:
SUCCESS

exit

Checking for datatypes and tables without keys

Before you transform the standby database, you can check the database doesn't contain tables without unique keys and tables that cannot be maintained by the logical standby. Run the following queries on BLACK:
select owner, table_name
from dba_logstdby_not_unique
where (owner, table_name) not in
(select distinct owner, table_name
from dba_logstdby_unsupported)
and bad_column='Y';

no rows selected
select distinct owner, table_name
from dba_logstdby_unsupported;
[...]

Capturing the dictionary on the primary

To proceed with the logical standby setup, you have to capture the dictionary on the source database (BLACK) and make sure there is no pending transactions. For that purpose, you can use the dbms_logstdby package like below:
. oraenv
BLACK

sqlplus / as sysdba

exec dbms_logstby.build;

exit;

Applying the logs on the standby

The standby database can now be synchronized to the point where the physical standby should be turned into a logical standby; to proceed with that step, proceed like below:
. oraenv
WHITE

sqlplus / as sysdba

alter database recover
to logical standby WHITE;

Database altered.

Opening the Physical Standby with ResetLogs

The next step consists in opening the standby database with resetlogs:
shutdown
ORA-01507: database not mounted
ORACLE instance shut down.


startup mount
ORACLE instance started.

Total System Global Area 263639040 bytes
Fixed Size 1335892 bytes
Variable Size 104861100 bytes
Database Buffers 150994944 bytes
Redo Buffers 6447104 bytes
Database mounted.


alter database open resetlogs;

Database altered.

exit;

Rebuilding the broker configuration

You can now change the data guard broker configuration to remove the physical standby configuration and add the logical standby instead:
. oraenv
BLACK

dgmgrl /

remove database white
preserve destinations;

Removed database "white" from the configuration

show configuration

Configuration - worldwide

Protection Mode: MaxPerformance
Databases:
black - Primary database

Fast-Start Failover: DISABLED

Configuration Status:
SUCCESS


add database white
as connect identifier is white
maintained as logical;

Database "white" added

enable configuration;

Enabled.

show configuration;

Configuration - worldwide

Protection Mode: MaxPerformance
Databases:
black - Primary database
white - Logical standby database

Fast-Start Failover: DISABLED

Configuration Status:
DISABLED


enable configuration;

Enabled.

show configuration

Configuration - worldwide

Protection Mode: MaxPerformance
Databases:
black - Primary database
white - Logical standby database

Fast-Start Failover: DISABLED

Configuration Status:
SUCCESS
Note:
It can take a few minutes for the standby and primary to get to the right state. However, if it still fails after a few minutes, check the alert.log for any unexpected errors.

Changing Properties

You can change some properties to ease the switchover and change the protection mode like below:
edit database white
set property StaticConnectIdentifier=white;

Property "staticconnectidentifier" updated

edit database black
set property StaticConnectIdentifier=black;

Property "staticconnectidentifier" updated

edit configuration
set protection mode as MaxAvailability;

Succeeded.

show configuration;

Configuration - worldwide

Protection Mode: MaxAvailability
Databases:
black - Primary database
white - Logical standby database

Fast-Start Failover: DISABLED

Configuration Status:
SUCCESS

Testing the Logical Standby

Once the configuration done, you can test it; make sure your changes to the primary database are replicated to the logical standby database:
. oraenv
BLACK

sqlplus / as sysdba

update scott.emp
set sal=10000
where ename='KING';

1 row updated.

commit;

Commit complete.

exit;

. oraenv
WHITE

sqlplus / as sysdba

select sal
from scott.emp
where ename='KING';

SAL
-----
10000
You can also test the switchover:
. oraenv
BLACK

dgmgrl /
switchover to white;

show configuration;

Conclusion

You may wonder what is the link between Streams and the Logical Standby. Obviously they both rely on the same technology but, in fact that's not the main reason why I'm investigating logical standby. No; instead that's because, with 11.2 you can now configure a Streams capture from a logical standby as described in Oracle® Data Guard Concepts and Administration - 11g Release 2 (11.2) - 10.6.6 Running an Oracle Streams Capture Process on a Logical Standby Database.

Read more...

Sunday, October 11, 2009

LOB Chunks

To continue with the previous test case, I'll insert a large enough LOB so that the change record is splitted into several chunks. To perform that operation :
  • Setup the example as described in Streams LCRs and LOBs
  • Insert a LOB that is large enough to guaranty it will be deleted
What is interesting is less the output than the number of chunks and the size of each chunks.

Insert a large CLOB

The script below create a LOB as a temporary resource and, once built, insert it in SOURCE.T9:
connect source/source

declare
z clob;
begin
DBMS_LOB.CREATETEMPORARY(z,true,DBMS_LOB.SESSION);
for i in 1..20 loop
dbms_lob.append(z,rpad('Z',1024,'Z'));
end loop;
insert into source.T9 values (2,z);
end;
/
commit;

connect strmadmin/strmadmin
set lines 1000
set serveroutput on
exec print_xml_fromq('MYQUEUE');

The result

You'll find below the output that matches the set of LCRs generated by the one only insert:
---------------------------------------------------------
<ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>INSERT</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>5.5.913</transaction_id>
<scn>1448259</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>2</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2 nil="true">
</data>
<lob_information>EMPTY LOB</lob_information>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB WRITE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>5.5.913</transaction_id>
<scn>1448259</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>2</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2>ZZZ[...]ZZZ</varchar2>
</data>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_offset>1</lob_offset>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB WRITE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>5.5.913</transaction_id>
<scn>1448259</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>2</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2>ZZZ[...]ZZZ</varchar2>
</data>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_offset>8061</lob_offset>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB WRITE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>5.5.913</transaction_id>
<scn>1448259</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>2</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2>ZZZ[...]ZZZ</varchar2>
</data>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_offset>16121</lob_offset>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB TRIM</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>5.5.913</transaction_id>
<scn>1448259</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>2</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2 nil="true">
</data>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_operation_size>20480</lob_operation_size>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>UPDATE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>5.5.913</transaction_id>
<scn>1448259</scn>
<old_values>
<old_value>
<column_name>ID</column_name>
<data>
<number>2</number>
</data>
</old_value>
</old_values>
</row_lcr>
---------------------------------------------------------
No more messages

Read more...

Friday, October 9, 2009

Streams LCRs and LOBs

LOBs (BLOB, CLOB, NCLOB) have been designed to provide a rich and efficient set of APIs. With securefile, LOB's storage model is highly efficient and optimized for compression. LOBs don't work like anything else within an Oracle database. You can check my previous post entitled Triggers and LOBs, if you are not convinced yet.

As a result, Streams doesn't work with LOBs like anything else and using LOBs with Streams can quickly turn into a headache:
  • Many tools, including the ones provided by the documentation, don't provide a clean display of LCR with LOBs,
  • Some packages require specific LOB settings, like DBMS_APPLY_ADM.SET_DML_HANDLER and its assemble_lobs parameter,
  • The streams feature set associated with LOB is quite different from the feature set associated with other types : no conflict detection, no support for synchronous capture mechanisms and several specific bugs (search for "Streams" and "LOB" in the Metalink Bug Search Engine).
For all these reasons, it's very interesting to be able to figure out how LOBs and Streams work together. To help, I've built a very simple model that allows to see, if not exactly the content of LCRs containing LOBs at least a XML representation of them.

The test case

You'll find below a description of the test case, I've built:

To work the test case is made of several parts
  1. The source table is named SOURCE.T9. You can easily change its structures (from CLOB to BLOB for example). All the DML commands you'll execute and commit on that table will be captured.
  2. Because I use an asynchronous capture process, I need to make sure the change vectors are stored by LGWR in the redologs and archivelogs.
  3. I've created a strmadmin schema to stored all the objects I use for the setup (queue, procedure, rules...); I've named the capture process streams_capture; it captures the changes from the redologs and enqueue them in the streams_queue queue.
  4. The apply process de-queues the LCRs from streams_queue
  5. It relies on a DML handler to manage the LCRs. That handler converts the LCRs into XML type objects with the DBMS_STREAMS.CONVERT_LCR_TO_XML procedure and enqueue them in the myqueue queue
  6. The print_xml_fromq procedure is used to display the content of the XML type objects that match LCRs.

The script

The script below performs the whole setup of the test case described above:
connect / as sysdba

col global_name format a30 new_value dbname
select global_name from global_name;

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

create table source.t9(
id number,
text clob,
constraint t9_pk primary key (id))
lob(text) store as securefile;

var first_scn number;

set serveroutput on
declare
scn number;
begin
dbms_capture_adm.build(
first_scn => scn);
dbms_output.put_line('First SCN Value = ' || scn);
:first_scn := scn;
end;
/

col first_scn format 999999999999 -
new_value first_scn
select :first_scn first_scn from dual;

create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs.dbf'
size 25M autoextend on maxsize 256M;

create user strmadmin identified by strmadmin
default tablespace streams_tbs
quota unlimited on streams_tbs;

grant dba to strmadmin;
grant execute on dbms_aq to strmadmin;

begin
dbms_streams_adm.set_up_queue(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
end;
/

connect strmadmin/strmadmin

var first_scn number;
exec :first_scn:=&&first_scn

begin
dbms_streams_adm.add_table_rules(
table_name => 'source.t9',
streams_type => 'capture',
streams_name => 'streams_capture',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => '&&dbname',
inclusion_rule => true,
and_condition => null);
end;
/

col capture_name format a15
col queue_name format a13
col first_scn format 999999999999
col start_scn format 999999999999
col rule_set_name format a11

select capture_name,
queue_name,
first_scn,
start_scn,
rule_set_name
from dba_capture;

set lines 120
col streams_name format a16
col streams_type format a9
col rule_owner format a10
col table_name format a15
col rule_type format a8
col rule_name format a15

select rule_owner,
streams_name,
streams_type,
table_owner||'.'||table_name table_name,
rule_type,
rule_name
from dba_streams_table_rules;

select table_owner||'.'||table_name table_name,
scn,
timestamp,
supplemental_log_data_pk
from dba_capture_prepared_tables;

connect strmadmin/strmadmin

begin
dbms_streams_adm.add_table_rules(
table_name => 'source.t9',
streams_type => 'apply',
streams_name => 'streams_apply',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => '&&dbname',
inclusion_rule => true);
end;
/

col apply_name format a13
col queue_name format a13
col rule_set_name format a11

select apply_name,
queue_name,
rule_set_name,
status,
message_delivery_mode
from dba_apply;

col instantiation_scn format 999999999999 -
new_value instantiation_scn

select dbms_flashback.get_system_change_number instantiation_scn
from dual;

begin
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => 'source.t9',
source_database_name => 'BLACK',
instantiation_scn => &&instantiation_scn);
end;
/

col source_database format a6
col object format a10
col instantiation_scn format 999999999999

select source_database,
source_object_owner||'.'||source_object_name object,
instantiation_scn
from dba_apply_instantiated_objects;

connect strmadmin/strmadmin

begin
dbms_aqadm.create_queue_table(
queue_table => 'myqueue_table',
queue_payload_type => 'sys.xmltype',
multiple_consumers => false);
dbms_aqadm.create_queue(
queue_name => 'myqueue',
queue_table => 'myqueue_table');
dbms_aqadm.start_queue('myqueue');
end;
/

create or replace procedure myhandler(in_any in anydata)
is
enqueue_options DBMS_AQ.enqueue_options_t;
message_properties DBMS_AQ.message_properties_t;
recipients DBMS_AQ.aq$_recipient_list_t;
message_handle RAW(16);
begin
enqueue_options.visibility := dbms_aq.immediate;
enqueue_options.delivery_mode := dbms_aq.persistent;
dbms_aq.enqueue(
queue_name => 'STRMADMIN.MYQUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => dbms_streams.convert_lcr_to_xml(in_any),
msgid => message_handle);
end;
/

begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T9',
object_type => 'TABLE',
operation_name => 'DEFAULT',
error_handler => false,
user_procedure => 'strmadmin.myhandler',
apply_name => 'STREAMS_APPLY',
assemble_lobs => false);
end;
/

exec dbms_capture_adm.start_capture('streams_capture');
exec dbms_apply_adm.start_apply('streams_apply');

create or replace procedure print_xml_fromq(queue_name varchar2)
is
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw(16);
message sys.xmltype;
v_out pls_integer;
no_messages exception;
pragma exception_init(no_messages, -25228);
begin
dequeue_options.wait := dbms_aq.no_wait;
dequeue_options.navigation := dbms_aq.first_message;
dequeue_options.visibility := dbms_aq.immediate;
dequeue_options.delivery_mode := dbms_aq.persistent;
loop
begin
dbms_aq.dequeue(
queue_name => queue_name,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
dbms_output.put_line('---------------------------------------------------------');
dbms_output.put_line(message.extract('/*').getstringval());
dbms_output.put_line('---------------------------------------------------------');
dbms_output.put_line('');
dequeue_options.navigation := dbms_aq.next_message;
end;
end loop;
exception
when no_messages then
dbms_output.put_line('No more messages');
commit;
end;
/
If you are use to streams, there is not much that you don't know already in that script. The table contains a LOB but can easily be changed, even after you've started the capture to match you need. The very powerful feature I use and that comes with Streams is the ability to transform a LCR into a XML File with the DBMS_STREAMS.CONVERT_LCR_TO_XML procedure.

Play with LOBs

Once the configuration done, you can use implicit LOB conversion or the LOB built-ins to create and modify LOB values from SQL and PL/SQL like below:
connect source/source

insert into source.T9 values (1,empty_clob());
commit;

declare
my_x clob;
begin
select text into my_x
from source.T9 where id=1
for update;
dbms_lob.append(my_x,'Z');
commit;
end;
/

declare
my_x clob;
amount number;
begin
select text into my_x
from source.T9 where id=1
for update;
amount:=1;
dbms_lob.erase(my_x,amount,1);
commit;
end;
/

Display XMLized LCRs

You can now display the content of the LCRs that have been captured with the script below:
connect strmadmin/strmadmin
set lines 1000
set serveroutput on
exec print_xml_fromq('MYQUEUE');
The result looks like below. Pay attention to the various command type "LOB WRITE", "LOB TRIM" and "LOB ERASE":
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>INSERT</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>6.31.810</transaction_id>
<scn>1351988</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>1</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2 nil="true"></varchar2>
<lob_information>EMPTY LOB</lob_information>
</data>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>UPDATE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>6.31.810</transaction_id>
<scn>1351988</scn>
<old_values>
<old_value>
<column_name>ID</column_name>
<data>
<number>1</number>
</data>
</old_value>
</old_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB WRITE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>3.33.828</transaction_id>
<scn>1352031</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>1</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2>Z</varchar2>
</data>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_offset>1</lob_offset>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB TRIM</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>3.33.828</transaction_id>
<scn>1352031</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>1
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2 nil="true"></varchar2>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_operation_size>1</lob_operation_size>
</data>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB ERASE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>7.19.625</transaction_id>
<scn>1352034</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>1</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2 nil="true"></varchar2>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_offset>1</lob_offset>
<lob_operation_size>1</lob_operation_size>
</data>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
No more messages

Drop the test case

Once your tests done, you can drop the whole test case running the script below:
connect / as sysdba

col global_name format a30 new_value dbname
select global_name from global_name;

exec dbms_apply_adm.stop_apply('STREAMS_APPLY')
exec dbms_capture_adm.stop_capture('STREAMS_CAPTURE')

exec dbms_apply_adm.delete_all_errors('STREAMS_APPLY');
exec dbms_apply_adm.drop_apply('STREAMS_APPLY')
exec dbms_capture_adm.drop_capture('STREAMS_CAPTURE')

drop procedure strmadmin.mydml_handler;

begin
dbms_aqadm.stop_queue('strmadmin.myqueue');
dbms_aqadm.drop_queue('strmadmin.myqueue');
dbms_aqadm.drop_queue_table('strmadmin.myqueue_table', true);
end;
/

begin
for i in (select source_object_owner||'.'||
source_object_name name
from dba_apply_instantiated_objects
where source_object_owner in ('SOURCE'))
loop
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => i.name,
source_database_name => '&&dbname',
instantiation_scn => null);
end loop;
end;
/

begin
for i in (select object_owner||'.'||
object_name name,
operation_name,
apply_name
from dba_apply_dml_handlers
where object_owner in ('SOURCE'))
loop
dbms_apply_adm.set_dml_handler(
object_name => i.name,
object_type => 'TABLE',
operation_name=> i.operation_name,
user_procedure=> null,
apply_name => i.apply_name);
end loop;
end;
/

exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);

drop user strmadmin cascade;

drop tablespace streams_tbs
including contents and datafiles;

drop user source cascade;
In a next blog post, I'll dig further into how LOBs work within LCRs. If you want write your own LCRs that contains LOB, you can check the documentation; there is a short section called "Oracle® Streams Extended Examples - Logical Change Records With LOBs Example"

Read more...

Monday, October 5, 2009

My XStream FAQ

I have written 2 programs a few months back to use and experiment Oracle XStream; when I discovered Oracle had launched XStream right after closing the deal with GoldenGate (a few hours back from now...), I thought that might be clever to publish those Java programs help people set up a XStream Inbound configuration and XStream Outbound configuration. I really wish those 2 programs can help people.

However, after a few conversations on IM, Twitter (@arkzoyd), and the blog, I'm now convinced I made it wrong. I should have started by the beginning instead of pushing those programs with no explanation. I should have talked about the reasons why it's important to track message positions. How subtle XStream actually is. What benefit you could get from XStream compared to what use to exist before.

I'll try to correct the current situation and provide simple answers to the questions below:
If you have some high level questions about XStream, feel free to comment this post; If you have technical issues, I suggest you directly post on the Streams Forum...

What is Oracle XStream?

XStream is a new way to interact between Oracle Streams and the outside world. It uses a publish subscribe model that is different from the regular AQ queuing model. The purpose is to provide the same kind of features Oracle uses to propagate messages between databases to the outside world.

As a result, with XStream, you can now propagate messages between a database and a program written in OCI or Java/OCI. In addition, you can guaranty that:
  • no messages are written on disks, everything stays in memory during the propagation to and from the program; and not only you can guaranty that behavior between the database and your program but also to the whole chain of programs from the source to the destination of your messages
  • If something crashes whether it's a database instance or your program, you'll be able to recover all your messages, in the right order and without loosing a single one
Obviously, if you face some bottleneck/saturation, you may decide, like Streams does internally, that it's more efficient for your messages to spill out of memory. In that case, you can turn them into persistent messages or you can slow down the capture or propagation processes.

Why could you not get to the same result before XStream?

There are 2 cases, XStream addresses: (1) Inbound messages and (2) Outbound messages.
  • Inbound messages
Before XStream, to send a message to a database the simplest way was to make it persistent; you could for example : "insert the message into a persistent queue" or "insert it into a table". In both cases, Oracle database guaranties you will not loose your message. However, if the database you put your messages in isn't your message destination database, you would have to pay the price of turning messages persistent in that staging area.

Obviously you could also decide to enqueue your messages into a non-persistent queue on the staging database and use a propagation job to send the message to its destination database. The problem with that method is that there is no guaranty that, if an instance crashes, the message will ever go to its destination; otherwise, non-persistent queue would be called persistent queues!

To both guaranty the message to get to its destination, without the price of turning it persistent on the staging databases, until Oracle Database 11g Release 2, you could not rely on any Oracle database built-in... But now, you have XStream!
  • Outbound messages
The principle is basically the same for outbound messages. A Streams captured message could be turned into a persistent message which guaranty you will not loose your message but require you pay the price of turning captured messages persistent on the staging database.

You could also turn your messages into non-persistent messages; yes, it's possible, if say, you create an apply handler that enqueue the message in a buffered queue. In that case, there is no guaranty you will never loose any message.

Like for Inbound messages, there is no built-in feature of Oracle you could use to guaranty a message from inside a database could be send to a destination outside the database without turning it into a persistent message on the staging database.

What problem does it solve?

As you can now guess, XStream enables you to propagate Streams messages between Oracle databases and the outside world in an extremely efficient way. It allows to propagate messages in-memory only (in most cases!) and, at the same time, it guaranties messages will be delivered to their destination.

Why is it cool?

Because Streams is cool! And providing the power of Streams to the outside world is VERY cool, don't you think? ;-)

How does XStream relate to GoldenGate?

XStream, like Oracle Active Data Guard is included into the GoldenGate license. But opposite to Active Data Guard, XStream cannot be licensed separately from GoldenGate. For now that's the only link between XStream and GoldenGate, even if we can guess GoldenGate could leverage the XStream feature sooner or later.

What does Oracle want to do with XStream?

That's actually a very good question and I assume we can only guess for now. There is a bunch of things Oracle could do with XStream, like the ones below:
  • leveraging the technology in GoldenGate more than just by a bundle
  • leveraging the technology for existing features of the Oracle Database like the In-Memory Database Cache or Oracle Coherence
  • leveraging the technology to build new features like file to database integration or mySQL to Oracle integration (If we consider mySQL could turn into an Oracle offer one day soon).
  • probably many more things I cannot even think about...

How does XStream work?

XStream relies on a few facts:
  • Messages are consumed once on the destination; whether they are made persistent or not doesn't actually matter. Once consumed a message on the destination will never be requested again.
  • There is an order in the messages; in addition, there is a raw value that strictly increases and uniquely identifies the message positions in that order
  • If a message is lost in the chain, that's because the chain has broken; and, in that case, all the subsequent messages are lost too.
  • The message order is kept during the propagation from the source to the destination
  • Messages are persistent on the source; Or, saying it in another way, if the destination doesn't get a message, the source will be able to resend it and in the same right order... Assuming it gets back the message position where to restart at!
As a result, an XStream program deals with its server:
  • In the case of XStream Inbound, the program gets the last position from the server and then enqueue its messages from that point. On a regular basis, it gets some feedback from the server saying what messages are consumed on the destination and can release those messages from the source
  • In the case of XStream Outbound, the program sends the last position of the applied messages and then dequeue messages from that point. On regular basis, it provides some feedback to the server saying what messages are consumed to allow the server to release them from its source.

How can I know more about XStream?

To know everything about XStream, read the following documentation:

Read more...

Sunday, October 4, 2009

XStream Outbound... A sample Java Program

Like with XStream InBound in my previous post, you'll find below a simple Java for XStream Outbound. This new program subscribes to Streams captured changes. Does it sound easy ? It is, like you'll figure out below.

Step 1: Create a Sample Schema

For this sample program, create a schema and a table :
connect / as sysdba

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

col dbname new_value dbname

select value dbname
from v$parameter
where name='db_unique_name';

prompt &&dbname

connect source/source

create table t7(
id number primary key,
text1 varchar2(80),
text2 varchar2(80));

insert into t7(id, text1, text2)
values (1,'Text 1','Text 1');

insert into t7(id, text1, text2)
values (2,'Text 2','Text 2');

commit;

Step 2: Create a Streams Administrator

To subscribe to Streams changes, you must create a Streams administrator:
connect / as sysdba

create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'
size 25M autoextend on maxsize 256M;

CREATE USER strmadmin
IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs
temporary tablespace temp;

grant dba to strmadmin;

begin
dbms_streams_adm.set_up_queue(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
end;
/

exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);

select *
from dba_streams_administrator;

Step 3: Create a Simple XStream Out Configuration

For our Java program to dequeue messages, you must create a XStream Outbound server:
connect strmadmin/strmadmin

begin
dbms_xstream_adm.create_outbound(
server_name => 'DEMO_SERVER',
source_database => '&&dbname',
table_names => 'source.t7',
schema_names => null,
capture_user => null,
connect_user => null,
comment => 'XStream OutBound Server Demonstration');
end;
/

SELECT *
FROM DBA_XSTREAM_OUTBOUND;

select *
from dba_apply
where purpose='XSTREAM OUT';

Step 4: Create a JAVA XStream Outbound Client

Create a XStreamOutDemo.java file, like below. Change the strings in red to meet your own configuration:
import java.sql.Connection;
import java.sql.DriverManager;

import oracle.jdbc.internal.OracleConnection;

import oracle.streams.XStreamOut;
import oracle.streams.ChunkColumnValue;
import oracle.streams.ColumnValue;
import oracle.streams.DefaultRowLCR;
import oracle.streams.LCR;
import oracle.streams.RowLCR;

public class XStreamOutDemo {

public static void main(String args[])
{
String out_url = "jdbc:oracle:oci:@arkzoyd-easyteam:1521:BLACK";

/*
* Connect to the Database
*/
Connection out_conn = null;
try
{
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
out_conn=DriverManager.getConnection(out_url, "strmadmin", "strmadmin");
}
catch(Exception e)
{
System.out.println("DB Connection Failed: " + out_url);
e.printStackTrace();
}

/*
* Get a XStream Out Handler
*/
XStreamOut xsOut=null;
byte[] lastPosition = null;

try
{
// when attach to an outbound server, client needs to tell outbound
// server the last position.
xsOut = XStreamOut.attach((OracleConnection) out_conn, "DEMO_SERVER",
lastPosition, XStreamOut.DEFAULT_MODE);
System.out.println("Attached to outbound server: DEMO_SERVER");
System.out.print("Last Position is: ");
if (lastPosition != null) { printHex(lastPosition); }
else { System.out.println("NULL");}
}
catch(Exception e)
{
System.out.println("cannot attach to outbound server: DEMO_SERVER");
System.out.println(e.getMessage());
e.printStackTrace();
}

byte[] processedLowPosition = null;
try
{
while(true)
{
// receive an LCR from outbound server
LCR alcr = xsOut.receiveLCR(XStreamOut.DEFAULT_MODE);

if (xsOut.getBatchStatus() == XStreamOut.EXECUTING) // batch is active
{
assert alcr != null;

// also get chunk data for this LCR if any
if (alcr instanceof RowLCR)
{
// receive chunk from outbound then send to inbound
if (((RowLCR)alcr).hasChunkData())
{
ChunkColumnValue chunk = null;
do
{
chunk = xsOut.receiveChunk(XStreamOut.DEFAULT_MODE);
} while (!chunk.isEndOfRow());
}
}

String command=alcr.getCommandType();
if (!command.equals("COMMIT"))
System.out.print(command+" on "+ alcr.getObjectOwner()+"."+
alcr.getObjectName()+"\n");
else System.out.print(command+"\n");
if (command.equals("INSERT") || command.equals("UPDATE")) {
System.out.print(" -- NEW VALUES ----------\n");
for (int i=0;i<((RowLCR) alcr).getNewValues().length;i++) {
System.out.print(" Column:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getNewValues()[i]).getColumnName(),10)+
"Value:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getNewValues()[i]).getColumnData().
stringValue(),30)+"\n");
}
}
if (command.equals("UPDATE") || command.equals("DELETE")) {
System.out.print(" -- OLD VALUES ----------\n");
for (int i=0;i<((RowLCR) alcr).getOldValues().length;i++) {
System.out.print(" Column:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getOldValues()[i]).getColumnName(),10)+
"Value:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getOldValues()[i]).getColumnData().
stringValue(),30)+"\n");
}
}
System.out.print(" -- DML ----------\n");
System.out.println(" "+((DefaultRowLCR) alcr).getStatement(false));
processedLowPosition = alcr.getPosition();
if (null != processedLowPosition)
xsOut.setProcessedLowWatermark(processedLowPosition,
XStreamOut.DEFAULT_MODE);
System.out.print("Last Position is: ");
printHex(processedLowPosition);
} else // batch is end
{ assert alcr == null; }
}
} catch(Exception e) {
System.out.println("exception when processing LCRs");
System.out.println(e.getMessage());
e.printStackTrace();
}
}

public static void printHex(byte[] b) { for (int i = 0; i <>length) {
output=text.substring(0,text.length());
} else {
output=text;
for (int i=0; i<(length-text.length()); i++) {
output=output+" ";
}
}
return output;
}
}
To compile and execute the program, run the script below:
export CLASSPATH=.:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc6.jar:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/rdbms/jlib/xstreams.jar:$CLASSPATH
export JAVA_HOME=/opt/jdk1.6.0_13
export PATH=$JAVA_HOME/bin:$PATH

javac XStreamOutDemo.java
java XStreamOutDemo
To test the program, open another session to the database and enqueue messages int the table like below:
insert into source.t7 values (3,'X','X');
commit;
update source.t7 set text2='Y' where id=3;
commit;
delete from source.t7 where id=3;
commit;
The output looks like below:
INSERT on SOURCE.T7
-- NEW VALUES ----------
Column:ID Value:3
Column:TEXT1 Value:X
Column:TEXT2 Value:X
Last Position is: 0000001d856d00000001000000010000001d856c000000010000000101
COMMIT
Last Position is: 0000001d856d00000001000000010000001d856d000000010000000101
UPDATE on SOURCE.T7
-- NEW VALUES ----------
Column:TEXT2 Value:Y
-- OLD VALUES ----------
Column:ID Value:3
Column:TEXT2 Value:X
Last Position is: 0000001d856f00000001000000010000001d856e000000010000000101
COMMIT
Last Position is: 0000001d856f00000001000000010000001d856f000000010000000101
DELETE on SOURCE.T7
-- OLD VALUES ----------
Column:ID Value:3
Column:TEXT1 Value:X
Column:TEXT2 Value:Y
Last Position is: 0000001d857200000001000000010000001d8570000000010000000101
COMMIT
Last Position is: 0000001d857200000001000000010000001d8572000000010000000101
To exit the program, type:
CTRL+C

Step 5: Drop the Configuration

I hope you've found this example useful. To drop the configuration, run the script below:
connect strmadmin/strmadmin

begin
dbms_xstream_adm.drop_outbound(
server_name=>'DEMO_SERVER');
end;
/

SELECT *
FROM DBA_XSTREAM_OUTBOUND;

select * from dba_apply;
select * from dba_capture;
select * from dba_streams_table_rules;

connect / as sysdba

drop user source cascade;
drop user strmadmin cascade;
You're done! It's nice to enter the new Oracle Data Integration generation with GoldenGate, don't you think?

Read more...

XStream Inbound... A Sample Java Program

Oracle XStream is a new feature of Oracle Database 11g Release 2. Though it has been developed by the Streams team and is part of the database, XStream is part of GoldenGate! And now that the GoldenGate deal has been closed, 7 days before Oracle Openworld, it's easy to guess why XStream went out a few hours ago.

This post is about XStream; it provides a Java program that uses "XStream In" to enqueue messages to a buffered queue. A Streams apply process applies them to a table named SOURCE.T8.

To know everything about XStream, read the following documentation:

Step 1: Create a Sample Schema

For this example, you'll need a schema and a table:
sqlplus / as sysdba

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

col dbname new_value dbname

select value dbname
from v$parameter
where name='db_unique_name';

prompt &&dbname

connect source/source

create table t8(
id number primary key,
text1 varchar2(80),
text2 varchar2(80));

commit;

Step 2: Create a Streams Administrator

To use XStream, you'll also need a Streams administrator and a queue to stage buffered messages :
connect / as sysdba

create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'
size 25M autoextend on maxsize 256M;

CREATE USER strmadmin
IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs
temporary tablespace temp;

grant dba to strmadmin;

begin
dbms_streams_adm.set_up_queue(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
end;
/

exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);

select *
from dba_streams_administrator;

Step 3: Create a Simple XStream In Configuration

You must create and start a XStream In configuration before you use a Java or OCI client to enqueue messages:
connect strmadmin/strmadmin

BEGIN
DBMS_XSTREAM_ADM.CREATE_INBOUND(
server_name => 'xin',
queue_name => 'xin_queue');
END;
/

SELECT *
FROM DBA_XSTREAM_INBOUND;

set pages 1000
select *
from dba_apply
where purpose='XSTREAM IN';

exec DBMS_APPLY_ADM.START_APPLY('xin');

Step 4: Create a JAVA XStream Inbound Client

Create a XStreamInDemo.java file that enqueue messages to the buffered queue. Change the part in red to match your environment:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

import java.util.Date;

import oracle.jdbc.internal.OracleConnection;

import oracle.sql.CHAR;
import oracle.sql.DATE;
import oracle.streams.ColumnValue;
import oracle.streams.DefaultColumnValue;
import oracle.streams.DefaultRowLCR;
import oracle.streams.RowLCR;
import oracle.streams.XStreamIn;

public class XStreamInDemo {

public static void main(String args[])
{
String in_url = "jdbc:oracle:oci:@arkzoyd-easyteam:1521:BLACK";

/*
* Connect to the Database
*/
Connection in_conn = null;
try
{
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
in_conn=DriverManager.getConnection(in_url, "strmadmin", "strmadmin");
}
catch(Exception e)
{
System.out.println("DB Connection Failed: " + in_url);
e.printStackTrace();
}

/*
* Get a XStream In Handler
*/
XStreamIn xsIn=null;
String xsinName="XIN";
byte[] lastPosition = null;
int transaction=0;
int rank=0;
try
{
xsIn = XStreamIn.attach ((OracleConnection)in_conn, xsinName,
"HI2" , XStreamIn.DEFAULT_MODE);

// use last position to decide where should we start sending LCRs
System.out.println("Attached to inbound server:"+xsinName);
System.out.print("Inbound Server Last Position is: ");
lastPosition = xsIn.getLastPosition();
if (null == lastPosition)
{
System.out.println("null");
transaction=1;
rank=1;
}
else {
printHex(lastPosition); System.out.println("");
transaction = getTransaction(lastPosition);
rank = getRank(lastPosition);
if (rank==1) rank=2; else { rank=1; transaction++; }
}
}
catch(Exception e)
{
System.out.println("cannot attach to inbound server: "+xsinName);
System.out.println(e.getMessage());
e.printStackTrace();
}

/*
* Create a
*/

try {
DATE mydate;
DefaultRowLCR alcr;
byte[] processedLowPosition;

while(true) {
if (rank==1) {
mydate = new DATE();
System.out.println("-- " +
Integer.toString(transaction) +
" -------------------------");
alcr=new DefaultRowLCR(
"BLACK", RowLCR.INSERT, "SOURCE", "T8",
"X."+Integer.toString(transaction), null,
encode2bytes(transaction, rank), mydate);
ColumnValue[] newcolumn= new ColumnValue[3];
newcolumn[0]= new DefaultColumnValue("ID",
new oracle.sql.NUMBER(transaction));
newcolumn[1]= new DefaultColumnValue("TEXT1",
new CHAR("Hello2", CHAR.DEFAULT_CHARSET));
newcolumn[2]= new DefaultColumnValue("TEXT2",
new CHAR("Hello2", CHAR.DEFAULT_CHARSET));
alcr.setNewValues(newcolumn);
xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE);
System.out.println(" " +alcr.getStatement(false));
xsIn.flush(XStreamIn.DEFAULT_MODE);
rank++;
Thread.sleep(500);
} else {
mydate = new DATE();
alcr=new DefaultRowLCR(
"BLACK", RowLCR.COMMIT, null, null,
"X."+Integer.toString(transaction), null,
encode2bytes(transaction, rank), mydate);
xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE);
xsIn.flush(XStreamIn.DEFAULT_MODE);
System.out.println(" " +alcr.getStatement(false));
rank=1;
transaction++;
Thread.sleep(500);
}
System.out.print(" ");
processedLowPosition =
xsIn.getProcessedLowWatermark();
if (processedLowPosition != null) {
System.out.print("processedLowPosition: ");
printHex(processedLowPosition);
System.out.print(" (" +
Integer.toString(getTransaction(processedLowPosition))+
", " +
Integer.toString(getRank(processedLowPosition)) +
")");
}
else {
System.out.print("processedLowPosition: null");
}
lastPosition =
xsIn.getLastPosition();
System.out.println("");
}
} catch (Exception e) {
System.out.println("exception when processing LCRs");
System.out.println(e.getMessage());
e.printStackTrace();
}
}

public static void printHex(byte[] b)
{
for (int i = 0; i < b.length; ++i)
{
System.out.print(
Integer.toHexString((b[i]&0xFF) | 0x100).substring(1,3));
}
}

public static byte[] encode2bytes(int transaction, int rank)
{
byte[] mybyte= new byte[5];
mybyte[0] =(byte)( transaction >> 24 );
mybyte[1] =(byte)( (transaction << 8) >> 24 );
mybyte[2] =(byte)( (transaction << 16) >> 24 );
mybyte[3] =(byte)( (transaction << 24) >> 24 );
mybyte[4] =(byte)( rank ) ;
return mybyte;
}

public static int getTransaction(byte[] mybyte)
{
int i = 0;
int pos = 0;
i += ((int) mybyte[pos++] & 0xFF) << 24;
i += ((int) mybyte[pos++] & 0xFF) << 16;
i += ((int) mybyte[pos++] & 0xFF) << 8;
i += ((int) mybyte[pos] & 0xFF);
return i;
}

public static int getRank(byte[] mybyte)
{
int foo;
foo =((int)mybyte[4] & 0xFF);
return foo;
}
}
Notes:
  • getLastPosition should be used once after you attach the server to get the last message handled by XStream In
  • The client has to be OCI or Java/OCI. You must include the xstreams.jar library in the classpath

To compile the code, set the CLASSPATH, the PATH and the JAVA_HOME. One done, you can simply run it:
export CLASSPATH=.:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc6.jar:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/rdbms/jlib/xstreams.jar:$CLASSPATH
export JAVA_HOME=/opt/jdk1.6.0_13
export PATH=$JAVA_HOME/bin:$PATH

javac XStreamInDemo.java
java XStreamInDemo

Check messages are applied to the SOURCE.T8 table

The program enqueue messages. You can verify messages are applied to the table like in the script below:
sqlplus / as sysdba

select count(*)
from source.T8;

COUNT(*)
----------
1073


/

COUNT(*)
----------
1076

Clean Up the Environment

To clean up the environment, execute the script below:
connect strmadmin/strmadmin

exec DBMS_APPLY_ADM.STOP_APPLY('xin');

BEGIN
DBMS_XSTREAM_ADM.DROP_INBOUND(
server_name => 'xin');
END;
/

select *
from DBA_XSTREAM_INBOUND;

select *
from dba_apply
where purpose='XSTREAM IN';

connect / as sysdba

drop user source cascade;
drop user strmadmin cascade;

Read more...