Skip to main content

Workflow

Create, run, and manage Databricks Jobs | Databricks on AWS

Type - Notebook
Source - Github
Git - https://github.com/cakedefi/cake-data-eng
Git reference - main
Path - silver_layer/product_revenue_bifurcation
Cluster - PROD Data Team Cluster
-> Save
Add Schedules & Triggers
Add Notification

Question Detail

Orchestrate Databricks jobs with Apache Airflow | Databricks on AWS

Trigger jobs when new files arrive | Databricks on AWS

Run a Databricks notebook from another notebook | Databricks on AWS

dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})

dbutils.notebook.run(i['nb_path'], i['timeout'], i['args']).split(';')

Share code between Databricks notebooks | Databricks on AWS

Share information between tasks in a Databricks job | Databricks on AWS

dbutils.jobs.taskValues.set(key = 'name', value = 'Some User')
dbutils.jobs.taskValues.set(key = "age", value = 30)

dbutils.jobs.taskValues.get(taskKey = "Get_user_data", key = "age", default = 42, debugValue = 0)
dbutils.jobs.taskValues.get(taskKey = "Get_user_data", key = "name", default = "Jane Doe")

Others

DBX

Databricks CLI eXtensions - aka dbx is a CLI tool for development and advanced Databricks workflows management.

GitHub - databrickslabs/dbx: 🧱 Databricks CLI eXtensions - aka dbx is a CLI tool for development and advanced Databricks workflows management.

dbx | github

pip install dbx

dbx init -p "cicd_tool=GitHub Actions" -p "cloud=AWS" -p "project_name=charming-aurora" -p "profile=DEFAULT" --no-input

cd charming-aurora

pip install -e ".[local,test]"

pytest tests/unit --cov

pyjaws

PyJaws enables declaring Databricks Jobs and Workflows as Python code, allowing for:

  • Code Linting
  • Formatting
  • Parameter Validation
  • Modularity and reusability
pip install pyjaws

export DATABRICKS_HOST="https://dbc-1ae8e7ed-1d80.cloud.databricks.com"
export DATABRICKS_TOKEN="<token>"
echo $DATABRICKS_HOST
echo $DATABRICKS_TOKEN

pyjaws create <folder_name>
pyjaws create .

display(simple_workflow)
from pyjaws.api.base import (
Cluster,
Runtime,
Workflow
)
from pyjaws.api.tasks import PythonWheelTask, NotebookTask

cluster = Cluster(
job_cluster_key = "ai_cluster",
spark_version = Runtime.DBR_13_ML,
num_workers = 2,
node_type_id = "r3.xlarge",
cluster_log_conf = {
"dbfs": {
"destination": "dbfs:/home/cluster_log"
}
}
)

# Create a Task object.
ingest_task = PythonWheelTask(
key = "ingest",
cluster = cluster,
entrypoint = "iot",
task_name = "ingest",
parameters = [
f"my_parameter_value",
"--output-table", "my_table"
],
package_name = 'abc'
)

transform_task = PythonWheelTask(
key = "transform",
cluster = cluster,
entrypoint = "iot",
task_name = "ingest",
dependencies = [ingest_task],
parameters = [
f"my_parameter_value2",
"--input-table", "my_table"
"--output-table", "output_table"
],
package_name = 'abc'
)

next_task = NotebookTask(
key = "next",
cluster = cluster,
entrypoint = "iot",
task_name = "next",
dependencies = [ingest_task],
source = "GIT",
notebook_path = "/test_repo_pipeline",
parameters = [
f"my_parameter_value2",
"--input-table", "my_table"
"--output-table", "output_table"
],
package_name = 'abc'
)


# Create a Workflow object to define dependencies
# between previously defined tasks.

workflow = Workflow(
name = "my_workflow",
tasks = [ingest_task, transform_task, next_task]
  • Make sure to add the package_name, otherwise getting pydantic.error_wrappers.ValidationError
  • Change node_type_id since example node_type_id not present
  • Add NotebookTask for running notebooks
  • Add source to GIT

GitHub - rafaelpierre/pyjaws: PyJaws: A Pythonic Way to Define Databricks Jobs and Workflows

pyjaws/pyjaws/pyjaws/api/tasks.py at main · rafaelpierre/pyjaws · GitHub