Thursday, April 9, 2015

#393 BAM 12c Continuous Queries and Streams

First to the docs -

The Continuous Query Service (CQS) is a BAM-specific wrapper around the
Continuous Query Language (CQL) engine within the Oracle Complex Event
Processing Service Engine. The CQS is a pure push system: query results are delivered
automatically. The CQS supports both stream (non-persistent) and archived relation
(persistent) data objects.
When you create a query, the CQS sets up tables in the CQL engine, registers the
query, and listens for data changes from the persistence engine. The query result is
processed in the CQL engine, then pushed to the CQS and on to the report cache.

Now to the creation of a Continuous Query -
They act on Streams, so we have to begin by defining such a DO.


Stream DO



I will use a simple StockSale scenario here.

Here is my process -




















Here are my BI settings -














I create a simple BAM dashboard to output stock symbol and price.

I output the salient data in a table -












I now create a DO of type Simple/Stream -



Note I specify one String column for the StockSymbol,
one decimal column for the stock price.

I add these via Add Column















Now, I add this new DO to my Designtime project.


Inserting Data into the Stream DO


In Designtime - I create an Alert to insert data
into the StockPriceStream1 DO - for sales over 500




Choose the option - When a data field...




























I add an Action -










I do the data mapping -

























I now create a high value stock sale.













I create the Stream DO Data tab










Create a Continuous Query based on the Stream DO












I want to detect duplicates on high price sales -



















Note the rolling window I am using -


I will insert duplicates into a separate DO - DuplicateSales.





































I realise that the above should contain custNr and orderNr but this
is just a quick demo ;--)

I add the new DO to the DesignTime project.










Now to the insert - I do this via an action, configured on the
Continuous Query -






Here is the list of available actions -


















So I choose the last option - Insert values...

Map Fields -
















Test by creating 2 processes for large orders with the same payload.



Here I see the  duplicate orders in the Stream,
on which the Continuous Query is based.





The duplicate has been detected and written to the
DuplicateSales DO -















No comments: