Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions singer_sdk/contrib/filesystem/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import annotations

import abc
import functools
import sys
import typing as t

Expand Down Expand Up @@ -66,6 +65,7 @@ def __init__(
self.filesystem = filesystem

super().__init__(tap, schema=None, name=name)
self._schema = self._get_full_schema()

# TODO(edgarrmondragon): Make this None if the filesystem does not support it.
self.replication_key = SDC_META_MODIFIED_AT
Expand All @@ -89,12 +89,6 @@ def _get_full_schema(self) -> dict[str, t.Any]:
schema["properties"].update(self.SDC_PROPERTIES)
return schema

@functools.cached_property
@override
def schema(self) -> dict[str, t.Any]:
"""Return the schema for the stream."""
return self._get_full_schema()

@override
def get_records(
self,
Expand Down
84 changes: 78 additions & 6 deletions singer_sdk/schema/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
else:
from typing_extensions import assert_never

if sys.version_info >= (3, 11):
from typing import Self # noqa: ICN003
else:
from typing_extensions import Self

if sys.version_info >= (3, 12):
from typing import override # noqa: ICN003
else:
Expand Down Expand Up @@ -74,6 +79,7 @@ def preprocess_schema(
Returns:
The preprocessed schema.
"""
...


class SchemaSource(ABC, t.Generic[_TKey]):
Expand Down Expand Up @@ -186,7 +192,7 @@ class MyStream(Stream):

def __init__(
self,
schema_source: SchemaSource[_TKey],
schema_source: SchemaSource[_TKey] | None = None,
*,
key: _TKey | None = None,
) -> None:
Expand All @@ -200,18 +206,37 @@ def __init__(
self.schema_source = schema_source
self.key = key

@t.overload
def __get__(self, obj: None, objtype: type[Stream] | None = ...) -> Self: ...
@t.overload
def __get__(self, obj: Stream, objtype: type[Stream] | None = ...) -> Schema: ...
@t.final
def __get__(self, obj: Stream, objtype: type[Stream]) -> Schema:
def __get__(
self,
obj: Stream | None,
objtype: type[Stream] | None = None,
) -> Schema | Self:
"""Get the schema from the schema source.

Args:
obj: The object to get the schema from.
objtype: The type of the object to get the schema from.
obj: The stream instance, or ``None`` when accessed from the class.
objtype: The stream class.

Returns:
A JSON schema dictionary.
A JSON schema dictionary, or the descriptor itself for class-level access.
"""
if obj is None:
return self
return self.get_stream_schema(obj, objtype or type(obj))

def __set__(self, obj: Stream, _value: object) -> None:
"""Raise AttributeError; schema is read-only on instances.

Raises:
AttributeError: Always.
"""
return self.get_stream_schema(obj, objtype)
msg = f"'schema' is read-only on {type(obj).__name__}"
raise AttributeError(msg)

def get_stream_schema(
self,
Expand All @@ -226,13 +251,60 @@ def get_stream_schema(

Returns:
A JSON schema dictionary.

Raises:
DiscoveryError: If no schema source is configured and this method is not
overridden.
"""
if self.schema_source is None:
msg = (
f"No schema source configured for {type(self).__name__!r}; "
"either pass a schema_source or override get_stream_schema()."
)
raise DiscoveryError(msg)
return self.schema_source.get_schema(
self.key or stream.name, # type: ignore[arg-type] # ty:ignore[invalid-argument-type]
key_properties=stream.primary_keys,
)


class _InstanceSchemaDescriptor(StreamSchema[str]):
"""Fallback descriptor placed on the base ``Stream`` class.

When no class-level ``schema`` is declared on a stream subclass, this
descriptor is found in the MRO and forwards the attribute access to the
instance's ``_schema`` attribute (set by ``Stream.__init__`` or by the
catalog-loading path).
"""

def __init__(self) -> None:
"""Initialize with no schema source."""
super().__init__()

@override
def get_stream_schema(
self,
stream: _TStream,
stream_class: type[_TStream],
) -> Schema:
"""Return the schema stored on the stream instance.

Args:
stream: The stream instance.
stream_class: The stream class (unused).

Returns:
The JSON schema dictionary.

Raises:
DiscoveryError: If no schema has been set on the instance.
"""
if stream._schema is None: # noqa: SLF001
msg = f"The schema for stream '{stream.name}' was not provided"
raise DiscoveryError(msg)
return stream._schema # noqa: SLF001


def _load_yaml(content: bytes) -> dict[str, t.Any]:
import yaml # noqa: PLC0415

Expand Down
16 changes: 1 addition & 15 deletions singer_sdk/sql/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import abc
import typing as t
from functools import cached_property

import sqlalchemy as sa

Expand All @@ -28,8 +27,6 @@ class SQLStream(Stream, abc.ABC):
"""Base class for SQLAlchemy-based streams."""

connector_class = SQLConnector
_cached_schema: dict | None = None

supports_nulls_first: bool = False
"""Whether the database supports the NULLS FIRST/LAST syntax."""

Expand All @@ -53,7 +50,7 @@ def __init__(
self.catalog_entry = catalog_entry
super().__init__(
tap=tap,
schema=self.schema,
schema=CatalogEntry.from_dict(catalog_entry).schema,
name=self.tap_stream_id,
)

Expand Down Expand Up @@ -86,17 +83,6 @@ def metadata(self) -> MetadataMapping:
"""
return self._singer_catalog_entry.metadata

@cached_property
def schema(self) -> dict:
"""Return metadata object (dict) as specified in the Singer spec.

Metadata from an input catalog will override standard metadata.

Returns:
The schema object.
"""
return self._singer_catalog_entry.schema.to_dict()

@property
def tap_stream_id(self) -> str:
"""Return the unique ID used by the tap to identify this stream.
Expand Down
27 changes: 11 additions & 16 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from singer_sdk.exceptions import (
AbortedSyncFailedException,
AbortedSyncPausedException,
DiscoveryError,
InvalidReplicationKeyException,
InvalidStreamSortException,
MaxRecordsLimitException,
Expand All @@ -35,6 +34,7 @@
from singer_sdk.helpers._util import utc_now
from singer_sdk.helpers.conform import TypeConformanceLevel
from singer_sdk.mapper import RemoveRecordTransform, SameRecordTransform
from singer_sdk.schema.source import _InstanceSchemaDescriptor
from singer_sdk.singerlib.catalog import (
REPLICATION_FULL_TABLE,
REPLICATION_INCREMENTAL,
Expand All @@ -47,6 +47,7 @@
from singer_sdk.helpers._batch import BaseBatchFileEncoding
from singer_sdk.helpers._compat import Traversable
from singer_sdk.mapper import StreamMap
from singer_sdk.schema.source import StreamSchema
from singer_sdk.singerlib.catalog import StreamMetadata
from singer_sdk.tap_base import Tap

Expand Down Expand Up @@ -102,6 +103,15 @@
selected_by_default: bool = True
"""Whether this stream is selected by default in the catalog."""

schema: t.ClassVar[StreamSchema[t.Any]] = _InstanceSchemaDescriptor()

Check warning on line 106 in singer_sdk/streams/core.py

View workflow job for this annotation

GitHub Actions / Check API Changes

Stream.schema

Attribute value was changed: `None` -> `_InstanceSchemaDescriptor()`
"""JSON schema for this stream.

Can be set at the class level to a :class:`~singer_sdk.StreamSchema` descriptor
(e.g. ``StreamSchema(SchemaDirectory("schemas"))``) or to a plain
``dict[str, Any]`` JSON Schema object. When neither is provided the schema
must be supplied via the *schema* constructor argument.
"""

def __init__(
self,
tap: Tap,
Expand Down Expand Up @@ -514,21 +524,6 @@
"""
return self._schema_filepath

@property
def schema(self) -> dict:
"""Get schema.

Returns:
JSON Schema dictionary for this stream.

Raises:
DiscoveryError: If the schema was not provided.
"""
if self._schema is None:
msg = f"The schema for stream '{self.name}' was not provided"
raise DiscoveryError(msg)
return self._schema

@property
def primary_keys(self) -> t.Sequence[str]:
"""Get primary keys.
Expand Down
Loading