Bookmark and Share

Thursday, August 27, 2009

Tracking Streams Changes with V$STREAMS_MESSAGE_TRACKING

V$STREAMS_MESSAGE_TRACKING is one of the best new features of Oracle Streams 11g. For a full description and an example, check the "Tracking LCRs Through a Stream" section of the documentation. In this post, we'll be creating a procedure after setting a 1-way Streams replication and the message tracking. You'll see how easy to use it is. To shorten this post, I won't be describing how to configure the replication. The 2 scripts attached, should help you to quickly get to it:Connect to the source database (i.e. BLACK), set the label and create a procedure in the DEMO schema:
. oraenv
BLACK

sqlplus / as sysdba

set serveroutput on

declare
tracking_label VARCHAR2(15);
begin
dbms_streams_adm.set_message_tracking(
tracking_label => 'ARKZOYD',
actions => DBMS_STREAMS_ADM.ACTION_MEMORY);
END;
/

create or replace procedure demo.hizoyd is
begin
null;
end;
/
Check the view to see how the message has been managed by Streams:
set lines 120
col component_name format a19
col component_type format a18
col action format a23
col object_owner format a14
col object_name format a14
col command_type format a16
select component_name,
component_type,
action,
object_owner,
object_name,
command_type
from V$STREAMS_MESSAGE_TRACKING
where tracking_label='ARKZOYD'
order by timestamp;
Here is the result:
COMPONENT_NAME     COMPONENT_TYPE      ACTION                     OBJECT_OWNER   OBJECT_NAME      COMMAND_TYPE
----------------- ----------------- ----------------------- ------------- -------------- ----------------
STREAMS_CAPTURE CAPTURE Created Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE
STREAMS_CAPTURE CAPTURE Rule evaluation Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE
STREAMS_CAPTURE CAPTURE Enqueue Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE
STREAMS_CAPTURE CAPTURE Created COMMIT
STREAMS_CAPTURE CAPTURE Enqueue COMMIT
STREAMS_PROPAGATION PROPAGATION SENDER Dequeued Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE
STREAMS_PROPAGATION PROPAGATION SENDER Propagation Sender sent Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE
STREAMS_PROPAGATION PROPAGATION SENDER Dequeued COMMIT
STREAMS_PROPAGATION PROPAGATION SENDER Propagation Sender sent COMMIT
The tracking is propagated to the destination database (i.e. WHITE) like you can see below :
exit

. oraenv
WHITE

sqlplus / as sysdba

set pages 20
set lines 120
col component_name format a19
col component_type format a20
col action format a28
col object_owner format a14
col object_name format a14
col command_type format a16
select component_name,
component_type,
action,
object_owner,
object_name,
command_type
from V$STREAMS_MESSAGE_TRACKING
where tracking_label='ARKZOYD'
order by timestamp;
Here is the result:
COMPONENT_NAME      COMPONENT_TYPE       ACTION                       OBJECT_OWNER    OBJECT_NAME    COMMAND_TYPE
----------------- ------------------ ------------------------- ------------- -------------- ----------------
STREAMS_PROPAGATION PROPAGATION RECEIVER Propagation receiver enqueue Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE
STREAMS_PROPAGATION PROPAGATION RECEIVER Rule evaluation Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE
STREAMS_PROPAGATION PROPAGATION RECEIVER Rule evaluation Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE
STREAMS_PROPAGATION PROPAGATION RECEIVER Propagation receiver enqueue Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE
STREAMS_PROPAGATION PROPAGATION RECEIVER Propagation receiver enqueue COMMIT
STREAMS_PROPAGATION PROPAGATION RECEIVER Rule evaluation COMMIT
STREAMS_PROPAGATION PROPAGATION RECEIVER Rule evaluation COMMIT
STREAMS_PROPAGATION PROPAGATION RECEIVER Propagation receiver enqueue COMMIT
STREAMS_APPLY APPLY READER Dequeued Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE
STREAMS_APPLY APPLY READER Dequeued COMMIT
STREAMS_APPLY APPLY SERVER Apply executed Missing MVDD:0 Missing MVDD:0 CREATE PROCEDURE
STREAMS_APPLY APPLY SERVER Commit COMMIT
And you can check the procedure has been replicated to the WHITE database:
select text    
from dba_source
where owner='DEMO'
and name='HIZOYD'
order by line;

TEXT
-------------------
procedure hizoyd is
begin
null;
end;

Read more...

Are your triggers triggered by Streams?

That sounds like a fair question, don't you think? Like often with Oracle the answer is split between "It depends" and "That's not as easy as I thought first". Let's start with how it's supposed to work; From the documentation, How an apply process behaves depends on the fire_once property of the trigger:
  • If the trigger fire_once is TRUE, then the trigger is not executed by an apply process
  • If the trigger fire_once is FALSE, then the trigger is suppose to be always executed, wether that's a regular DML/DDL command or an Streams apply process performing the change
That sounds easy. Lets have a look with a concrete example!

To build the test case, we'll implement a simple Streams One-Way Replication. To do it in 10 minutes, assuming your database is in archivelog, follow Step 1 to Step 9 of the "Oracle Streams One Way Table Replication 101" post. Ready?

Test when fire_once is TRUE (i.e. Default)

To begin, we'll create a table and a trigger in the DESTINATION schema; we'll check if the trigger is supposed to fire ONCE or ALWAYS:
connect / as sysdba

create table destination.t1_x
(id number primary key,
text varchar2(80));

create or replace trigger destination.t1_x_trigger
after insert on destination.T1 for each row
begin
insert into T1_X(id, text)
values (:new.id, :new.text);
end;
/

