Skip to main content
Version: devel


StorageSchemaInfo Objects

class StorageSchemaInfo(NamedTuple)



def from_normalized_mapping(
cls, normalized_doc: Dict[str, Any],
naming_convention: NamingConvention) -> "StorageSchemaInfo"


Instantiate this class from mapping where keys are normalized according to given naming convention


  • normalized_doc - Mapping with normalized keys (e.g. {Version: ..., SchemaName: ...})
  • naming_convention - Naming convention that was used to normalize keys


  • StorageSchemaInfo - Instance of this class

StateInfo Objects

class StateInfo()



def from_normalized_mapping(
cls, normalized_doc: Dict[str, Any],
naming_convention: NamingConvention) -> "StateInfo"


Instantiate this class from mapping where keys are normalized according to given naming convention


  • normalized_doc - Mapping with normalized keys (e.g. {Version: ..., PipelineName: ...})
  • naming_convention - Naming convention that was used to normalize keys


  • StateInfo - Instance of this class

DestinationClientConfiguration Objects

class DestinationClientConfiguration(BaseConfiguration)



which destination to load data to


def fingerprint() -> str


Returns a destination fingerprint which is a hash of selected configuration fields. ie. host in case of connection string


def __str__() -> str


Return displayable destination location


def credentials_type(
config: "DestinationClientConfiguration" = None
) -> Type[CredentialsConfiguration]


Figure out credentials type, using hint resolvers for dynamic types

For correct type resolution of filesystem, config should have bucket_url populated

DestinationClientDwhConfiguration Objects

class DestinationClientDwhConfiguration(DestinationClientConfiguration)


Configuration of a destination that supports datasets/schemas


dataset name in the destination to load data to, for schemas that are not default schema, it is used as dataset prefix


name of default schema to be used to name effective dataset to load data to


How to handle replace disposition for this destination, can be classic or staging


Layout for staging dataset, where %s is replaced with dataset name. placeholder is optional


Whether to normalize the dataset name. Affects staging dataset as well.


def normalize_dataset_name(schema: Schema) -> str


Builds full db dataset (schema) name out of configured dataset name and schema name: {datasetname}{}. The resulting name is normalized.

If default schema name is None or equals, the schema suffix is skipped.


def normalize_staging_dataset_name(schema: Schema) -> str


Builds staging dataset name out of dataset_name and staging_dataset_name_layout.

DestinationClientStagingConfiguration Objects

class DestinationClientStagingConfiguration(DestinationClientDwhConfiguration)


Configuration of a staging destination, able to store files with desired layout at bucket_url.

Also supports datasets and can act as standalone destination.

DestinationClientDwhWithStagingConfiguration Objects

