Source code for sparkdq.checks.row_level.null_checks.null_check
from typing import List
from pydantic import Field
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from sparkdq.core.base_check import BaseRowCheck
from sparkdq.core.base_config import BaseRowCheckConfig
from sparkdq.core.severity import Severity
from sparkdq.exceptions import MissingColumnError
from sparkdq.plugin.check_config_registry import register_check_config
class NullCheck(BaseRowCheck):
"""
Record-level validation check that identifies rows containing null values.
Validates that specified columns contain non-null values across all records,
flagging any rows where these columns are null. This check is essential for
enforcing data completeness requirements and identifying missing data issues
that could impact downstream processing or analysis.
The check uses OR-based failure logic, where any null value in the configured
columns marks the entire record as invalid.
Attributes:
columns (List[str]): Column names that must contain non-null values.
"""
def __init__(self, check_id: str, columns: List[str], severity: Severity = Severity.CRITICAL):
"""
Initialize the null value validation check with target columns and configuration.
Args:
check_id (str): Unique identifier for this check instance.
columns (List[str]): Column names that must contain non-null values.
severity (Severity, optional): Classification level for validation failures.
Defaults to Severity.CRITICAL.
"""
super().__init__(check_id=check_id, severity=severity)
self.columns = columns
def validate(self, df: DataFrame) -> DataFrame:
"""
Execute the null value validation logic against the configured columns.
Performs schema validation to ensure all target columns exist, then applies
null detection logic with OR-based failure semantics. Records fail validation
when any configured column contains null values.
Args:
df (DataFrame): The dataset to validate for null value compliance.
Returns:
DataFrame: Original dataset augmented with a boolean result column where
True indicates validation failure (null values detected) and False
indicates compliance with non-null requirements.
Raises:
MissingColumnError: When any configured column is not present in the
dataset schema, indicating a configuration mismatch.
"""
for column in self.columns:
if column not in df.columns:
raise MissingColumnError(column, df.columns)
# Build a Spark array column where each element checks if the corresponding column is NULL
null_checks = F.array(*[F.col(c).isNull() for c in self.columns])
# Reduce the array by OR-ing all null checks
any_null_expr = F.aggregate(null_checks, F.lit(False), lambda acc, x: acc | x)
return self.with_check_result_column(df, any_null_expr)
[docs]
@register_check_config(check_name="null-check")
class NullCheckConfig(BaseRowCheckConfig):
"""
Configuration schema for null value validation checks.
Defines the parameters required for configuring checks that enforce non-null
value requirements. This configuration enables declarative check definition
through external configuration sources while ensuring parameter validity.
Attributes:
columns (List[str]): Column names that must contain non-null values for
records to pass validation.
"""
check_class = NullCheck
columns: List[str] = Field(..., description="The list of columns to check for null values")