4444
4545import pyiceberg .expressions .parser as parser
4646from pyiceberg .expressions import (
47+ AlwaysFalse ,
4748 AlwaysTrue ,
4849 And ,
4950 BooleanExpression ,
5051 EqualTo ,
52+ IsNull ,
53+ Or ,
54+ Reference ,
5155)
5256from pyiceberg .expressions .visitors import (
5357 _InclusiveMetricsEvaluator ,
117121 _OverwriteFiles ,
118122)
119123from pyiceberg .table .update .spec import UpdateSpec
124+ from pyiceberg .transforms import IdentityTransform
120125from pyiceberg .typedef import (
121126 EMPTY_DICT ,
122127 IcebergBaseModel ,
@@ -344,6 +349,48 @@ def _set_ref_snapshot(
344349
345350 return updates , requirements
346351
352+ def _build_partition_predicate (self , partition_records : Set [Record ]) -> BooleanExpression :
353+ """Build a filter predicate matching any of the input partition records.
354+
355+ Args:
356+ partition_records: A set of partition records to match
357+ Returns:
358+ A predicate matching any of the input partition records.
359+ """
360+ partition_spec = self .table_metadata .spec ()
361+ schema = self .table_metadata .schema ()
362+ partition_fields = [schema .find_field (field .source_id ).name for field in partition_spec .fields ]
363+
364+ expr : BooleanExpression = AlwaysFalse ()
365+ for partition_record in partition_records :
366+ match_partition_expression : BooleanExpression = AlwaysTrue ()
367+
368+ for pos , partition_field in enumerate (partition_fields ):
369+ predicate = (
370+ EqualTo (Reference (partition_field ), partition_record [pos ])
371+ if partition_record [pos ] is not None
372+ else IsNull (Reference (partition_field ))
373+ )
374+ match_partition_expression = And (match_partition_expression , predicate )
375+ expr = Or (expr , match_partition_expression )
376+ return expr
377+
378+ def _append_snapshot_producer (self , snapshot_properties : Dict [str , str ]) -> _FastAppendFiles :
379+ """Determine the append type based on table properties.
380+
381+ Args:
382+ snapshot_properties: Custom properties to be added to the snapshot summary
383+ Returns:
384+ Either a fast-append or a merge-append snapshot producer.
385+ """
386+ manifest_merge_enabled = property_as_bool (
387+ self .table_metadata .properties ,
388+ TableProperties .MANIFEST_MERGE_ENABLED ,
389+ TableProperties .MANIFEST_MERGE_ENABLED_DEFAULT ,
390+ )
391+ update_snapshot = self .update_snapshot (snapshot_properties = snapshot_properties )
392+ return update_snapshot .merge_append () if manifest_merge_enabled else update_snapshot .fast_append ()
393+
347394 def update_schema (self , allow_incompatible_changes : bool = False , case_sensitive : bool = True ) -> UpdateSchema :
348395 """Create a new UpdateSchema to alter the columns of this table.
349396
@@ -398,15 +445,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
398445 self .table_metadata .schema (), provided_schema = df .schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
399446 )
400447
401- manifest_merge_enabled = property_as_bool (
402- self .table_metadata .properties ,
403- TableProperties .MANIFEST_MERGE_ENABLED ,
404- TableProperties .MANIFEST_MERGE_ENABLED_DEFAULT ,
405- )
406- update_snapshot = self .update_snapshot (snapshot_properties = snapshot_properties )
407- append_method = update_snapshot .merge_append if manifest_merge_enabled else update_snapshot .fast_append
408-
409- with append_method () as append_files :
448+ with self ._append_snapshot_producer (snapshot_properties ) as append_files :
410449 # skip writing data files if the dataframe is empty
411450 if df .shape [0 ] > 0 :
412451 data_files = _dataframe_to_data_files (
@@ -415,6 +454,62 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
415454 for data_file in data_files :
416455 append_files .append_data_file (data_file )
417456
457+ def dynamic_partition_overwrite (self , df : pa .Table , snapshot_properties : Dict [str , str ] = EMPTY_DICT ) -> None :
458+ """
459+ Shorthand for overwriting existing partitions with a PyArrow table.
460+
461+ The function detects partition values in the provided arrow table using the current
462+ partition spec, and deletes existing partitions matching these values. Finally, the
463+ data in the table is appended to the table.
464+
465+ Args:
466+ df: The Arrow dataframe that will be used to overwrite the table
467+ snapshot_properties: Custom properties to be added to the snapshot summary
468+ """
469+ try :
470+ import pyarrow as pa
471+ except ModuleNotFoundError as e :
472+ raise ModuleNotFoundError ("For writes PyArrow needs to be installed" ) from e
473+
474+ from pyiceberg .io .pyarrow import _check_pyarrow_schema_compatible , _dataframe_to_data_files
475+
476+ if not isinstance (df , pa .Table ):
477+ raise ValueError (f"Expected PyArrow table, got: { df } " )
478+
479+ if self .table_metadata .spec ().is_unpartitioned ():
480+ raise ValueError ("Cannot apply dynamic overwrite on an unpartitioned table." )
481+
482+ for field in self .table_metadata .spec ().fields :
483+ if not isinstance (field .transform , IdentityTransform ):
484+ raise ValueError (
485+ f"For now dynamic overwrite does not support a table with non-identity-transform field in the latest partition spec: { field } "
486+ )
487+
488+ downcast_ns_timestamp_to_us = Config ().get_bool (DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE ) or False
489+ _check_pyarrow_schema_compatible (
490+ self .table_metadata .schema (), provided_schema = df .schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
491+ )
492+
493+ # If dataframe does not have data, there is no need to overwrite
494+ if df .shape [0 ] == 0 :
495+ return
496+
497+ append_snapshot_commit_uuid = uuid .uuid4 ()
498+ data_files : List [DataFile ] = list (
499+ _dataframe_to_data_files (
500+ table_metadata = self ._table .metadata , write_uuid = append_snapshot_commit_uuid , df = df , io = self ._table .io
501+ )
502+ )
503+
504+ partitions_to_overwrite = {data_file .partition for data_file in data_files }
505+ delete_filter = self ._build_partition_predicate (partition_records = partitions_to_overwrite )
506+ self .delete (delete_filter = delete_filter , snapshot_properties = snapshot_properties )
507+
508+ with self ._append_snapshot_producer (snapshot_properties ) as append_files :
509+ append_files .commit_uuid = append_snapshot_commit_uuid
510+ for data_file in data_files :
511+ append_files .append_data_file (data_file )
512+
418513 def overwrite (
419514 self ,
420515 df : pa .Table ,
@@ -461,14 +556,14 @@ def overwrite(
461556
462557 self .delete (delete_filter = overwrite_filter , case_sensitive = case_sensitive , snapshot_properties = snapshot_properties )
463558
464- with self .update_snapshot (snapshot_properties = snapshot_properties ). fast_append () as update_snapshot :
559+ with self ._append_snapshot_producer (snapshot_properties ) as append_files :
465560 # skip writing data files if the dataframe is empty
466561 if df .shape [0 ] > 0 :
467562 data_files = _dataframe_to_data_files (
468- table_metadata = self .table_metadata , write_uuid = update_snapshot .commit_uuid , df = df , io = self ._table .io
563+ table_metadata = self .table_metadata , write_uuid = append_files .commit_uuid , df = df , io = self ._table .io
469564 )
470565 for data_file in data_files :
471- update_snapshot .append_data_file (data_file )
566+ append_files .append_data_file (data_file )
472567
473568 def delete (
474569 self ,
@@ -552,9 +647,8 @@ def delete(
552647 ))
553648
554649 if len (replaced_files ) > 0 :
555- with self .update_snapshot (snapshot_properties = snapshot_properties ).overwrite (
556- commit_uuid = commit_uuid
557- ) as overwrite_snapshot :
650+ with self .update_snapshot (snapshot_properties = snapshot_properties ).overwrite () as overwrite_snapshot :
651+ overwrite_snapshot .commit_uuid = commit_uuid
558652 for original_data_file , replaced_data_files in replaced_files :
559653 overwrite_snapshot .delete_data_file (original_data_file )
560654 for replaced_data_file in replaced_data_files :
@@ -989,6 +1083,17 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
9891083 with self .transaction () as tx :
9901084 tx .append (df = df , snapshot_properties = snapshot_properties )
9911085
1086+ def dynamic_partition_overwrite (self , df : pa .Table , snapshot_properties : Dict [str , str ] = EMPTY_DICT ) -> None :
1087+ """Shorthand for dynamic overwriting the table with a PyArrow table.
1088+
1089+ Old partitions are auto detected and replaced with data files created for input arrow table.
1090+ Args:
1091+ df: The Arrow dataframe that will be used to overwrite the table
1092+ snapshot_properties: Custom properties to be added to the snapshot summary
1093+ """
1094+ with self .transaction () as tx :
1095+ tx .dynamic_partition_overwrite (df = df , snapshot_properties = snapshot_properties )
1096+
9921097 def overwrite (
9931098 self ,
9941099 df : pa .Table ,
0 commit comments