Kinesis Streams — Using AWS Lambda to Process Kinesis Streams

In this article, we are going to learn Kinesis Streams — Using AWS Lambda to Process Kinesis Streams.

Kinesis Streams — Using AWS Lambda to Process Kinesis Streams

Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data. By the end of the article, we will do Hands-on Lab : Using AWS Lambda to Process Kinesis Streams for processing event data.

I have just published a new course — AWS Lambda & Serverless — Developer Guide with Hands-on Labs.

Kinesis Streams — Using AWS Lambda to Process Kinesis Streams

We can build applications with Kinesis streams and take action based on the contents. Kinesis Streams are very similar with DynamoDB Streams when it comes to process on AWS Lambda Functions.

We can use an AWS Lambda function to process records in an Amazon Kinesis data stream.

A Kinesis data stream is a set of shards. Each shard contains a sequence of data records. A consumer is an application that processes the data from a Kinesis data stream. We can map a Lambda function to a shared-throughput consumer which is standard iterator, or to a dedicated-throughput consumer with enhanced fan-out.

For standard iterators, Lambda polls each shard in your Kinesis stream for records using HTTP protocol. The event source mapping shares read throughput with other consumers of the shard.

Lambda reads records from the data stream and invokes your function synchronously with an event that contains stream records. Lambda reads records in batches and invokes your function to process records from the batch. Each batch contains records from a single shard/data stream.

See Example Kinesis record event:

{
“Records”: [
{
“kinesis”: {
“kinesisSchemaVersion”: “1.0”,
“partitionKey”: “1”,
“sequenceNumber”: “49590338271490256608559692538361571095921575989136588898”,
“data”: “SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==”,
“approximateArrivalTimestamp”: 1545084650.987
},
“eventSource”: “aws:kinesis”,
“eventVersion”: “1.0”,
“eventID”: “shardId-000000000006:49590338271490256608559692538361571095921575989136588898”,
“eventName”: “aws:kinesis:record”,
“invokeIdentityArn”: “arn:aws:iam::123456789012:role/lambda-role”,
“awsRegion”: “us-east-2”,
“eventSourceARN”: “arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream”
},
{
“kinesis”: {
“kinesisSchemaVersion”: “1.0”,
“partitionKey”: “1”,
“sequenceNumber”: “49590338271490256608559692540925702759324208523137515618”,
“data”: “VGhpcyBpcyBvbmx5IGEgdGVzdC4=”,
“approximateArrivalTimestamp”: 1545084711.166
},
“eventSource”: “aws:kinesis”,
“eventVersion”: “1.0”,
“eventID”: “shardId-000000000006:49590338271490256608559692540925702759324208523137515618”,
“eventName”: “aws:kinesis:record”,
“invokeIdentityArn”: “arn:aws:iam::123456789012:role/lambda-role”,
“awsRegion”: “us-east-2”,
“eventSourceARN”: “arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream”
}
]
}

By default, Lambda invokes your function as soon as records are available. If the batch that Lambda reads from the event source has only one record in it, Lambda sends only one record to the function.

Configuring your data stream and function

Our Your Lambda function is a consumer application for your data stream. It processes one batch of records at a time from each shard. So in order to consume Kinesis Streams from Lambda Function, we have 2 prerequisites:
1- Add required Permissions to polling Kinesis shards from Lambda
2- Create event source mapping between Lambda and Kinesis

Create the execution role that gives your function permission to access AWS resources. The AWSLambdaKinesisExecutionRole policy has the permissions that the function needs to read items from Kinesis and write logs to CloudWatch Logs.This role includes:

  • kinesis:DescribeStream
  • kinesis:DescribeStreamSummary
  • kinesis:GetRecords
  • kinesis:GetShardIterator
  • kinesis:ListShards
  • kinesis:ListStreams
  • kinesis:SubscribeToShard

To configure your function to read from Kinesis in the Lambda console, create a Kinesis trigger.

  • Open the Functions page of the Lambda console. Choose the name of a function. Under Function overview, choose Add trigger. Choose a trigger type. Configure the required options, and then choose Add.

Lambda supports the following options for Kinesis event sources.

  • Kinesis stream — The Kinesis stream to read records from.
    Consumer— Use a stream consumer to read from the stream over a dedicated connection.
    Batch size — The number of records to send Lambda function.