set serveroutput on
begin
if (dbms_ddl.IS_TRIGGER_FIRE_ONCE('DESTINATION', 'T1_X_TRIGGER'))
then
dbms_output.put_line('Trigger FIRE_ONCE=TRUE');
else
dbms_output.put_line('Trigger FIRE_ONCE=FALSE');
end if;
end;
/

Trigger FIRE_ONCE=TRUE
So that's really the default and that trigger is not supposed to be executed by an apply process. But before, we can test the trigger is working fine by inserting a value in the DESTINATION.T1 table manually (That will make the 2 tables diverge but who cares?):
insert into DESTINATION.T1
values (9999, 'Text 9999');
commit;

select *
from DESTINATION.T1_X;

ID TEXT
---- ---------
9999 Text 9999
Now that we are sure the tigger is working fine, we can test the trigger is not executed by the apply process. We'll change the SOURCE.T1table this time and will check the values in changed on the DESTINATION.T1 but not in DESTINATION.T1_X as expected:
insert into SOURCE.T1
values (4, 'Text 4');
commit;

pause

select *
from DESTINATION.T1;

ID TEXT
---- ---------
1 Text 1
2 Text 2
3 Text 3
9999 Text 9999
4 Text 4

select *
from DESTINATION.T1_X;

ID TEXT
---- ---------
9999 Text 9999
Very good!

Test when fire_once is FALSE

Now we'll change the trigger's behavior and we'll perform another test to see if the trigger is executed when the change is applied to the DESTINATION.T1 table:
begin
DBMS_DDL.SET_TRIGGER_FIRING_PROPERTY(
trig_owner => 'DESTINATION',
trig_name => 'T1_X_TRIGGER',
fire_once => false);
end;
/

set serveroutput on
begin
if (dbms_ddl.IS_TRIGGER_FIRE_ONCE('DESTINATION', 'T1_X_TRIGGER'))
then
dbms_output.put_line('Trigger FIRE_ONCE=TRUE');
else
dbms_output.put_line('Trigger FIRE_ONCE=FALSE');
end if;
end;
/

Trigger FIRE_ONCE=FALSE

insert into SOURCE.T1
values (5, 'Text 5');
commit;

pause

select *
from DESTINATION.T1;

ID TEXT
---- ---------
1 Text 1
2 Text 2
3 Text 3
9999 Text 9999
4 Text 4
5 Text 5

select *
from DESTINATION.T1_X;

ID TEXT
---- ---------
9999 Text 9999
But wait a minute! Wasn't that trigger supposed to be executed? Let's restart all the processes to see if anything changes:
exec dbms_capture_adm.stop_capture('STREAMS_CAPTURE');
exec dbms_apply_adm.stop_apply('STREAMS_APPLY');
exec dbms_capture_adm.start_capture('STREAMS_CAPTURE');
exec dbms_apply_adm.start_apply('STREAMS_APPLY');

set serveroutput on
begin
if (dbms_ddl.IS_TRIGGER_FIRE_ONCE('DESTINATION', 'T1_X_TRIGGER'))
then
dbms_output.put_line('Trigger FIRE_ONCE=TRUE');
else
dbms_output.put_line('Trigger FIRE_ONCE=FALSE');
end if;
end;
/

Trigger FIRE_ONCE=FALSE

insert into SOURCE.T1
values (6, 'Text 6');

commit;

select *
from DESTINATION.T1;
2
ID TEXT
---- ----------
1 Text 1
2 Text 2
3 Text 3
9999 Text 9999
4 Text 4
5 Text 5
6 Text 6

select *
from DESTINATION.T1_X;

ID TEXT
---- ----------
9999 Text 9999
No, Same Story! Actually, I had to recreate the trigger to make it work:
exec dbms_apply_adm.stop_apply('STREAMS_APPLY');
drop trigger destination.t1_x_trigger;

create or replace trigger destination.t1_x_trigger
after insert on destination.T1 for each row
begin
insert into T1_X(id, text)
values (:new.id, :new.text);
end;
/

begin
DBMS_DDL.SET_TRIGGER_FIRING_PROPERTY(
trig_owner => 'DESTINATION',
trig_name => 'T1_X_TRIGGER',
fire_once => false);
end;
/

set serveroutput on
begin
if (dbms_ddl.IS_TRIGGER_FIRE_ONCE('DESTINATION', 'T1_X_TRIGGER'))
then
dbms_output.put_line('Trigger FIRE_ONCE=TRUE');
else
dbms_output.put_line('Trigger FIRE_ONCE=FALSE');
end if;
end;
/

Trigger FIRE_ONCE=FALSE

exec dbms_apply_adm.start_apply('STREAMS_APPLY');

insert into SOURCE.T1
values (7, 'Text 7');

commit;

select *
from DESTINATION.T1;

ID TEXT
---- ----------
1 Text 1
2 Text 2
3 Text 3
9999 Text 9999
4 Text 4
5 Text 5
6 Text 6
7 Text 7

select *
from DESTINATION.T1_X;

ID TEXT
---- ----------
9999 Text 9999
7 Text 7
Expected Behavior? Bug? I'm still split! What do you think? btw, don't forget to clean up your Streams configuration.

Read more...

Data Comparison with DBMS_COMPARISON

The new 11g DBMS_COMPARISON package helps to detect how data of 2 tables differ. The interface is easy to use and quite powerful. Not only, it allows to compare and converge the whole content of 2 tables but you can also work with data samples and with views. This post shows how to use this feature and include flashback query. This can be very useful to use in combination with always changing data, you can only block for a few seconds, of a production system.

Limits and Requirements of DBMS_COMPARISON

Before we start, refer to the limits of DBMS_COMPARISON:

