Getting started with PySpark

Getting started with PySpark

October 17, 2022

The PySpark Project

When talking about PySpark, we are talking about Big Data and Python 🐍.

WHY PySpark?

🐍 PySpark Overview:

  • PySpark is an interface for Apache Spark in Python.
  • It allows you to write Spark applications using Python and provides the PySpark shell to analyze data in a distributed environment.

📊 PySpark vs. pandas:

  • PySpark is a library for working with large datasets in a distributed computing environment.
  • pandas, on the other hand, is a library for working with smaller, tabular datasets on a single machine.

🔍 Choosing Between PySpark and pandas:

  • If the data is small enough to be processed by pandas, PySpark might not be necessary.
  • PySpark becomes useful when dealing with large datasets that don’t fit into memory on a single machine, enabling distributed computation.
  • However, if the computation is complex and could benefit from parallelization, PySpark could offer efficiency boosts.

PySpark - Useful Code Snippets

How to create a spark dataframe

Data = [("Jerez","Yosua","Dr.Yosu"),
    ("London","John","CEO"),
    ("Roberta","Storm","Manager")
  ]

schema = ["Region","Nick","Job"]
df = spark.createDataFrame(data=Data, schema = schema)

df.printSchema()
df.show(truncate=False)

How to use SQL in Spark - SparkSQL

If you are already confortable with SQL, a good starting point is to use that knowledge combined with the power of Spark.

You are lucky enought to write SQL queries with PySpark and they will get translated as well.

Lets view a few examples:

PySpark with Trino

You can use PySpark to query Trino (ex PrestoSQL) by configuring PySpark to connect to Trino as an external data source:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TrinoQueryExample") \
    .config("spark.jars", "/path/to/trino-jdbc-driver.jar") \
    .getOrCreate()

spark.sql("SELECT 'Hello, world - You are ready to use PySpark!'").show(truncate=False)

df.createOrReplaceTempView("trino_table")
result = spark.sql("SELECT * FROM trino_table WHERE column_name = 'some_value'")
result.show()

#spark.stop()

And with python like logic:

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:trino://trino-server:port") \
    .option("dbtable", "your_table_name") \
    .option("user", "your_username") \
    .option("password", "your_password") \
    .load()

# Use Python queries on the DataFrame
filtered_df = df.filter(df["column_name"] == "some_value")
result = filtered_df.select("column_name1", "column_name2")

result.show()

Cool Stuff to do with PySpark

Time Travel with Spark and Apache Iceberg

Time-traveling allows accessing data from a specific point in the past.

While the approach varies across formats, the core principle remains the same. Snapshots stored in logs enable time-traveling capabilities.

To achieve time-traveling, every state of data must be stored at a given time using snapshots. Parallel computing engines are then utilized to retrieve the desired snapshots from logs. Different technologies may use varying terminology for this mechanism.

Now that we have a brief idea about time-traveling, let’s check how to use it with a popular data lake formats: Apache Iceberg.

🌊 Apache Iceberg: A Universal Table Format

  • Iceberg is a data lake format that is not tied to any specific execution engine
  • It can be used with various streaming services
  • Originated at Netflix

📚 Key Terms in Iceberg - Apache Iceberg uses several key terms to structure data in its format:

  • Snapshot: Represents the state of a table at a specific time. It lists all data files that constitute the table’s contents at the snapshot’s time.
  • Manifest List: A metadata file that lists the manifests forming a table snapshot. It stores information about each manifest file’s contents to optimize metadata operations.
  • Manifest File: A metadata file that lists a subset of data files in a snapshot. It includes partition tuple, column-level stats, and summary information for efficient scan planning.

How to Apply Time Travel

  • With timestamp:
# time travel to October 26, 1986 at 01:21:00 
spark.read 
     .option("as-of-timestamp","499162860000") 
     .format("iceberg") .load("path/to/table") 
  • With Snapshot_ID:
// time travel to snapshot with ID 10963874102873L 
spark.read 
     .option("snapshot-id",10963874102873L) 
     .format("iceberg") .load("path/to/table") 

FAQ

Where to Learn more about Data Engineering?

You can browse s3 with a GUI thanks to: https://github.com/mickael-kerjean/filestash

PySpark FAQ

Why PySpark is called lazy? 📌

PySpark is considered “lazy” because it does not execute any code until it absolutely has to.

This means that when you call a transformation on a PySpark DataFrame or RDD, it does not actually compute the result until you call an action.

This allows Spark to optimize the execution plan by looking at all of the transformations that you have specified, and deciding the most efficient way to execute them.

It also allows Spark to delay execution until the result is actually needed, rather than executing every single transformation as soon as it is specified.

What to use, Spark or Pandas? What’s the difference? 📌

The choice between using Spark or Pandas depends on the type and size of data you are dealing with.

For small datasets, Pandas is usually the better option as it provides a more intuitive and user-friendly interface. However, for larger datasets, Spark provides much better performance and scalability.

Spark also offers a range of features, such as distributed processing, in-memory computing, streaming, and machine learning algorithms, that are not available with Pandas.

The main difference between the two is that Pandas is designed to work with tabular data, while Spark is designed to work with both structured and unstructured data.

