singer_sdk.BatchSink¶
- class singer_sdk.BatchSink[source]¶
Base class for batched record writers.
This class supports both legacy and modern loading patterns:
- Legacy pattern (deprecated but supported):
Override process_batch(context: dict) to load context[“records”]
- Modern pattern (recommended):
Override load_batch(batch: BatchContext) for type-safe, clear loading
- Example (modern pattern):
- class MyBatchSink(BatchSink):
- def load_batch(self, batch: BatchContext) -> None:
# Type-safe access to batch data self.api.bulk_insert(batch.records)
- Example (legacy pattern - still works):
- class MyLegacySink(BatchSink):
- def process_batch(self, context: dict) -> None:
# Old style - will show deprecation warning self.api.bulk_insert(context[“records”])
- __init__(*args, batch_strategy=None, **kwargs)[source]¶
Initialize the batch sink.
- Parameters:
*args (t.Any) – Positional arguments to pass to parent Sink.
batch_strategy (BatchStrategy | None) – Optional batch strategy for custom batching behavior. If not provided, uses default record-count based batching.
**kwargs (t.Any) – Keyword arguments to pass to parent Sink.
- Return type:
None
- load_batch(batch)[source]¶
Load an accumulated batch of records to the target.
Modern pattern (recommended): Override this method for type-safe, clear batch loading.
This method provides a cleaner, more maintainable alternative to
process_batch(). It receives a strongly-typedBatchContextinstead of a generic dict, providing IDE autocomplete and type safety.Example
- class MyBatchSink(BatchSink):
- def load_batch(self, batch: BatchContext) -> None:
# Type-safe access to batch data records = batch.records # IDE knows this is list[dict] batch_id = batch.batch_id # IDE knows this is str self.api.bulk_insert(records)
- Parameters:
batch (BatchContext) – The batch context containing records and metadata.
- Return type:
None
Note
If you override this method, you do NOT need to override
process_batch(). The framework will automatically call this method instead.
- process_batch(context)[source]¶
Process a batch with the given batch context.
Legacy pattern (deprecated but supported): Override this method for dict-based batch loading.
Modern pattern (recommended): Override
load_batch()instead for type-safe batch loading withBatchContext.If you override
load_batch(), you do NOT need to override this method. The framework will automatically route to yourload_batch()implementation.If
process_record()is not overridden, the context[“records”] list will contain all records from the given batch context.If duplicates are merged, these can be tracked via
tally_duplicate_merged().- Parameters:
context (dict) – Stream partition or context dictionary.
- Raises:
NotImplementedError – If neither process_batch nor load_batch is overridden.
- Return type:
None
- process_record(record, context)[source]¶
Load the latest record from the stream.
Developers may either load to the context dict for staging (the default behavior for Batch types), or permanently write out to the target.
If this method is not overridden, the default implementation will create a context[“records”] list and append all records for processing during
process_batch().If duplicates are merged, these can be tracked via
tally_duplicate_merged().
- start_batch(context)[source]¶
Start a new batch with the given context.
The SDK-generated context will contain batch_id (GUID string) and batch_start_time (datetime).
Developers may optionally override this method to add custom markers to the context dict and/or to initialize batch resources - such as initializing a local temp file to hold batch records before uploading.
- Parameters:
context (dict) – Stream partition or context dictionary.
- Return type:
None