diff --git a/singer_sdk/contrib/filesystem/stream.py b/singer_sdk/contrib/filesystem/stream.py index f17b2c1719..17575227ce 100644 --- a/singer_sdk/contrib/filesystem/stream.py +++ b/singer_sdk/contrib/filesystem/stream.py @@ -3,7 +3,6 @@ from __future__ import annotations import abc -import functools import sys import typing as t @@ -66,6 +65,7 @@ def __init__( self.filesystem = filesystem super().__init__(tap, schema=None, name=name) + self._schema = self._get_full_schema() # TODO(edgarrmondragon): Make this None if the filesystem does not support it. self.replication_key = SDC_META_MODIFIED_AT @@ -89,12 +89,6 @@ def _get_full_schema(self) -> dict[str, t.Any]: schema["properties"].update(self.SDC_PROPERTIES) return schema - @functools.cached_property - @override - def schema(self) -> dict[str, t.Any]: - """Return the schema for the stream.""" - return self._get_full_schema() - @override def get_records( self, diff --git a/singer_sdk/schema/source.py b/singer_sdk/schema/source.py index bf9056605a..f5e8581933 100644 --- a/singer_sdk/schema/source.py +++ b/singer_sdk/schema/source.py @@ -29,6 +29,11 @@ else: from typing_extensions import assert_never +if sys.version_info >= (3, 11): + from typing import Self # noqa: ICN003 +else: + from typing_extensions import Self + if sys.version_info >= (3, 12): from typing import override # noqa: ICN003 else: @@ -74,6 +79,7 @@ def preprocess_schema( Returns: The preprocessed schema. """ + ... class SchemaSource(ABC, t.Generic[_TKey]): @@ -186,7 +192,7 @@ class MyStream(Stream): def __init__( self, - schema_source: SchemaSource[_TKey], + schema_source: SchemaSource[_TKey] | None = None, *, key: _TKey | None = None, ) -> None: @@ -200,18 +206,37 @@ def __init__( self.schema_source = schema_source self.key = key + @t.overload + def __get__(self, obj: None, objtype: type[Stream] | None = ...) -> Self: ... + @t.overload + def __get__(self, obj: Stream, objtype: type[Stream] | None = ...) -> Schema: ... @t.final - def __get__(self, obj: Stream, objtype: type[Stream]) -> Schema: + def __get__( + self, + obj: Stream | None, + objtype: type[Stream] | None = None, + ) -> Schema | Self: """Get the schema from the schema source. Args: - obj: The object to get the schema from. - objtype: The type of the object to get the schema from. + obj: The stream instance, or ``None`` when accessed from the class. + objtype: The stream class. Returns: - A JSON schema dictionary. + A JSON schema dictionary, or the descriptor itself for class-level access. + """ + if obj is None: + return self + return self.get_stream_schema(obj, objtype or type(obj)) + + def __set__(self, obj: Stream, _value: object) -> None: + """Raise AttributeError; schema is read-only on instances. + + Raises: + AttributeError: Always. """ - return self.get_stream_schema(obj, objtype) + msg = f"'schema' is read-only on {type(obj).__name__}" + raise AttributeError(msg) def get_stream_schema( self, @@ -226,13 +251,60 @@ def get_stream_schema( Returns: A JSON schema dictionary. + + Raises: + DiscoveryError: If no schema source is configured and this method is not + overridden. """ + if self.schema_source is None: + msg = ( + f"No schema source configured for {type(self).__name__!r}; " + "either pass a schema_source or override get_stream_schema()." + ) + raise DiscoveryError(msg) return self.schema_source.get_schema( self.key or stream.name, # type: ignore[arg-type] # ty:ignore[invalid-argument-type] key_properties=stream.primary_keys, ) +class _InstanceSchemaDescriptor(StreamSchema[str]): + """Fallback descriptor placed on the base ``Stream`` class. + + When no class-level ``schema`` is declared on a stream subclass, this + descriptor is found in the MRO and forwards the attribute access to the + instance's ``_schema`` attribute (set by ``Stream.__init__`` or by the + catalog-loading path). + """ + + def __init__(self) -> None: + """Initialize with no schema source.""" + super().__init__() + + @override + def get_stream_schema( + self, + stream: _TStream, + stream_class: type[_TStream], + ) -> Schema: + """Return the schema stored on the stream instance. + + Args: + stream: The stream instance. + stream_class: The stream class (unused). + + Returns: + The JSON schema dictionary. + + Raises: + DiscoveryError: If no schema has been set on the instance. + """ + if stream._schema is None: # noqa: SLF001 + msg = f"The schema for stream '{stream.name}' was not provided" + raise DiscoveryError(msg) + return stream._schema # noqa: SLF001 + + def _load_yaml(content: bytes) -> dict[str, t.Any]: import yaml # noqa: PLC0415 diff --git a/singer_sdk/sql/stream.py b/singer_sdk/sql/stream.py index e87210434d..801d038359 100644 --- a/singer_sdk/sql/stream.py +++ b/singer_sdk/sql/stream.py @@ -4,7 +4,6 @@ import abc import typing as t -from functools import cached_property import sqlalchemy as sa @@ -28,8 +27,6 @@ class SQLStream(Stream, abc.ABC): """Base class for SQLAlchemy-based streams.""" connector_class = SQLConnector - _cached_schema: dict | None = None - supports_nulls_first: bool = False """Whether the database supports the NULLS FIRST/LAST syntax.""" @@ -53,7 +50,7 @@ def __init__( self.catalog_entry = catalog_entry super().__init__( tap=tap, - schema=self.schema, + schema=CatalogEntry.from_dict(catalog_entry).schema, name=self.tap_stream_id, ) @@ -86,17 +83,6 @@ def metadata(self) -> MetadataMapping: """ return self._singer_catalog_entry.metadata - @cached_property - def schema(self) -> dict: - """Return metadata object (dict) as specified in the Singer spec. - - Metadata from an input catalog will override standard metadata. - - Returns: - The schema object. - """ - return self._singer_catalog_entry.schema.to_dict() - @property def tap_stream_id(self) -> str: """Return the unique ID used by the tap to identify this stream. diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 6477342af5..56db628ad5 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -19,7 +19,6 @@ from singer_sdk.exceptions import ( AbortedSyncFailedException, AbortedSyncPausedException, - DiscoveryError, InvalidReplicationKeyException, InvalidStreamSortException, MaxRecordsLimitException, @@ -35,6 +34,7 @@ from singer_sdk.helpers._util import utc_now from singer_sdk.helpers.conform import TypeConformanceLevel from singer_sdk.mapper import RemoveRecordTransform, SameRecordTransform +from singer_sdk.schema.source import _InstanceSchemaDescriptor from singer_sdk.singerlib.catalog import ( REPLICATION_FULL_TABLE, REPLICATION_INCREMENTAL, @@ -47,6 +47,7 @@ from singer_sdk.helpers._batch import BaseBatchFileEncoding from singer_sdk.helpers._compat import Traversable from singer_sdk.mapper import StreamMap + from singer_sdk.schema.source import StreamSchema from singer_sdk.singerlib.catalog import StreamMetadata from singer_sdk.tap_base import Tap @@ -102,6 +103,15 @@ class Stream(abc.ABC): # noqa: PLR0904 selected_by_default: bool = True """Whether this stream is selected by default in the catalog.""" + schema: t.ClassVar[StreamSchema[t.Any]] = _InstanceSchemaDescriptor() + """JSON schema for this stream. + + Can be set at the class level to a :class:`~singer_sdk.StreamSchema` descriptor + (e.g. ``StreamSchema(SchemaDirectory("schemas"))``) or to a plain + ``dict[str, Any]`` JSON Schema object. When neither is provided the schema + must be supplied via the *schema* constructor argument. + """ + def __init__( self, tap: Tap, @@ -514,21 +524,6 @@ def schema_filepath(self) -> Path | Traversable | None: """ return self._schema_filepath - @property - def schema(self) -> dict: - """Get schema. - - Returns: - JSON Schema dictionary for this stream. - - Raises: - DiscoveryError: If the schema was not provided. - """ - if self._schema is None: - msg = f"The schema for stream '{self.name}' was not provided" - raise DiscoveryError(msg) - return self._schema - @property def primary_keys(self) -> t.Sequence[str]: """Get primary keys.