DBMS_COMPARISON and FLASHBACK Query

We'll compare the content of a table with a view that contains a flashback clause. In that case, the view is on the table itself; here is the script to create our example:
create user demo
identified by demo
default tablespace users
temporary tablespace temp;

grant connect, resource to demo;

create table DEMO.T1 (
id number,
field1 varchar2(1000),
field2 varchar2(1000),
constraint T1_PK primary key(id));

insert into demo.T1
(select rownum, to_char(rownum),to_char(rownum)
from dual
connect by level <=10000);

commit;

col scn new_value scn

select dbms_flashback.get_system_change_number scn
from dual;

insert into demo.T1
(select 10000+rownum, to_char(rownum),to_char(rownum)
from dual connect by level <=10000);

commit;

create view demo.t1v
as select * from t1 as of scn &&scn;
Once the schema created, we can use create_comparison and compare to find the differences between the table and the view:
begin
dbms_comparison.create_comparison(
COMPARISON_NAME => 'mycomparison',
SCHEMA_NAME => 'demo',
OBJECT_NAME => 't1v',
DBLINK_NAME => null,
REMOTE_SCHEMA_NAME => 'demo',
REMOTE_OBJECT_NAME => 'T1',
COMPARISON_MODE =>
DBMS_COMPARISON.CMP_COMPARE_MODE_OBJECT,
COLUMN_LIST => '*',
SCAN_MODE =>
DBMS_COMPARISON.CMP_SCAN_MODE_FULL);
end;
/

var scanid number;

SET SERVEROUTPUT ON
DECLARE
consistent BOOLEAN;
scan_info DBMS_COMPARISON.COMPARISON_TYPE;
BEGIN
consistent := DBMS_COMPARISON.COMPARE(
comparison_name => 'mycomparison',
scan_info => scan_info,
perform_row_dif => TRUE);
:scanid:=scan_info.scan_id;
DBMS_OUTPUT.PUT_LINE('Scan ID: '||scan_info.scan_id);
IF consistent=TRUE THEN
DBMS_OUTPUT.PUT_LINE('No differences were found.');
ELSE
DBMS_OUTPUT.PUT_LINE('Differences were found.');
END IF;
END;
/

COLUMN OWNER HEADING 'Comparison Owner' FORMAT A16
COLUMN COMPARISON_NAME HEADING 'Comparison Name' FORMAT A20
COLUMN SCHEMA_NAME HEADING 'Schema Name' FORMAT A11
COLUMN OBJECT_NAME HEADING 'Object Name' FORMAT A11
COLUMN CURRENT_DIF_COUNT HEADING 'Differences' FORMAT 9999999

SELECT c.OWNER,
c.COMPARISON_NAME,
c.SCHEMA_NAME,
c.OBJECT_NAME,
s.CURRENT_DIF_COUNT
FROM DBA_COMPARISON c, DBA_COMPARISON_SCAN_SUMMARY s
WHERE c.COMPARISON_NAME = s.COMPARISON_NAME AND
c.OWNER = s.OWNER AND
s.SCAN_ID = :scanid;

Comparison Owner Comparison Name Schema Name Object Name Differences
---------------- -------------------- ----------- ----------- -----------
SYS MYCOMPARISON DEMO T1V 10000
We can even make the table converge with the view:
SET SERVEROUTPUT ON
DECLARE
scan_info DBMS_COMPARISON.COMPARISON_TYPE;
BEGIN
DBMS_COMPARISON.CONVERGE(
comparison_name => 'mycomparison',
scan_id => :scanid,
scan_info => scan_info,
converge_options => DBMS_COMPARISON.CMP_CONVERGE_LOCAL_WINS);
DBMS_OUTPUT.PUT_LINE('Local Rows Merged: '||scan_info.loc_rows_merged);
DBMS_OUTPUT.PUT_LINE('Remote Rows Merged: '||scan_info.rmt_rows_merged);
DBMS_OUTPUT.PUT_LINE('Local Rows Deleted: '||scan_info.loc_rows_deleted);
DBMS_OUTPUT.PUT_LINE('Remote Rows Deleted: '||scan_info.rmt_rows_deleted);
END;
/

Local Rows Merged: 0
Remote Rows Merged: 0
Local Rows Deleted: 0
Remote Rows Deleted: 10000

select count(*) from demo.t1;

COUNT(*)
----------
10000

select max(id) from demo.t1;

MAX(ID)
----------
10000
You'll be able to do much more from this example; to learn more, check:
You can drop the comparison and the demo schema for your next test:
exec dbms_comparison.drop_comparison('mycomparison');
drop user demo cascade;

How does DBMS_COMPARISON work?

The chapter of the Streams Replication Administrator's Guide explains how the comparison is performed and that ora_hash is used. To get more details, I've traced the compare procedure and the main query look like that:
SELECT ll.l_rowid, rr.r_rowid, NVL(ll."ID", rr."ID") idx_val
FROM
(SELECT l.rowid l_rowid, l."ID",
ora_hash(
NVL(to_char(l."ID"),'ORA$STREAMS$NV'),
4294967295,
ora_hash(
NVL((l."FIELD1"), 'ORA$STREAMS$NV'),
4294967295,
ora_hash(
NVL((l."FIELD2"), 'ORA$STREAMS$NV'),
4294967295,
0
)
)
) l_hash
FROM "DEMO"."T1" l
WHERE l."ID">=:scan_min1 AND l."ID"<=:scan_max1 ) ll
FULL OUTER JOIN
(SELECT /*+ NO_MERGE REMOTE_MAPPED */ r.rowid r_rowid, r."ID",
ora_hash(
NVL(to_char(r."ID"), 'ORA$STREAMS$NV'),
4294967295,
ora_hash(
NVL((r."FIELD1"), 'ORA$STREAMS$NV'),
4294967295,
ora_hash(
NVL((r."FIELD2"), 'ORA$STREAMS$NV'),
4294967295,
0
)
)
) r_hash
FROM "DEMO"."T2" r
WHERE r."ID">=:scan_min1
AND r."ID"<=:scan_max1 ) rr
ON ll."ID"=rr."ID"
WHERE ll.l_hash IS NULL
OR rr.r_hash IS NULL
OR ll.l_hash <> rr.r_hash;
It gives a good idea of what we can expect from the tool, no?

