Building an Expandable IoT Data Pipeline - Part 2
Building an Expandable Data Pipeline - Part 2
Over the past year I've been building a cloud-based data pipeline for a client focusing on IoT data aggregation. There have been many bumps and bruises along the way, but in the end my team has been able to build out a scalable, efficient data processing pipeline to ingest and aggregate data from many different devices across a variety of purposes. Each new device type brought a new spin on processing, so the pipeline needed to be adaptable and fault tolerant. In the end, I believe we made an excellent version 1, leaving lots of room for expansion and growth.
In my head, a data pipeline means a couple of things.
- Data Archival
- Data Warehousing
- Data Cleaning
- Metadata Storage
- Structured querying
We decided to choose a cloud infrastructure for rapid development. While, Azure and AWS both offer similar and valid tools, we decided on AWS. Specifically we used: AWS S3, SQS, Lambda, DynamoDB, EC2 and RDS.
Each IoT device is equipped with a tool to insert files into AWS S3. S3's purpose is for long term storage of the data in its original format, CSV in our case. You could just as easily use JSON or any other data storage format, but we were constained per requirements to use CSV.
Once the data is in S3, we wrote a very small Lambda function that is triggered on the bucket insert event. It takes an S3 file key and inserts it into an SQS procesing queue. This gives us increased logging and reproducability via ClouddWatch, as well as DeadLetter queue functionality for storing (and taking action on) invalid messages.
We also configured an EC2 instance to poll the queue every 15 minutes for new messages. The device data comes in on a regular, nonstreaming cadence, so we didn't need direct streaming capabilities. The longest a message could sit in the queue (provided the EC2 instance was up) was 15 minutes, which was totally fine for our use case.
The EC2 instance ran a Python script that was knowledgable about S3, SQS, DynamoDB and RDS (Postgres). This script solves points 2-4 above. This processing script reads a CSV file and is validates it based on its contents. If there are errors in the CSV, the script is able to fix them in many cases. Additionally, if there is junk/weird data, the script can accommodate that as well. The goal of the script is insert data into our data stores as uniformly as possible, so that it's standarized and consistent.
Once the script generated a valid, standard, and consistent version of the CSV, required metadata about the CSV is stored in Postgres, and the structured version of its contents stored in DynamoDB. We needed to store metadata for each CSV in Postgres to ensure targeted scans and queries of DynamoDB. Otherwise, we would see performance issues. When making data requests, first metadata for a query is obtained from Postgres, then DynamoDB is queried based on the metadata.
The Extra Specifics
Special consideration should be made when creating a schema for DynamoDB. There are relatively few options for performance tuning, so the schema itself needs to be thought out pretty extensively before going live. For our purposes, we settled on a schema that required a device identifier (
DeviceId) as a Partition Key, and data timestamp (
Timestamp) as a sort key. Additional columns are stored based on their CSV values. This introduces some complications in querying across multiple CSV versions. Specifically, not all CSVs contained the same headers. We solved this issue by creating a second DynamoDB table that stored a
DeviceId and a list of its file headers.
We needed to solve Archival, Warehousing, Cleaning, Metadata and Structured Querying. We solved each of those problems performantly and consistently. The point of this pipeline, however, was to get something up and running quickly. Each component can be optimized or replaced depending on the needs of the system. If that means replacing DynamoDB with a Hadoop or Cassandra cluster, it shouldn't require monumental changes to the rest of the system. Extensibility was important, so it was at the forefront of my mind while architecting this system.