{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Solutions Guide" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import pandas as pd\n", "\n", "# PySpark imports\n", "from pyspark.sql import SparkSession\n", "from pyspark.sql import functions as F\n", "from pyspark.sql.types import *\n", "\n", "# Create or get Spark session\n", "spark = SparkSession.builder \\\n", " .appName(\"TitanicAssessmentExtended\") \\\n", " .getOrCreate()\n", "\n", "print(\"Spark version:\", spark.version)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Explanation:\n", "\n", " We import pandas, pyspark.sql modules, and create a Spark session named \"TitanicAssessmentExtended\".\n", " Checking spark.version helps confirm which version of Spark is running." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Read in data \n", "titanic_csv_path = os.path.join(\"..\", \"data\", \"titanic.csv\")\n", "\n", "# 2.1 Read into a Pandas DataFrame\n", "pd_df = pd.read_csv(titanic_csv_path)\n", "\n", "print(\"pd_df shape:\", pd_df.shape)\n", "display(pd_df.head())\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We use pd.read_csv(...) to read the Titanic data into a pd.DataFrame.\n", ".shape gives the (rows, columns).\n", ".head() shows the top few rows." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 2.2 Read into a Spark DataFrame\n", "spark_df = spark.read.csv(titanic_csv_path, header=True, inferSchema=True)\n", "\n", "spark_df.printSchema()\n", "print(\"spark_df count:\", spark_df.count())\n", "spark_df.show(5)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We specify header=True so Spark knows the first row is column headers, and inferSchema=True so it automatically detects column types.\n", ".printSchema() reveals the inferred schema.\n", ".count() and .show() let us see row counts and sample rows." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Split data into subsets\n", "\n", "pd_part1 = pd_df[[\"PassengerId\", \"Name\", \"Sex\", \"Age\"]]\n", "pd_part2 = pd_df[[\"PassengerId\", \"Fare\", \"Survived\", \"Pclass\"]]\n", "\n", "display(pd_part1.head())\n", "display(pd_part2.head())\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark_part1 = spark_df.select(\"PassengerId\", \"Name\", \"Sex\", \"Age\")\n", "spark_part2 = spark_df.select(\"PassengerId\", \"Fare\", \"Survived\", \"Pclass\")\n", "\n", "spark_part1.show(5)\n", "spark_part2.show(5)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Merging/Joining split dataframes \n", "\n", "pd_merged = pd_part1.merge(pd_part2, on=\"PassengerId\", how=\"inner\")\n", "print(\"pd_merged shape:\", pd_merged.shape)\n", "display(pd_merged.head())\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "on=\"PassengerId\" merges the two tables by the PassengerId key.\n", "how=\"inner\" ensures rows only appear if they exist in both subsets (should be all matching in this case)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Join in spark\n", "\n", "spark_merged = spark_part1.join(spark_part2, on=\"PassengerId\", how=\"inner\")\n", "print(\"spark_merged count:\", spark_merged.count())\n", "spark_merged.show(5)\n", "spark_merged.printSchema()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Spark uses .join(df2, on=\"PassengerId\", how=\"inner\").\n", "spark_merged.show(5) and .printSchema() confirm the merge result." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Data cleaning\n", "\n", "pd_merged_clean = pd_merged.dropna(subset=[\"Age\", \"Fare\"])\n", "print(\"Before dropna:\", pd_merged.shape)\n", "print(\"After dropna:\", pd_merged_clean.shape)\n", "pd_merged_clean.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Spark data cleaning\n", "spark_merged_clean = spark_merged.dropna(subset=[\"Age\", \"Fare\"])\n", "print(\"spark_merged count BEFORE dropna:\", spark_merged.count())\n", "print(\"spark_merged_clean count AFTER dropna:\", spark_merged_clean.count())\n", "spark_merged_clean.show(5)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Basic aggregations\n", "\n", "pd_avg_fare = pd_merged_clean.groupby(\"Pclass\")[\"Fare\"].mean()\n", "pd_avg_fare" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Spark survival rate by sex and pclass\n", "\n", "spark_survival_rate = (\n", " spark_merged_clean\n", " .groupBy(\"Sex\", \"Pclass\")\n", " .agg(F.avg(\"Survived\").alias(\"survival_rate\"))\n", ")\n", "spark_survival_rate.show()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Write spark df to parquet\n", "\n", "spark_merged_clean.write.mode(\"overwrite\").parquet(\"../titanic_merged_clean.parquet\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Read parquet back in\n", "\n", "spark_parquet_df = spark.read.parquet(\"../titanic_merged_clean.parquet\")\n", "print(\"spark_parquet_df count:\", spark_parquet_df.count())\n", "spark_parquet_df.show(5)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Bonus - create a temp view/query\n", "\n", "spark_merged_clean.createOrReplaceTempView(\"titanic_merged\")\n", "\n", "result_df = spark.sql(\n", " \"\"\"\n", " SELECT Pclass,\n", " COUNT(*) AS passenger_count,\n", " AVG(Age) AS avg_age\n", " FROM titanic_merged\n", " GROUP BY Pclass\n", " ORDER BY Pclass\n", " \"\"\")\n", "result_df.show()\n", "\n", "#Correlation between Fare and Survival\n", "# Compute the Pearson correlation between Fare and Survived\n", "\n", "correlation1 = spark_merged_clean.stat.corr(\"Fare\", \"Survived\", \"pearson\")\n", "\n", "print(\"Pearson correlation between Fare and Survived:\", correlation1)\n", "\n", "correlation2 = spark.sql(\n", " '''\n", " SELECT\n", " covar_samp(Fare, Survived) / (stddev_samp(Fare)*stddev_samp(Survived)) as correlation\n", " FROM titanic_merged_clean\n", " '''\n", ")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sentence_transformers import SentenceTransformer\n", "from pyspark.sql.functions import udf\n", "from pyspark.sql.types import ArrayType, FloatType\n", "\n", "# Load the pre-trained MiniLM sentence transformer model\n", "model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')\n", "\n", "# Define a UDF to compute the embeddings\n", "def compute_embedding(text):\n", " return model.encode(text).tolist()\n", "\n", "# Register the UDF in Spark\n", "embedding_udf = udf(compute_embedding, ArrayType(FloatType()))\n", "\n", "# Apply the UDF to compute embeddings for each document\n", "df_with_embeddings = spark_merged_clean.withColumn('mini-lm-vectors', embedding_udf(spark_merged_clean['Name']))\n", "\n", "# Show the result\n", "df_with_embeddings.head()\n", "\n" ] } ], "metadata": { "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 2 }