Source code for sparkdq.checks.row_level.null_checks.null_check
from typing import List
from pydantic import Field
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from sparkdq.core.base_check import BaseRowCheck
from sparkdq.core.base_config import BaseRowCheckConfig
from sparkdq.core.severity import Severity
from sparkdq.exceptions import MissingColumnError
from sparkdq.plugin.check_config_registry import register_check_config
class NullCheck(BaseRowCheck):
"""
Row-level data quality check that flags null values in one or more specified columns.
This check appends a boolean result column to the input DataFrame.
A row is marked as failed (True) if **any** of the target columns are null.
Attributes:
columns (List[str]): Names of the columns to inspect for null values.
"""
def __init__(self, check_id: str, columns: List[str], severity: Severity = Severity.CRITICAL):
"""
Initialize a NullCheck instance.
Args:
check_id (str): Unique identifier for the check instance.
columns (List[str]): Names of the columns to check for null values.
severity (Severity, optional): Severity level of the check result.
Defaults to Severity.CRITICAL.
"""
super().__init__(check_id=check_id, severity=severity)
self.columns = columns
def validate(self, df: DataFrame) -> DataFrame:
"""
Execute the null check on the given DataFrame.
This method appends a new boolean column (named after `check_id`) that indicates
for each row whether **any** of the target columns are null.
Args:
df (DataFrame): The input Spark DataFrame to validate.
Returns:
DataFrame: A new DataFrame with an additional boolean column where
`True` indicates a null value (i.e. check failed), and `False` means valid.
Raises:
MissingColumnError: If any of the specified columns do not exist in the DataFrame.
"""
for column in self.columns:
if column not in df.columns:
raise MissingColumnError(column, df.columns)
# Build a Spark array column where each element checks if the corresponding column is NULL
null_checks = F.array(*[F.col(c).isNull() for c in self.columns])
# Reduce the array by OR-ing all null checks
any_null_expr = F.aggregate(null_checks, F.lit(False), lambda acc, x: acc | x)
return self.with_check_result_column(df, any_null_expr)
[docs]
@register_check_config(check_name="null-check")
class NullCheckConfig(BaseRowCheckConfig):
"""
Declarative configuration model for the NullCheck.
Attributes:
columns (List[str]): The names of the columns to check for null values.
This is a required field and must match existing columns in the DataFrame.
"""
check_class = NullCheck
columns: List[str] = Field(..., description="The list of columns to check for null values")