Skip to content

sparkdq.management

The sparkdq.management subpackage provides high-level coordination components for managing checks within the framework.

management

CheckSet

Centralized registry and lifecycle manager for data quality checks.

Orchestrates the complete lifecycle of data quality checks from configuration to execution readiness, providing a unified interface for check registration, organization, and retrieval. The CheckSet abstracts the complexity of check instantiation and type management, enabling clean separation between check definition and execution logic.

This design supports both programmatic and declarative check registration patterns while maintaining type safety and providing convenient filtering capabilities for different execution contexts.

Source code in sparkdq/management/check_set.py
class CheckSet:
    """
    Centralized registry and lifecycle manager for data quality checks.

    Orchestrates the complete lifecycle of data quality checks from configuration
    to execution readiness, providing a unified interface for check registration,
    organization, and retrieval. The CheckSet abstracts the complexity of check
    instantiation and type management, enabling clean separation between check
    definition and execution logic.

    This design supports both programmatic and declarative check registration
    patterns while maintaining type safety and providing convenient filtering
    capabilities for different execution contexts.
    """

    def __init__(self) -> None:
        """Initialize an empty CheckSet ready for check registration."""
        self._checks: List[BaseCheck] = []

    def add_check(self, config: BaseCheckConfig) -> "CheckSet":
        """
        Register a single check from a validated configuration object.

        Instantiates the check from the provided configuration and adds it to
        the internal registry. The fluent interface enables method chaining
        for convenient multi-check registration.

        Args:
            config (BaseCheckConfig): Validated configuration object containing
                all parameters required for check instantiation.

        Returns:
            CheckSet: This instance to enable fluent method chaining.
        """
        self._checks.append(config.to_check())
        return self

    def add_checks_from_dicts(self, configs: List[Dict[str, Any]]) -> None:
        """
        Register multiple checks from raw configuration dictionaries.

        Processes a collection of configuration dictionaries through the CheckFactory,
        performing validation and instantiation for each check definition. This method
        enables bulk registration from external configuration sources such as JSON,
        YAML, or database records.

        Args:
            configs (List[Dict[str, Any]]): Collection of configuration dictionaries,
                each containing the parameters required for a specific check type.
        """
        self._checks.extend(CheckFactory.from_list(configs))

    def get_all(self) -> List[BaseCheck]:
        """
        Retrieve the complete collection of registered checks.

        Returns:
            List[BaseCheck]: All checks currently managed by this CheckSet,
                regardless of their specific type or implementation.
        """
        return self._checks

    def get_row_checks(self) -> List[BaseRowCheck]:
        """
        Retrieve only the record-level validation checks.

        Filters the registered checks to return only those that operate on
        individual records, enabling targeted execution for row-level
        validation scenarios.

        Returns:
            List[BaseRowCheck]: Collection of checks that validate individual
                records within the dataset.
        """
        return [check for check in self._checks if isinstance(check, BaseRowCheck)]

    def get_aggregate_checks(self) -> List[BaseAggregateCheck]:
        """
        Retrieve only the dataset-level validation checks.

        Filters the registered checks to return only those that evaluate
        global dataset properties, enabling targeted execution for
        aggregate-level validation scenarios.

        Returns:
            List[BaseAggregateCheck]: Collection of checks that validate
                dataset-wide properties and constraints.
        """
        return [check for check in self._checks if isinstance(check, BaseAggregateCheck)]

    def clear(self) -> None:
        """
        Remove all registered checks from this CheckSet.

        Clears the internal check registry, returning the CheckSet to its
        initial empty state. This operation is useful for resetting between
        validation runs or when reconfiguring the check collection.
        """
        self._checks.clear()

    def __repr__(self) -> str:
        """
        Generate a concise developer-oriented representation of this CheckSet.

        Returns:
            str: Summary representation indicating the total number of
                registered checks for debugging and logging purposes.
        """
        return f"<CheckSet (total checks: {len(self._checks)})>"

    def __str__(self) -> str:
        """
        Generate a detailed human-readable listing of all registered checks.

        Produces a formatted string containing the check identifiers and types
        for all registered checks, providing comprehensive visibility into the
        current CheckSet configuration.

        Returns:
            str: Formatted listing of all registered checks with their
                identifiers and implementation types.
        """
        if not self._checks:
            return "CheckSet: No checks registered."

        lines = ["CheckSet:"]
        for check in self._checks:
            check_id = getattr(check, "check_id", "unknown-id")
            check_type = type(check).__name__
            lines.append(f"  - {check_id} ({check_type})")
        return "\n".join(lines)

__init__

__init__() -> None

Initialize an empty CheckSet ready for check registration.

Source code in sparkdq/management/check_set.py
def __init__(self) -> None:
    """Initialize an empty CheckSet ready for check registration."""
    self._checks: List[BaseCheck] = []

add_check

add_check(config: BaseCheckConfig) -> CheckSet

Register a single check from a validated configuration object.

Instantiates the check from the provided configuration and adds it to the internal registry. The fluent interface enables method chaining for convenient multi-check registration.

Parameters:

Name Type Description Default
config BaseCheckConfig

Validated configuration object containing all parameters required for check instantiation.

required

Returns:

Name Type Description
CheckSet CheckSet

This instance to enable fluent method chaining.

Source code in sparkdq/management/check_set.py
def add_check(self, config: BaseCheckConfig) -> "CheckSet":
    """
    Register a single check from a validated configuration object.

    Instantiates the check from the provided configuration and adds it to
    the internal registry. The fluent interface enables method chaining
    for convenient multi-check registration.

    Args:
        config (BaseCheckConfig): Validated configuration object containing
            all parameters required for check instantiation.

    Returns:
        CheckSet: This instance to enable fluent method chaining.
    """
    self._checks.append(config.to_check())
    return self

