How to Make Spark Lineage for Facts Lakes

by:

Softwares

When a facts pipeline breaks, details engineers need to instantly comprehend where by the rupture happened and what has been impacted. Data downtime is pricey.

Without the need of knowledge lineage—a map of how property are related and info moves throughout its lifecycle—data engineers may well as perfectly perform their incident triage and root lead to evaluation blindfolded.

Acquiring data lineage for SQL is a much various course of action than creating Spark lineage.

To retrieve facts making use of SQL, a consumer would publish and execute a question, which is then usually stored in a log. These SQL queries contain all the breadcrumbs important to trace which columns or fields from certain tables are feeding other tables downstream.  

For instance, we can glimpse at this SQL query which will display screen the results of at-bats for a baseball team’s players…

Graphic courtesy of Monte Carlo.

… and we can fully grasp the connections in between the over participant, bat, and bat_final result tables. 

You can see the downstream area-to-discipline relationships in the ensuing “at-bat outcome” table from the Select statements and the table-to-industry dependencies in the non-Pick statements.

Metadata from the facts warehouse/lake and from the BI resource of record can then be utilised to map the dependencies amongst the tables and dashboards.

Parsing all this manually to build finish-to-conclusion lineage is attainable, but it is cumbersome. 

It also gets to be outdated pretty much the moment it is mapped as your environment carries on to ingest much more data and you continue to layer on added remedies. 

The way we automate it is by working with a homegrown data collector to get our customers’ SQL logs from their details warehouse or lake, stream the details to various factors of our info pipelines. We leverage the open up supply ANTLR parser, which we seriously customized for numerous dialects of SQL, in a Java-primarily based lambda operate to comb through the query logs and make lineage facts. 

The again-finish architecture of our area-amount SQL lineage answer appears to be a little something like this: 

Picture courtesy of Monte Carlo.

Uncomplicated? No. Effortless as opposed to Spark lineage? Absolutely.

How We Solved Finish-to-Stop Spark Lineage

Apache Spark does not really do the job the exact way. Spark supports numerous distinct programming interfaces that can produce work opportunities this sort of as Scala, Python, or R.

Regardless of the programming interface that’s made use of, it gets interpreted and compiled into Spark commands. Behind the scenes, there is no these types of thing as a concise query, or a log of all those queries. 

Next are examples from Databricks notebooks in Python, Scala, and R that all do the very same detail: load a CSV file into a Spark DataFrame.

Python

%python
facts = spark.go through.format('csv')
.selection('header', 'true')
.alternative('inferSchema', 'true')
  .load('/info/enter.csv')

Scala

%scala
val details = spark.go through.format("csv")
.possibility("header", "true")
.alternative("inferSchema", "true")
  .load("/cfritz/enter.csv")

R

%r 
knowledge <- read.df("/data/input.csv",
source = "csv",
header="true",
                 inferSchema = "true")

After Spark interprets the programmatic code and compiles the commands, it creates an execution graph (a DAG or Directed Acyclic Graph) of all the sequential steps to read data from the source(s), perform a series of transformations, and write it to an output location. 

That makes the DAG the equivalent of a SQL execution plan. Integrating with it is the holy grail of Spark lineage because it contains all the information needed for how data moves through the data lake and how everything is connected.

Spark has an internal framework called QueryExecutionListeners which you can configure in Spark to listen for events where a command gets executed and then pass that command to the listener. 

For example, below is the source code for the listener implementation used by an open-sourced listening agent.

package za.co.absa.spline.harvester.listener
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.execution.QueryExecutionimport org.apache.spark.sql.util.QueryExecutionListenerimport za.co.absa.spline.harvester.SparkLineageInitializer

class SplineQueryExecutionListener extends QueryExecutionListener

private val maybeListener: Option[QueryExecutionListener] =
val sparkSession = SparkSession.getActiveSession
.orElse(SparkSession.getDefaultSession)
.getOrElse(throw new IllegalStateException(“Session is unexpectedly missing. Spline cannot be initialized.”))

new SparkLineageInitializer(sparkSession).createListener(isCodelessInit = true)

override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit =
maybeListener.foreach(_.onSuccess(funcName, qe, durationNs))

override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit =
maybeListener.foreach(_.onFailure(funcName, qe, exception))

Instead of building a series of listeners from scratch, we decided to take advantage of that open source technology, Spline. 

Smart developers had already invested several years into building a Spark agent to listen for these events, capture the lineage metadata, and transform it into graph format which we could then receive via a REST API (other options for receiving this data are available as well). 

How Monte Carlo collects information to construct our lineage graph. Spark lineage is the metadata detailing the journey data takes across its lifecycle including source, creator, transformations, and related data sets.

Once we have that representation of the execution plan, we send it to the Integration Gateway and then a normalizer which converts it into Monte Carlo’s internal representation of a lineage event. 

