Architecture for a data lake REST API using Delta Lake, Fugue & Spark

bitsofinfo
Level Up Coding
Published in
11 min readAug 15, 2023

--

“Hey, we need some kind of a REST API over all our data lakes to let analysts and other integrations query records on demand . Can we please get this done?”

That was the use-case laid out that needed a solution. If you’ve had any experience with data lakes you know that they can be very complicated, very large and utilize unique file formats. In any case, this project sounded interesting, so here we go.

CURRENT STATE OF THE OCEAN

We needed to do this for N data lakes, not just one. Thankfully the current architecture of the data lakes in question followed fairly consistent pattern.

Each lake followed the Delta Lake Architecture following the patterns of bronze, silver and gold zones each one progressing from more primitive data (bronze) to more refined data (gold). For each lake the gold data held the final gold copy of the data; the official consumable data or curated data if you’d like to call it that. The “gold” location of each lake was generally the data we wanted to make consumable via a REST API, however at the end of the day, despite each silo of data holding different refinement levels of data, they all still followed the same pattern inside each silo: a top level root folder, followed by a set of sub-folders organizing the data in a categorization tree of directories, and finally the last folder being the delta table that represented some “entity”.

For example it would look something like this:

/[data_lake_name]/[project]/some/path/to/the/data/[delta-table-name]/

The above is pretty typical in lakes anyways and is not anything unique. The lakes themselves generally resided in Azure ADLS storage (Azure Data Lake Storage) accounts. The end delta tables were also often cataloged in Databricks and utilized via various Spark jobs/notebooks etc. Some tables were small, others medium sized and others were pretty large. Individual records within each table generally were not that big and well within the margin of size reasonable to be returned over an API response.

REQUIREMENTS

The API requirements were pretty basic, as most the integration use-cases were simple. The primary use was being able to select any subset of data from a single entity (delta table), joining across tables was not needed (yet). Give than wanted to expose the following functionality via rest endpoints:

  • Get a schema for any target entity in the lake
  • Retrieve data for any single entity (delta table) in the lake and return the results. Support the ability to filter the data and specify the specific columns to return. Sorting/grouping etc should all be supported and paging through results.
  • API clients need to be restricted to only parts of a given lake they have access to, however row level security was not necessary.

Ok the R in CRUD; cool, pretty simple.

DELTA-RS & DUCKDB

Given the variety of entity/table sizes residing across the lake it became apparent that we would need a variety of execution methods to access the data. For example when accessing smaller tables it would be ideal (for speed) to simply access the data locally or direct from the storage account, while for larger ones we might have to formulate queries that execute over Databricks/Spark.

Given The first goal was to start writing some Python code to see what libraries might work well to access the Delta Tables natively just using Python and not having to rely on something like Spark.

The first library looked at was delta-rs which provides low level access to Delta tables in Rust but with higher level bindings for Python. Using this you could get at the tables and load up data into pyarrow tables and go from there, but would not be an ideal layer to directly bind code that is mediating a REST API. With delta-rs you could also feed the pyarrow tables directly into DuckDB which is in-process OLAP DBMS (database management system)… which would much better suited for this kind of use-case: load the target delta table with delta-rs, feed it into duckdb, then use plain sql to query it. Prototyping with this proceeded and it worked quite well. For smaller to medium sized tables this was pretty performant when the delta tables were accessible locally on the filesystem or via and Azure ABFSS URI and supporting driver library.

However, when using this method over large delta tables it was super slow but eventually worked… and would not be acceptable for real-time API requests. The size of the delta tables was simply too much for an in-process query engine to both load and process in a timely manner.

HANDLING LARGE DELTA TABLES

For the large tables the only practical solution was to field the queries to be executed over the wire to Spark. Given the data being targeted in the lakes was already in the “gold” zone, these tables were already cataloged and queryable via Spark sql, so it would be the natural choice. Some initial testing was done with this, it worked, but ultimately would force the access pattern of the API for these types of target tables to be asynchronous.