add_checks_from_dicts

add_checks_from_dicts(
    configs: List[Dict[str, Any]],
) -> None

Register multiple checks from raw configuration dictionaries.

Processes a collection of configuration dictionaries through the CheckFactory, performing validation and instantiation for each check definition. This method enables bulk registration from external configuration sources such as JSON, YAML, or database records.

Parameters:

Name Type Description Default
configs List[Dict[str, Any]]

Collection of configuration dictionaries, each containing the parameters required for a specific check type.

required
Source code in sparkdq/management/check_set.py
def add_checks_from_dicts(self, configs: List[Dict[str, Any]]) -> None:
    """
    Register multiple checks from raw configuration dictionaries.

    Processes a collection of configuration dictionaries through the CheckFactory,
    performing validation and instantiation for each check definition. This method
    enables bulk registration from external configuration sources such as JSON,
    YAML, or database records.

    Args:
        configs (List[Dict[str, Any]]): Collection of configuration dictionaries,
            each containing the parameters required for a specific check type.
    """
    self._checks.extend(CheckFactory.from_list(configs))

get_all

get_all() -> List[BaseCheck]

Retrieve the complete collection of registered checks.

Returns:

Type Description
List[BaseCheck]

List[BaseCheck]: All checks currently managed by this CheckSet, regardless of their specific type or implementation.

Source code in sparkdq/management/check_set.py
def get_all(self) -> List[BaseCheck]:
    """
    Retrieve the complete collection of registered checks.

    Returns:
        List[BaseCheck]: All checks currently managed by this CheckSet,
            regardless of their specific type or implementation.
    """
    return self._checks

get_row_checks

get_row_checks() -> List[BaseRowCheck]

Retrieve only the record-level validation checks.

Filters the registered checks to return only those that operate on individual records, enabling targeted execution for row-level validation scenarios.

Returns:

Type Description
List[BaseRowCheck]

List[BaseRowCheck]: Collection of checks that validate individual records within the dataset.

Source code in sparkdq/management/check_set.py
def get_row_checks(self) -> List[BaseRowCheck]:
    """
    Retrieve only the record-level validation checks.

    Filters the registered checks to return only those that operate on
    individual records, enabling targeted execution for row-level
    validation scenarios.

    Returns:
        List[BaseRowCheck]: Collection of checks that validate individual
            records within the dataset.
    """
    return [check for check in self._checks if isinstance(check, BaseRowCheck)]

get_aggregate_checks

get_aggregate_checks() -> List[BaseAggregateCheck]

Retrieve only the dataset-level validation checks.

Filters the registered checks to return only those that evaluate global dataset properties, enabling targeted execution for aggregate-level validation scenarios.

Returns:

Type Description
List[BaseAggregateCheck]

List[BaseAggregateCheck]: Collection of checks that validate dataset-wide properties and constraints.

Source code in sparkdq/management/check_set.py
def get_aggregate_checks(self) -> List[BaseAggregateCheck]:
    """
    Retrieve only the dataset-level validation checks.

    Filters the registered checks to return only those that evaluate
    global dataset properties, enabling targeted execution for
    aggregate-level validation scenarios.

    Returns:
        List[BaseAggregateCheck]: Collection of checks that validate
            dataset-wide properties and constraints.
    """
    return [check for check in self._checks if isinstance(check, BaseAggregateCheck)]

clear

clear() -> None

Remove all registered checks from this CheckSet.

Clears the internal check registry, returning the CheckSet to its initial empty state. This operation is useful for resetting between validation runs or when reconfiguring the check collection.

Source code in sparkdq/management/check_set.py
def clear(self) -> None:
    """
    Remove all registered checks from this CheckSet.

    Clears the internal check registry, returning the CheckSet to its
    initial empty state. This operation is useful for resetting between
    validation runs or when reconfiguring the check collection.
    """
    self._checks.clear()

__repr__

__repr__() -> str

Generate a concise developer-oriented representation of this CheckSet.

Returns:

Name Type Description
str str

Summary representation indicating the total number of registered checks for debugging and logging purposes.

Source code in sparkdq/management/check_set.py
def __repr__(self) -> str:
    """
    Generate a concise developer-oriented representation of this CheckSet.

    Returns:
        str: Summary representation indicating the total number of
            registered checks for debugging and logging purposes.
    """
    return f"<CheckSet (total checks: {len(self._checks)})>"

__str__

__str__() -> str

Generate a detailed human-readable listing of all registered checks.

Produces a formatted string containing the check identifiers and types for all registered checks, providing comprehensive visibility into the current CheckSet configuration.

Returns:

Name Type Description
str str

Formatted listing of all registered checks with their identifiers and implementation types.

Source code in sparkdq/management/check_set.py
def __str__(self) -> str:
    """
    Generate a detailed human-readable listing of all registered checks.

    Produces a formatted string containing the check identifiers and types
    for all registered checks, providing comprehensive visibility into the
    current CheckSet configuration.

    Returns:
        str: Formatted listing of all registered checks with their
            identifiers and implementation types.
    """
    if not self._checks:
        return "CheckSet: No checks registered."

    lines = ["CheckSet:"]
    for check in self._checks:
        check_id = getattr(check, "check_id", "unknown-id")
        check_type = type(check).__name__
        lines.append(f"  - {check_id} ({check_type})")
    return "\n".join(lines)