Technical/Business

Real-time data processing with Kinesis Data Analytics

author picture

Nick van Hoof

Passionate about technology. Driven by an urge to create valuable change to the world.

Follow: @stefceyssens

TL;DR - Using Kinesis Data Analytics to get real-time insights in Flanders' traffic situation!

 

Why Real-time Data Analytics?

Real-time analytics allow businesses to react on the spot.
Seize opportunities when they occur, avoid problems before they happen or alert immediately when necessary.

 

 

Traditionally, batch-processing analytics was generally considered adequate as most BI users could meet their business goals by looking at weekly or monthly business numbers.
In this approach, recorded events were collected together based on the number of records or a certain period of time and stored somewhere to processed as a finite data set at a later point in time.

Recently however, a shift has occurred towards low latency real-time analytics, where data is processed as it arrives. Driving this shift are several factors such as increased access to data sources and improved computing power resulting from cloud computing.

Examples where businesses could benefit from real-time analytics are:

  • The tracking of client behavior on websites (a visitor that is hesitating to convert may be persuaded to buy by a final incentive).
  • Viewing orders as they happen allowing for the faster identification of and reaction to trends.
  • The monitoring of traffic data to improve traffic flows or generate alerts for traffic jams.

 

How Can We Achieve Real-Time Analytics?

We can use AWS Kinesis Data Analytics to author SQL code that continuously reads and processes data in near real time. When using data in real time we speak of the hot path for the data. "Hot", meaning that you want to do something with the data while it is still hot from just being produced.

For your hot path you want to think about your data access patterns upfront!

Correctly defining your data access patterns upfront (and structuring your data accordingly) is extremely important. It will ensure you have data available that is compatible with the manner in which it is to be used, allowing for (near) real time reactions to it.
(Note: with data access pattern we are referring to the way you will interact with your data: which fields will you query on, what attributes do you want to extract from nested data structures..)

 

What Kind Of Real-Time Analytics Will We Do?

For this blog post, we would like to explore the following practical example.
In Flanders, the "Flemish Traffic Institute" is continuously monitoring traffic on the highways. The visualization directly below shows all of the different measurement locations, where traffic is monitored.

measurement-locations

This data is made available, on a per minute basis, via an API.
For a couple of critical locations in Flanders, we endeavored to set-up the following:

  • Send out an alert when a traffic jam occurs.
  • Analyze whether a traffic jam is emerging or disappearing.
  • Get a real time status of the current situation.

 

Big Picture of Architecture

The government publishes the traffic data every minute as a big blob of xml data, containing the information for all 4500 measurement locations in Flanders. We immediately convert this data into the JSON format and divide it into in per-location measurement events. This preprocessing is achieved using AWS Lambda. As the focus of this blog post is to discuss real-time analytics, we will not go deeper into the particulars of how we used Lambda to accomplish this.

The per-location measurement events are then streamed over Firehose.
This Firehose is used as an input for our Kinesis Data Analytics application, which will provide real-time insights. Next, the results of our real-time analytics with Kinesis Data Analytics are sent to a Kinesis Data Stream, which can then be used by a Lambda to, for example, generating traffic jam alerts or saving the results in DynamoDB.

analytic flow

The format of the data arriving on Firehose is shown below. For the non-native Dutch readers, this data contains:

  • A timestamp telling us when the measurement was taken.
  • A unique id for the measurement location which can be linked to a physical place in Flanders.
  • Information about the status of the measurement sensor.
  • Information about the speed of vehicles of certain classes.

These classes represent the type of vehicle eg. motor, truck, car. For future reference let's remember that class 2 is the class representing the cars.

{
"beschrijvende_id": "H101L20",
"unieke_id": "3159", // unique location id
"lve_nr": "18",
"tijd_waarneming": "2020-04-06T17:51:00+01:00", // time of the observation
"tijd_laatst_gewijzigd": "2020-04-06T17:52:20+01:00",
"actueel_publicatie": "1",
"beschikbaar": "1",
"defect": "0", // is the sensor working correctly
"geldig": "0",
"verkeersintensiteit_klasse1": "0",
"voertuigsnelheid_rekenkundig_klasse1": "0",
"voertuigsnelheid_harmonisch_klasse1": "252",
"verkeersintensiteit_klasse2": "1",
"voertuigsnelheid_rekenkundig_klasse2": "110", // average speed of the cars (class2)
"voertuigsnelheid_harmonisch_klasse2": "110",
"...": "...",
"rekendata_bezettingsgraad": "6",
"rekendata_beschikbaarheidsgraad": "100",
"rekendata_onrustigheid": "86"
}

 