What is data redistributable? 📌

Data redistribution is the process of transferring data from one system or location to another. This can be done between different databases, platforms, or locations.

The purpose of data redistribution is to improve performance, enhance scalability, or reduce cost.

Data redistribution is often used to move data between a production system and a test system or between different servers or clusters, to spread the load evenly or to make sure that the data is available in multiple locations in case of system failure or disaster recovery.

What is a partition? 📌

Partitions in Apache Spark are logical divisions of data stored on a node in the cluster.

They provide a way to split up large datasets into smaller, more manageable chunks that can be processed in parallel.

By default, Spark uses a Hash Partitioner, which uses a hash function to determine which partition a particular row of data should be assigned to.

Spark also supports Range Partitioning, which allows for data to be divided into partitions based on a range of values.

What is doing a GroupBy before a partition? 📌

Doing a group by before partitioning means that the data is grouped together before being divided into partitions.

This can be useful when performing certain calculations, as it allows for more efficient processing of the data. For example, if you have a table of data with multiple columns and you want to sum up values in one of the columns, you can group by that column before partitioning the data so that the sum for each group is only calculated once.

What is Data Skew?

Data skew is an uneven distribution of data in a dataset, which can be caused by a variety of factors including transformations such as joins, groupBy, and orderBy.

Data skew can have a variety of implications, such as increased lock contention, reduced database concurrency, and decreased performance.

Data skew can be measured using the skewness coefficient, which is a measure of the asymmetry of the data in a dataset.

Interesting Big Data Tools

What it is TRINO? ⏬

TRINO - an open-source distributed SQL query engine designed to handle large datasets distributed across one or more heterogeneous data sources.

  • It is known for its speed, scalability, and ability to query data from a variety of sources, including:
    • Data warehouses: connects to popular data warehouses like Amazon Redshift, Google BigQuery, and Snowflake.
    • Data lakes: It can also query data stored in data lakes, such as those in HDFS, Amazon S3, and Google Cloud Storage.
    • NoSQL databases: Databases like Cassandra and MongoDB.
    • Streaming data sources: It can even query streaming data sources like Kafka and Kinesis
What it is HADOOP? ⏬

Apache Hadoop is an open-source framework that facilitates storing and processing large datasets across clusters of computers. It provides a distributed computing platform for handling data processing tasks that are too large or complex for a single machine.

  • Hadoop itself is a collection of software utilities that work together. Some key components include:
    • YARN (Yet Another Resource Negotiator): Manages computing resources within the cluster, allocating them to running applications.
    • MapReduce: A programming model for processing large datasets in parallel across multiple nodes in the cluster.
    • Hadoop Distributed File System (HDFS): The primary storage system used by Hadoop applications to store large datasets reliably across multiple machines.
  • DBT - dbt™ is a SQL-first transformation workflow that lets teams quickly and collaboratively deploy analytics code following software engineering best practices like modularity, portability, CI/CD, and documentation.
More about DBT - Data Infrastructure ⏬

DBT (Data Build Tool) is a tool used in data analytics that provides a framework for transforming data in your warehouse more effectively. It allows users to write SQL scripts, manage them, and run them in different environments, making it a powerful tool for data transformation.

DBT highlight the importance in the data infrastructure landscape.

The tool is particularly appreciated for its ability to “unbundle” the data pipeline, meaning users can focus on writing SQL scripts without needing to learn complex tools.

Among its benefits, DBT is open-source and customizable, offers automatic version control, requires no specific skillset beyond SQL knowledge, has built-in testing capabilities, and is well-documented with a growing community.

However, it also has some drawbacks. The SQL-based logic can be complex and hard to read, it offers limited debugging functionality, and it only handles the transformation layer, requiring additional components for a complete data pipeline.

In conclusion, while DBT is not a replacement for traditional data pipeline tools, it is a complementary tool that can simplify the data transformation process and has become a popular choice in the data infrastructure landscape.

DBT enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.

The Object Store for AI Data Infrastructure

What it is ODH - Open Data Hub? ⏬

Open Data Hub (ODH) is an open-source project that provides an end-to-end AI/ML platform on top of OpenShift Container Platform which can be used to build, deploy, and manage intelligent applications. It is a blueprint for building an AI-as-a-Service (AIaaS) platform using OpenShift and Kubernetes, Ceph, Apache Kafka, Seldon, Argo, Prometheus, Grafana, and JupyterHub, among other technologies.

Open Data Hub is designed to be a meta-project, meaning it is a project of projects. It brings together several open-source projects, each of which has a specific role in the data and AI/ML ecosystem, and integrates them into a coherent AI/ML platform with easy-to-use interfaces.

The main components of Open Data Hub include:

  • Data storage and management tools, such as Ceph for distributed storage and Apache Kafka for data streaming.
  • Data exploration and analysis tools, such as JupyterHub for interactive data analysis and exploration.
  • Model training and serving tools, such as Seldon for serving machine learning models and Argo for managing model training workflows.
  • Monitoring tools, such as Prometheus for monitoring and Grafana for visualization.

Open Data Hub is designed to be flexible and customizable, allowing you to pick and choose the components that best fit your data and AI/ML needs.

The Metadata Platform for your Data Stack