A closer look to DataBricks
A closer look to DataBricks
June 10, 2024
Intro
You will hear a lot about databricks nowadays.
Its one datalakehouse platform, where you can run Spark to do big data processing.
⚠️
This Post is WIP
The Data Catalogue 📌
from tabulate import tabulate
# Data as a list of lists
data = [
["Alice", 24, "Engineer"],
["Bob", 30, "Data Scientist"],
["Charlie", 27, "Designer"]
]
# Headers for the columns
headers = ["Name", "Age", "Occupation"]
# Print a simple table
print(tabulate(data, headers=headers))
# Print a table with a different format (e.g., Markdown)
print("\n--- Markdown Table ---")
print(tabulate(data, headers=headers, tablefmt="pipe"))
The library also offers many options for customizing alignment, number formatting, and handling missing values, making it a flexible and powerful tool for data presentation.
FAQ
PySpark Recap
Interesting queries to get started with PySpark:
### CREATES A SPARK DATAFRAME
df=spark.createDataFrame(
data = [ ("1","2019-06-24 12:01:19.000")],
schema=["id","input_timestamp"])
df.show(5, truncate=False)
#.withColumn("ts", f.expr("to_timestamp(CAST(ts / 1000 AS INT))"))
#join and filter the result
qoe.join(nmd,
qoe.id == nmd.mac,
'left')\
.select('id','mac')\
.filter(col("mac").isNull())
from pyspark.sql.functions import count
cm_stats_renamed.filter(cm_stats_renamed.ModelName == "CH7465LG")\
.groupBy("ModelName", "MTA_LineStatus")\
.agg(count("SerialNumber").alias("SerialNumberCount"))\
.orderBy("SerialNumberCount", ascending=False)\
.limit(30).toPandas().style.hide_index()
#.show(30, truncate=False)
nr.filter(nr.locationId == "something")\
.withColumn("year_month_day", date_format(nr["createdAt"], "yyyy-MM-dd"))\
.groupBy("year_month_day")\
.agg(count("nodeId").alias("Node_Distctinct_Counts"))\
.orderBy("Node_Distctinct_Counts", ascending=False)\
.limit(30).toPandas().style.hide_index()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, split
def process_cable_modem_data(country_code):
# Initialize Spark session
spark = SparkSession.builder.appName("CableModemDataProcessing").getOrCreate()
# Load the DataFrame based on the country code
path = f"hdfs://123.45.67.89:9820/delta/refined_tables/{country_code}/dimensions/your_dimension_table/"
your_dimension_table_df = spark.read.format("delta").load(path)
# Add the node_id_prefix column
your_dimension_table_df = dim_cable_modem_df.withColumn('node_id_prefix', split(dim_cable_modem_df['node_id'], '\.')[0])
# Compare node_id_prefix and site_Id, and create a new column 'is_same'
your_dimension_table_df = dim_cable_modem_df.withColumn('is_same', when(col('node_id_prefix') == col('site_Id'), True).otherwise(False))
# Filter the DataFrame to keep only the rows where 'is_same' is False
filtered_df = dim_cable_modem_df.filter(col('is_same') == False)
# Show the result
filtered_df.select('node_id', 'node_id_prefix', 'site_Id', 'is_same').distinct().show(5, truncate=False)
# Example usage
process_cable_modem_data("CH")