Kinesis Data Analytics: Nuts And Bolds


Source / Incoming Data

Let's dig deeper into the architecture. We'll start with the source for our analytics application, which is a Kinesis Firehose stream.

Kinesis Firehose is a near real-time serverless service that can load data into your data lake or analytics tool and scales automatically.

Let's dissect that definition:

  • Near real-time: data arrives on the stream and is flushed towards the destination of the stream on minimum intervals of 60 seconds or 1MiB.
  • Serverless: you don't have to manage this stream yourself.
  • Scales automatically: don't worry about sharding your stream.

AWS will take care of this for you.

It is important to note that there are 2 main options to stream your data, either Kinesis Firehose or Kinesis Data Stream. We decided to use Kinesis Firehose, as we did not wish to handle sharding up or down ourself, as is required when using Kinesis Data StreamFirehose also allows 5000 writes per second where Data Streams will throttle you at 1000 writes per second (per shard). Firehose comes with the extra advantage that it can land your original data on S3 allowing you to build a data lake for batch processing later on. The other side of the medal is that Firehose causes you to be near real-time instead of real-time.

If you would like to know more about Firehose vs Data Streams visit this page on the Lumigo Blog.

The Kinesis Firehose/Data Stream that you choose as your input is your Streaming source. You point this streaming source to an in-application stream that is automatically created and will be named SOURCE_SQL_STREAM_001 by AWS.

 

Analytics

 

Now we dive into the heart of our real-time analytics flow, namely Kinesis Data Analytics.

Kinesis Data Analytics is a way to analyze streaming data in real-time using SQL or integrated Java applications. https://aws.amazon.com/kinesis/data-analytics/

In this case we chose to use SQL to write our real-time analytics. In our Analytics Application we'll use the Firehose as the source for our application.

kinesis data analytics source

Notice that:

  • The incoming data from `Firehose` will be available on the `In-application stream` named `SOURCE_SQL_STREAM_001`. As mentioned above this stream is named by AWS.
  • There is a record preprocessor allowing you to do process or filter on the data before it enters the SQL analytics app.

 

Record preprocessor

This is a Lambda Function which will receive batches of events and can transform these, drop these or let them pass on a one-by-one basis. The pseudo code below shows what the Lambda does:

def handle(event, context):
output = []

for record in event['records']:
payload = base64.b64decode(record['data'])

result = dropped_or_okay(payload)
if result == 'Ok':
payload = preprocess_payload(payload)

output_record = {
'recordId': record['recordId'],
'result': result,
'data': base64.b64encode(json.dumps(payload).encode("utf-8")).decode("utf-8")
}
output.append(output_record)

return {'records': output}

 

We see that the Lambda uses the dropped_or_okay() method to filter records. A record that will be dropped gets result Dropped, one that can pass gets result Ok.

The preprocess_payload method is used to modify the payload. In the preprocess_payload method I remove some uneccessary fields from the payload. (The method is not shown here). In our case, we are only interested in cars (vehicle class 2 in our payload). So we will remove the data from the other vehicle classes in order to avoid storing and having to deal with unnecessary data.

 

Real time analytics with a Kinesis Data Analytics application

kinesis data analytics real time application

Kinesis Data Analytics allows you to do real time analytics using SQL concepts.

Before I dive into the actual queries, there are two very important concepts to discuss:


In-application-SQL-Streams
As mentioned above, the streaming source (a Kinesis firehose in our case) is mapped to an in-application stream named SOURCE_SQL_STREAM_001. In-application streams are Amazon Kinesis Data Analytics concepts. The stream continuously receives data from your source. Think of it as basically a table that you can also query using SQL. Since a continuous stream of data is flowing over it, we call it an in-application stream.

 

In-application-SQL-pumps
You can actually create multiple in-application streams. This means you need a way to move and insert data or query results from one stream to another. This is done by a SQL pump. AWS puts it as follows: "A pump is a continuously running insert query that moves data from one in-application stream to another in-application stream." (source: AWS doc)

 