class DestinationClientDwhWithStagingConfiguration(


Configuration of a destination that can take data from staging destination


configuration of the staging, if present, injected at runtime


If dlt should truncate the tables on staging destination before loading data.

LoadJob Objects

class LoadJob(ABC)


A stateful load job, represents one job file


def job_id() -> str


The job id that is derived from the file name and does not changes during job lifecycle


def file_name() -> str


A name of the job file


def state() -> TLoadJobState


Returns current state. Should poll external resource if necessary.


def exception() -> str


The exception associated with failed or retry states


def metrics() -> Optional[LoadJobMetrics]


Returns job execution metrics

RunnableLoadJob Objects

class RunnableLoadJob(LoadJob, ABC)


Represents a runnable job that loads a single file

Each job starts in "running" state and ends in one of terminal states: "retry", "failed" or "completed". Each job is uniquely identified by a file name. The file is guaranteed to exist in "running" state. In terminal state, the file may not be present. In "running" state, the loader component periodically gets the state via status() method. When terminal state is reached, load job is discarded and not called again. exception method is called to get error information in "failed" and "retry" states.

The __init__ method is responsible to put the Job in "running" state. It may raise LoadClientTerminalException and LoadClientTransientException to immediately transition job into "failed" or "retry" state respectively.


def __init__(file_path: str) -> None


File name is also a job id (or job id is deterministically derived) so it must be globally unique


def set_run_vars(load_id: str, schema: Schema,
load_table: TTableSchema) -> None


called by the loader right before the job is run


def run_managed(job_client: "JobClientBase") -> None


wrapper around the user implemented run method


def run() -> None


run the actual job, this will be executed on a thread and should be implemented by the user exception will be handled outside of this function


def state() -> TLoadJobState


Returns current state. Should poll external resource if necessary.


def exception() -> str


The exception associated with failed or retry states

FollowupJobRequest Objects

class FollowupJobRequest()


Base class for follow up jobs that should be created


def new_file_path() -> str


Path to a newly created temporary job file. If empty, no followup job should be created

HasFollowupJobs Objects

class HasFollowupJobs()


Adds a trait that allows to create single or table chain followup jobs


def create_followup_jobs(
final_state: TLoadJobState) -> List[FollowupJobRequest]


Return list of jobs requests for jobs that should be created. final_state is state to which this job transits

JobClientBase Objects

class JobClientBase(ABC)



def initialize_storage(truncate_tables: Iterable[str] = None) -> None


Prepares storage to be used ie. creates database schema or file system folder. Truncates requested tables.


def is_storage_initialized() -> bool


Returns if storage is ready to be read/written.


def drop_storage() -> None


Brings storage back into not initialized state. Typically data in storage is destroyed.


def update_stored_schema(
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None) -> Optional[TSchemaTables]


Updates storage to the current schema.

Implementations should not assume that expected_update is the exact difference between destination state and the self.schema. This is only the case if destination has single writer and no other processes modify the schema.


  • only_tables Sequence[str], optional - Updates only listed tables. Defaults to None.
  • expected_update TSchemaTables, optional - Update that is expected to be applied to the destination


  • Optional[TSchemaTables] - Returns an update that was applied at the destination.


def create_load_job(table: TTableSchema,
file_path: str,
load_id: str,
restore: bool = False) -> LoadJob


Creates a load job for a particular table with content in file_path


def prepare_load_job_execution(job: RunnableLoadJob) -> None


Prepare the connected job client for the execution of a load job (used for query tags in sql clients)


def create_table_chain_completed_followup_jobs(
table_chain: Sequence[TTableSchema],
completed_table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None
) -> List[FollowupJobRequest]


Creates a list of followup jobs that should be executed after a table chain is completed


def complete_load(load_id: str) -> None


Marks the load package with load_id as completed in the destination. Before such commit is done, the data with load_id is invalid.

WithStateSync Objects

class WithStateSync(ABC)



def get_stored_schema() -> Optional[StorageSchemaInfo]


Retrieves newest schema from destination storage


def get_stored_schema_by_hash(version_hash: str) -> StorageSchemaInfo


retrieves the stored schema by hash


def get_stored_state(pipeline_name: str) -> Optional[StateInfo]


Loads compressed state from destination storage

WithStagingDataset Objects

class WithStagingDataset(ABC)


Adds capability to use staging dataset and request it from the loader


def with_staging_dataset() -> ContextManager["JobClientBase"]


Executes job client methods on staging dataset

SupportsStagingDestination Objects

class SupportsStagingDestination(ABC)


Adds capability to support a staging destination for the load


def should_load_data_to_staging_dataset_on_staging_destination(
table: TTableSchema) -> bool


If set to True, and staging destination is configured, the data will be loaded to staging dataset on staging destination instead of a regular dataset on staging destination. Currently it is used by Athena Iceberg which uses staging dataset on staging destination to copy data to iceberg tables stored on regular dataset on staging destination. The default is to load data to regular dataset on staging destination from where warehouses like Snowflake (that have their own storage) will copy data.


def should_truncate_table_before_load_on_staging_destination(
table: TTableSchema) -> bool


If set to True, data in table will be truncated on staging destination (regular dataset). This is the default behavior which can be changed with a config flag. For Athena + Iceberg this setting is always False - Athena uses regular dataset to store Iceberg tables and we avoid touching it. For Athena we truncate those tables only on "replace" write disposition.

Destination Objects

class Destination(ABC, Generic[TDestinationConfig, TDestinationClient])


A destination factory that can be partially pre-configured with credentials and other config params.


Explicit config params, overriding any injected or default values.


Explicit capabilities params, overriding any default values for this destination


def spec() -> Type[TDestinationConfig]


A spec of destination configuration that also contains destination credentials


def capabilities(
config: Optional[TDestinationConfig] = None,
naming: Optional[NamingConvention] = None
) -> DestinationCapabilitiesContext


Destination capabilities ie. supported loader file formats, identifier name lengths, naming conventions, escape function etc. Explicit caps arguments passed to the factory init and stored in caps_params are applied.

If config is provided, it is used to adjust the capabilities, otherwise the explicit config composed just of config_params passed to factory init is applied If naming is provided, the case sensitivity and case folding are adjusted.


def destination_name() -> str


The destination name will either be explicitly set while creating the destination or will be taken from the type


def client_class() -> Type[TDestinationClient]


A job client class responsible for starting and resuming load jobs


def configuration(initial_config: TDestinationConfig,
accept_partial: bool = False) -> TDestinationConfig


Get a fully resolved destination config from the initial config


def client(schema: Schema,
initial_config: TDestinationConfig = None) -> TDestinationClient


Returns a configured instance of the destination's job client


def adjust_capabilities(
cls, caps: DestinationCapabilitiesContext, config: TDestinationConfig,
naming: Optional[NamingConvention]) -> DestinationCapabilitiesContext


Adjust the capabilities to match the case sensitivity as requested by naming convention.


def normalize_type(destination_type: str) -> str


Normalizes destination type string into a canonical form. Assumes that type names without dots correspond to built in destinations.


def from_reference(
ref: TDestinationReferenceArg,
credentials: Optional[Any] = None,
destination_name: Optional[str] = None,
environment: Optional[str] = None,
**kwargs: Any
) -> Optional["Destination[DestinationClientConfiguration, JobClientBase]"]


Instantiate destination from str reference. The ref can be a destination name or import path pointing to a destination class (e.g. dlt.destinations.postgres)

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!


Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.