Event Sourcing with AWS Lambda

The API Gateway Method is then configured to forward those requests to an AWS Lambda that acts a loader.AWS LambdaLambdas are leveraged for the following purposes:Load Lambda: used to load incoming requests from the API Gateway into the Kinesis Event Stream.Pump Lambda: receives events from the incoming stream and puts them in the Event Store DB.Write Lambda: receives events from the stream and stores only the latest state in the Data Store DB..Any business logic can be applied here before writing to the DB just like in a regular application.Playback Lambda: can be triggered manually to read all or a subset of events from the Events Table and send them to the Kinesis Playback Stream..The reason this does not write directly to the Data Store is so you later attach additional subscribers to the Kinesis Playback stream as your application needs evolve.Microservice Lambda: this Lambda will contain your application’s business logic and process the incoming event.DynamoDBDynamoDB is used to store the events..We have two tables per service.Events Table: stores every single event that is submitted to the system..This is the immutable system of record for the application.Data Store: stores the latest state for quick access by the application..This is not the system of record and can be wiped and rebuilt from the Event Table when necessary.Sample CodeTerraform for Creating a Lambda############################## Lambda#############################resource "aws_lambda_function" "terraform_kinesis_streamer_func" { function_name = "EventSourcing"s3_bucket = "terraform-event-sourcing" s3_key = "v1.0.0/lambda-code.zip"handler = "com.slalom.lambda.handler.ProxyWithStream::handleRequest" runtime = "java8"role = "${aws_iam_role.iam_for_terraform_lambda.arn}"timeout = 300 memory_size = 512}resource "aws_lambda_permission" "apigw" { statement_id = "AllowAPIGatewayInvoke" action = "lambda:InvokeFunction" function_name = "${aws_lambda_function.terraform_kinesis_streamer_func.arn}" principal = "apigateway.amazonaws.com"# The /*/* portion grants access from any method on any resource # within the API Gateway "REST API"..source_arn = "${aws_api_gateway_deployment.event_sourcing.execution_arn}/*/*"}Terraform for Configuring API Gateway to Invoke the Lambda############################## API Gateway Event Sourcing#############################resource "aws_api_gateway_resource" "event_sourcing" { rest_api_id = "${aws_api_gateway_rest_api.event_sourcing.id}" parent_id = "${aws_api_gateway_rest_api.event_sourcing.root_resource_id}" path_part = "event-sourcing"}resource "aws_api_gateway_method" "event_sourcring" { rest_api_id = "${aws_api_gateway_rest_api.event_sourcing.id}" resource_id = "${aws_api_gateway_resource.event_sourcing.id}" http_method = "POST" authorization = "NONE"}# Integrationresource "aws_api_gateway_integration" "event_sourcing_lambda" { rest_api_id = "${aws_api_gateway_rest_api.event_sourcing.id}" resource_id = "${aws_api_gateway_method.event_sourcring.resource_id}" http_method = "${aws_api_gateway_method.event_sourcring.http_method}"integration_http_method = "POST" type = "AWS_PROXY" uri = "${aws_lambda_function.terraform_kinesis_streamer_func.invoke_arn}"}# Deploymentresource "aws_api_gateway_deployment" "event_sourcing" { depends_on = [ "aws_api_gateway_integration.event_sourcing_lambda", ]rest_api_id = "${aws_api_gateway_rest_api.event_sourcing.id}" stage_name = "dev"}output "base_url" { value = "${aws_api_gateway_deployment.event_sourcing.invoke_url}"}Load Lambda Code (API Gateway to Kinesis Event Stream)The below example is written in Java..The Lambda:Receives requests from the AWS API Gateway.Parses the Request.Uses the data from the request to send an event the Kinesis Event Stream.Responds to the API Gateway, which in turn, response to the client.public class ProxyWithStream implements RequestStreamHandler { JSONParser parser = new JSONParser();public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {LambdaLogger logger = context.getLogger(); logger.log("Loading Java Lambda handler of ProxyWithStream");BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); JSONObject responseJson = new JSONObject(); String responseCode = "200";try { JSONObject event = (JSONObject) parser.parse(reader); if (event.get("queryStringParameters") != null) { JSONObject qps = (JSONObject) event.get("queryStringParameters"); }if (event.get("pathParameters") != null) { JSONObject pps = (JSONObject) event.get("pathParameters"); }if (event.get("headers") != null) { JSONObject hps = (JSONObject) event.get("headers"); }responseJson.put("isBase64Encoded", false); responseJson.put("statusCode", responseCode);JSONObject headerJson = new JSONObject(); responseJson.put("headers", headerJson);JSONObject responseBody = new JSONObject(); responseBody.put("message", event.toJSONString());AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard().build();PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName("event-stream"); putRecordRequest.setData(ByteBuffer.wrap((event.toJSONString().getBytes()))); putRecordRequest.setPartitionKey(UUID.randomUUID().toString()); kinesis.putRecord(putRecordRequest);responseJson.put("body", responseBody.toString());} catch (ParseException pex) { responseJson.put("statusCode", "400"); responseJson.put("exception", pex); }logger.log(responseJson.toJSONString()); OutputStreamWriter writer = new OutputStreamWriter(outputStream, "UTF-8"); writer.write(responseJson.toJSONString()); writer.close(); }}Write Lambda Code (Kinesis Event Stream Client to Data Store)The below example is written in Java..The Lambda:Receives events from the Kinesis Event Stream.Parses the Event and executes any transformation or business logic.Writes the data from the event into the application Data Store to be later read by the application.public class KinesisLambdaReceiver implements RequestHandler<KinesisEvent, Void> {public Void handleRequest(KinesisEvent event, Context context) { LambdaLogger logger = context.getLogger(); logger.log("Received " + event.getRecords().size() + " raw Event Records.");RecordDeaggregator.stream(event.getRecords().stream(), dataRecord -> { logger.log(new String(dataRecord.getData().array())); AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() .standard() .build();DynamoDB dynamoDB = new DynamoDB(client);Table table = dynamoDB.getTable("EventSourcingDataStore");final Map<String, Object> dataMap = new HashMap<>(); dataMap.put("data", new String(dataRecord.getData().array()));PutItemOutcome outcome = table .putItem(new Item().withPrimaryKey("id", dataRecord.getExplicitHashKey()).withMap("data", dataMap));});return null; }}Deploying the SolutionDeploying the solution requires two main parts:Compiling the Java Lambda code.Applying the Terraform configs to deploy the Lambda and other infrastructure.Compiling the Java Lambda CodeThe project I used to develop the Lambda code is a standard Java Gradle project..Which ever way you prefer to structure your project, you need to ensure that your build packages your project into a self contained zip file..This zip file will later be used by your Terraform config to deploy the lambda.build.gradleapply plugin: 'java'repositories { mavenCentral()}dependencies { compile ( 'com.amazonaws:aws-lambda-java-core:1.1.0', 'com.amazonaws:aws-lambda-java-events:1.1.0', 'com.amazonaws:aws-lambda-java-log4j:1.0.0', 'com.amazonaws:aws-java-sdk-kinesis:1.11.373', 'com.amazonaws:amazon-kinesis-client:1.8.8', 'com.amazonaws:amazon-kinesis-deaggregator:1.0.3' )}task buildZip(type: Zip) { from compileJava from processResources into('lib') { from configurations.compileClasspath } }build.dependsOn buildZipCreating a .zip Deployment Package (Java) — AWS LambdaThis section provides examples of creating .zip file as your deployment package..You can use any build and packaging…docs.aws.amazon.comDeploying the LambdaTo deploy the Lambda you can use the Terraform examples mentioned previously in this article.. More details

Leave a Reply