diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 9efcd25fcb412..dcbb6f761eb88 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -442,7 +442,7 @@ pub trait PhysicalPlanNodeExt: Sized { proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result { let plan_clone = Arc::clone(&plan); - let plan = plan.as_ref() as &dyn Any; + let plan = plan.as_ref(); if let Some(exec) = plan.downcast_ref::() { return protobuf::PhysicalPlanNode::try_from_explain_exec(exec, codec); diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 2022857d4e59d..50c5fb4c0bbc6 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -89,8 +89,8 @@ use datafusion::physical_plan::windows::{ }; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, InputOrderMode, Partitioning, - PhysicalExpr, RangePartitioning, SendableRecordBatchStream, SplitPoint, Statistics, - displayable, + PhysicalExpr, PlanProperties, RangePartitioning, SendableRecordBatchStream, + SplitPoint, Statistics, displayable, }; use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion::scalar::ScalarValue; @@ -229,6 +229,74 @@ fn roundtrip_empty() -> Result<()> { roundtrip_test(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))) } +#[derive(Debug)] +struct DowncastDelegatingExec { + inner: Arc, +} + +impl DowncastDelegatingExec { + fn new(inner: Arc) -> Self { + Self { inner } + } +} + +impl DisplayAs for DowncastDelegatingExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + self.inner.fmt_as(t, f) + } +} + +impl ExecutionPlan for DowncastDelegatingExec { + fn name(&self) -> &str { + self.inner.name() + } + + fn properties(&self) -> &Arc { + self.inner.properties() + } + + fn children(&self) -> Vec<&Arc> { + self.inner.children() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + let inner = Arc::clone(&self.inner).with_new_children(children)?; + Ok(Arc::new(Self::new(inner))) + } + + fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> { + Some(self.inner.as_ref()) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + self.inner.execute(partition, context) + } +} + +#[test] +fn serialize_uses_downcast_delegate() -> Result<()> { + let inner: Arc = + Arc::new(EmptyExec::new(Arc::new(Schema::empty()))); + let plan: Arc = Arc::new(DowncastDelegatingExec::new(inner)); + let codec = DefaultPhysicalExtensionCodec {}; + + let proto = PhysicalPlanNode::try_from_physical_plan(plan, &codec)?; + + assert!(matches!( + proto.physical_plan_type, + Some(protobuf::physical_plan_node::PhysicalPlanType::Empty(_)) + )); + + Ok(()) +} + #[test] fn roundtrip_date_time_interval() -> Result<()> { let schema = Schema::new(vec![