MEH, NOT GREAT

While both of these options could work, we really didn’t like having to work directly with two different underlying chains of libraries depending on what table in any particular data lake was being targeted. We’d much prefer to simply be able to create a query, send it off to a single library, give it some kind of “execution preference/hint” and then let that library take care of the rest. At the end of the day, we could write that abstraction layer ourselves however we’d like to avoid that. That is when we came across the Fugue Project.

FUGUE PROJECT

As stated by the Fugue project themselves:

Fugue is a unified interface for distributed computing that lets users execute Python, Pandas, and SQL code on Spark, Dask, and Ray with minimal rewrites.”

What is omitted in that statement, is the capability of being executable in process via DuckDB. The two key abilities of Fugue are an API of functions that can take your function and execute over all these distributed backends as well as a SQL interface (supporting ANSI) with extensions to the do the same using a SQL “workflow” syntax; i.e. load dataframe from any INPUT, execute some SQL against dataframe, then do something with the OUTPUT.

So now we could build something more like the following:

We did a little bit of prototyping with this model and it worked pretty well. We could focus our efforts on producing valid Fugue SQL workflow statements along with some small functions to permit Fugue to get at the raw delta tables and let Fugue do the rest. Based on which data lake entity/table was being targeted, we could simply switch which Fugue engine type utilize (duckdb or spark) and Fugue would handle the rest. By doing this were then able to focus more on all the logic sitting above such as the API itself, security and creating the queries themselves.

CHANGE OF PLANS

First off we made a decision that all executed queries, regardless of local execution or remote via spark would be treated as asynchronous. The caller should make a request and get an immediate result containing a result_id that could be used to get the result at a later time when the query was finished (or check the status). We had to do this given any query could be executed over Spark OR local IO and it was good design to hide that from the callers and keep the API contract consistent whether the target was a small or large table. Secondly, we realized that now we had a bit more power available to us and we wanted to expose that query capability all the way to the callers. We did not want to re-invent the wheel of creating yet another abstraction in an API to allow users to express things like what to get, what columns to return, how to sort it, paging etc. Instead we would let them pass raw ANSI SQL queries that targeted only the table/entity they were requesting. (yes I can already hear readers screaming “sql injection! OWASP! sql injection!”…. and yes we addressed that, read on on).

Hence the requirements changed a bit for. the API:

  • Get a schema for any target entity in the data lake
  • Query data for any single entity (delta table) in the lake and return the a result_id that could be used to check the processing status as well as to fetch the completed results. Callers can provide any valid ANSI SQL query that only targets the named table/entity and nothing else, no joins across data lake entities.
  • Get the status for any given result_id
  • Retrieve the results for any completed result_id, specifying an offset and limit. Query results would be persisted and retained until expired.
  • API clients need to be restricted to only parts of a given lake they have access to, however row level security was not necessary.

ENGINE IMPLEMENTATION

So how did the implementation evolve?

The core goal of the system was to ultimately craft a generated Fugue SQL workflow statement that consisted of these 3 key parts:

  • optionally create a data frame from an Fugue SQL INPUT
  • execute a SQL query against the data frame
  • optionally write the result data frame to an Fugue SQL OUTPUT

Fugue SQL INPUT/OUTPUT statements are fugue SQL extensions that let you load resources into and out of dataframes in the local process. You can load natively from the local filesystem via path references or provide custom function callouts to provide the data as PyArrow tables. We took the later approach by creating custom python functions that could load the data from local filesystem locations or ABFSS URIs to Delta Tables in Azure storage accounts using the methods previously described above (w/ DuckDB as the engine). This was then further abstracted away as IOStatementBuilders (with a number of implementations) in our project which could be configured per data lake entity/table to permit mixing/matching of input sources and output targets. Which Fugue engine to use per table was also configurable (Spark or DuckDB). For example you could configure any particular Data Lake/Table to read a table from local io, execute the query and then output it to some other storage account via abfss uris (duckdb engine), or execute the query against a hive table and write to DBFS (bound to a storage account for results under the covers); spark engine.