Read more...

Wednesday, August 26, 2009

Oracle AQ Buffered Queues 101 (Part 2)

I would like to get back to the Oracle AQ Buffered Queues 101 post, I've written a few months back. What you'll find below is an enhanced example, that may better suit your needs. As a matter of fact, this is something I've recently used to study the impact of Oracle*NET SDU, buffer size and the latency parameter on propagation performance. But that's another story...

I strongly invite you to read the original post. It shows interesting things like "queue paused in flow control", "spilled messages" or how to loose buffered messages. Nevertheless, the previous example uses only one table. The one below exchanges messages between 2 queues in different databases. You'll see how to setup subscribers and schedule propagation to share your messages. To install it, assuming you have 2 databases 11.1 (or very likely 10.2), follow these steps:You can start right away...

Set Up the environnement to use SQL*Plus and to connect as SYSDBA

To begin, you'll set the SQL*Plus variables that follow:
  • sourcegdb defines the global_name AND the network alias of the source database. We assume global_names=true, even if that's not strictly mandatory. It helps to make things more readable. Make sure you have the aliases setup everywhere and they match the database global names.
  • destgdb defines the global_name AND the network alias of the destination database.
  • source_user and dest_user define names of SYSDBA users on the source and on the destination databases.
  • source_pwd and dest_pwd define the passwords of the corresponding source_user and dest_user.
accept sourcegdb default 'BLACK' -
prompt "Enter the Source Database Global Name [BLACK]: "

accept destgdb default 'WHITE' -
prompt "Enter the Destination Database Global Name [WHITE]: "

accept source_user default 'sys' -
prompt "Enter the Destination SYSDBA user [sys]: "

accept source_pwd default 'change_on_install' -
prompt "Enter the Destination SYSDBA password [change_on_install]: "

accept dest_user default 'sys' -
prompt "Enter the Destination SYSDBA user [sys]: "

accept dest_pwd default 'change_on_install' -
prompt "Enter the Destination SYSDBA password [change_on_install]: "

Create a User DEMO and a User Defined Type in the 2 databases

To go on with the example, you'll need to create the DEMO user in the 2 databases and create a type you'll use to send and receive messages. The script below creates those users and types.
Note:
For this example, we assume there is no user named DEMO. We also assume the 2 tablespaces USERS and TEMP exist in the databases. There is no need for any of the database to run in ARCHIVELOG mode.

connect &&source_user/&&source_pwd@&&sourcegdb as sysdba

create user demo
identified by demo
default tablespace users
temporary tablespace temp;

grant connect, resource, dba to demo;
grant execute on dbms_aq to demo;
grant execute on dbms_aqadm to demo;

connect &&dest_user/&&dest_pwd@&&destgdb as sysdba

create user demo
identified by demo
default tablespace users
temporary tablespace temp;

grant connect, resource, dba to demo;
grant execute on dbms_aq to demo;
grant execute on dbms_aqadm to demo;

connect demo/demo@&&sourcegdb

create type mytype as object (
id number
, field1 varchar2(4000)
, field2 varchar2(4000));
/

connect demo/demo@&&destgdb

create type mytype as object (
id number
, field1 varchar2(4000)
, field2 varchar2(4000));
/

Create a database link between the source and the destination databases

To start the propagation job, you'll need the source database to connect to the destination database. Create and test a database link for that purpose:
connect demo/demo@&&sourcegdb

create database link &&destgdb
connect to demo
identified by demo
using '&&destgdb';

select * from dual@&&destgdb;

Create and start queues

Then, create and start the queues:
connect demo/demo@&&destgdb

begin
dbms_aqadm.create_queue_table(
'myqueue_table'
, 'mytype'
, multiple_consumers => true);
end;
/

begin
dbms_aqadm.create_queue(
'myqueue'
, 'myqueue_table');
end;
/

begin
dbms_aqadm.start_queue('myqueue');
end;
/

connect demo/demo@&&sourcegdb

begin
dbms_aqadm.create_queue_table(
'myqueue_table'
, 'mytype'
, multiple_consumers => true);
end;
/

begin
dbms_aqadm.create_queue(
'myqueue'
, 'myqueue_table');
end;
/

begin
dbms_aqadm.start_queue('myqueue');
end;
/

Create a subscribers on the source queue and schedule propagation to the destination queue

The next step consists in adding the subscribers that match the destination queue, to the source queue. In this example, we add 2 subscribers because we will eventually dequeue the messages from 2 separate programs (or for 2 distinct purposes). Once done, check the queues are compatibles and schedule the QUEUE to QUEUE propagation. dba_queue_schedules provides detailed informations about what is scheduled:
connect demo/demo@&&sourcegdb

begin
dbms_aqadm.add_subscriber(
queue_name => 'myqueue'
, subscriber => sys.aq$_agent('RED','demo.myqueue@&&destgdb',null)
, queue_to_queue => true
, delivery_mode => dbms_aqadm.buffered);
end;
/

