Skip to main content

Command Palette

Search for a command to run...

SQL & PySpark Equivalent

Updated
โ€ข4 min read
SQL & PySpark Equivalent
N

I am a Tech Enthusiast having 13+ years of experience in ๐ˆ๐“ as a ๐‚๐จ๐ง๐ฌ๐ฎ๐ฅ๐ญ๐š๐ง๐ญ, ๐‚๐จ๐ซ๐ฉ๐จ๐ซ๐š๐ญ๐ž ๐“๐ซ๐š๐ข๐ง๐ž๐ซ, ๐Œ๐ž๐ง๐ญ๐จ๐ซ, with 12+ years in training and mentoring in ๐’๐จ๐Ÿ๐ญ๐ฐ๐š๐ซ๐ž ๐„๐ง๐ ๐ข๐ง๐ž๐ž๐ซ๐ข๐ง๐ , ๐ƒ๐š๐ญ๐š ๐„๐ง๐ ๐ข๐ง๐ž๐ž๐ซ๐ข๐ง๐ , ๐“๐ž๐ฌ๐ญ ๐€๐ฎ๐ญ๐จ๐ฆ๐š๐ญ๐ข๐จ๐ง ๐š๐ง๐ ๐ƒ๐š๐ญ๐š ๐’๐œ๐ข๐ž๐ง๐œ๐ž. I have ๐’•๐’“๐’‚๐’Š๐’๐’†๐’… ๐’Ž๐’๐’“๐’† ๐’•๐’‰๐’‚๐’ 10,000+ ๐‘ฐ๐‘ป ๐‘ท๐’“๐’๐’‡๐’†๐’”๐’”๐’Š๐’๐’๐’‚๐’๐’” and ๐’„๐’๐’๐’…๐’–๐’„๐’•๐’†๐’… ๐’Ž๐’๐’“๐’† ๐’•๐’‰๐’‚๐’ 500+ ๐’•๐’“๐’‚๐’Š๐’๐’Š๐’๐’ˆ ๐’”๐’†๐’”๐’”๐’Š๐’๐’๐’” in the areas of ๐’๐จ๐Ÿ๐ญ๐ฐ๐š๐ซ๐ž ๐ƒ๐ž๐ฏ๐ž๐ฅ๐จ๐ฉ๐ฆ๐ž๐ง๐ญ, ๐ƒ๐š๐ญ๐š ๐„๐ง๐ ๐ข๐ง๐ž๐ž๐ซ๐ข๐ง๐ , ๐‚๐ฅ๐จ๐ฎ๐, ๐ƒ๐š๐ญ๐š ๐€๐ง๐š๐ฅ๐ฒ๐ฌ๐ข๐ฌ, ๐ƒ๐š๐ญ๐š ๐•๐ข๐ฌ๐ฎ๐š๐ฅ๐ข๐ณ๐š๐ญ๐ข๐จ๐ง๐ฌ, ๐€๐ซ๐ญ๐ข๐Ÿ๐ข๐œ๐ข๐š๐ฅ ๐ˆ๐ง๐ญ๐ž๐ฅ๐ฅ๐ข๐ ๐ž๐ง๐œ๐ž ๐š๐ง๐ ๐Œ๐š๐œ๐ก๐ข๐ง๐ž ๐‹๐ž๐š๐ซ๐ง๐ข๐ง๐ . I am interested in ๐ฐ๐ซ๐ข๐ญ๐ข๐ง๐  ๐›๐ฅ๐จ๐ ๐ฌ, ๐ฌ๐ก๐š๐ซ๐ข๐ง๐  ๐ญ๐ž๐œ๐ก๐ง๐ข๐œ๐š๐ฅ ๐ค๐ง๐จ๐ฐ๐ฅ๐ž๐๐ ๐ž, ๐ฌ๐จ๐ฅ๐ฏ๐ข๐ง๐  ๐ญ๐ž๐œ๐ก๐ง๐ข๐œ๐š๐ฅ ๐ข๐ฌ๐ฌ๐ฎ๐ž๐ฌ, ๐ซ๐ž๐š๐๐ข๐ง๐  ๐š๐ง๐ ๐ฅ๐ž๐š๐ซ๐ง๐ข๐ง๐  new subjects.

ConceptSQLSpark / PySpark
SELECTSELECT column(s) FROM table;

SELECT * FROM table; | df.select("column(s)")

df.select("*") | | DISTINCT | SELECT DISTINCT column(s) FROM table; | df.select("column(s)").distinct() | | WHERE | SELECT column(s) FROM table WHERE condition; | df.filter(condition).select("column(s)") | | ORDER BY | SELECT column(s) FROM table ORDER BY column(s); | df.sort("column(s)")
.select("column(s)") | | LIMIT | SELECT column(s) FROM table LIMIT n; | df.limit(n).select("column(s)") | | COUNT | SELECT COUNT(*) FROM table; | df.count() | | SUM | SELECT SUM(column) FROM table; | from pyspark.sql.functions import sum

df.agg(sum("column")) | | | | |

|

| | | | ) | | | | | | | | | | | | | | | | | | ||

| | AVG | SELECT AVG(column) FROM table; | from pyspark.sql.functions import avg

df.agg(avg("column")) | | MAX / MIN | SELECT MAX(column) FROM table | from pyspark.sql.functions import max,min

df.agg(max("column"), min("column")) | | String Length | SELECT LEN(string) FROM table; | from pyspark.sql.functions import length

df.select(length(col("string"))) | | Convert to Uppercase | SELECT UPPER(string) FROM table | from pyspark.sql.functions import upper;

