4. Data Ingestion
What is data ingestion?
Data ingestion is the process of importing large, assorted data files from multiple sources into a single, cloud based storage medium - a data warehouse, data mart or database – where it can be accessed and analyzed. A simple data ingestion pipeline consumes data from a point of origin, cleans it up a bit, then writes it to a destination.
The sync.sh
file
In the Initial setup chapter, we showed how to install a databricks extension in VS Code. The purpose of the databricks extension is to sync your work in VS Code with the Databricks File System (DBFS).
One other way to sync your VS Code expectations work with the DBFS is to run the sync.sh
file 1. This file is found in the path /~NIP-Lakehouse-Data/dab/sync.sh
. Running this file will sync your expectations in the various files to the .ide
folder in your Databricks workspace account. It is preferable to run this file after all changes have been merged in Github and pulled to your local repository. This is because syncing will by default update to DBFS from your most recent branch.
Run it using this code: source sync.sh
.
If all is running well, it will display the progress of the syncing process.
You can get the name of the remote Databricks workspace folder that your databricks extensions is syncing to from the Sync Destination of your databricks configuration menu in VS Code.
The expectations that have been synced from VS Code will appear in a similar path to the one in your local repository, that is, they will appear under the ~development/gx_development
folder. For example, here is the path to the expectations files in DBFS – /Workspace/Users/<your-email>/.ide/,sync-destination-name>/development/gx_development
.
Initialize a cluster
In your Azure Databricks workspace, there is a Compute tab. This tab is where you manage your databricks computation resources. You need to create a cluster2 to perform analytical operations in databricks.
In a simple way, the cluster executes all your databricks workflows. Therefore to run your data ingestion process in Databricks, you need to start your cluster.
Creating the checks
Before performing the data ingestion process, we need to make DBFS understand the checks that we designed. In DBFS, unless otherwise instructed, there is a file by the name of gx_dev_example_db_ui.py
. The full path is here: /Workspace/Users/<email>/.ide/<destination>/development/gx_dev_example_db_ui
. This file handles the creation of checks that we specify for each great expectations.py file.
Ensure you replace your user_name
value with your email, and the survey_abbr
with the feature layer you are working on.
Set is_dev
to False. If it is left as true, the ingested files will land into bronze with dev_
prefix.
Taking the example of the cell of cpp_soil_carbon
, we can create the checks for this particular feature layer using the below format:
from gx_development.survey123_cpp_soil_carbon_great_expectations import *
dev_gx_cpp_soil_carbon_sublayer_survey()
dev_gx_cpp_soil_carbon_sublayer_soil_carbon_metrics()
dev_gx_cpp_soil_carbon_subtable_bulk_density_metrics()
dev_gx_cpp_soil_carbon_subtable_fine_root_metrics()
This can be done either on DBFS or from VS Code and syncing again.
Run this cell, the checks should run in totality and free from error.
The Data ingestion pipeline
The data ingestion pipeline will organize data into three levels of bronze, silver and gold.
-
Bronze tables have raw data ingested from various sources (RDBMS data, JSON files, IoT data, etc.).
-
Silver tables will give a more refined view of our data. We can join fields from various bronze tables to improve streaming records or update account statuses based on recent activity.
-
Gold tables give business-level aggregates often used for dashboarding and reporting. This would include aggregations such as weekly sales per store, daily active website users, or gross revenue per quarter by the department.
The data ingestion process will process the data to the bronze stage.
Importing the Survey123 file
Data ingestion is executed in the file called prod_example_db_ui
found in this path: /Workspace/Users/<email>/.ide/<sync-destination>/development/prod_example_db_ui
.
In the cell with the import statement from nip_lakehouse.common import Task
replace the user_name
with your email address. For survey_abbr
and survey_id
fields insert the survey abbreviation and ID values respectively from the particular feature layer expectations file.
Below is an example for the cpp_additional
feature layer.
Survey123IngestionLandingTask
The two cells under this header will push the data to the container page under this path in your Azure Resources: nsiitechdevadlslanding>Containers>Survey123
.
If ingesting a new file, and you have inserted the abbreviation and ID for this file in the previous section, the data will appear as a new folder in this Blob storage container.
Once the cell with the code ingestion_task.launch()
runs successfully you may proceed to the following section.
Survey123LandingBronzeTask
The purpose of this section is to push data to the bronze stage.
Once the second cell of this section containing bronze_task.launch()
runs successfully, the ingested data appears in the Catalog.
To access the Catalog, click the Catalog tab to the left. In the Catalog Explorer interface that appears, under the ns-ii-tech-dev-catalog that appears, under the bronze folder, the sublayers and subtables of your feature layer should appear.
Here are other notes on what the LandingBronzeTask does as extracted from the file in path – ~NIP-Lakehouse-Data/dab/README.md
.
- we check the latest schema_edit_date from the log table.
- this pipeline is streaming data from folder_name with autoloader into bronze after passing the checks defined with great expectations. If the data is valid then it continue processing the data.
- unix time columns are converted to readable human format
- the data is merged with previously ingested data and the data in “bronze_storage_account/survey123/folder_name/recent” is updated and the data is appended to historical folder “bronze_storage_account/survey123/folder_name/history”