begin
dbms_aqadm.add_subscriber(
queue_name => 'myqueue'
, subscriber => sys.aq$_agent('BLUE','demo.myqueue@&&destgdb',null)
, queue_to_queue => true
, delivery_mode => dbms_aqadm.buffered);
end;
/

set serveroutput on

declare
rc binary_integer;
begin
dbms_aqadm.verify_queue_types(
src_queue_name => 'myqueue'
, dest_queue_name => 'demo.myqueue'
, destination => '&&destgdb'
, rc => rc);
dbms_output.put_line('If result is 1, it''s OKAY: '||rc);
end;
/

begin
dbms_aqadm.schedule_propagation(
queue_name => 'myqueue'
, destination => '&&destgdb'
, destination_queue => 'demo.myqueue');
end;
/

set pages 1000
select schema
, qname
, destination
, start_time
, latency
, schedule_disabled
, session_id
, total_number
, failures
, last_error_msg
, message_delivery_mode
from dba_queue_schedules;

Create an enqueue procedure in the source database

Create an enqueue procedure demo_enqueue, that enqueues a message in the buffered part of the queue:
connect demo/demo@&&sourcegdb

create or replace procedure demo_enqueue(p_mytype mytype) is
enqueue_options DBMS_AQ.enqueue_options_t;
message_properties DBMS_AQ.message_properties_t;
recipients DBMS_AQ.aq$_recipient_list_t;
message_handle RAW(16);
begin
enqueue_options.visibility := dbms_aq.immediate;
enqueue_options.delivery_mode := dbms_aq.buffered;
dbms_aq.enqueue(
queue_name => 'MYQUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => p_mytype,
msgid => message_handle);
commit;
end;
/

Create a dequeue procedure in the destination database

demo_dequeue dequeues messages from the destination queue based on the consumer name:
connect demo/demo@&&destgdb

select * from aq$myqueue_table_S;

set serveroutput on

create or replace procedure demo_dequeue(p_consumer varchar2)
is
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
v_mytype mytype;
no_messages exception;
pragma exception_init(no_messages, -25228);
begin
dequeue_options.wait := dbms_aq.no_wait;
dequeue_options.consumer_name := p_consumer;
dequeue_options.navigation := dbms_aq.first_message;
dequeue_options.visibility := dbms_aq.immediate;
dequeue_options.delivery_mode := dbms_aq.buffered;
loop
begin
dbms_aq.dequeue(
queue_name => 'myqueue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => v_mytype,
msgid => message_handle);
dbms_output.put_line('---------------------------------------------------------');
dbms_output.put_line('Message for Consumer "'||p_consumer||'": ');
dbms_output.put_line('ID :'||to_char(v_mytype.id));
dbms_output.put_line('FIELD1:'||v_mytype.field1);
dbms_output.put_line('FIELD2:'||v_mytype.field2);
dbms_output.put_line('---------------------------------------------------------');
dequeue_options.navigation := dbms_aq.next_message;
end;
end loop;
exception
when no_messages then
dbms_output.put_line('No more messages');
commit;
end;
/

Enqueue a message on one end and dequeue it on the other end

You are ready to test your case. Enqueue a message and check you get the messages for the 2 subscribers:
connect demo/demo@&&sourcegdb
set serveroutput on
declare
v_mytype mytype;
begin
v_mytype := mytype(1, 'BLUE AND RED','Red And Blue');
demo_enqueue(v_mytype);
end;
/

select * from aq$myqueue_table;


connect demo/demo@&&destgdb

select * from aq$myqueue_table;

set serveroutput on
exec demo_dequeue('BLUE')
exec demo_dequeue('RED')
Note:
aq$myqueue_table help to monitor the messages. The propagation has been set without any time means that the messages are always sent from the source to the destination; the latency (default to 60 in that case), is the only thing that can slightly impact the time needed for the message to be available for the consumers

Clean Up the environment

You are done! Before you leave, suppress the AQ propagation schedule, the queues and the DEMO users:
connect &&source_user/&&source_pwd@&&sourcegdb as sysdba

begin
dbms_aqadm.UNSCHEDULE_PROPAGATION(
queue_name => 'demo.myqueue'
, destination => '&&destgdb'
, destination_queue => 'DEMO.MYQUEUE');
end;
/

exec dbms_aqadm.drop_queue_table('demo.myqueue_table',TRUE)

drop user demo cascade;

select * from dba_queue_schedules;

connect &&dest_user/&&dest_pwd@&&destgdb as sysdba

exec dbms_aqadm.drop_queue_table('demo.myqueue_table',TRUE)

drop user demo cascade;

Read more...

Monday, August 24, 2009

Googling The Oracle Dictionary

This doesn't sound to have much to do with Oracle Streams, but it has helped me to speed up my understanding of a few internals and to quickly get to a better reading of traces. "It" is actually a procedure. I've named it findString. It detects patterns stored in Oracle tables. It works with VARCHAR2 only but you can easily make it work with NUMBER or whatever you need... Obviously, don't to use it on anything that is not a small TEST database or with non-selective patterns.

Remember, it's just a tool, it won't explain anything about how things work. Keep also in mind that some data can be hold in sessions and that this procedure won't find them because it doesn't read temporary data. Enough talk, here is the code:
create or replace procedure findString(pattern varchar2)
is
cnt number:=0;
begin
for i in (select c.owner, c.table_name, c.column_name
from DBA_TAB_COLS c, dba_tables t
where c.data_type in ('VARCHAR2','NVARCHAR2')
and c.owner=t.owner
and c.table_name=t.table_name) loop
begin
execute immediate 'select count(*) from "'
||i.owner||'"."'||i.table_name||
'" where "'||I.column_name||'" like :x'
into cnt using pattern;
exception when others then
dbms_output.put_line('Error in in "'||i.owner||
'"."'||i.table_name||'" column "'||
i.column_name||'"');
dbms_output.put_line(DBMS_UTILITY.
FORMAT_ERROR_STACK);
end;
if (cnt>0) then
dbms_output.put_line('Pattern Detected in "'||
i.owner||'"."'||i.table_name||
'" column "'||i.column_name||'"');
end if;
end loop;
end;
/
To use it with SQL*Plus, set serveroutput to on and pass your search pattern in the parameter, like this:
set serveroutput on
exec findString('%ONEWAY%TABLE%')

