This article covers the implementation of a data ingestion use case, to ingest a big file, process it and load it into a schemaless database. The entire use case is setup over Azure, taking advantage of Azure Storage as the main source for the file dump, Azure Functions to process the data, and Cosmos DB as the final persistence step.
Data Engineering as been a growing topic, specially with the increased use of the Cloud, which has given many companies capabilities that were otherwise very difficult to ensure. Software Engineers and Architects are now, more than ever, responsible to come up with solutions for integrating systems based on data processes and pipelines. Some of these problems derive from the incapability to control every system within one or multiple organizations, rarely is the opportunity to setup a new and clean end-to-end data pipeline, therefore alternative methods must be put in place to solve data ingestion challenges.
In this article we take a look at a specific problem and how to solve it with the help of Cloud native approaches. Take by example a system that needs to daily ingest a huge chunk of data where there is little to no control on the way that data is ingested. Here you are bound to ingest a large CSV file into a database where it will be consumed by other applications.
Sound interesting? Let's get into it then.
TLDR: Full development available in the solution git repo.
Before going into the schematics it's necessary to explain how we will accomplish the solution. Our main goal here is to ingest a large file using serverless computation. It doesn't come without challenges, from limited storage capabilities, to failing executions, serverless gives us a huge improvement on overall compute and setup timelines, but not without it's downfalls. In this case we want to increase the parallel capabilities of the execution while preventing high chunks of data to be dependent on a single function/execution.
For that, we decided to begin the process by splitting the file into several smaller chunks that could be processed in parallel and therefore reduce the impact of processes failing. Then we want to make sure we have independent streams running each one of the files, and what better way to do that then to use the Fan In/Fan Out pattern for Azure Durable Functions (official documentation here).
The example architecture is based on three main components:
- Azure Storage: Container where the external systems dumps the initial file. In this case we will use it for the file sub partition as well, but other solutions could be feasible;
- Azure Functions: Compute resource that will integrate with the Azure Storage to get the file, split it, and then run multiple executions to read the file shards and upload the data to Cosmos DB;
- Cosmos DB: Azure NoSQL Database, where we will store the ingested data.
Regarding the Azure Function, we will use the Azure Durable Function features to split the execution into multiple streams:
The Azure Function execution will be based on:
- Trigger will initiate the Main execution that will start an Orchestrator instance.
- The Orchestrator instance will begin to initiate a new ProcessCsv Activity where the file will be read from the Azure Storage, split and the shards will be uploaded to the Azure Storage, afterwards the list of filenames will be returned to the Orchestrator.
- After the Orchestrator receives the list with the names of the files in the Azure Storage container, it will trigger a batch of activities based on the number of available files to be processed.
- The ProcessData Activities will receive each one a filename that will be read, processed, and every row inserted in the Cosmos DB as a new document.
Note: The current design does not include redundancy or re-ingestion in the data processing, however it can be extended by creating a list of failed inserted documents in Cosmos DB and creating a new "failed insert" list.
Let's start by looking at the baseline versions, the example is developed using .NET 6 and Azure Functions v4. The full implementation code project is available at this git repo.
The solution is composed by a single project, divided by a Functions folder, containing all the triggers, orchestrators and activities, the Models folder, containing the document structure to be inserted into Cosmos DB, and a Services folder, where the integrations with Azure Storage and Cosmos DB are implemented.
The Main class contains the trigger for the instance that will run the orchestrator, this is important because instances are durable until the orchestrator ends or there is an error in the execution (for more information regarding instance management check Durable Orchestrations).
The FunctionOrchestrator class is responsible for running the orchestration where the Activities will be instantiated from.
First we start by initiating the ProcessFile Activity that will process the large file, split it in several shards, upload them to Azure Storage and return a list with the new files names. The logic behind the file processing is implemented at the StorageService ProcessCSV method, and includes a partition cleaner for old file shards. The file reading and splitting is done through an asynchronous Stream Reader to improve memory allocation, and not exceed serverless capabilities for low end plans.
Then the list of files is split into batches of 5 parallel workers, this is generated by launching 5 asynchronous Activities that will process each file and then return to the Orchestrator after finishing the work. After all 5 Activities finish, another 5 Activities are started and so on until all the files have been processed.
Inside the ProcessData Activity, the file is loaded from the Azure Storage and transformed into a local list of data objects, by the TransformCsv method within StorageService, ready to be inserted into the Cosmos DB database. To avoid Cosmos DB throttle the data is loaded in batches of 100 lines by the UploadData method in the CosmosService class.
After all the batches finish the orchestration execution is completed. In this case, as stated in the previous section, there is no retry mechanism for failed inserts into the Cosmos DB database.
External data ingestion can be complicated in several ways, not having a clear control on how the data is ingested, or even the data model behind it, can be very challenging. By taking the advantage of custom code, and cloud serverless capabilities, we can develop a clean and reusable solution to manage ingestions from external or even unmanaged internal sources.
Azure Functions have come a long way, and with the increase of reactive systems it will probably grow exponentially in the next years, however, even for data pipelines, it's extremely important for development and architecture teams to be able to extend and use tools as they see fit for the problem.
As usual, thank you for reading. I appreciate all the feedback I can get, so feel free to drop a comment bellow or to reach me in social media, links bellow as well.
Name Placeholder9:21 PM - Monday, March 14, 2022
Was looking for a clean implementation for durable functions parallel work, and this was perfect, good job!