📚
Tech-Posts
  • README
  • Kafka + Maxwell
  • Kafka
  • Docker
  • MySQL connection via SSH
  • Python
    • Django
    • PyCharm+Docker Dev
    • Pip Tools
    • python project with local packages
  • PHP
    • PhpStorm+Docker Dev
  • Cassandra
  • AWS
    • Cheat Sheet
    • Lambda with Kinesis Event Source Mapping
  • AWS DMS
  • Lambda demo function to produce to Kinesis
  • Deploy a static web page with protection of specific static resources on AWS S3
  • Data Engineer
    • Move Salesforce Files out using Pentaho DI
  • A Pentaho DI Project Readme
  • PowerBI
    • Power BI refer to previous row
Powered by GitBook
On this page

Was this helpful?

Lambda demo function to produce to Kinesis

import boto3
from datetime import datetime
import calendar
import random
import time
import json


class EventFormatError(Exception):
    pass


def lambda_handler(event, context):
    
    stream_name = event.get('stream_name')
    if not stream_name:
        raise EventFormatError('stream name is null')
    
    producer = Producer(stream_name)
    # create 10 data records and write them to the stream
    # Data records consist of a random number between 0 and 100,
    # A timestamp and a string consisting of the text 'testString'
    # concatenated with the random number that we generated
    # NB We don't use the event or context parameters as this lambda is not running
    # in repsonse to an event
    #
    for i in range(10):
        property_value = random.randint(0, 100)
        property_timestamp = calendar.timegm(datetime.utcnow().timetuple())
        the_data = 'testString' + str(property_value)
        
        # write the data to the stream
        producer.put_to_stream(the_data, property_value, property_timestamp)

        # wait for 1 second
        time.sleep(1)


class Producer(object):
    def __init__(self, stream_name):
        self.stream_name = stream_name
        self.k_client = boto3.client('kinesis', region_name='ap-southeast-2')

        
    def put_to_stream(self, the_data, property_value, property_timestamp):
            
            payload = {
                    'prop': str(property_value),
                    'timestamp': str(property_timestamp),
                    'the_data': the_data
                  }
    
            print (payload)
    
            put_response = self.k_client.put_record(
                            StreamName=self.stream_name,
                            Data=json.dumps(payload),
                            PartitionKey=the_data)

PreviousAWS DMSNextDeploy a static web page with protection of specific static resources on AWS S3

Last updated 4 years ago

Was this helpful?