PySpark (Python Spark )

Introduction

PySpark is the Python API for Apache Spark. It enables you to perform real-time, large-scale data processing in a distributed environment using Python. It also provides a PySpark shell for interactively analyzing your data.

--(Apache Spark. 2022)

PySpark is a commonly used tool in which Python applications can be run with Spark. Spark will act as the Python compiler and execute the steps using the most efficient process. In the sample Python application that you will create, the goal is to utilize Spark when applications become larger and require more resources.

Creating and Manipulating Dataframes

Spark's primary abstraction is a distributed item of collections called datasets. These datasets can be created through Hadoop input files or other transformed datasets. Since Python is a dynamically typed programming language, the data does not need to be strongly typed. You call each dataset a dataframe in Python to be consistent with [pandas][-pd]. To create and manipulate dataframes using PySpark, follow these steps:

Load Data

Download and move data files into a location where PySpark's shell can reach.

Enter PySpark Shell

Enter the PySpark shell through either a container or a local or remote installtion.

Generate Dataframe from Data

Generate the dataframe of the sample file by running the below spark command.

textFile = spark.read.text('data.txt')

Access DataFrame and Transform it

You can access the dataframe values directly and transform the dataframe into a new one if desired. First, determine the number of rows in the dataframe with the following Spark command.

textFile.count()

Peek Subsets of Dataframe

You can capture the first row of data with the following Spark Command.

textFile.first()

Capture Subset of Dataframe

Next, run the following command to transform this dataframe and create a new one with a subset of the previous one.

linesWithSpark = textFile.filter(textFile.value.contains('Spark'))

Chain Commands Together

Spark offers the option to chain together commands.

textFile.filter(textFile.value.contains('spark')).count()

Use SQL

Because Spark has a [SQL engine][-sql], it's possible to import the SQL client to PySpark and start performing queries.

from pyspark.sql.functions import *

spark.sql('SELECT * FROM data')

Complex Queries

After importing the SQL functions, run the following PySpark command to find the row with the most words and return the number of words in that row:

(textFile
.select(size(split(textFile.value,"\s+"))
.name("numWords"))
.agg(max(col("numWords")))
.collect()
)

Caching

Caching is useful when a large dataframe is going to be accessed frequently. These dataframes are considered hot, so it's ideal to keep them accessible in-memory to improve speed. In order to mark a dataframe as cache, use the following command.

linesWithSpark.cache()

References

Web Links

Note Links