mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
fix(data-warehouse): Dont overwrite delta table if resuming sync (#41441)
This commit is contained in:
@@ -122,7 +122,7 @@ class DeltaTableHelper:
|
||||
self,
|
||||
data: pa.Table,
|
||||
write_type: Literal["incremental", "full_refresh", "append"],
|
||||
chunk_index: int,
|
||||
should_overwrite_table: bool,
|
||||
primary_keys: Sequence[Any] | None,
|
||||
) -> deltalake.DeltaTable:
|
||||
delta_table = self.get_delta_table()
|
||||
@@ -130,7 +130,9 @@ class DeltaTableHelper:
|
||||
if delta_table:
|
||||
delta_table = self._evolve_delta_schema(data.schema)
|
||||
|
||||
self._logger.debug(f"write_to_deltalake: _is_first_sync = {self._is_first_sync}")
|
||||
self._logger.debug(
|
||||
f"write_to_deltalake: _is_first_sync = {self._is_first_sync}. should_overwrite_table = {should_overwrite_table}"
|
||||
)
|
||||
|
||||
use_partitioning = False
|
||||
if PARTITION_KEY in data.column_names:
|
||||
@@ -202,7 +204,7 @@ class DeltaTableHelper:
|
||||
):
|
||||
mode: Literal["error", "append", "overwrite", "ignore"] = "append"
|
||||
schema_mode: Literal["merge", "overwrite"] | None = "merge"
|
||||
if chunk_index == 0 or delta_table is None:
|
||||
if should_overwrite_table or delta_table is None:
|
||||
mode = "overwrite"
|
||||
schema_mode = "overwrite"
|
||||
|
||||
|
||||
@@ -151,7 +151,11 @@ class PipelineNonDLT(Generic[ResumableData]):
|
||||
row_count += py_table.num_rows
|
||||
|
||||
self._process_pa_table(
|
||||
pa_table=py_table, index=chunk_index, row_count=row_count, is_first_ever_sync=is_first_ever_sync
|
||||
pa_table=py_table,
|
||||
index=chunk_index,
|
||||
resuming_sync=should_resume,
|
||||
row_count=row_count,
|
||||
is_first_ever_sync=is_first_ever_sync,
|
||||
)
|
||||
|
||||
chunk_index += 1
|
||||
@@ -179,7 +183,11 @@ class PipelineNonDLT(Generic[ResumableData]):
|
||||
py_table = self._batcher.get_table()
|
||||
row_count += py_table.num_rows
|
||||
self._process_pa_table(
|
||||
pa_table=py_table, index=chunk_index, row_count=row_count, is_first_ever_sync=is_first_ever_sync
|
||||
pa_table=py_table,
|
||||
index=chunk_index,
|
||||
resuming_sync=should_resume,
|
||||
row_count=row_count,
|
||||
is_first_ever_sync=is_first_ever_sync,
|
||||
)
|
||||
|
||||
self._post_run_operations(row_count=row_count)
|
||||
@@ -200,7 +208,9 @@ class PipelineNonDLT(Generic[ResumableData]):
|
||||
pa_memory_pool.release_unused()
|
||||
gc.collect()
|
||||
|
||||
def _process_pa_table(self, pa_table: pa.Table, index: int, row_count: int, is_first_ever_sync: bool):
|
||||
def _process_pa_table(
|
||||
self, pa_table: pa.Table, index: int, resuming_sync: bool, row_count: int, is_first_ever_sync: bool
|
||||
):
|
||||
delta_table = self._delta_table_helper.get_delta_table()
|
||||
previous_file_uris = delta_table.file_uris() if delta_table else []
|
||||
|
||||
@@ -218,8 +228,13 @@ class PipelineNonDLT(Generic[ResumableData]):
|
||||
elif self._schema.is_append:
|
||||
write_type = "append"
|
||||
|
||||
should_overwrite_table = index == 0 and not resuming_sync
|
||||
|
||||
delta_table = self._delta_table_helper.write_to_deltalake(
|
||||
pa_table, write_type, index, self._resource.primary_keys
|
||||
pa_table,
|
||||
write_type,
|
||||
should_overwrite_table=should_overwrite_table,
|
||||
primary_keys=self._resource.primary_keys,
|
||||
)
|
||||
|
||||
self._internal_schema.add_pyarrow_table(pa_table)
|
||||
|
||||
Reference in New Issue
Block a user