Pattern Detected in "SYSTEM"."LOGMNRC_GTLO" column "LVL0NAME"
Pattern Detected in "SYSTEM"."LOGMNR_OBJ$" column "NAME"
Pattern Detected in "SYS"."OBJ$" column "NAME"
Pattern Detected in "SYS"."HISTGRM$" column "EPVALUE"
Pattern Detected in "SYS"."STREAMS$_RULES" column "OBJECT_NAME"
Pattern Detected in "SYS"."STREAMS$_RULES" column "RULE_CONDITION"
Pattern Detected in "SYS"."STREAMS$_RULES" column "RULE_NAME"
Pattern Detected in "SYS"."WRH$_SEG_STAT_OBJ" column "BASE_OBJECT_NAME"
Pattern Detected in "SYS"."WRH$_SEG_STAT_OBJ" column "OBJECT_NAME"

Read more...

Propagation Error And Exception Queue Management

If you've tried to use the example from my previous blog post named "Transfer Files From One Server To Another With Oracle Streams", you may have faced some propagation issues. Step 7 of the post helps to quickly work around any issue by stopping and restarting the propagation. However, this is just a work around and in a real case scenario, you would want to manage the message in the exception queue.

This post goes deeper into the right way to fix any propagation issue; to begin, it simulates a propagation error, just in case you did not hit any by yourself ;-). The next section shows how you could fix the situation and manage the exception message by your own.

1. Simulate a Propagation Error

Like with DBMS_FILE_TRANSFER, external files transported with Oracle Streams must be multiple of 512 bytes in size. We'll use that limit to make a propagation process fail. To set up the example, follow "Step 1" to "Step 5" of the previous post "Transfer Files From One Server To Another With Oracle Streams". Once done, instead of executing "Step 6", execute the step below that will make the propagation fail.
On BLACK, create a file that is NOT a multiple of 512 bytes in size, instantiate a BFILE that points to it and enqueue the message with that points to the file in the Streams queue:
connect strmadmin/strmadmin

select * from GLOBAL_NAME;

GLOBAL_NAME
----------
BLACK

!dd if=/dev/zero of=/tmp/black/demo2.zero bs=511 count=1

declare
v_demotyp demotyp:=demotyp(2, bfilename('tmp_dir','demo2.zero'));
v_any anydata;
begin
v_any:=anydata.convertobject(v_demotyp);
dbms_streams_messaging.enqueue(
'strmadmin.streams_queue',
v_any);
end;
/
commit;
This time the propagation fails with a message like the one below:
col error_message format a100
set lines 100
set long 10000
set longchunksize 10000

select error_message
from dba_propagation
where propagation_name='BFILE_PROPAGATION';

ERROR_MESSAGE
-------------------------------------------------------
ORA-19505: failed to identify file "/tmp/black/demo2.zero"
ORA-27046: file size is not a multiple of logical block size
Additional information: 1
You'll find the same error message in the queue propagation schedule:
select last_error_msg
from dba_queue_schedules
where schema='STRMADMIN'
and qname='STREAMS_QUEUE'
and message_delivery_mode='PERSISTENT';

LAST_ERROR_MSG
-------------------------------------------------------
ORA-19505: failed to identify file "/tmp/black/demo2.zero"
ORA-27046: file size is not a multiple of logical block size
Additional information: 1

2. Fix the Error

Fixing the error depends on what it is. Let's say in this case, you'll fix the error by padding the file with a "0x00" byte; You can do it with a dd and the seek parameter:
Important note:
Don't use such a command on a real file without evaluating first what the impact would be on the file itself!
!dd if=/dev/zero of=/tmp/black/demo2.zero count=1 bs=1 seek=511 

!ls -ltra /tmp/black/demo2.zero
-rw-r--r-- 1 oracle oinstall 512 2009-08-24 14:26 /tmp/black/demo2.zero
That done, dequeue the message from the exception queue and re-enqueue it in the Streams queue; in that case that's probably a good idea to have only one queue per queue table to know the original queue of the message in the exception queue. Here is how to do it; if you want more examples, refer to Metalink Note "233103.1 - Dequeuing Messages from an Exception Queue":
connect strmadmin/strmadmin

begin
dbms_aqadm.start_queue(
queue_name => 'strmadmin.AQ$_STREAMS_QUEUE_TABLE_E',
enqueue => false,
dequeue => true);
end;
/

SET SERVEROUTPUT ON

DECLARE
v_deq_options DBMS_AQ.dequeue_options_t;
v_msg_properties DBMS_AQ.message_properties_t;
o_msgid RAW(16);
v_any anydata;
BEGIN
v_deq_options.consumer_name:=null;
v_deq_options.dequeue_mode := DBMS_AQ.remove;
v_deq_options.navigation := DBMS_AQ.NEXT_TRANSACTION;
DBMS_AQ.dequeue(
queue_name => 'strmadmin.AQ$_STREAMS_QUEUE_TABLE_E',
dequeue_options => v_deq_options,
message_properties => v_msg_properties,
payload => v_any,
msgid => o_msgid);
dbms_streams_messaging.enqueue('strmadmin.streams_queue',v_any);
COMMIT;
END;
/

