# SQL & PySpark Equivalent

| Concept | SQL | Spark / PySpark |
| --- | --- | --- |
| SELECT | SELECT column(s) FROM table;  
  
SELECT \* FROM table; | df.select("column(s)")  
  
df.select("\*") |
| DISTINCT | SELECT DISTINCT column(s) FROM table; | [df.select](http://df.select)("column(s)").distinct() |
| WHERE | SELECT column(s) FROM table WHERE condition; | df.filter(condition).select("column(s)") |
| ORDER BY | SELECT column(s) FROM table ORDER BY column(s); | df.sort("column(s)")  
.select("column(s)") |
| LIMIT | SELECT column(s) FROM table LIMIT n; | df.limit(n).select("column(s)") |
| COUNT | SELECT COUNT(\*) FROM table; | df.count() |
| SUM | SELECT SUM(column) FROM table; | from pyspark.sql.functions import sum  
  
df.agg(sum("column")) |
|  |  |  |

|

| | | | ) | | | | | | | | | | | | | | | | | | ||

| | AVG | SELECT AVG(column) FROM table; | from pyspark.sql.functions import avg

df.agg(avg("column")) | | MAX / MIN | SELECT MAX(column) FROM table | from pyspark.sql.functions import max,min

df.agg(max("column"), min("column")) | | String Length | SELECT LEN(string) FROM table; | from pyspark.sql.functions import length

[df.select](http://df.select)(length(col("string"))) | | Convert to Uppercase | SELECT UPPER(string) FROM table | from pyspark.sql.functions import upper;

[df.select](http://df.select)(upper(col("string"))) | | Convert to Lowercase | SELECT LOWER(string) FROM table | from pyspark.sql.functions import lower

[df.select](http://df.select)(lower(col("string"))) | | Concatenate Strings | SELECT CONCAT(string1, string2) FROM table | from pyspark.sql.functions import concat

[df.select](http://df.select)(concat(col("string1"), col("string2"))) | | Trim String | SELECT TRIM(string) FROM table | from pyspark.sql.functions import trim

[df.select](http://df.select)(trim(col("string"))) | | Substring | SELECT SUBSTRING(string, start, length) FROM table | from pyspark.sql.functions import substring

[df.select](http://df.select)(substring(col("string"),start, length)) | | CURDATE, NOW, CURTIME | SELECT CURDATE() FROM table; | from pyspark.sql.functions import current\_date

[df.select](http://df.select)(current\_date()) | | CAST, CONVERT | SELECT CAST(column AS datatype) FROM table | [df.select](http://df.select)(col("column").cast("datatype")) | | IF | SELECT IF(condition, value1, value2) FROM table | from pyspark.sql.functions import when, otherwise

[df.select](http://df.select)(when(condition,value1)  
.otherwise(value2)) | | COALESCE | SELECT COALESCE(column1, column2, column3) FROM table; | from pyspark.sql.functions import coalesce

[df.select](http://df.select)(coalesce("column1","column2", "column3")) | | JOIN | JOIN table1 ON table1.column = table2.column | df1.join(df2, "column") | | GROUP BY | GROUP BY column(s) | df.groupBy("column(s)") | | PIVOT | PIVOT (agg\_function(column) FOR pivot\_column IN (values)) | df.groupBy("pivot\_column")  
.pivot("column").agg(agg\_function) | | Logical Operators | SELECT column FROM table WHERE column1 = value AND column2 &gt; value | df.filter((col("column1") == value) & (col("column2") &gt; value)) | | IS NULL, IS NOT NULL | SELECT column FROM table WHERE column IS NULL | df.filter(col("column").isNull())  
.select("column") | | IN | SELECT column FROM table WHERE column IN (value1,value2, value3) | df.filter(col("column")  
.isin(value1,value2,value3))  
.select("column") | | LIKE | SELECT column FROM table WHERE column LIKE 'value%' | df.filter(col("column").like("value%")) | | BETWEEN | SELECT column FROM table WHERE column BETWEEN value1 AND value2 | df.filter((col("column") &gt;= value1) & (col("column") &lt;= value2))  
.select("column") | | UNION, UNION ALL | SELECT column FROM table1 UNION SELECT column FROM table2 | df1.union(df2).select("column") or df1.unionAll(df2).select("column") | | RANK, DENSERANK, ROWNUMBER | SELECT column, RANK() OVER (ORDER BY column) as rank FROM table | from pyspark.sql import Window  
from pyspark.sql.functions import rank

[df.select](http://df.select)("column", rank().over(Window.orderBy("column"))  
.alias("rank")) | | CTE | WITH cte1 AS (SELECT \* FROM tab;e1),  
SELECT \* FROM cte1 WHERE condition | df.createOrReplaceTempView("cte1"); df\_cte1 = spark.sql("SELECT \* FROM cte1 WHERE condition"); df\_cte1.show() or df.filter(condition1).filter(condition2) | | Datatypes | INT: for integer values BIGINT: for large integer values FLOAT: for floating point values DOUBLE: for double precision floating point values CHAR: for fixed-length character strings VARCHAR: for variable-length character strings DATE: for date values TIMESTAMP: for timestamp values | In PySpark, the data types are similar, but are represented differently.

IntegerType: for integer values LongType: for long integer values FloatType: for floating point values DoubleType: for double precision floating point values StringType: for character strings TimestampType: for timestamp values DateType: for date values | | Create Table | CREATE TABLE table\_name (column\_name data\_type constraint); | df.write.format("parquet")  
.saveAsTable("table\_name") | | Create Table with Columns definition | CREATE TABLE table\_name( column\_name data\_type \[constraints\], column\_name data\_type \[constraints\], ...); | from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DecimalType

schema = StructType(\[ StructField("id", IntegerType(), True), StructField("name", StringType(), False), StructField("age", IntegerType(), True), StructField("salary", DecimalType(10,2), True)\])

df = spark.createDataFrame(\[\], schema) | | Create Table with Primary Key | CREATE TABLE table\_name( column\_name data\_type PRIMARY KEY, ...);

If table already exists: ALTER TABLE table\_name ADD PRIMARY KEY (column\_name); | In PySpark or HiveQL, primary key constraints are not enforced directly. However, you can use the dropDuplicates() method to remove duplicate rows based on one or more columns.

df = df.dropDuplicates(\["id"\]) | | Create Table with Auto Increment constraint | CREATE TABLE table\_name( id INT AUTO\_INCREMENT, name VARCHAR(255), PRIMARY KEY (id)); | not natively supported by the DataFrame API, but there are several ways to achieve the same functionality.

from pyspark.sql.functions import monotonically\_increasing\_id df = df.withColumn("id", monotonically\_increasing\_id()+start\_value) | | Adding a column | ALTER TABLE table\_name ADD column\_name datatype; | from pyspark.sql.functions import lit df=df.withColumn("column\_name", lit(None).cast("datatype")) | | Modifying a column | ALTER TABLE table\_name MODIFY column\_name datatype; | df=df.withColumn("column\_name", df\["column\_name"\].cast("datatype")) | | Dropping a column | ALTER TABLE table\_name DROP COLUMN column\_name; | df = df.drop("column\_name") | | Rename a column | ALTER TABLE table\_name RENAME COLUMN old\_column\_name TO new\_column\_name;

In mysql, ALTER TABLE employees CHANGE COLUMN first\_name first\_name\_new VARCHAR(255); | df =df.withColumnRenamed("existing\_column", "new\_column") |
