Implementation¶
Place your checks in a dedicated package and expose all config classes from __init__.py so registration decorators run on import:
# __init__.py
from .row_checks import MyRowCheckConfig
from .aggregate_checks import MyAggregateCheckConfig
Once the package is structured this way, register all checks with a single call before building a CheckSet:
Row-Level Check¶
Rows where the condition evaluates to True are marked as failing.
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from sparkdq.core import BaseRowCheck, Severity
from sparkdq.plugin import register_check_config
class PositiveValueCheck(BaseRowCheck):
def __init__(self, check_id: str, column: str, severity: Severity = Severity.CRITICAL):
super().__init__(check_id=check_id, severity=severity)
self.column = column
def validate(self, df: DataFrame) -> DataFrame:
condition = F.col(self.column) <= 0
return self.with_check_result_column(df, condition)
@register_check_config(check_name="positive-value")
class PositiveValueCheckConfig(BaseRowCheckConfig):
column: str
check_class = PositiveValueCheck
Aggregate Check¶
Aggregate checks evaluate the dataset as a whole. Implement _evaluate_logic(df) and return an AggregateEvaluationResult with a passed flag and a metrics dictionary.
from pydantic import Field, model_validator
from pyspark.sql import DataFrame
from sparkdq.core import AggregateEvaluationResult, BaseAggregateCheck, BaseAggregateCheckConfig, Severity
from sparkdq.exceptions import InvalidCheckConfigurationError
from sparkdq.plugin import register_check_config
class RowCountMinCheck(BaseAggregateCheck):
def __init__(self, check_id: str, min_count: int, severity: Severity = Severity.CRITICAL):
super().__init__(check_id=check_id, severity=severity)
self.min_count = min_count
def _evaluate_logic(self, df: DataFrame) -> AggregateEvaluationResult:
actual = df.count()
return AggregateEvaluationResult(
passed=actual >= self.min_count,
metrics={"actual_row_count": actual, "min_expected": self.min_count},
)
@register_check_config(check_name="custom-row-count-min")
class RowCountMinCheckConfig(BaseAggregateCheckConfig):
check_class = RowCountMinCheck
min_count: int = Field(..., alias="min-count")
@model_validator(mode="after")
def validate_min(self) -> "RowCountMinCheckConfig":
if self.min_count <= 0:
raise InvalidCheckConfigurationError(
f"min_count ({self.min_count}) must be greater than 0"
)
return self
Your custom checks are now fully integrated into SparkDQ — validated, declarative, and indistinguishable from built-in checks at runtime.