From there, it’s integrated with other sources of lineage and metadata to provide a single end-to-end view for each customer.

It’s a really elegant solution….here’s why it doesn’t work.

The Challenges of Spark Lineage

What makes Spark difficult from a lineage perspective, is what makes it great as a framework for processing large amounts of unstructured data. Namely, how extensible it is.

You can run Spark jobs across solutions like AWS Glue, EMR, and Databricks. In fact, there are multiple ways you can run Spark jobs in Databricks alone.

Configuring our Spark lineage solution—specifically how you add JAR files to Spark’s runtime classpath—would vary depending on how and where our customers ran their Spark jobs across these solutions and what combinations of Scala and Spark versions those solutions leveraged.

At Monte Carlo, we strongly emphasize ease-of-use and time-to-value. When you find yourself inserting a table like this into the draft documentation, it may be a sign to re-evaluate the solution.

The second challenge is that, like SQL statements, the vocabulary of Spark commands is ever expanding. But, since it is a newer framework than SQL, it’s growing at a slightly faster rate.

Every time a new command is introduced, code has to be written to extract the lineage metadata from that command. As a result, there were gaps in Spline’s parsing capabilities with commands that weren’t yet supported. 

Unfortunately, many of these gaps needed to be filled for our customers’ use cases. For example, a large biotech company needed coverage for when it utilized the Spark MERGE command which, just like the SQL statement and tables, combines two dataframes together by inserting what’s new from the incoming dataframe and updates any existing records it finds. 

For example, taking our simplistic baseball tables from before, this is how the new Spark MERGE command could be used to add new at-bats, update previously existing at-bats with corrected data, or maybe even delete at-bats that are so old we don’t care about them any more.

MERGE into bat
using bat_stage
on bat.player_id = bat_stage.player_id
and bat.opponent_id = bat_stage.opponent_id
and bat.date = bat_stage.date
and bat.at_bat_number = bat_stage.at_bat_number

when matched then
update set
bat.bat_outcome_id = bat_stage.bat_outcome_id
when not matched then
insert (
player_id,
opponent_id,
date,
at_bat_number,
bat_outcome_id
) values (
bat_stage.player_id,
bat_stage.opponent_id,
bat_stage.date,
bat_stage.at_bat_number,
bat_stage.bat_outcome_id
)

It’s a relatively new command and Spline doesn’t support it. Additionally, Databricks has developed their own implementation of the MERGE statement into which there is no public visibility.

These are big challenges sure, but they also have solutions. 

We could ensure there is more client hand holding for Spark lineage configuration. We could hire and deploy an army of Scala ninjas to contribute support for new commands back to the Spline agent. We could even get cheeky and reverse engineer how to derive lineage from Databricks’ MERGE command implementation.

A good engineer can build solutions for hard problems. A great engineer takes a step back and asks, “Is the juice worth the squeeze? Is there a better way this can be done?”

Oftentimes buying or integrating with off-the-shelf solutions is not only more time efficient, but it prevents your team from accruing technical debt. So we went in another direction.

Partnering with Databricks

During the beta-testing of our Spark lineage solution, we found the primary use case for virtually every customer was for lineage within Databricks.

We found a better solution was to integrate with Databricks data lineage via the Unity Catalog. Some of the features in Databrick’s lineage solution include:

  • Automated run-time lineage: Unity Catalog automatically captures lineage generated by operations executed in Databricks. This helps data teams save significant time compared to manually tagging the data to create a lineage graph.
  • Support for all workloads: Lineage is not limited to just SQL. It works across all workloads in any language supported by Databricks: Python, SQL, R, and Scala. This empowers all personas—data analysts, data scientists, ML experts—to augment their tools with data intelligence and context surrounding the data, resulting in better insights.
  • Lineage at column level granularity: The Unity Catalog captures data lineage for tables, views, and columns. This information is displayed in real-time, enabling data teams to have a granular view of how data flows both upstream and downstream from a particular table or column in the lakehouse with just a few clicks.
  • Lineage for notebooks, workflows, and dashboards: Unity Catalog can also capture lineage associated with non-data entities, such as notebooks, workflows, and dashboards. This helps with end-to-end visibility into how data is used in your organization. As a result, you can answer key questions like, “if I deprecate this column, who is impacted?”
  • Built-in security: Lineage graphs in Unity Catalog are privilege-aware and share the same permission model as Unity Catalog. If users do not have access to a table, they will not be able to explore the lineage associated with the table, adding an additional layer of security for privacy considerations.
  • Easily exportable via REST API: Lineage can be visualized in the Data Explorer in near real-time, and retrieved via REST API to support integrations with our catalog partners.

I hope our experience has helped shed some light both on the intricacies of automated and Spark lineage.

Leave a Reply

Your email address will not be published.