Lambda demo function to produce to Kinesis

import boto3
from datetime import datetime
import calendar
import random
import time
import json


class EventFormatError(Exception):
    pass


def lambda_handler(event, context):
    
    stream_name = event.get('stream_name')
    if not stream_name:
        raise EventFormatError('stream name is null')
    
    producer = Producer(stream_name)
    # create 10 data records and write them to the stream
    # Data records consist of a random number between 0 and 100,
    # A timestamp and a string consisting of the text 'testString'
    # concatenated with the random number that we generated
    # NB We don't use the event or context parameters as this lambda is not running
    # in repsonse to an event
    #
    for i in range(10):
        property_value = random.randint(0, 100)
        property_timestamp = calendar.timegm(datetime.utcnow().timetuple())
        the_data = 'testString' + str(property_value)
        
        # write the data to the stream
        producer.put_to_stream(the_data, property_value, property_timestamp)

        # wait for 1 second
        time.sleep(1)


class Producer(object):
    def __init__(self, stream_name):
        self.stream_name = stream_name
        self.k_client = boto3.client('kinesis', region_name='ap-southeast-2')

        
    def put_to_stream(self, the_data, property_value, property_timestamp):
            
            payload = {
                    'prop': str(property_value),
                    'timestamp': str(property_timestamp),
                    'the_data': the_data
                  }
    
            print (payload)
    
            put_response = self.k_client.put_record(
                            StreamName=self.stream_name,
                            Data=json.dumps(payload),
                            PartitionKey=the_data)

Last updated

Was this helpful?