Serverless AWS Lambda architecture for large scale data ingestion

bitsofinfo
Level Up Coding
Published in
9 min readNov 29, 2021

--

Recently was faced with a requirement to build out an extensible data import framework that would be able to consume various file formats provided by 3rd parties…. but make it faster than the current implementation. The current mechanism that was in place was using a proprietary packaged legacy file ETL product who’s output was an executable artifact that would run on a single VM. I won’t name the particular vendor, but it was one of those tools that an analyst would use drag and drop components to essentially read a file (CSV) and “do something” row by row. This was adequate years ago but did not scale well as the CSV files were getting extremely large, new formats on the horizon and due to the nature of the tool implementation… the processing of the records was sequential and single threaded; it was slow.

Requirements

The primary requirement was to be able to process the data much faster. The client also wanted to be able to process more than just CSV formats… TSV, JSON, XML, EDI (X12) etc. and there was no interest in any of the numerous 3rd party cloud SaaS integration offerings (Fivetran, Stitch, Matillion etc). The desire was to have it running within their existing AWS environment and were good with their developers modifying it to made adjustments or add support for new integrations. Overall just wanted it to be faster and new formats to be supportable.

Speed

Given speed was one of the primary issues to solve this was the first thing looked at. 90% of the file types that needed to be processed were just row oriented data, where each row was independent and had no dependencies between its predecessor row or ancestor rows. Given this, we could look at introducing a level of parallelism that the prior system lacked. The decision was to introduce a default capability to essentially “split” the file row by row and permit each row to processed independently. Each row would have a unique identifier as well as a record number decorated by the framework upon splitting. These would be used for correlation of processing events to track overall success/failure. For file formats that did have dependencies or were more complicated than line separated row oriented data like CSV/TSV, we also needed to support a non-splitting scenario. Since the platform was on AWS, leveraging services like SQS and SNS was likely going to be on the table.

Lifecycle

The next part of the design was how to address the overall “flow” you would typically want to enforce in a data import system. Some key rules needed to apply to any file imported, regardless of “type” and we wanted to ensure these rules would be addressed universally. For example the lifecycle: files drop into a landing-zone, they then are validated, optionally split apart and then go into a processing state and one done are either completed, failed or rejected. Finally files should be archived. The handling of transitions across these states is something that could be handled in a code, at a framework level that should not have to be implemented by each individual developer working on a new file import type. The developer just wants to focus on being able to process the data in the file.

Storage

How the data is stored is another important factor. Given the target AWS cloud environment, using S3 as the primary location to store the imported data was the logical choice. S3 is basically ubiquitous at this point and getting data into it would be a non-issue as the integration options out there are virtually unlimited; even for 3rd parties with legacy systems; for example you can bridge FTP/SFTP to it. For all data imported we would want to be able to have a history of what was imported, copies of the data as if moved through different states and have it archived as well. With this in mind we decided to have an enforced structure to the data lake that we would be creating:

s3://[import-bucket-name]/
landing-zone/
[data_import_type_a]/[yyyy]/[mm]/[dd]/[import_id]
[data_import_type_b]/[yyyy]/[mm]/[dd]/[import_id]
...
archived/
...
processing/
...
rejected/
...
completed/
...
failed/
...Scalability

Given some file types could contains N number of rows, from thousands to tens of thousands and beyond, not only was sequential processing slowing things down, but depending on the import file type, the requirements for what was actually being done with each row varied. Sometimes the rows were being slightly modified and inserted into a database, other times there was a back and forth sequence of API calls going against yet another system; point being is that it was not just the volume of items to be processed but also the time involved to process each item; it could be time consuming.

With this in mind, we needed the processing to be elastic and the team did not want to have to manage a bunch of infrastructure and manually or automatically scale it or even guess what the capacity ceiling would be… the data just needed to be processed as fast as possible (with some high ceilings of course..). Given this we decided to write the framework as AWS Lambda functions. The team was primarily versed in Python and Javascript so likewise, we decided to go with those languages and a polygot implementation.

Metadata

With a data import system, one is going to want to be able to track everything that has come into it and what happened; what succeeded/failed overall as well as what happened on a record by record level. Given the number of individual records per file had no upper bound, we would need an underlying storage platform that was N scale. In this case we decided to utilize DynamoDB with a few tables:

  • data_import_event: this table tracked the overall state of the imported file event, its identifier, type, current state (landing-zone, processing, rejected, failed, completed) and timestamps of each state change. There were also various local secondary indices as necessary.
  • data_import_record: for each record processed, it will fail or succeed and be recorded here with a reason/status. The record would contain a status, row identifier, row number, received at and a trace identifier (created by the consumer) There were also various local secondary indices as necessary.

Solution overview

Given the requirements and design decisions above, the framework solution would roughly break down into the following high level design parts:

Serverless Framework

When developing serverless solutions on AWS you always have the challenge of development. How do we efficiently develop locally for something that is using services that are in the cloud? What do I use to ensure all the cloud resources exist that I need for my functions to work? There are a few strategies for this, you can run test local code that just connects to cloud services over the wire, or you can look at various frameworks out there that are all encompassing: they provide local mock cloud services, as well as simplified abstractions that allow you define what cloud resources you need and generate all the cloud specific IaC code. (in this case AWS cloudformation). There are several frameworks out there for this, such as AWS’s own SAM framework (serverless application model). The one we settled on to go with was the Serverless Framework. The framework has a tremendous amount of plugins and a pretty good set of simplified declarative abstractions for cloud resources in its serverless.yml format.... but it also comes with its own set of quirks for offline usage and some buggy plugins that you need to work around... but at the end of the day it works.

