{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "92a57b81-9b11-47e9-905f-d8b0f5210b36", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "# Chapter 3: Function Junction - Data manipulation with PySpark\n", "\n", "\n", "## Clean data\n", "In data science, `garbage in, garbage out` (GIGO) is the concept that flawed, biased or poor quality information or input produces a result or output of similar quality.\n", "To improve the analysis quality, we need data cleaning, the process to turn garbage into gold, it is composed of identifying, correcting, or removing errors and inconsistencies in data to improve its quality and usability. \n", "\n", "\n", "\n", "Let's start with a Dataframe containing bad values:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "tags": [ "remove-cell" ] }, "outputs": [ ], "source": [ "!pip install pyspark==4.0.0.dev2" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "tags": [ "remove-cell" ] }, "outputs": [ ], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession \\\n", " .builder \\\n", " .appName(\"Data Loading and Storage Example\") \\\n", " .getOrCreate()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "010edc0f-b8ff-4ca7-890b-312cfd86aee0", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+----+------+-----+\n", "| age|height| NAME|\n", "+----+------+-----+\n", "| 10| 80.0|Alice|\n", "| 10| 80.0|Alice|\n", "| 5| NaN| BOB|\n", "|NULL| NULL| Tom|\n", "|NULL| NaN| NULL|\n", "| 9| 78.9| josh|\n", "| 18|1802.3| bush|\n", "| 7| 75.3|jerry|\n", "+----+------+-----+\n", "\n" ] } ], "source": [ "from pyspark.sql import Row\n", "\n", "df = spark.createDataFrame([\n", " Row(age=10, height=80.0, NAME=\"Alice\"),\n", " Row(age=10, height=80.0, NAME=\"Alice\"),\n", " Row(age=5, height=float(\"nan\"), NAME=\"BOB\"),\n", " Row(age=None, height=None, NAME=\"Tom\"),\n", " Row(age=None, height=float(\"nan\"), NAME=None),\n", " Row(age=9, height=78.9, NAME=\"josh\"),\n", " Row(age=18, height=1802.3, NAME=\"bush\"),\n", " Row(age=7, height=75.3, NAME=\"jerry\"),\n", "])\n", "\n", "df.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "d69ab969-8377-449b-a8a5-3c2e900298eb", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "\n", "### Rename columns\n", "At first glance, we find that column `NAME` is upper case.\n", "For consistency, we can use `DataFrame.withColumnRenamed` to rename columns." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "d682268d-dc62-47a2-be2e-b26af4b6bf0d", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+------+-----+\n", "| age|height| name|\n", "+----+------+-----+\n", "| 10| 80.0|Alice|\n", "| 10| 80.0|Alice|\n", "| 5| NaN| BOB|\n", "|NULL| NULL| Tom|\n", "|NULL| NaN| NULL|\n", "| 9| 78.9| josh|\n", "| 18|1802.3| bush|\n", "| 7| 75.3|jerry|\n", "+----+------+-----+\n", "\n" ] } ], "source": [ "df2 = df.withColumnRenamed(\"NAME\", \"name\")\n", "\n", "df2.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "34599ede-9280-48bb-b968-1bdda9d22d8e", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "### Drop null values\n", "\n", "Then we can notice that there are two kinds of missing data:\n", "\n", "- the `NULL` values in all three columns;\n", "- the `NaN` values which means `Not a Number` for a numeric column;\n", "\n", "The records without a valid `name` are likely useless, so let's drop them first. There are a group of functions in `DataFrameNaFunctions` for missing value handling, we can use `DataFrame.na.drop` or `DataFrame.dropna` to omit rows with `NULL` or `NaN` values.\n", "\n", "After the step `df2.na.drop(subset=\"name\")`, invalid record `(age=None, height=NaN, name=None)` is discarded." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "bc46e7b4-c8ec-47cb-8934-9d7fde49e426", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+------+-----+\n", "| age|height| name|\n", "+----+------+-----+\n", "| 10| 80.0|Alice|\n", "| 10| 80.0|Alice|\n", "| 5| NaN| BOB|\n", "|NULL| NULL| Tom|\n", "| 9| 78.9| josh|\n", "| 18|1802.3| bush|\n", "| 7| 75.3|jerry|\n", "+----+------+-----+\n", "\n" ] } ], "source": [ "df3 = df2.na.drop(subset=\"name\")\n", "\n", "df3.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "5c2b010e-f591-4ccd-a23b-8f68cc54e395", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "### Fill values\n", "\n", "For the remaining missing values, we can use `DataFrame.na.fill` or `DataFrame.fillna` to fill them.\n", "\n", "With a `Dict` input `{'age': 10, 'height': 80.1}`, we can specify the values for columns `age` and `height` together." ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "9aac3291-4e70-435c-a665-59beed8ef3b1", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+------+-----+\n", "|age|height| name|\n", "+---+------+-----+\n", "| 10| 80.0|Alice|\n", "| 10| 80.0|Alice|\n", "| 5| 80.1| BOB|\n", "| 10| 80.1| Tom|\n", "| 9| 78.9| josh|\n", "| 18|1802.3| bush|\n", "| 7| 75.3|jerry|\n", "+---+------+-----+\n", "\n" ] } ], "source": [ "df4 = df3.na.fill({'age': 10, 'height': 80.1})\n", "\n", "df4.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "7c7ef34d-3403-4d5f-96a2-56f823e30277", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "### Remove outliers\n", "\n", "After above steps, all missing values are dropped or filled.\n", "However, we can find that `height=1802.3` seems unreasonable, to remove this kind of outliers, we can filter the DataFrame with a valid range like `(65, 85)`." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "09308454-3cea-4dd2-b0eb-01d93abd0488", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+------+-----+\n", "|age|height| name|\n", "+---+------+-----+\n", "| 10| 80.0|Alice|\n", "| 10| 80.0|Alice|\n", "| 5| 80.1| BOB|\n", "| 10| 80.1| Tom|\n", "| 9| 78.9| josh|\n", "| 7| 75.3|jerry|\n", "+---+------+-----+\n", "\n" ] } ], "source": [ "df5 = df4.where(df4.height.between(65, 85))\n", "\n", "df5.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "509b8ebe-52ba-4b8f-9473-018c2a8b1273", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "### Remove duplicates\n", "\n", "Now, all invalid records have been handled. But we notice that record `(age=10, height=80.0, name=Alice)` has been duplicated. To remove such duplicates, we can simply apply `DataFrame.distinct`." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "c5cb2e96-f194-46ab-9489-efe2fe14190d", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+------+-----+\n", "|age|height| name|\n", "+---+------+-----+\n", "| 10| 80.0|Alice|\n", "| 5| 80.1| BOB|\n", "| 10| 80.1| Tom|\n", "| 9| 78.9| josh|\n", "| 7| 75.3|jerry|\n", "+---+------+-----+\n", "\n" ] } ], "source": [ "df6 = df5.distinct()\n", "\n", "df6.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "287a5e17-bf51-423b-9cb1-1f0eb0663658", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "### String manipulation\n", "\n", "Column `name` contains both lower case and upper case letters. We can apply `lower()` function to convert all letters to lower case. \n" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "80a9cb72-1d37-407a-9161-85ea78ee4b73", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+------+-----+\n", "|age|height| name|\n", "+---+------+-----+\n", "| 10| 80.0|alice|\n", "| 5| 80.1| bob|\n", "| 10| 80.1| tom|\n", "| 9| 78.9| josh|\n", "| 7| 75.3|jerry|\n", "+---+------+-----+\n", "\n" ] } ], "source": [ "from pyspark.sql import functions as sf\n", "\n", "df7 = df6.withColumn(\"name\", sf.lower(\"name\"))\n", "df7.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "3b28e946-91be-4dd3-a4c3-720e2579a272", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "For more complicated string manipulations, we can also use `udf` to utilize Python's power functions." ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "ff338521-4770-4b49-b064-0ed3cff12570", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+------+-----+\n", "|age|height| name|\n", "+---+------+-----+\n", "| 10| 80.0|Alice|\n", "| 5| 80.1| Bob|\n", "| 10| 80.1| Tom|\n", "| 9| 78.9| Josh|\n", "| 7| 75.3|Jerry|\n", "+---+------+-----+\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "from pyspark.sql import functions as sf\n", "\n", "capitalize = sf.udf(lambda s: s.capitalize())\n", "\n", "df8 = df6.withColumn(\"name\", capitalize(\"name\"))\n", "df8.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "5279bf5b-6c37-48e7-bbd9-b3207820bb95", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "### Reorder columns\n", "\n", "After above process, the data is clean and we want to reorder the columns before saving the DataFrame to some storage. You can refer to previous chapter `Load and Behold: Data loading, storage, file formats` for more details.\n", "\n", "Normally, we use `DataFrame.select` for this purpose." ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "f316c501-6f97-4772-82a8-568fd59f04ff", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+---+------+\n", "| name|age|height|\n", "+-----+---+------+\n", "|alice| 10| 80.0|\n", "| bob| 5| 80.1|\n", "| tom| 10| 80.1|\n", "| josh| 9| 78.9|\n", "|jerry| 7| 75.3|\n", "+-----+---+------+\n", "\n" ] } ], "source": [ "df9 = df7.select(\"name\", \"age\", \"height\")\n", "\n", "df9.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "55a8d1de-f53a-4a73-a7c0-8dd8376f2dd5", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "## Transform data\n", "\n", "The main part of a data engineering project is transformation. We create new dataframes from old ones." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "b8dc6227-05c9-4c0a-90da-e3f377c9468b", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "### Select columns with select()\n", "\n", "The input table may contains hundreds of columns, but for a specific project we likly are interested only in a small subset of them.\n", "\n" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "402e442a-b04e-492b-a1b7-376185ea9f50", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+\n", "| id|col_0|col_1|col_2|col_3|col_4|col_5|col_6|col_7|col_8|col_9|col_10|col_11|col_12|col_13|col_14|col_15|col_16|col_17|col_18|col_19|\n", "+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+\n", "| 0| 0| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|\n", "| 1| 0| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|\n", "| 2| 0| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|\n", "| 3| 0| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|\n", "| 4| 0| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|\n", "| 5| 0| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|\n", "| 6| 0| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|\n", "| 7| 0| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|\n", "| 8| 0| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|\n", "| 9| 0| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|\n", "+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+\n", "\n" ] } ], "source": [ "from pyspark.sql import functions as sf\n", "df = spark.range(10)\n", "\n", "for i in range(20):\n", " df = df.withColumn(f\"col_{i}\", sf.lit(i))\n", "\n", "df.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "585efeeb-f935-4bc9-9d72-83d4e2cbe946", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "\n", "We create a DataFrame with 21 columns via a `for` loop, then we only select 4 columns by `select`. Columns `id`, `col_2` and `col_3` are directly selected from previous DataFrame, while column `sqrt_col_4_plus_5` is generated by the math functions.\n", "\n", "We have hundreds of functions for column manipulation in `pyspark.sql.function` and `pyspark.sql.Column`." ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "dde46ecc-a43f-4c83-823c-4ba010291e2c", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-----+-----+-----------------+\n", "| id|col_2|col_3|sqrt_col_4_plus_5|\n", "+---+-----+-----+-----------------+\n", "| 0| 2| 3| 3.0|\n", "| 1| 2| 3| 3.0|\n", "| 2| 2| 3| 3.0|\n", "| 3| 2| 3| 3.0|\n", "| 4| 2| 3| 3.0|\n", "| 5| 2| 3| 3.0|\n", "| 6| 2| 3| 3.0|\n", "| 7| 2| 3| 3.0|\n", "| 8| 2| 3| 3.0|\n", "| 9| 2| 3| 3.0|\n", "+---+-----+-----+-----------------+\n", "\n" ] } ], "source": [ "\n", "df2 = df.select(\"id\", \"col_2\", \"col_3\", sf.sqrt(sf.col(\"col_4\") + sf.col(\"col_5\")).alias(\"sqrt_col_4_plus_5\"))\n", "\n", "df2.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "5dea6b73-186e-4bdc-a4f1-918b4e74ef75", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "### Filter rows with where()\n", "\n", "The input table may be super huge and contains billions of rows, and we may also be interested in only a small subset.\n", "\n", "We can use `where` or `filter` with sepcified conditions to filter the rows.\n", "\n", "For example, we can select rows with odd `id` values." ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "2bf40a39-5a42-49af-8a3b-afbb766bbdc9", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-----+-----+-----------------+\n", "| id|col_2|col_3|sqrt_col_4_plus_5|\n", "+---+-----+-----+-----------------+\n", "| 1| 2| 3| 3.0|\n", "| 3| 2| 3| 3.0|\n", "| 5| 2| 3| 3.0|\n", "| 7| 2| 3| 3.0|\n", "| 9| 2| 3| 3.0|\n", "+---+-----+-----+-----------------+\n", "\n" ] } ], "source": [ "df3 = df2.where(sf.col(\"id\") % 2 == 1)\n", "\n", "df3.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "6e34ecd8-8c7a-44b1-9f7e-d4b1cc40a2b7", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "## Summarizing data\n", "\n", "In data analysis, we normally end up with summarizing data to a chart or table." ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "1fbde87c-c5f7-4102-a41d-9c81c63d750b", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+-----+\n", "| incomes| NAME|\n", "+--------------------+-----+\n", "|[123.0, 456.0, 78...|Alice|\n", "| [234.0, 567.0]| BOB|\n", "|[100.0, 200.0, 10...| Tom|\n", "| [79.0, 128.0]| josh|\n", "|[123.0, 145.0, 17...| bush|\n", "|[111.0, 187.0, 45...|jerry|\n", "+--------------------+-----+\n", "\n" ] } ], "source": [ "from pyspark.sql import Row\n", "\n", "df = spark.createDataFrame([\n", " Row(incomes=[123.0, 456.0, 789.0], NAME=\"Alice\"),\n", " Row(incomes=[234.0, 567.0], NAME=\"BOB\"),\n", " Row(incomes=[100.0, 200.0, 100.0], NAME=\"Tom\"),\n", " Row(incomes=[79.0, 128.0], NAME=\"josh\"),\n", " Row(incomes=[123.0, 145.0, 178.0], NAME=\"bush\"),\n", " Row(incomes=[111.0, 187.0, 451.0, 188.0, 199.0], NAME=\"jerry\"),\n", "])\n", "\n", "df.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "aa593364-0f87-48d1-969e-7620c8c3ff85", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "For example, given the income per month, we want to find the average income for each name." ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "107e5b59-0d5c-4539-b481-c7895115bb5d", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+-----------------------------------+\n", "|name |incomes |\n", "+-----+-----------------------------------+\n", "|alice|[123.0, 456.0, 789.0] |\n", "|bob |[234.0, 567.0] |\n", "|tom |[100.0, 200.0, 100.0] |\n", "|josh |[79.0, 128.0] |\n", "|bush |[123.0, 145.0, 178.0] |\n", "|jerry|[111.0, 187.0, 451.0, 188.0, 199.0]|\n", "+-----+-----------------------------------+\n", "\n" ] } ], "source": [ "from pyspark.sql import functions as sf\n", "\n", "df2 = df.select(sf.lower(\"NAME\").alias(\"name\"), \"incomes\")\n", "\n", "df2.show(truncate=False)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "3ee9f0d6-2d77-43b5-8632-2f60004b8bb4", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "### Reshape data using explode()\n", "\n", "To make the data easier for aggregation, we can use `explode()` function to reshape the data" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "1c0fb5ea-10aa-4f7a-be15-6881a04f3485", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+------+\n", "| name|income|\n", "+-----+------+\n", "|alice| 123.0|\n", "|alice| 456.0|\n", "|alice| 789.0|\n", "| bob| 234.0|\n", "| bob| 567.0|\n", "| tom| 100.0|\n", "| tom| 200.0|\n", "| tom| 100.0|\n", "| josh| 79.0|\n", "| josh| 128.0|\n", "| bush| 123.0|\n", "| bush| 145.0|\n", "| bush| 178.0|\n", "|jerry| 111.0|\n", "|jerry| 187.0|\n", "|jerry| 451.0|\n", "|jerry| 188.0|\n", "|jerry| 199.0|\n", "+-----+------+\n", "\n" ] } ], "source": [ "df3 = df2.select(\"name\", sf.explode(\"incomes\").alias(\"income\"))\n", "\n", "df3.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "21590f9b-7e64-406d-9d55-7f0ac47b594c", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "### Summarizing via groupBy() and agg()\n", "\n", "Then we normally use `DataFrame.groupBy(...).agg(...)` to aggreate the data. To compute the average income, we can apply aggration function `avg`" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "31dece1d-4d2a-4f85-bb4e-76d3f82d8ca0", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+------------------+\n", "| name| avg_income|\n", "+-----+------------------+\n", "|alice| 456.0|\n", "| bob| 400.5|\n", "| tom|133.33333333333334|\n", "| josh| 103.5|\n", "| bush|148.66666666666666|\n", "|jerry| 227.2|\n", "+-----+------------------+\n", "\n" ] } ], "source": [ "df4 = df3.groupBy(\"name\").agg(sf.avg(\"income\").alias(\"avg_income\"))\n", "\n", "df4.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "0938c629-4494-4614-8eac-3fabb0eb1547", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "### Orderby\n", "\n", "For final analysis, we normally want to order the data. In this case, we can order the data by `name`." ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "7e54b022-cb52-4fe2-a79d-259be277d705", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+------------------+\n", "| name| avg_income|\n", "+-----+------------------+\n", "|alice| 456.0|\n", "| bob| 400.5|\n", "| bush|148.66666666666666|\n", "|jerry| 227.2|\n", "| josh| 103.5|\n", "| tom|133.33333333333334|\n", "+-----+------------------+\n", "\n" ] } ], "source": [ "df5 = df4.orderBy(\"name\")\n", "\n", "df5.show()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "dc6e8724-1363-4d55-a98c-87ad11efb787", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "## When DataFrames Collide: The Art of Joining\n", "\n", "When dealing with multiple dataframe, we likely need to combine them together in some way. The most frequently used approach is joining.\n", "\n", "For example, given the `incomes` data and `height` data, we can use `DataFrame.join` to join them together by `name`.\n", "\n", "We can see that only `alice`, `josh` and `bush` are in the final results, because they appear in both DataFrames." ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "2a5e52e6-fc57-4315-b649-f79828269449", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "from pyspark.sql import Row\n", "\n", "df1 = spark.createDataFrame([\n", " Row(age=10, height=80.0, name=\"alice\"),\n", " Row(age=9, height=78.9, name=\"josh\"),\n", " Row(age=18, height=82.3, name=\"bush\"),\n", " Row(age=7, height=75.3, name=\"tom\"),\n", "])\n", "\n", "df2 = spark.createDataFrame([\n", " Row(incomes=[123.0, 456.0, 789.0], name=\"alice\"),\n", " Row(incomes=[234.0, 567.0], name=\"bob\"),\n", " Row(incomes=[79.0, 128.0], name=\"josh\"),\n", " Row(incomes=[123.0, 145.0, 178.0], name=\"bush\"),\n", " Row(incomes=[111.0, 187.0, 451.0, 188.0, 199.0], name=\"jerry\"),\n", "])" ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "bb635cb3-74b4-40f3-a8bd-b74fca020f95", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+---+------+---------------------+\n", "|name |age|height|incomes |\n", "+-----+---+------+---------------------+\n", "|alice|10 |80.0 |[123.0, 456.0, 789.0]|\n", "|bush |18 |82.3 |[123.0, 145.0, 178.0]|\n", "|josh |9 |78.9 |[79.0, 128.0] |\n", "+-----+---+------+---------------------+\n", "\n" ] } ], "source": [ "df3 = df1.join(df2, on=\"name\")\n", "\n", "df3.show(truncate=False)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "98cefd42-a358-4d12-87f2-41cc41aa98b7", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "There are seven join methods:\n", "- `INNER`\n", "- `LEFT`\n", "- `RIGHT`\n", "- `FULL`\n", "- `CROSS`\n", "- `LEFTSEMI`\n", "- `LEFTANTI`\n", "\n", "And the default one is `INNER`.\n", "\n", "Let's take `LEFT` join as another example. A left join includes all of the records from the first (left) of two tables, even if there are no matching values for records in the second (right) table.\n", "\n" ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "cdc282ab-ed1d-4964-9ebd-a5770de93cc3", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+---+------+---------------------+\n", "|name |age|height|incomes |\n", "+-----+---+------+---------------------+\n", "|alice|10 |80.0 |[123.0, 456.0, 789.0]|\n", "|josh |9 |78.9 |[79.0, 128.0] |\n", "|bush |18 |82.3 |[123.0, 145.0, 178.0]|\n", "|tom |7 |75.3 |NULL |\n", "+-----+---+------+---------------------+\n", "\n" ] } ], "source": [ "df4 = df1.join(df2, on=\"name\", how=\"left\")\n", "\n", "df4.show(truncate=False)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "70257613-f432-48e6-bc58-b521fde9b77a", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "And a `RIGHT` join keeps all of the records from the right table." ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "846429dc-ea7e-484e-ad3d-82e625348f69", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+----+------+-----------------------------------+\n", "|name |age |height|incomes |\n", "+-----+----+------+-----------------------------------+\n", "|alice|10 |80.0 |[123.0, 456.0, 789.0] |\n", "|bob |NULL|NULL |[234.0, 567.0] |\n", "|josh |9 |78.9 |[79.0, 128.0] |\n", "|bush |18 |82.3 |[123.0, 145.0, 178.0] |\n", "|jerry|NULL|NULL |[111.0, 187.0, 451.0, 188.0, 199.0]|\n", "+-----+----+------+-----------------------------------+\n", "\n" ] } ], "source": [ "df5 = df1.join(df2, on=\"name\", how=\"right\")\n", "\n", "df5.show(truncate=False)" ] } ], "metadata": { "application/vnd.databricks.v1+notebook": { "dashboards": [], "environmentMetadata": { "base_environment": "", "client": "1" }, "language": "python", "notebookMetadata": { "pythonIndentUnit": 2 }, "notebookName": "PythonCookbook", "widgets": {} }, "kernelspec": { "display_name": "", "language": "python", "name": "" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.4" } }, "nbformat": 4, "nbformat_minor": 0 }