Skip to main content

Command Palette

Search for a command to run...

Streaming Deduplication and Quality Enforcement

Updated
โ€ข3 min read
Streaming Deduplication and Quality Enforcement
N

I am a Tech Enthusiast having 13+ years of experience in ๐ˆ๐“ as a ๐‚๐จ๐ง๐ฌ๐ฎ๐ฅ๐ญ๐š๐ง๐ญ, ๐‚๐จ๐ซ๐ฉ๐จ๐ซ๐š๐ญ๐ž ๐“๐ซ๐š๐ข๐ง๐ž๐ซ, ๐Œ๐ž๐ง๐ญ๐จ๐ซ, with 12+ years in training and mentoring in ๐’๐จ๐Ÿ๐ญ๐ฐ๐š๐ซ๐ž ๐„๐ง๐ ๐ข๐ง๐ž๐ž๐ซ๐ข๐ง๐ , ๐ƒ๐š๐ญ๐š ๐„๐ง๐ ๐ข๐ง๐ž๐ž๐ซ๐ข๐ง๐ , ๐“๐ž๐ฌ๐ญ ๐€๐ฎ๐ญ๐จ๐ฆ๐š๐ญ๐ข๐จ๐ง ๐š๐ง๐ ๐ƒ๐š๐ญ๐š ๐’๐œ๐ข๐ž๐ง๐œ๐ž. I have ๐’•๐’“๐’‚๐’Š๐’๐’†๐’… ๐’Ž๐’๐’“๐’† ๐’•๐’‰๐’‚๐’ 10,000+ ๐‘ฐ๐‘ป ๐‘ท๐’“๐’๐’‡๐’†๐’”๐’”๐’Š๐’๐’๐’‚๐’๐’” and ๐’„๐’๐’๐’…๐’–๐’„๐’•๐’†๐’… ๐’Ž๐’๐’“๐’† ๐’•๐’‰๐’‚๐’ 500+ ๐’•๐’“๐’‚๐’Š๐’๐’Š๐’๐’ˆ ๐’”๐’†๐’”๐’”๐’Š๐’๐’๐’” in the areas of ๐’๐จ๐Ÿ๐ญ๐ฐ๐š๐ซ๐ž ๐ƒ๐ž๐ฏ๐ž๐ฅ๐จ๐ฉ๐ฆ๐ž๐ง๐ญ, ๐ƒ๐š๐ญ๐š ๐„๐ง๐ ๐ข๐ง๐ž๐ž๐ซ๐ข๐ง๐ , ๐‚๐ฅ๐จ๐ฎ๐, ๐ƒ๐š๐ญ๐š ๐€๐ง๐š๐ฅ๐ฒ๐ฌ๐ข๐ฌ, ๐ƒ๐š๐ญ๐š ๐•๐ข๐ฌ๐ฎ๐š๐ฅ๐ข๐ณ๐š๐ญ๐ข๐จ๐ง๐ฌ, ๐€๐ซ๐ญ๐ข๐Ÿ๐ข๐œ๐ข๐š๐ฅ ๐ˆ๐ง๐ญ๐ž๐ฅ๐ฅ๐ข๐ ๐ž๐ง๐œ๐ž ๐š๐ง๐ ๐Œ๐š๐œ๐ก๐ข๐ง๐ž ๐‹๐ž๐š๐ซ๐ง๐ข๐ง๐ . I am interested in ๐ฐ๐ซ๐ข๐ญ๐ข๐ง๐  ๐›๐ฅ๐จ๐ ๐ฌ, ๐ฌ๐ก๐š๐ซ๐ข๐ง๐  ๐ญ๐ž๐œ๐ก๐ง๐ข๐œ๐š๐ฅ ๐ค๐ง๐จ๐ฐ๐ฅ๐ž๐๐ ๐ž, ๐ฌ๐จ๐ฅ๐ฏ๐ข๐ง๐  ๐ญ๐ž๐œ๐ก๐ง๐ข๐œ๐š๐ฅ ๐ข๐ฌ๐ฌ๐ฎ๐ž๐ฌ, ๐ซ๐ž๐š๐๐ข๐ง๐  ๐š๐ง๐ ๐ฅ๐ž๐š๐ซ๐ง๐ข๐ง๐  new subjects.

This program demonstrates a real-time data pipeline using Spark Structured Streaming to handle deduplication and data quality enforcement on streaming data from CSV files.

Objective