Producer: data-importer

The primary import framework solution was implemented in Python as a Serverless Framework based set of AWS Lambda functions. The data-importer project provided the following functionality:

  • created the primary S3 bucket that housed the data lake for all supported states landing-zone, processing, archived, failed, completed
  • created/managed all DynamoDB tables and secondary indices for data_import_event and data_import_record
  • created/managed a central SNS topic where all data_import_event record event's would be published to, annotated with the data_import_type so that consumers could only subscribe to the events they cared about.
  • provided the core AWS Lambda function handleDataImportEvent() that would react to all S3 object creation events within the s3://[import-bucket-name]/landing-zone bucket location, trigger validation and broadcast of individual records or file reference to SNS
  • created AWS Event Bridge scheduled tasks for each data_import_type to invoke the function evaluateDataImportRecords()
  • provided the AWS Lambda function recordProcessingEvents() which consumers could call to notify the producer of data_import_record success/failures.
  • provided various markImportData[Completed|Rejected|Failed]() lambda functions to let the consumer flag the overall status
  • provided a getDataImportRecord() function to retrieve the underlying audit record for the event and its status.

A key part of the producer codebase is the individual python modules it consults during handleDataImportEvent(), one for each data_import_type. Each of these implementations fulfills an interface contract that the producer calls to validate the file received and return the data as a PETL table, once validated and in this format, the generic code can proceed to work with it and emit it to consumers regardless of the underlying data file format. Note that the PETL conversion only occurs for data_import_types with mode EMIT_ROWS. Those with EMIT_FILE_REF just extract simple meta-data such the row counts etc.

Consumers

For each data_import_type two things are created:

  • python module for the data_import_type that is used by the data_importer producer for PETL conversion and validation
  • a separate Serverless Framework project, written in any language, however most of them are written in Javascript NodeJS.

Each consumer serverless framework project is its own project in Git and can be independently managed and developed on its own. To interoperate with the producer each consumer project is responsible for:

  • managing a dedicated SQS queue that subscribes to events from the producers SNS topic where the data_import_type is relevant
  • provide an Lambda function (typically named handleDataImportEvent()) that is invoked by it's SQS queue for each data import record (if mode is EMIT_ROWS) or a single record containing the file reference (if mode is EMIT_FILE_REF)
  • as the data is processed, invoke the producer's recordProcessingEvents() and/or markImportData[Completed|Rejected|Failed]() functions as necessary.

Some of the flow within consumer itself is generic, as well as code or invoking the producer's status reporting functions. For NodeJS based implementations, we ended up creating a npm module that can be used that provides much of this boilerplate implementation.

Overall flow

So lets re-cap how this works:

  1. File is placed in landing-zone/<data_import_type>
  2. S3 event fired and invokes handleDataImportEvent()
  3. File validated
  4. If valid: file is moved to processing/yyyy/mm/dd/<data_import_id>/ and SNS events are published (one for the file OR one for each row)
  5. If invalid: file is moved to rejected/yyyy/mm/dd/<data_import_id>/
  6. Regardless of valid/invalid, a record is written to dynamodb w/ primary key data_import_id / data_import_type
  7. One or more queues/applications can subscribe to the SNS topic and listen for only messages where the data_import_type is the one they are responsible for. The messages may be one message/event per record in the file (EMIT_ROWS), or a single event which contains a reference to the original file (EMIT_FILE_REF).
  8. As the data_import_type consumer lambda processes the rows or data file, it optionally can make api-gateway calls, or direct Lambda function calls to the producers recordProcessingEvents() function for annotating the data_import_record with supplementary info. Each "processing event" item is either status: success or status: failed. Calling recordProcessingEvents() is optional as some data_import_types, specifically the EMIT_FILE_REF types, may not need to emit individual processing events per row and instead choose to manually callback via one of the markImportData[Rejected|Completed|Failed] methods which flag the entire file as completed, rejected or failed.
  9. On demand via api-gateway or via a scheduled cron via event bridge, the evaluateDataImportRecords() can be called to evaluate data import records in the processing state and determined when they are completed or failed base don the rules in the import_types/ modules. This method then call markImportDataCompleted() or markImportDataFailed() which will copy the file from processing/ to either completed/yyyy/mm/dd/<data_import_id>/ or failed/yyyy/mm/dd/<data_import_id>/. Either of these methods can also be called directly via api-gateway on demand.
producer: import file received, validated and events distributed to consumers
producer: import file received, validated and events distributed to consumers
producer: evaluateDataImportRecords() via Event Bridge or direct invocation
consumer: invoking producer functions for recording event processing results
consumer: reporting overall status of the file to the producer

Summary

Overall the solution works pretty well. When new files dropped the time to completion for processing was much faster as the number of rows processed concurrently is now only bound by the capacity of any systems that consumers have to interact with. The AWS side scales infinitely. Since inception, the solution has been used to process much more than its original design (CSVs), but expanded to cover more unique scenarios where the consumer is much more complicated: EDI (X12) file processing, with returning EDI responses back to the origin as well as managing other systems like OpenSearch, populating data and managing index rotation etc. The implementation currently services more than a dozen unique integrations.

Hopefully this article was helpful and might provide some inspiration for others out there looking to design solutions.

Originally published at http://bitsofinfo.wordpress.com on November 29, 2021.

--

--