EVER
Window that retains the latest events based on a given unique keys. When a new event arrives with the same key it replaces the one that exist in the window.
This function is not recommended if the maximum number of unique attributes are undefined, as there is a risk of system going out to memory.
Syntax
WINDOW UNIQUE:EVER(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key)
WINDOW UNIQUE:EVER(<INT|LONG|FLOAT|BOOL|DOUBLE|STRING> unique.key, <INT|LONG|FLOAT|BOOL|DOUBLE|STRING> ...)
Query Parameters
| Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
|---|---|---|---|---|---|
| unique.key | The attribute used to checked for uniqueness. | INT LONG FLOAT BOOL DOUBLE STRING | No | Yes |
Example 1
CREATE STREAM LoginEvents (timestamp long, ip string);
INSERT events INTO UniqueIps
SELECT count(ip) AS ipCount
FROM LoginEvents WINDOW UNIQUE:EVER(ip)
This query collects all unique events based on the ip attribute by
retaining the latest unique events from the LoginEvents stream. Then
the query counts the unique ips arrived so far and outputs the
ipCount via the UniqueIps stream.
Example 2
CREATE STREAM DriverChangeStream (trainID string, driver string);
INSERT expired events INTO PreviousDriverChangeStream;
SELECT trainID, driver
FROM DriverChangeStream WINDOW UNIQUE:EVER(trainID)
Query collects all unique events based on the trainID attribute by
retaining the latest unique events from the DriverChangeStream.
The query outputs the previous unique event stored in the window as the
expired events are emitted via PreviousDriverChangeStream.
Example 3
CREATE STREAM StockStream (symbol string, price float);
CREATE SINK STREAM PriceRequestStream(symbol string);
INSERT events INTO PriceResponseStream;
SELECT s.symbol AS symbol, s.price AS price
FROM StockStream WINDOW UNIQUE:EVER(symbol) AS s JOIN PriceRequestStream AS p
ON s.symbol == p.symbol
Query stores the last unique event for each symbol attribute of
StockStream, and joins them with events arriving on the
PriceRequestStream for equal symbol attributes to fetch the latest
price for each requested symbol and output it to PriceResponseStream.