df.select(upper(col("string"))) | | Convert to Lowercase | SELECT LOWER(string) FROM table | from pyspark.sql.functions import lower

df.select(lower(col("string"))) | | Concatenate Strings | SELECT CONCAT(string1, string2) FROM table | from pyspark.sql.functions import concat

df.select(concat(col("string1"), col("string2"))) | | Trim String | SELECT TRIM(string) FROM table | from pyspark.sql.functions import trim

df.select(trim(col("string"))) | | Substring | SELECT SUBSTRING(string, start, length) FROM table | from pyspark.sql.functions import substring

df.select(substring(col("string"),start, length)) | | CURDATE, NOW, CURTIME | SELECT CURDATE() FROM table; | from pyspark.sql.functions import current_date

df.select(current_date()) | | CAST, CONVERT | SELECT CAST(column AS datatype) FROM table | df.select(col("column").cast("datatype")) | | IF | SELECT IF(condition, value1, value2) FROM table | from pyspark.sql.functions import when, otherwise

df.select(when(condition,value1)
.otherwise(value2)) | | COALESCE | SELECT COALESCE(column1, column2, column3) FROM table; | from pyspark.sql.functions import coalesce

df.select(coalesce("column1","column2", "column3")) | | JOIN | JOIN table1 ON table1.column = table2.column | df1.join(df2, "column") | | GROUP BY | GROUP BY column(s) | df.groupBy("column(s)") | | PIVOT | PIVOT (agg_function(column) FOR pivot_column IN (values)) | df.groupBy("pivot_column")
.pivot("column").agg(agg_function) | | Logical Operators | SELECT column FROM table WHERE column1 = value AND column2 > value | df.filter((col("column1") == value) & (col("column2") > value)) | | IS NULL, IS NOT NULL | SELECT column FROM table WHERE column IS NULL | df.filter(col("column").isNull())
.select("column") | | IN | SELECT column FROM table WHERE column IN (value1,value2, value3) | df.filter(col("column")
.isin(value1,value2,value3))
.select("column") | | LIKE | SELECT column FROM table WHERE column LIKE 'value%' | df.filter(col("column").like("value%")) | | BETWEEN | SELECT column FROM table WHERE column BETWEEN value1 AND value2 | df.filter((col("column") >= value1) & (col("column") <= value2))
.select("column") | | UNION, UNION ALL | SELECT column FROM table1 UNION SELECT column FROM table2 | df1.union(df2).select("column") or df1.unionAll(df2).select("column") | | RANK, DENSERANK, ROWNUMBER | SELECT column, RANK() OVER (ORDER BY column) as rank FROM table | from pyspark.sql import Window
from pyspark.sql.functions import rank

df.select("column", rank().over(Window.orderBy("column"))
.alias("rank")) | | CTE | WITH cte1 AS (SELECT * FROM tab;e1),
SELECT * FROM cte1 WHERE condition | df.createOrReplaceTempView("cte1"); df_cte1 = spark.sql("SELECT * FROM cte1 WHERE condition"); df_cte1.show() or df.filter(condition1).filter(condition2) | | Datatypes | INT: for integer values BIGINT: for large integer values FLOAT: for floating point values DOUBLE: for double precision floating point values CHAR: for fixed-length character strings VARCHAR: for variable-length character strings DATE: for date values TIMESTAMP: for timestamp values | In PySpark, the data types are similar, but are represented differently.

IntegerType: for integer values LongType: for long integer values FloatType: for floating point values DoubleType: for double precision floating point values StringType: for character strings TimestampType: for timestamp values DateType: for date values | | Create Table | CREATE TABLE table_name (column_name data_type constraint); | df.write.format("parquet")
.saveAsTable("table_name") | | Create Table with Columns definition | CREATE TABLE table_name( column_name data_type [constraints], column_name data_type [constraints], ...); | from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DecimalType

schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), False), StructField("age", IntegerType(), True), StructField("salary", DecimalType(10,2), True)])

df = spark.createDataFrame([], schema) | | Create Table with Primary Key | CREATE TABLE table_name( column_name data_type PRIMARY KEY, ...);

If table already exists: ALTER TABLE table_name ADD PRIMARY KEY (column_name); | In PySpark or HiveQL, primary key constraints are not enforced directly. However, you can use the dropDuplicates() method to remove duplicate rows based on one or more columns.

df = df.dropDuplicates(["id"]) | | Create Table with Auto Increment constraint | CREATE TABLE table_name( id INT AUTO_INCREMENT, name VARCHAR(255), PRIMARY KEY (id)); | not natively supported by the DataFrame API, but there are several ways to achieve the same functionality.

from pyspark.sql.functions import monotonically_increasing_id df = df.withColumn("id", monotonically_increasing_id()+start_value) | | Adding a column | ALTER TABLE table_name ADD column_name datatype; | from pyspark.sql.functions import lit df=df.withColumn("column_name", lit(None).cast("datatype")) | | Modifying a column | ALTER TABLE table_name MODIFY column_name datatype; | df=df.withColumn("column_name", df["column_name"].cast("datatype")) | | Dropping a column | ALTER TABLE table_name DROP COLUMN column_name; | df = df.drop("column_name") | | Rename a column | ALTER TABLE table_name RENAME COLUMN old_column_name TO new_column_name;

In mysql, ALTER TABLE employees CHANGE COLUMN first_name first_name_new VARCHAR(255); | df =df.withColumnRenamed("existing_column", "new_column") |

More from this blog

Naveen P.N's Tech Blog

94 posts