The program achieves the following:

  1. Ingest data from a directory containing CSV files.

  2. Deduplicate records based on unique identifiers (event_id) and keep the latest record within a specified time window.

  3. Enforce data quality rules by filtering and handling invalid or missing data.

  4. Output processed data to the console for debugging and verification.

Code Walkthrough

Reading Streams

Spark Session Initialization

The program starts by initializing a SparkSession, which is the entry point for Spark Structured Streaming applications.

spark = SparkSession.builder \
    .appName("Streaming Deduplication and Quality Enforcement") \
    .master("local") \
    .getOrCreate()

Schema Definition

A custom schema is defined to specify the structure of the incoming CSV files:

CUSTOM_SCHEMA = StructType([
    StructField("event_id", StringType(), True),    # Unique event identifier
    StructField("name", StringType(), True),       # Name of the entity
    StructField("timestamp", TimestampType(), True),  # Event timestamp
    StructField("value", IntegerType(), True)      # Numeric value for the event
])

Reading Streaming Data

The program reads streaming data from a folder (../resources/dataset/data_quality/input) using the CSV format:

streaming_df = spark.readStream \
    .format("csv") \
    .schema(CUSTOM_SCHEMA) \
    .load("../resources/dataset/data_quality/input")

Deduplication Using Time-Based Windowing

Deduplication is achieved by grouping records using a time-based window and event_id. Records with the latest timestamp are kept.

Add Watermark and Grouping

streaming_df.withWatermark("timestamp", "10 minutes").groupBy(
    window(col("timestamp"), "10 minutes"),  # Group by 10-minute time windows
    col("event_id")                         # Group by event_id
)

Aggregate Functions

.agg(
    max("timestamp").alias("latest_timestamp"),  # Latest timestamp per group
    max("value").alias("value"),                # Latest value per group
    first("name").alias("name")                 # First non-null name
)

Data Quality Enforcement

Data quality is enforced by applying filters and handling null values:

quality_enforced_df = deduplicated_df \
    .filter((col("value").isNotNull()) & (col("value") > 0)) \
    .fillna({"name": "Unknown"})  # Replace null names

Writing to the Console

The processed data is written to the console using the writeStream method:

quality_enforced_df.writeStream \
    .format("console") \
    .outputMode("complete") \
    .start() \
    .awaitTermination()

Input Data

Input

event_id,name,timestamp,value
1,John,2024-12-10 10:00:00,50
1,John,2024-12-10 10:05:00,60
2,Alice,2024-12-10 10:03:00,30
3,,2024-12-10 10:10:00,-10

Output

+--------+-------------------+-----+-------+
|event_id|timestamp          |value|name   |
+--------+-------------------+-----+-------+
|1       |2024-12-10 10:05:00|60   |John   |
|2       |2024-12-10 10:03:00|30   |Alice  |
+--------+-------------------+-----+-------+

Complete Code

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

if __name__ == '__main__':
    # Create Spark session
    spark = SparkSession.builder \
        .appName("Streaming Deduplication and Quality Enforcement") \
        .master("local") \
        .getOrCreate()

    # Define custom schema
    CUSTOM_SCHEMA = StructType([
        StructField("event_id", StringType(), True),    # Unique event identifier
        StructField("name", StringType(), True),       # Name of the entity
        StructField("timestamp", TimestampType(), True),  # Event timestamp
        StructField("value", IntegerType(), True)      # Numeric value for the event
    ])

    # Source: Read streaming data from CSV files
    streaming_df = spark.readStream \
        .format("csv") \
        .schema(CUSTOM_SCHEMA) \
        .load("../resources/dataset/data_quality/input")

    # Deduplicate using time-based windowing
    deduplicated_df = streaming_df.withWatermark("timestamp", "10 minutes") \
        .groupBy(
            window(col("timestamp"), "10 minutes"),  # Group by time window
            col("event_id")                         # Group by event_id
        ) \
        .agg(
            max("timestamp").alias("latest_timestamp"),  # Keep the latest timestamp
            max("value").alias("value"),                # Keep the latest value
            first("name").alias("name")                 # Keep the first non-null name
        ) \
        .select(
            col("event_id"),
            col("latest_timestamp").alias("timestamp"),
            col("value"),
            col("name")
        )

    # Enforce data quality rules
    quality_enforced_df = deduplicated_df \
        .filter((col("value").isNotNull()) & (col("value") > 0)) \
        .fillna({"name": "Unknown"})  # Replace null names

    # Sink: Write the results to the console
    quality_enforced_df.writeStream.start(format="console",
                                          outputMode="complete").awaitTermination()

More from this blog

Naveen P.N's Tech Blog

95 posts