Documentation for Spark Notebook Examples

Estimating Pi Using PySpark
This program uses the Monte Carlo method to estimate the value of Pi. It demonstrates how to use PySpark to parallelize computations.
x
from pyspark.sql import SparkSession
import random
# Initialize Spark session
spark = SparkSession.builder \
.appName("Estimate Pi") \
.getOrCreate()
# Function to check if a point is inside the unit circle
def inside_circle(_):
x = random.uniform(0, 1)
y = random.uniform(0, 1)
return x * x + y * y < 1
# Function to estimate Pi
def estimate_pi(num_samples):
count_inside = spark.sparkContext.parallelize(range(num_samples)) \
.filter(inside_circle) \
.count()
pi_estimate = 4 * count_inside / num_samples
return pi_estimate
# Run estimation
num_samples = 100000
pi_value = estimate_pi(num_samples)
print(f"Estimated value of Pi: {pi_value}")
# Stop the Spark session
spark.stop()

Creating and Displaying a DataFrame
This example showcases creating a Spark DataFrame using a list of Row
objects and displaying it.
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
# Initialize Spark session
spark = SparkSession.builder \
.appName("MySparkApp") \
.getOrCreate()
# Create a DataFrame
df = spark.createDataFrame([
Row(a=1, b=2.0, c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3.0, c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5.0, c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df.show()

Complex Operations with PySpark
This example demonstrates joining DataFrames, applying User-Defined Functions (UDFs), and executing SQL queries.
Data Preparation and Joining:
from pyspark.sql import SparkSession, Row
spark = SparkSession.builder \
.appName("Complex Spark Example") \
.master("yarn") \
.config("spark.executor.memory", "2g") \
.config("spark.driver.memory", "1g") \
.getOrCreate()
# Sample data
employee_data = [
Row(emp_id=1, name='John Doe', age=30, dept_id=1, salary=60000),
Row(emp_id=2, name='Jane Smith', age=25, dept_id=2, salary=70000),
Row(emp_id=3, name='Sam Brown', age=45, dept_id=1, salary=80000),
]
department_data = [
Row(dept_id=1, dept_name='HR'),
Row(dept_id=2, dept_name='Engineering'),
]
employee_df = spark.createDataFrame(employee_data)
department_df = spark.createDataFrame(department_data)
joined_df = employee_df.join(department_df, on='dept_id', how='inner')
joined_df.show()
Using UDFs:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType
def salary_category(salary):
if salary < 70000:
return 'Low'
elif 70000 <= salary < 100000:
return 'Medium'
else:
return 'High'
salary_category_udf = spark.udf.register("salary_category", salary_category, StringType())
categorized_df = joined_df.withColumn("salary_category", salary_category_udf(col("salary")))
categorized_df.select("name", "dept_name", "salary", "salary_category").show()
SQL Queries:
categorized_df.createOrReplaceTempView("employee_view")
avg_salary_query = spark.sql("""
SELECT dept_name, AVG(salary) AS avg_salary
FROM employee_view
GROUP BY dept_name
""")
avg_salary_query.show()

Performing Word Count
A simple word count example demonstrating Spark RDD transformations.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Word Count Example") \
.master("local[*]") \
.getOrCreate()
# Sample data
data = ["hello world", "hello spark", "spark is awesome", "hello world again"]
# Word count using RDD
rdd = spark.sparkContext.parallelize(data)
word_counts = rdd.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
# Display results
result = word_counts.collect()
for word, count in result:
print(f"{word}: {count}")
spark.stop()

Using ODP’s Spark version
import os
import findspark
# Set environment variables for Spark
os.environ['SPARK_HOME'] = '/usr/odp/3.2.3.2-3/spark3'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3'
# Initialize findspark
findspark.init('/usr/odp/3.2.3.2-3/spark3')
from pyspark import SparkContext
# Initialize SparkContext
sc = SparkContext.getOrCreate()
import random
# Define the number of samples
num_samples = 100000
# Function to check if a point is inside the unit circle
def inside_unit_circle(_):
return (random.random() ** 2 + random.random() ** 2) < 1
# Perform the estimation by running a parallel computation
rdd = sc.parallelize(range(num_samples))
count = rdd.map(inside_unit_circle).filter(lambda x: x).count()
# Estimate the value of Pi
pi_estimate = 4.0 * count / num_samples
print("Estimated value of Pi:", pi_estimate)
# Stop SparkContext after use (optional in a notebook environment)
# sc.stop()
Submit the Job to a Cluster (Optional)
If you’re using a cluster manager like YARN with JupyterHub, you need to adjust the configurations accordingly:
from pyspark import SparkConf, SparkContext
conf = SparkConf() \
.setAppName("Pi Estimation") \
.setMaster("yarn") \
.set("spark.executor.memory", "4g") \
.set("spark.executor.cores", "2")
sc = SparkContext(conf=conf)

Show Hive Tables in Spark
import os
import findspark
# Set environment variables for Spark
os.environ['SPARK_HOME'] = '/usr/odp/3.2.3.2-3/spark3'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3'
# Initialize findspark
findspark.init('/usr/odp/3.2.3.2-3/spark3')
from pyspark.sql import SparkSession
# Initialize SparkSession
spark = SparkSession.builder \
.appName("ShowTablesExample") \
.enableHiveSupport() \ # Optional if you use Hive
.getOrCreate()
# Optional: Switch to a specific database
# Uncomment and replace 'my_database' with your database name
# spark.sql("USE my_database")
# Execute the SHOW TABLES command
tables_df = spark.sql("SHOW TABLES")
# Display the tables
tables_df.show()
# Stop SparkSession after use (optional in a notebook environment)
# spark.stop()
How It Works:
- Environment Setup: Configures the necessary Spark environment variables.
- SparkSession: Initializes a
SparkSession
with Hive support (if needed). - Switch Database (Optional): You can switch to a specific database if you want to list tables from it.
- Show Tables: Executes the SQL command
SHOW TABLES
, which returns a DataFrame of tables. - Display Output: Displays the list of tables in the output.
Expected Output:


- Dependencies: Ensure Spark is correctly set up and configured for
local
oryarn
mode depending on the example. - UDFs: Register user-defined functions (UDFs) as required for custom transformations.
- SQL Queries: Use
createOrReplaceTempView
to run SQL queries on DataFrames. - Data Source: Replace hardcoded data with external sources like HDFS, databases, or files for real-world applications.
- Resource Configurations: Tune
spark.executor.memory
andspark.driver.memory
based on the cluster size and workload requirements.
Was this page helpful?