A closer look to DataBricks
A closer look to DataBricks
June 10, 2024
ℹ️
This Post is WIP
The Data Catalogue 📌
FAQ
Spark Recap
### CREATES A SPARK DATAFRAME
df=spark.createDataFrame(
data = [ ("1","2019-06-24 12:01:19.000")],
schema=["id","input_timestamp"])
df.show(5, truncate=False)
#.withColumn("ts", f.expr("to_timestamp(CAST(ts / 1000 AS INT))"))
#join and filter the result
qoe.join(nmd,
qoe.id == nmd.mac,
'left')\
.select('id','mac')\
.filter(col("mac").isNull())
from pyspark.sql.functions import count
cm_stats_renamed.filter(cm_stats_renamed.ModelName == "CH7465LG")\
.groupBy("ModelName", "MTA_LineStatus")\
.agg(count("SerialNumber").alias("SerialNumberCount"))\
.orderBy("SerialNumberCount", ascending=False)\
.limit(30).toPandas().style.hide_index()
#.show(30, truncate=False)
nr.filter(nr.locationId == "something")\
.withColumn("year_month_day", date_format(nr["createdAt"], "yyyy-MM-dd"))\
.groupBy("year_month_day")\
.agg(count("nodeId").alias("Node_Distctinct_Counts"))\
.orderBy("Node_Distctinct_Counts", ascending=False)\
.limit(30).toPandas().style.hide_index()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, split
def process_cable_modem_data(country_code):
# Initialize Spark session
spark = SparkSession.builder.appName("CableModemDataProcessing").getOrCreate()
# Load the DataFrame based on the country code
path = f"hdfs://172.24.80.93:9820/delta/refined_tables/{country_code}/dimensions/dim_cable_modem/"
dim_cable_modem_df = spark.read.format("delta").load(path)
# Add the node_id_prefix column
dim_cable_modem_df = dim_cable_modem_df.withColumn('node_id_prefix', split(dim_cable_modem_df['node_id'], '\.')[0])
# Compare node_id_prefix and site_Id, and create a new column 'is_same'
dim_cable_modem_df = dim_cable_modem_df.withColumn('is_same', when(col('node_id_prefix') == col('site_Id'), True).otherwise(False))
# Filter the DataFrame to keep only the rows where 'is_same' is False
filtered_df = dim_cable_modem_df.filter(col('is_same') == False)
# Show the result
filtered_df.select('node_id', 'node_id_prefix', 'site_Id', 'is_same').distinct().show(5, truncate=False)
# Example usage
process_cable_modem_data("CH")