Streaming Deduplication and Quality Enforcement

I am a Tech Enthusiast having 13+ years of experience in ๐๐ as a ๐๐จ๐ง๐ฌ๐ฎ๐ฅ๐ญ๐๐ง๐ญ, ๐๐จ๐ซ๐ฉ๐จ๐ซ๐๐ญ๐ ๐๐ซ๐๐ข๐ง๐๐ซ, ๐๐๐ง๐ญ๐จ๐ซ, with 12+ years in training and mentoring in ๐๐จ๐๐ญ๐ฐ๐๐ซ๐ ๐๐ง๐ ๐ข๐ง๐๐๐ซ๐ข๐ง๐ , ๐๐๐ญ๐ ๐๐ง๐ ๐ข๐ง๐๐๐ซ๐ข๐ง๐ , ๐๐๐ฌ๐ญ ๐๐ฎ๐ญ๐จ๐ฆ๐๐ญ๐ข๐จ๐ง ๐๐ง๐ ๐๐๐ญ๐ ๐๐๐ข๐๐ง๐๐. I have ๐๐๐๐๐๐๐ ๐๐๐๐ ๐๐๐๐ 10,000+ ๐ฐ๐ป ๐ท๐๐๐๐๐๐๐๐๐๐๐๐ and ๐๐๐๐ ๐๐๐๐๐ ๐๐๐๐ ๐๐๐๐ 500+ ๐๐๐๐๐๐๐๐ ๐๐๐๐๐๐๐๐ in the areas of ๐๐จ๐๐ญ๐ฐ๐๐ซ๐ ๐๐๐ฏ๐๐ฅ๐จ๐ฉ๐ฆ๐๐ง๐ญ, ๐๐๐ญ๐ ๐๐ง๐ ๐ข๐ง๐๐๐ซ๐ข๐ง๐ , ๐๐ฅ๐จ๐ฎ๐, ๐๐๐ญ๐ ๐๐ง๐๐ฅ๐ฒ๐ฌ๐ข๐ฌ, ๐๐๐ญ๐ ๐๐ข๐ฌ๐ฎ๐๐ฅ๐ข๐ณ๐๐ญ๐ข๐จ๐ง๐ฌ, ๐๐ซ๐ญ๐ข๐๐ข๐๐ข๐๐ฅ ๐๐ง๐ญ๐๐ฅ๐ฅ๐ข๐ ๐๐ง๐๐ ๐๐ง๐ ๐๐๐๐ก๐ข๐ง๐ ๐๐๐๐ซ๐ง๐ข๐ง๐ . I am interested in ๐ฐ๐ซ๐ข๐ญ๐ข๐ง๐ ๐๐ฅ๐จ๐ ๐ฌ, ๐ฌ๐ก๐๐ซ๐ข๐ง๐ ๐ญ๐๐๐ก๐ง๐ข๐๐๐ฅ ๐ค๐ง๐จ๐ฐ๐ฅ๐๐๐ ๐, ๐ฌ๐จ๐ฅ๐ฏ๐ข๐ง๐ ๐ญ๐๐๐ก๐ง๐ข๐๐๐ฅ ๐ข๐ฌ๐ฌ๐ฎ๐๐ฌ, ๐ซ๐๐๐๐ข๐ง๐ ๐๐ง๐ ๐ฅ๐๐๐ซ๐ง๐ข๐ง๐ new subjects.
This program demonstrates a real-time data pipeline using Spark Structured Streaming to handle deduplication and data quality enforcement on streaming data from CSV files.
Objective
The program achieves the following:
Ingest data from a directory containing CSV files.
Deduplicate records based on unique identifiers (
event_id) and keep the latest record within a specified time window.Enforce data quality rules by filtering and handling invalid or missing data.
Output processed data to the console for debugging and verification.
Code Walkthrough
Reading Streams
Spark Session Initialization
The program starts by initializing a SparkSession, which is the entry point for Spark Structured Streaming applications.
spark = SparkSession.builder \
.appName("Streaming Deduplication and Quality Enforcement") \
.master("local") \
.getOrCreate()
Schema Definition
A custom schema is defined to specify the structure of the incoming CSV files:
CUSTOM_SCHEMA = StructType([
StructField("event_id", StringType(), True), # Unique event identifier
StructField("name", StringType(), True), # Name of the entity
StructField("timestamp", TimestampType(), True), # Event timestamp
StructField("value", IntegerType(), True) # Numeric value for the event
])
Reading Streaming Data
The program reads streaming data from a folder (../resources/dataset/data_quality/input) using the CSV format:
streaming_df = spark.readStream \
.format("csv") \
.schema(CUSTOM_SCHEMA) \
.load("../resources/dataset/data_quality/input")
Deduplication Using Time-Based Windowing
Deduplication is achieved by grouping records using a time-based window and event_id. Records with the latest timestamp are kept.
Add Watermark and Grouping
streaming_df.withWatermark("timestamp", "10 minutes").groupBy(
window(col("timestamp"), "10 minutes"), # Group by 10-minute time windows
col("event_id") # Group by event_id
)
Aggregate Functions
.agg(
max("timestamp").alias("latest_timestamp"), # Latest timestamp per group
max("value").alias("value"), # Latest value per group
first("name").alias("name") # First non-null name
)
Data Quality Enforcement
Data quality is enforced by applying filters and handling null values:
quality_enforced_df = deduplicated_df \
.filter((col("value").isNotNull()) & (col("value") > 0)) \
.fillna({"name": "Unknown"}) # Replace null names
Writing to the Console
The processed data is written to the console using the writeStream method:
quality_enforced_df.writeStream \
.format("console") \
.outputMode("complete") \
.start() \
.awaitTermination()
Input Data
Input
event_id,name,timestamp,value
1,John,2024-12-10 10:00:00,50
1,John,2024-12-10 10:05:00,60
2,Alice,2024-12-10 10:03:00,30
3,,2024-12-10 10:10:00,-10
Output
+--------+-------------------+-----+-------+
|event_id|timestamp |value|name |
+--------+-------------------+-----+-------+
|1 |2024-12-10 10:05:00|60 |John |
|2 |2024-12-10 10:03:00|30 |Alice |
+--------+-------------------+-----+-------+
Complete Code
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
if __name__ == '__main__':
# Create Spark session
spark = SparkSession.builder \
.appName("Streaming Deduplication and Quality Enforcement") \
.master("local") \
.getOrCreate()
# Define custom schema
CUSTOM_SCHEMA = StructType([
StructField("event_id", StringType(), True), # Unique event identifier
StructField("name", StringType(), True), # Name of the entity
StructField("timestamp", TimestampType(), True), # Event timestamp
StructField("value", IntegerType(), True) # Numeric value for the event
])
# Source: Read streaming data from CSV files
streaming_df = spark.readStream \
.format("csv") \
.schema(CUSTOM_SCHEMA) \
.load("../resources/dataset/data_quality/input")
# Deduplicate using time-based windowing
deduplicated_df = streaming_df.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "10 minutes"), # Group by time window
col("event_id") # Group by event_id
) \
.agg(
max("timestamp").alias("latest_timestamp"), # Keep the latest timestamp
max("value").alias("value"), # Keep the latest value
first("name").alias("name") # Keep the first non-null name
) \
.select(
col("event_id"),
col("latest_timestamp").alias("timestamp"),
col("value"),
col("name")
)
# Enforce data quality rules
quality_enforced_df = deduplicated_df \
.filter((col("value").isNotNull()) & (col("value") > 0)) \
.fillna({"name": "Unknown"}) # Replace null names
# Sink: Write the results to the console
quality_enforced_df.writeStream.start(format="console",
outputMode="complete").awaitTermination()