begin
dbms_aqadm.stop_queue(
queue_name => 'strmadmin.AQ$_STREAMS_QUEUE_TABLE_E' ,
enqueue => true,
dequeue => true);
end;
/
Once done, you'll see the error go away from the propagation and the schedule. The file, now of a correct size, will be transferred from BLACK to WHITE:
select error_message
from dba_propagation
where propagation_name='BFILE_PROPAGATION';

ERROR_MESSAGE
-------------------------------------------------------

select last_error_msg
from dba_queue_schedules
where schema='STRMADMIN'
and qname='STREAMS_QUEUE'
and message_delivery_mode='PERSISTENT';

LAST_ERROR_MSG
-------------------------------------------------------
Check the file has been received on the WHITE side:
!ls -ltra /tmp/white/demo2.zero
-rw-r----- 1 oracle oinstall 512 2009-08-24 14:33 /tmp/white/demo2.zero
This is it: a very simple way to fix propagation errors. Once again, clean up the environment before you leave it for your next demo; delete the demo2.zero file on both sides of your configuration and execute "Step 8 of the previous post" to delete queues, tablespaces, administrators and more...

Read more...

Sunday, August 23, 2009

Transfer Files From One Server To Another With Oracle Streams

You can leverage Oracle Streams to transfer files stored on a file system from one server to another. Not only, those files can be images or documents, assuming they are multiple of 512 bytes, but they also can be Oracle files, like Datapump exports, backups or data files. As a matter of fact, to me, sharing files across servers is probably the #1 reason why you would want to use Streams user messages.

In this post, you'll find an easy to reproduce sample application. You'll create a User Define Type (UDT) with a BFILE (i.e. a file located outside the database on a file system or in ASM). Once done, you'll create an instance of that type, you'll wrap it in an ANYDATA type and will share it between databases with Streams. You'll figure out that, not only the message with a BFILE locator, but also the corresponding file will be transferred from the source database to the destination database.

This post is made of the following sections:

Step 1: Configure Databases and Streams Administrators

If you read this blog or the documentation, you've seen it dozens of time already! In order for Streams to work, you must setup a Streams Administrator, a queue and, though it's not mandatory, it's strongly recommended to set the global_names parameter of the two databases to TRUE. In this sample application, you'll assume we have 2 databases called BLACK, the source, and WHITE, the destination. You'll find below a table with the commands to run on the 2 databases to setup the Streams Administrator and the parameters:
On BLACKOn WHITE
select * from global_name;

GLOBAL_NAME
-----------
BLACK

alter system set global_names=true;
select * from global_name;

GLOBAL_NAME
-----------
WHITE

alter system set global_names=true;
connect / as sysdba

CREATE TABLESPACE streams_tbs DATAFILE
'/u01/app/oracle/oradata/BLACK/streams_tbs.dbf'
SIZE 25M AUTOEXTEND ON MAXSIZE 256M;

CREATE USER strmadmin IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs;

grant dba to strmadmin;

BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
END;
/

begin
dbms_streams_auth.grant_admin_privilege(
grantee => 'strmadmin',
grant_privileges => true);
end;
/
connect / as sysdba

CREATE TABLESPACE streams_tbs DATAFILE
'/u01/app/oracle/oradata/WHITE/streams_tbs.dbf'
SIZE 25M AUTOEXTEND ON MAXSIZE 256M;

CREATE USER strmadmin IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs;

grant dba to strmadmin;

BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
END;
/

begin
dbms_streams_auth.grant_admin_privilege(
grantee => 'strmadmin',
grant_privileges => true);
end;
/
Note:
Because we don't use any Streams capture process, those databases don't have to be in archivelog mode.

Step 2: Configure Directories and the User Defined Type

Once the Streams Administrator created, you'll create a directory with the same name on the 2 databases as well as a User Defined Type. To allow the instances associated with the 2 databases to run on the same server, the directory path for BLACK differs from the directory path for WHITE:
On BLACKOn WHITE
!mkdir -p /tmp/black

create directory tmp_dir
as '/tmp/black';

grant read, write, execute
on directory tmp_dir
to strmadmin;
!mkdir -p /tmp/white

create directory tmp_dir
as '/tmp/white';

grant read, write, execute
on directory tmp_dir
to strmadmin;
connect strmadmin/strmadmin

create type demotyp as object
(id number,
myfile bfile);
/
connect strmadmin/strmadmin

create type demotyp as object
(id number,
myfile bfile);
/

Step 3: Create the Database Link and the Propagation Between the Queues

Make sure you can add an alias WHITE to connect to the WHITE database in the Oracle*Net configuration of the BLACK database (e.g. in the tnsnames.ora file). Once done, you can create a database link from BLACK to WHITE and create a Propagation process to send messages from the source to the destination database. Connect to the BLACK and run the script below:
connect strmadmin/strmadmin

select * from GLOBAL_NAME;

GLOBAL_NAME
----------
BLACK

CREATE DATABASE LINK WHITE
connect to STRMADMIN
identified by strmadmin
using 'WHITE';

BEGIN
DBMS_PROPAGATION_ADM.CREATE_PROPAGATION(
propagation_name => 'bfile_propagation',
source_queue => 'strmadmin.streams_queue',
destination_queue => 'strmadmin.streams_queue',
destination_dblink => 'white',
rule_set_name => null,
queue_to_queue => TRUE);
END;
/

Step 4: Create A Streams Message Consumer And A Dequeue Program

