Data engineer framework for Azure Databricks
Dette framework er perfekt til hurtigt at komme i gang med Azure Databricks.
Det kræver ingen installation, blot at du uploader koden til et Azure DevOps repository og importere repositoriet i Azure Databricks.
Det kræver ej heller nogen installation af eksterne libraries for de clusters som benytter frameworket.
Frameworket understøtter Unity Catalog og Delta Tables.
Connectionstring, keys, passwords etc. vedligeholdes i Databricks secrets
Frameworket indeholder templates for:
Opdatering af dimensioner og fact tabeller (append eller overwrite).
Import af data fra Azure Data Lake (gen2).
Import af data fra Azure SQL server.
Import af data fra Azure Active Directory.
Export af data til Azure Data Lake (gen2).
Export af data til Azure SQL server.
Logning af alle notebooks som afvikles.
Mail afsendelse hvis en notebook fejler.
Meta data opsamling for kataloger, databaser, tabeller og kolloner.
Dagligt/Ugenligt maintenance job for at sikre optimal performance.
Frameworket sikre en kæmpe besparelse i udviklingstiden og derudover sikre den en ensartethed i koden på tværs af udvikler.
Frameworket er skrevet i python og kan nemt tilrettes og udvides så det passer til alle behov.
I de nedenstående eksempler, kan du se hvor simpelt det bliver at arbejde med Databricks ved brug af mit framework.
Eksempel: Import fra Azure Data Lake til en Delta Table
from framework.extract.azureDataLakeGen2CsvToDelta import azureDataLakeGen2CsvToDelta
azureDataLakeGen2CsvToDelta(
sourceAuthenticationScopeName='datalake_demo'
, sourceContainer='databricks'
, sourceFileNameAndPath='/extract/customer.csv'
, sourceHeader=True
, sourceQuotation=''
, targetTable='20_raw.demo.customer'
, targetOverwrite=True
, failOnError=True
)
Eksempel: Import fra Azure Data SQL Server til en Delta Table
from framework.extract.azureSqlDbQueryToDelta import azureSqlDbQueryToDelta
sourceQuery="""(
SELECT(
*
FROM
[dbo].[customer]
"""
azureSqlDbQueryToDelta(
sourceAuthenticationScopeName='sqlServer_demo'
, sourceDatabaseName = "sqldb-dwh-demo"
, sourceQuery=sourceQuery
, targetTable='20_raw.demo.customer'
, targetOverwrite=True
, failOnError=True
)
Eksempel: Udfør en spark SQL Statement
from framework.dataTransformation.executeSqlStatement import executeSqlStatement
sqlStatement="""(
SELECT(
*
FROM
20_raw.demo.customer
"""
executeSqlStatement(
sqlStatement=sqlStatement
, failOnError=True
)
Eksempel: Send en email
from framework.utility.sendMail import sendMail
sendMail(
mailMessageHeader="my header"
, mailMessageBody="my body"
, mailToAddress="somebody@mail.com"
)
Eksempel: Import af users fra Azure Active Directory
from framework.extract.azureActiveDirectoryUsersToDelta import azureActiveDirectoryUsersToDelta
azureActiveDirectoryUsersToDelta(
targetTable='20_raw.azureActiveDirectory.user'
, failOnError=True
)
Eksempel: Maintence på alle Delta Tabeller
from framework.utility.maintenanceOnDeltaTables import maintenanceOnDeltaTables
catalogList = [
"20_raw"
, "50_prepared"
, "70_delivery"
, "99_utility"
]
maintenanceOnDeltaTables(
catalogList
)
# Denne kode vil udføre en "vacuum" kommando på alle tabeller i de angivet kataloger!