In-application SQL streams and SQL pumps are the core concepts of a Kinesis Data Analytics Application.

Let's see a basic example of what that looks like. Remember the structure of the streamed events shown above and the name of the source in-application stream. SOURCE_SQL_STREAM_001.

CREATE OR REPLACE STREAM "INCOMING_STREAM" (
"uniqueId" INTEGER,
"speed" INTEGER,
"bezettingsgraad" INTEGER,
"recordTimestamp" TIMESTAMP);

CREATE OR REPLACE PUMP "INCOMING_STREAM_PUMP" AS
INSERT INTO "INCOMING_STREAM"
SELECT STREAM
"unieke_id",
"voertuigsnelheid_rekenkundig_klasse2",
"rekendata_bezettingsgraad",
TO_TIMESTAMP(CAST("tijd_waarneming" AS BIGINT) * 1000) AS "recordTimestamp"
FROM "SOURCE_SQL_STREAM_001";

 

Explanation:

  1. I am creating an intermediate in-application stream `INCOMING_STREAM`.
  2. I create a pump to insert data towards the intermediate stream.
  3. I define the query that will yield the results which will be pumped towards the intermediate streams.

 

Windowed queries

So, now we know about (intermediary) in-application streams and pumps which move data between those streams. Let's now have a look at how we can make a window on our stream and aggregate results within that window. There are two kinds of windows: time-based vs row-based and 3 types: stagger, thumbling and sliding windows.

Concerning time- and row-based windows the names says it all. You either specify the window size in terms of time or number of rows.

Different types of windows:

Sliding Windows
A continuously aggregating query, using a fixed time or rowcount interval.

Thumbling Windows
A continuously aggregating query, using definite time-based windows that open and close at regular intervals.

Staggering Windows
Stagger windows can help with use cases where related records do not fall into the same (by ROWTIME) time-restricted window. A challenge which you cannot solve when using thumbling windows.


A detailed explanation and example of these windows can be found in the AWS docs. Originally it was hard for me to remember the syntax of each of these windows. Let me show you how you can recognize each type:


Sliding Windows syntax:

... WINDOW W1 AS (PARTITION BY ... RANGE INTERVAL 'x' MINUTE PRECEDING


Thumbling Windows syntax:

... GROUP BY ... , STEP("YOUR_IN_APPLICATION_STREAM".ROWTIME BY INTERVAL 'x' MINUTE)


Staggering Windows syntax:

... WINDOWED BY STAGGER (PARTITION BY FLOOR(EVENT_TIME TO MINUTE), ... RANGE INTERVAL 'x' MINUTE)

In our application we use the Sliding window to find out what the average speed over the last x minutes was. Below you can recognize three windows, indicating the last 10 minutes, 2 minutes and the current timestamp.

CREATE OR REPLACE PUMP "STREAM_PUMP_SPEED" AS
INSERT INTO "SPEED_SQL_STREAM"
SELECT STREAM
"uniqueId",
AVG("speed") over W0,
AVG("speed") over W2,
AVG("speed") over W10
FROM "INCOMING_STREAM"
WINDOW
W0 AS ( PARTITION BY "uniqueId"
RANGE INTERVAL '0' MINUTE PRECEDING),
W2 AS ( PARTITION BY "uniqueId"
RANGE INTERVAL '2' MINUTE PRECEDING),
W10 AS ( PARTITION BY "uniqueId"
RANGE INTERVAL '10' MINUTE PRECEDING);


When using a timestamp to start your window from, you can only use either ROWTIME or APPROXIMATE_ARRIVAL_TIME

  • `ROWTIME` represents the time at which the event was inserted to the first in-application stream.
  • `APPROXIMATE_ARRIVAL_TIME` represents the time at which the event was added to the streaming source.

That is the source which is feeding data towards your kinesis analytics application.

!! You cannot use a timestamp that originates from a field in your event to window by. This actually makes sense since you are working with real time data, which implicates the data should arrive in real time!

Using the LAG operator we can look back in our window and access the data of the previous event(s).

In the following example, I am using LAG to look back in the current Sliding Window and extract the speed from the previous event. This allows me to output a new event with both the current speed and the previous speed.

CREATE OR REPLACE PUMP "SPEED_CHANGE_PUMP" AS
INSERT INTO "SPEED_CHANGE_SQL_STREAM"
SELECT STREAM "s"."uniqueId",
LAG("s"."speed", 1, "s"."speed") OVER CURRENT_WINDOW AS "previousSpeed",
"s"."speed" AS "currentSpeed"
FROM "SPEED_SQL_STREAM" AS "s"
WINDOW CURRENT_WINDOW AS (PARTITION BY "s"."uniqueId" ROWS 3 PRECEDING);

 

Using static reference data

kinesis data analytics using reference data

You can add reference data that you can use to enrich the query results of your application.

In our case the data of our application already contains an ID of the place where the data was measured. The name of the places themselves is not included in the data. However the ID of a place is statically linked to the name of that measurement location and thus could be found using reference data. This reference data has the following format:

kinesis data analytics reference data

It's a csv file which in tab delimited. You can also use other separator. This files must be located on S3. When your application starts, it will read the file from S3 and make the data available as a table. A table that you can use in your queries. Below I joined a query result on location ID to retrieve the name of the measurement location from the reference data.

CREATE OR REPLACE PUMP "YOUR_IN_APPLICATION_STREAM" AS
INSERT INTO "YOUR_IN_APPLICATION_PUMP" ("uniqueId", "currentSpeed", ..., "location")
SELECT STREAM
"sdi"."uniqueId",
"sdi"."currentSpeed",
...,
"ml"."locatie",
FROM "SPEED_DIFF_INDICATOR_SQL_STREAM" AS "sdi" LEFT JOIN "measurementLocations" as "ml"
ON "sdi"."uniqueId" = "ml"."id";

 

Destination / outcome

Your kinesis analytics application outputs its result towards a destination.

kinesis data analytics output

We saw that our intermediary results are always pumped towards an in-application stream. To get these results out of our application we have to couple the in-application stream towards an exterior data stream like a kinesis Firehose or a kinesis data stream.

Mind that an in-application stream can only be coupled to one exterior data stream. If you want to output the same result towards two different destinations you'll have to create another in-application stream which receives the same data. That is also the reason why you see two different in-application streams coupled to the two destinations.

 

What to do with the real time results

alerting architecture

In the architecture diagram above, you will notice that the kinesis data stream, which receives the analytics results, is coupled to a Lambda Function. That gives you opportunities. You could directly send out alerts based on the data that the function receives from the stream. Or you can save the results in a real time data store which you can use to always query for the current situation.

Here I choose the latter. I am storing the real time resuls in DynamoDB. This table holds the current situation for each of the different measuring points in Belgium. I then provide an API through which a client can fetch the current traffic situation in Belgium for a certain point.

Another Lambda Function is listening on the change stream of this table. It's actually monitoring whether a traffic jam is present or not. If the traffic jam flag switches between True or False we send out a slack message to notify interested parties that a traffic jam has appeared or has dissolved.

alerting architecture

 

Key Takeaways

Great, we just learned how we can use Kinesis data analytics to get real time insights in our streamed data. In our case it gave us the possiblity to get an on-demand view of the traffic jams in Belgium and send out alerts for emerging traffic jams.

Kinesis data analytics is a great tool for real time analytics. There are some some knobs and twists which I think are really good to know!


Here are once again the key takeaways from this blog:

  • Separate cold and hot flow of your data completely (real time vs batch).
  • Real time date should be produced in real time and arrive in real time.
  • Think about your data access pattern upfront.
  • Mind the differences between `Kinesis Firehose` and `Kinesis Data Streams` to stream your data.
  • Preprocess and/or filter your records before they go in your `Kinesis analytics application` by using a record preprocessor Lambda Fuction.
  • You can use `Windowing` to aggregate or correlate results over a certain timespan.
  • You can only use the timestamps `ROWTIME` and `APPROXIMATE_ARRIVAL_TIME`.
  • Add static reference data to your application by making it available via S3.
  • The core SQL concepts of the `Kinesis analytics app` are `SQL STREAMS` and `SQL PUMPS`.

Get in touch with the authors:

 

Triggered to team up with CloudWay? 

Get in touch

Read more

Related articles