You can now configure the WHITE database so that a program of yours dequeues messages from its Streams queue. The script below creates a Streams DEQUEUE client and execute an anonymous PL/SQL block that receives and removes any messages (if ID>0):
connect strmadmin/strmadmin

begin
DBMS_STREAMS_ADM.ADD_MESSAGE_RULE (
message_type => 'strmadmin.demotyp',
rule_condition => ':MSG.ID > 0',
streams_type => 'DEQUEUE',
streams_name => 'demostrm',
queue_name => 'strmadmin.streams_queue');
end;
/

select streams_name,
queue_owner||'.'||queue_name queue
from DBA_STREAMS_MESSAGE_CONSUMERS;

set serveroutput on
declare
v_demotyp demotyp;
v_any anydata;
v_out pls_integer;
begin
dbms_streams_messaging.dequeue(
queue_name => 'strmadmin.streams_queue',
streams_name => 'demostrm',
payload => v_any,
dequeue_mode => 'REMOVE',
navigation => 'NEXT TRANSACTION',
wait => DBMS_STREAMS_MESSAGING.FOREVER);
v_out := v_any.getobject(v_demotyp);
dbms_output.put_line('Message Received: '||
to_char(v_demotyp.id));
commit;
end;
/

Step 5: Test The Streams Configuration

You can now test your settings. In order to do it, enqueue a message on the BLACK side. For this first test, don't "attach" any file to the message; leave myfile to NULL:
connect strmadmin/strmadmin

declare
v_demotyp demotyp:=demotyp(1, null);
v_any anydata;
begin
v_any:=anydata.convertobject(v_demotyp);
dbms_streams_messaging.enqueue(
'strmadmin.streams_queue',
v_any);
end;
/
commit;
The program running on the WHITE should display the following line and stop:
Message Received: 1

Step 6: Create a File and Transfer It with Streams

You'll perform the same test, but this time with a file attached. Restart the program on WHITE:
select * from GLOBAL_NAME; 

GLOBAL_NAME
----------
WHITE

set serveroutput on

declare
v_demotyp demotyp;
v_any anydata;
v_out pls_integer;
begin
dbms_streams_messaging.dequeue(
queue_name => 'strmadmin.streams_queue',
streams_name => 'demostrm',
payload => v_any,
dequeue_mode => 'REMOVE',
navigation => 'NEXT TRANSACTION',
wait => DBMS_STREAMS_MESSAGING.FOREVER);
v_out := v_any.getobject(v_demotyp);
dbms_output.put_line('Message Received: '||
to_char(v_demotyp.id));
commit;
end;
/
Create a file that is a multiple of 512 bytes in size on the BLACK side, allocate a BFILE that points to it and send it to WHITE with Streams:
connect strmadmin/strmadmin

select * from GLOBAL_NAME;

GLOBAL_NAME
----------
BLACK

!dd if=/dev/zero of=/tmp/black/demo.zero bs=512 count=1

declare
v_demotyp demotyp:=demotyp(2, bfilename('tmp_dir','demo.zero'));
v_any anydata;
begin
v_any:=anydata.convertobject(v_demotyp);
dbms_streams_messaging.enqueue(
'strmadmin.streams_queue',
v_any);
end;
/
commit;
The program you've restarted on WHITE should display something like:
Message Received: 2
And the file should now be present on the WHITE side too:
!ls -l /tmp/white
total 4
-rw-r----- 1 oracle oinstall 512 2009-08-24 00:25 demo.zero

Step 7: Troubleshooting

Troubleshooting this sample is beyond the scope of the post. However if the configuration is not working, that's very likely it's related to the propagation of the file (the file size, some privileges on the /tmp/white directory, etc). You can query DBA_PROPAGATION and the queue tables to know more about the status of your message. If there is any error at the propagation level, fix it and you restart the propagation:
exec dbms_propagation_adm.stop_propagation('bfile_propagation',true);
exec dbms_propagation_adm.start_propagation('bfile_propagation');

Step 8: Clean-up the environment

As always, leave the database as it was when you've first started. The scripts below removes all the components:
On BLACKOn WHITE
connect / as sysdba

!rm /tmp/black/demo.zero

begin
dbms_propagation_adm.stop_propagation(
'bfile_propagation',true);
dbms_propagation_adm.drop_propagation(
'bfile_propagation',true);
end;
/

drop directory tmp_dir;

begin
DBMS_STREAMS_ADM.REMOVE_QUEUE(
queue_name => 'strmadmin.streams_queue',
cascade => true,
drop_unused_queue_table=> true);
end;
/

drop user strmadmin cascade;
drop tablespace streams_tbs
including contents and datafiles;
connect / as sysdba

!rm /tmp/white/demo.zero

col streams_name format a15
col queue format a40
select streams_name,
queue_owner||'.'||queue_name queue
from DBA_STREAMS_MESSAGE_CONSUMERS;

begin
for i in (select rule_owner, rule_name
from DBA_STREAMS_RULES
where STREAMS_TYPE='DEQUEUE'
and STREAMS_NAME='DEMOSTRM')
loop
dbms_streams_adm.remove_rule(
rule_name => i.rule_owner||'.'||i.rule_name,
streams_type => 'DEQUEUE',
streams_name => 'DEMOSTRM',
drop_unused_rule => true);
end loop;
end;
/

col streams_name format a15
col queue format a40
select streams_name,
queue_owner||'.'||queue_name queue
from DBA_STREAMS_MESSAGE_CONSUMERS;

drop directory tmp_dir;
drop user strmadmin cascade;
drop tablespace streams_tbs
including contents and datafiles;

In the next post, we'll get a bit deeper in the troubleshooting part; we'll generate an error with the propagation and we'll see how to fix it. Stay tuned!

Read more...