From d847c367993d11d48ab03f7c692cc266f6509edf Mon Sep 17 00:00:00 2001 From: Anas Khan <83116240+anxkhn@users.noreply.github.com> Date: Mon, 29 Jun 2026 12:25:54 +0530 Subject: [PATCH] fix(dlt): use timestamp macros for incremental model time filter The DLT-generated INCREMENTAL_BY_TIME_RANGE model casts its time column to a timestamp (TO_TIMESTAMP(...)) but filtered it with @start_ds AND @end_ds. Those categorical date macros both render at midnight, so on a single-day run any row past 00:00:00 was excluded, yielding an empty range. Use the inclusive timestamp macros @start_ts AND @end_ts instead, matching make_inclusive's documented contract for timestamp columns. Fixes #5689 Signed-off-by: Anas Khan <83116240+anxkhn@users.noreply.github.com> --- sqlmesh/integrations/dlt.py | 2 +- tests/cli/test_cli.py | 8 ++++---- tests/integrations/test_dlt.py | 21 +++++++++++++++++++++ 3 files changed, 26 insertions(+), 5 deletions(-) create mode 100644 tests/integrations/test_dlt.py diff --git a/sqlmesh/integrations/dlt.py b/sqlmesh/integrations/dlt.py index a2202bea02..e8c53e7efb 100644 --- a/sqlmesh/integrations/dlt.py +++ b/sqlmesh/integrations/dlt.py @@ -208,7 +208,7 @@ def generate_incremental_model( FROM {from_clause} WHERE - {time_column} BETWEEN @start_ds AND @end_ds + {time_column} BETWEEN @start_ts AND @end_ts """ diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 938f90cc74..7138cf4cc2 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -921,7 +921,7 @@ def test_dlt_filesystem_pipeline(tmp_path): FROM filesystem_pipeline_dataset.equipment as c WHERE - TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) BETWEEN @start_ds AND @end_ds + TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) BETWEEN @start_ts AND @end_ts """ with open(equipment_model_path) as file: @@ -1064,7 +1064,7 @@ def test_dlt_pipeline(runner, tmp_path): FROM sushi_dataset.sushi_types as c WHERE - TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) BETWEEN @start_ds AND @end_ds + TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) BETWEEN @start_ts AND @end_ts """ dlt_sushi_types_model_path = tmp_path / "models/incremental_sushi_types.sql" @@ -1095,7 +1095,7 @@ def test_dlt_pipeline(runner, tmp_path): FROM sushi_dataset._dlt_loads as c WHERE - TO_TIMESTAMP(CAST(c.load_id AS DOUBLE)) BETWEEN @start_ds AND @end_ds + TO_TIMESTAMP(CAST(c.load_id AS DOUBLE)) BETWEEN @start_ts AND @end_ts """ with open(dlt_loads_model_path) as file: @@ -1122,7 +1122,7 @@ def test_dlt_pipeline(runner, tmp_path): ON c._dlt_parent_id = p._dlt_id WHERE - TO_TIMESTAMP(CAST(p._dlt_load_id AS DOUBLE)) BETWEEN @start_ds AND @end_ds + TO_TIMESTAMP(CAST(p._dlt_load_id AS DOUBLE)) BETWEEN @start_ts AND @end_ts """ with open(dlt_sushi_fillings_model_path) as file: diff --git a/tests/integrations/test_dlt.py b/tests/integrations/test_dlt.py new file mode 100644 index 0000000000..ca4f374acb --- /dev/null +++ b/tests/integrations/test_dlt.py @@ -0,0 +1,21 @@ +from sqlmesh.integrations.dlt import generate_incremental_model + + +def test_generate_incremental_model_filters_on_timestamp_macros() -> None: + # The DLT-generated model's time column is a timestamp + # (TO_TIMESTAMP(...)). It must therefore be filtered with the inclusive + # timestamp macros @start_ts/@end_ts, not the categorical date macros + # @start_ds/@end_ds, which both render midnight and exclude any rows past + # 00:00:00 on a single-day run. + model = generate_incremental_model( + "dataset_sqlmesh.incremental_equipment", + " CAST(c.item_id AS BIGINT) AS item_id", + "", + "dataset.equipment", + "duckdb", + "c._dlt_load_id", + ) + + assert "BETWEEN @start_ts AND @end_ts" in model + assert "@start_ds" not in model + assert "@end_ds" not in model