📚
Tech-Posts
  • README
  • Kafka + Maxwell
  • Kafka
  • Docker
  • MySQL connection via SSH
  • Python
    • Django
    • PyCharm+Docker Dev
    • Pip Tools
    • python project with local packages
  • PHP
    • PhpStorm+Docker Dev
  • Cassandra
  • AWS
    • Cheat Sheet
    • Lambda with Kinesis Event Source Mapping
  • AWS DMS
  • Lambda demo function to produce to Kinesis
  • Deploy a static web page with protection of specific static resources on AWS S3
  • Data Engineer
    • Move Salesforce Files out using Pentaho DI
  • A Pentaho DI Project Readme
  • PowerBI
    • Power BI refer to previous row
Powered by GitBook
On this page
  • Event Source Config
  • Handle event in lambda function

Was this helpful?

  1. AWS

Lambda with Kinesis Event Source Mapping

PreviousCheat SheetNextAWS DMS

Last updated 4 years ago

Was this helpful?

Lambda Triggered by Kinesis Stream.

Event Source Config

Handle event in lambda function

The event argument received by lambda function:

{
  "Records": [
    {
      "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "testString94",
        "sequenceNumber": "49615617802757578954455366724613282670234824178078318610",
        "data": "ewogICAgImRhdGEiOiB7CiAgICAgICAgImlkIjogMzE1MjA5LAogICAgICAgICJjcmVhdGVkX2J5X3VzZXJfaWQiOiAxLAogICAgICAgICJvd25lcnNoaXBfaWQiOiAxLAogICAgICAgICJuYW1lIjogImRzZHNkIGRzZHNkZHMiLAogICAgICAgICJmaXJzdF9uYW1lIjogImRzZHNkIiwKICAgICAgICAibGFzdF9uYW1lIjogImRzZHNkZHMiLAogICAgICAgICJsZWFkX3R5cGUiOiAiSGVsbG8gTGVhZHMiLAogICAgICAgICJsZWFkX3N0YXR1cyI6ICJDb250YWN0ZWQiLAogICAgICAgICJwaG9uZSI6ICIxMjM0NSIsCiAgICAgICAgImVtYWlsIjogImZtZmpkZEBxcS5jb20iLAogICAgICAgICJjb3VudHJ5IjogIkNCIiwKICAgICAgICAiY3JlYXRlZF9hdCI6ICIyMDIwLTAxLTA5VDA0OjEzOjM4WiIsCiAgICAgICAgInVwZGF0ZWRfYXQiOiAiMjAyMC0wMS0xNVQyMjo0NToxN1oiLAogICAgICAgICJkZWxldGVkX2F0IjogIjIwMjAtMDEtMTVUMjI6NDU6MTdaIiwKICAgICAgICAibGFzdF92aWV3ZWRfZGF0ZXRpbWUiOiAiMjAyMC0wMS0xNVQyMjo0NTowMVoiLAogICAgICAgICJkZXNjcmlwdGlvbiI6ICJkc2hsIGFzZGpzYWtsIGhhYSBoYXVzYWEgcyIKICAgIH0sCiAgICAibWV0YWRhdGEiOiB7CiAgICAgICAgInRpbWVzdGFtcCI6ICIyMDIwLTEyLTE1VDA1OjQ3OjQyLjcyNDQ1OFoiLAogICAgICAgICJyZWNvcmQtdHlwZSI6ICJkYXRhIiwKICAgICAgICAib3BlcmF0aW9uIjogImxvYWQiLAogICAgICAgICJwYXJ0aXRpb24ta2V5LXR5cGUiOiAicHJpbWFyeS1rZXkiLAogICAgICAgICJzY2hlbWEtbmFtZSI6ICJsYWJfbG9naXhfY3JtX3JlcGxpY2EiLAogICAgICAgICJ0YWJsZS1uYW1lIjogImFjeV9sZWFkcyIKICAgIH0KfQ==",
        "approximateArrivalTimestamp": 1613626130.312
      },
      "eventSource": "aws:kinesis",
      "eventVersion": "1.0",
      "eventID": "shardId-000000000001:49615617802757578954455366724613282670234824178078318610",
      "eventName": "aws:kinesis:record",
      "invokeIdentityArn": "arn:aws:iam::757914573505:role/service-role/labKinesisConsumer-role-2dnjmg4b",
      "awsRegion": "ap-southeast-2",
      "eventSourceARN": "arn:aws:kinesis:ap-southeast-2:757914573505:stream/mock-mt4_report_db-acy-cdc"
    },
    {
      "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "testString59",
        "sequenceNumber": "49615617802757578954455366724614491596054438875972501522",
        "data": "eyJwcm9wIjogIjU5IiwgInRpbWVzdGFtcCI6ICIxNjEzNjI2MTMxIiwgInRoZV9kYXRhIjogInRlc3RTdHJpbmc1OSJ9",
        "approximateArrivalTimestamp": 1613626131.331
      },
      "eventSource": "aws:kinesis",
      "eventVersion": "1.0",
      "eventID": "shardId-000000000001:49615617802757578954455366724614491596054438875972501522",
      "eventName": "aws:kinesis:record",
      "invokeIdentityArn": "arn:aws:iam::757914573505:role/service-role/labKinesisConsumer-role-2dnjmg4b",
      "awsRegion": "ap-southeast-2",
      "eventSourceARN": "arn:aws:kinesis:ap-southeast-2:757914573505:stream/mock-mt4_report_db-acy-cdc"
    }
  ]
}

event.Records.*.eventSourceARN can be used to determine source (which kinesis it is from).

event.Records.*.kinesis.data is actual streaming data, which is encoded, should be decoded to string:

records = event['Records']
for record in records:

    # decode data to string
    payload=base64.b64decode(record["kinesis"]["data"]).decode('utf-8')
    
    # log payload
    print("Decoded payload: " + "".join(payload.splitlines()))
    
    # if payload is json string, we can convert it to python dictionary
    data: dict = json.loads(payload)
    
    # TODO: process data ...