Hands-on Labs : Process Kinesis Streams using AWS Lambda

We are going to do Hands-on Labs : Process Kinesis Streams using AWS Lambda. Kinesis Streams are very similar with DynamoDB Streams when it comes to process on AWS Lambda Functions.

Kinesis Streams — Using AWS Lambda to Process Kinesis Streams

Lets have a plan;
1- Create Kinesis Data Streams
2- Create Lambda Function
3- Add required Permissions to polling Kinesis shards from Lambda
4- Create event source mapping polling invocation type between Lambda and Kinesis
5- Develop our Lambda function code with incoming kinesis records
6- E2E test

Now we will follow this plan to do hands-on lab.

1- Create Kinesis Data Streams

We are interested in Kinesis Data Streams Collect streaming data with a data stream.

  • Create data stream — teststream
  • See there are Producers, Consumers

2- Create Lambda Function

create new function — KinesisTest. goto test templates: kinesis data streams:

{
“Records”: [
{
“kinesis”: {
“partitionKey”: “partitionKey-03”,
“kinesisSchemaVersion”: “1.0”,
“data”: “SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=”,
“sequenceNumber”: “49545115243490985018280067714973144582180062593244200961”,
“approximateArrivalTimestamp”: 1428537600
},
“eventSource”: “aws:kinesis”,
“eventID”: “shardId-000000000000:49545115243490985018280067714973144582180062593244200961”,
“invokeIdentityArn”: “arn:aws:iam::EXAMPLE”,
“eventVersion”: “1.0”,
“eventName”: “aws:kinesis:record”,
“eventSourceARN”: “arn:aws:kinesis:EXAMPLE”,
“awsRegion”: “us-east-1”
}
]
}

See that this is almost the same as DynamoDB streams and Amazon SQS
it has records and items with eventSource, eventId, name and so on. By default, Lambda invokes your function as soon as records are available. If the batch that Lambda reads from the event source has only one record in it, Lambda sends only one record to the function.

3- Add required Permissions to polling Kinesis shards from Lambda

Create the execution role that gives your function permission to access AWS resources. This role includes:

  • kinesis:DescribeStream
  • kinesis:DescribeStreamSummary
  • kinesis:GetRecords
  • kinesis:GetShardIterator
  • kinesis:ListShards
  • kinesis:ListStreams
  • kinesis:SubscribeToShard

4- Create event source mapping polling invocation type between Lambda and Kinesis

We can Configuring a stream as an event source. To create a trigger

  • Under Function overview, choose Add trigger. — Choose a trigger type. — Configure the required options, and then choose Add.

5- Develop our Lambda function code with incoming kinesis records

According to incoming event, I am going to develop our lambda function;

exports.handler = async (event) => {
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(function(record) {
// Kinesis data is base64 encoded so decode here
var payload = Buffer.from(record.kinesis.data, ‘base64’).toString(‘ascii’);
console.log(‘Decoded payload:’, payload);
});
};

As you can see that we have created Kinesis streams developed our Lambda function, create event source mapping connection between Kinesis and Lambda. The only differences with DynamoDB streams is that Kinesis using base64 when transferring data into it.

As you can see that we have successfully developed Hands-on Lab : Kinesis Streams — Using AWS Lambda to Process Kinesis Streams. To see full developments of this hands-on lab, you can check below course on Udemy.

Step by Step Design AWS Architectures w/ Course

I have just published a new course — AWS Lambda & Serverless — Developer Guide with Hands-on Labs.

In this course, we will learn almost all the AWS Serverless Services with all aspects. We are going to build serverless applications with using AWS Lambda, Amazon API Gateway, Amazon DynamoDB, Amazon Cognito, Amazon S3, Amazon SNS, Amazon SQS, Amazon EventBridge, AWS Step Functions, DynamoDB and Kinesis Streams. This course will be 100% hands-on, and you will be developing a real-world application with hands-on labs together and step by step.

Source Code

Get the Source Code from Serverless Microservices GitHub — Clone or fork this repository, if you like don’t forget the star. If you find or ask anything you can directly open issue on repository.

--

--

Mehmet Ozkaya
AWS Lambda & Serverless — Developer Guide with Hands-on Labs

Software Architect | Udemy Instructor | AWS Community Builder | Cloud-Native and Serverless Event-driven Microservices https://github.com/mehmetozkaya