## Solutions Guide

In [None]:
import os
import pandas as pd

# PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Create or get Spark session
spark = SparkSession.builder \
 .appName("TitanicAssessmentExtended") \
 .getOrCreate()

print("Spark version:", spark.version)


Explanation:

 We import pandas, pyspark.sql modules, and create a Spark session named "TitanicAssessmentExtended".
 Checking spark.version helps confirm which version of Spark is running.

In [None]:
#Read in data 
titanic_csv_path = os.path.join("..", "data", "titanic.csv")

# 2.1 Read into a Pandas DataFrame
pd_df = pd.read_csv(titanic_csv_path)

print("pd_df shape:", pd_df.shape)
display(pd_df.head())


We use pd.read_csv(...) to read the Titanic data into a pd.DataFrame.
.shape gives the (rows, columns).
.head() shows the top few rows.

In [None]:
# 2.2 Read into a Spark DataFrame
spark_df = spark.read.csv(titanic_csv_path, header=True, inferSchema=True)

spark_df.printSchema()
print("spark_df count:", spark_df.count())
spark_df.show(5)


We specify header=True so Spark knows the first row is column headers, and inferSchema=True so it automatically detects column types.
.printSchema() reveals the inferred schema.
.count() and .show() let us see row counts and sample rows.

In [None]:
#Split data into subsets

pd_part1 = pd_df[["PassengerId", "Name", "Sex", "Age"]]
pd_part2 = pd_df[["PassengerId", "Fare", "Survived", "Pclass"]]

display(pd_part1.head())
display(pd_part2.head())


In [None]:
spark_part1 = spark_df.select("PassengerId", "Name", "Sex", "Age")
spark_part2 = spark_df.select("PassengerId", "Fare", "Survived", "Pclass")

spark_part1.show(5)
spark_part2.show(5)


In [None]:
#Merging/Joining split dataframes 

pd_merged = pd_part1.merge(pd_part2, on="PassengerId", how="inner")
print("pd_merged shape:", pd_merged.shape)
display(pd_merged.head())


on="PassengerId" merges the two tables by the PassengerId key.
how="inner" ensures rows only appear if they exist in both subsets (should be all matching in this case).

In [None]:
#Join in spark

spark_merged = spark_part1.join(spark_part2, on="PassengerId", how="inner")
print("spark_merged count:", spark_merged.count())
spark_merged.show(5)
spark_merged.printSchema()


Spark uses .join(df2, on="PassengerId", how="inner").
spark_merged.show(5) and .printSchema() confirm the merge result.

In [None]:
#Data cleaning

pd_merged_clean = pd_merged.dropna(subset=["Age", "Fare"])
print("Before dropna:", pd_merged.shape)
print("After dropna:", pd_merged_clean.shape)
pd_merged_clean.head()

In [None]:
#Spark data cleaning
spark_merged_clean = spark_merged.dropna(subset=["Age", "Fare"])
print("spark_merged count BEFORE dropna:", spark_merged.count())
print("spark_merged_clean count AFTER dropna:", spark_merged_clean.count())
spark_merged_clean.show(5)


In [None]:
#Basic aggregations

pd_avg_fare = pd_merged_clean.groupby("Pclass")["Fare"].mean()
pd_avg_fare

In [None]:
#Spark survival rate by sex and pclass

spark_survival_rate = (
 spark_merged_clean
 .groupBy("Sex", "Pclass")
 .agg(F.avg("Survived").alias("survival_rate"))
)
spark_survival_rate.show()


In [None]:
#Write spark df to parquet

spark_merged_clean.write.mode("overwrite").parquet("../titanic_merged_clean.parquet")

In [None]:
#Read parquet back in

spark_parquet_df = spark.read.parquet("../titanic_merged_clean.parquet")
print("spark_parquet_df count:", spark_parquet_df.count())
spark_parquet_df.show(5)


In [None]:
#Bonus - create a temp view/query

spark_merged_clean.createOrReplaceTempView("titanic_merged")

result_df = spark.sql(
 """
 SELECT Pclass,
 COUNT(*) AS passenger_count,
 AVG(Age) AS avg_age
 FROM titanic_merged
 GROUP BY Pclass
 ORDER BY Pclass
 """)
result_df.show()

#Correlation between Fare and Survival
# Compute the Pearson correlation between Fare and Survived

correlation1 = spark_merged_clean.stat.corr("Fare", "Survived", "pearson")

print("Pearson correlation between Fare and Survived:", correlation1)

correlation2 = spark.sql(
 '''
 SELECT
 covar_samp(Fare, Survived) / (stddev_samp(Fare)*stddev_samp(Survived)) as correlation
 FROM titanic_merged_clean
 '''
)


In [None]:
from sentence_transformers import SentenceTransformer
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

# Load the pre-trained MiniLM sentence transformer model
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

# Define a UDF to compute the embeddings
def compute_embedding(text):
 return model.encode(text).tolist()

# Register the UDF in Spark
embedding_udf = udf(compute_embedding, ArrayType(FloatType()))

# Apply the UDF to compute embeddings for each document
df_with_embeddings = spark_merged_clean.withColumn('mini-lm-vectors', embedding_udf(spark_merged_clean['Name']))

# Show the result
df_with_embeddings.head()