Note that the INPUT statements could be optional. Why? Because we created the capability to configure the engine of execution to be different per data lake target. If the execution was targeted for Spark, we had no need for the INPUT statement as the input dataframe would be inherent in the SQL query as it would reference a Hive cataloged schema.tablename. Likewise OUTPUT might optional as the query being executed might be a query targeting the retrieval of a previously stored set of query results stored by result_id, hence we had no need to re-store the retrieval query’s results, but instead just return them directly to the caller.

So how did the implementation evolve? We broke it down into the following areas:

  • API contract definition
  • authorization and security
  • SQL query parsing, whitelisting enforcement and rewriting
  • Fugue IO statement builders for different INPUT/OUTPUTs and SQL query enrichment, for example we would inject a row_number on all query result dataframes before persistence to permit for paging of the results on retrieval.
  • Database for query results metadata and auditing
  • Data lake configuration to drive behavior changes per lake/table as well as permit mixing/matching INPUTs/OUTPUTs for different lake table sources and where their query results would be persisted.
  • Query result expiration and purging engine
  • Schema service implementation
  • Whitelisting engine for restricted columns per lake/table driven by data lake config
  • Data Lake replication processes for mirroring target tables to local or more proximal network storage for faster access.

CLIENT API FLOW

The API specification was pretty straightforward and naturally could grow as a data lake evolved and new paths were added over time as the structure of the data lake was reflected in the API paths themself. Any part of an exposed data lake path could be aliased with pointers (in configuration) to other locations to further limit what could be queried.

  • Client acquires a JWT that identifies it
  • Client POSTs a SQL query for the entity they wish to egress data for at /<data-lake-id>/<data-lake-project-id>/[path/to]/<entity>. The query executes asynchronously and returns an result that can be tracked by its result_id
  • Client inquires the result status via GET against /<data-lake-id>/<data-lake-project-id>/[path/to]/<entity>/<result-id>/status (SUBMITTED,PROCESSING,FAILED,COMPLETED)
  • Once COMPLETED the client can retrieve their results by issuing one or more GETs for /<data-lake-id>/<data-lake-project-id>/[path/to]/<entity>/<result-id>/results?offset=[n]&limit=[n]
  • Query results are no longer available after their expiration time and are purged.

GET SCHEMA

POST QUERY

GET STATUS

GET RESULTS

SECURITY

There was a fair amount of configuration behind the scenes that enforced which clients could access which data lakes, data lake projects, folders within the hierarchies as well as end tables. Security was enforced both within the service engine as well as at bindings at the driver levels utilizing different underlying identities when accessing different data lakes or locations within them as well as specific spark clusters for different security contexts.

Regarding the potential for SQL injection this was locked down in various ways. Much of this was inherent negated via the underlying identities being utilized for lower level operations but another large set of it was restricted via the usage of tools like SQLGlot. Tools like SQLGlot could parse all SQL statements into traversable graphs that could then be analyzed for any exceptions against a set rules that defined which kinds of SQL constructs were whitelisted. For example, reject INSERT and permit SELECT. This extended into analysis of targeted table names even via aliasing to ensure no cross joins were permissible. SQLGlot made this quite easy and all statements ran through multiple passes before submission to any execution engine.

SUMMARY

In summary this project opened up a world of possibilities and there is more work going into the platform as more use-cases emerge. The Fugue Project is definitely a pretty cool piece of work that is worth a look if you do any work in this space. The number of possible use cases this could be thrown at to help solve is quite large. Kudos to the Fugue project team members and the community as well; they are quite helpful and responsive in their slack channels.

https://github.com/fugue-project/fugue

https://fugue-tutorials.readthedocs.io/tutorials/quick_look/ten_minutes.html

originally posted at: https://bitsofinfo.wordpress.com/2023/08/14/data-lake-rest-api-delta-lake-fugue-spark/

--

--