Lambda with Kinesis Event Source Mapping
Last updated
Was this helpful?
Last updated
Was this helpful?
Lambda Triggered by Kinesis Stream.
The event
argument received by lambda function:
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "testString94",
"sequenceNumber": "49615617802757578954455366724613282670234824178078318610",
"data": "ewogICAgImRhdGEiOiB7CiAgICAgICAgImlkIjogMzE1MjA5LAogICAgICAgICJjcmVhdGVkX2J5X3VzZXJfaWQiOiAxLAogICAgICAgICJvd25lcnNoaXBfaWQiOiAxLAogICAgICAgICJuYW1lIjogImRzZHNkIGRzZHNkZHMiLAogICAgICAgICJmaXJzdF9uYW1lIjogImRzZHNkIiwKICAgICAgICAibGFzdF9uYW1lIjogImRzZHNkZHMiLAogICAgICAgICJsZWFkX3R5cGUiOiAiSGVsbG8gTGVhZHMiLAogICAgICAgICJsZWFkX3N0YXR1cyI6ICJDb250YWN0ZWQiLAogICAgICAgICJwaG9uZSI6ICIxMjM0NSIsCiAgICAgICAgImVtYWlsIjogImZtZmpkZEBxcS5jb20iLAogICAgICAgICJjb3VudHJ5IjogIkNCIiwKICAgICAgICAiY3JlYXRlZF9hdCI6ICIyMDIwLTAxLTA5VDA0OjEzOjM4WiIsCiAgICAgICAgInVwZGF0ZWRfYXQiOiAiMjAyMC0wMS0xNVQyMjo0NToxN1oiLAogICAgICAgICJkZWxldGVkX2F0IjogIjIwMjAtMDEtMTVUMjI6NDU6MTdaIiwKICAgICAgICAibGFzdF92aWV3ZWRfZGF0ZXRpbWUiOiAiMjAyMC0wMS0xNVQyMjo0NTowMVoiLAogICAgICAgICJkZXNjcmlwdGlvbiI6ICJkc2hsIGFzZGpzYWtsIGhhYSBoYXVzYWEgcyIKICAgIH0sCiAgICAibWV0YWRhdGEiOiB7CiAgICAgICAgInRpbWVzdGFtcCI6ICIyMDIwLTEyLTE1VDA1OjQ3OjQyLjcyNDQ1OFoiLAogICAgICAgICJyZWNvcmQtdHlwZSI6ICJkYXRhIiwKICAgICAgICAib3BlcmF0aW9uIjogImxvYWQiLAogICAgICAgICJwYXJ0aXRpb24ta2V5LXR5cGUiOiAicHJpbWFyeS1rZXkiLAogICAgICAgICJzY2hlbWEtbmFtZSI6ICJsYWJfbG9naXhfY3JtX3JlcGxpY2EiLAogICAgICAgICJ0YWJsZS1uYW1lIjogImFjeV9sZWFkcyIKICAgIH0KfQ==",
"approximateArrivalTimestamp": 1613626130.312
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000001:49615617802757578954455366724613282670234824178078318610",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::757914573505:role/service-role/labKinesisConsumer-role-2dnjmg4b",
"awsRegion": "ap-southeast-2",
"eventSourceARN": "arn:aws:kinesis:ap-southeast-2:757914573505:stream/mock-mt4_report_db-acy-cdc"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "testString59",
"sequenceNumber": "49615617802757578954455366724614491596054438875972501522",
"data": "eyJwcm9wIjogIjU5IiwgInRpbWVzdGFtcCI6ICIxNjEzNjI2MTMxIiwgInRoZV9kYXRhIjogInRlc3RTdHJpbmc1OSJ9",
"approximateArrivalTimestamp": 1613626131.331
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000001:49615617802757578954455366724614491596054438875972501522",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::757914573505:role/service-role/labKinesisConsumer-role-2dnjmg4b",
"awsRegion": "ap-southeast-2",
"eventSourceARN": "arn:aws:kinesis:ap-southeast-2:757914573505:stream/mock-mt4_report_db-acy-cdc"
}
]
}
event.Records.*.eventSourceARN
can be used to determine source (which kinesis it is from).
event.Records.*.kinesis.data
is actual streaming data, which is encoded, should be decoded to string:
records = event['Records']
for record in records:
# decode data to string
payload=base64.b64decode(record["kinesis"]["data"]).decode('utf-8')
# log payload
print("Decoded payload: " + "".join(payload.splitlines()))
# if payload is json string, we can convert it to python dictionary
data: dict = json.loads(payload)
# TODO: process data ...