diff --git a/lib/dal/src/attribute/value.rs b/lib/dal/src/attribute/value.rs index 91ceef0aca..741556ea31 100644 --- a/lib/dal/src/attribute/value.rs +++ b/lib/dal/src/attribute/value.rs @@ -2630,7 +2630,11 @@ impl AttributeValue { ) -> AttributeValueResult> { Ok(ctx .workspace_snapshot()? - .find_edge(parent_attribute_value_id, child_attribute_value_id) + .find_edge( + parent_attribute_value_id, + child_attribute_value_id, + EdgeWeightKindDiscriminants::Contain, + ) .await? .and_then(|weight| match weight.kind() { EdgeWeightKind::Contain(key) => key.to_owned(), diff --git a/lib/dal/src/management/mod.rs b/lib/dal/src/management/mod.rs index fc6f5c1162..808f5db0c5 100644 --- a/lib/dal/src/management/mod.rs +++ b/lib/dal/src/management/mod.rs @@ -9,7 +9,6 @@ use veritech_client::{ManagementFuncStatus, ManagementResultSuccess}; use crate::component::frame::{Frame, FrameError}; use crate::diagram::view::{View, ViewId}; use crate::diagram::DiagramError; -use crate::WorkspaceSnapshotError; use crate::{ action::{ prototype::{ActionKind, ActionPrototype, ActionPrototypeError}, @@ -29,6 +28,7 @@ use crate::{ Func, FuncError, InputSocket, InputSocketId, OutputSocket, OutputSocketId, Prop, PropKind, Schema, SchemaError, SchemaId, SchemaVariantId, StandardModelError, WsEvent, WsEventError, }; +use crate::{EdgeWeightKind, WorkspaceSnapshotError}; pub mod prototype; @@ -396,7 +396,9 @@ struct PendingParent { #[derive(Clone, Debug)] enum PendingOperation { Connect(PendingConnect), + Manage(ComponentId), Parent(PendingParent), + RemoveConnection(PendingConnect), } impl<'a> ManagementOperator<'a> { @@ -649,6 +651,20 @@ impl<'a> ManagementOperator<'a> { Ok(()) } + async fn manage(&self, component_id: ComponentId) -> ManagementResult<()> { + let cycle_check_guard = self.ctx.workspace_snapshot()?.enable_cycle_check().await; + Component::add_manages_edge_to_component( + self.ctx, + self.manager_component_id, + component_id, + EdgeWeightKind::Manages, + ) + .await?; + drop(cycle_check_guard); + + Ok(()) + } + async fn creates(&mut self) -> ManagementResult> { // We take here to avoid holding on to an immutable ref to self throughout the loop let creates = self.operations.create.take(); @@ -712,16 +728,7 @@ impl<'a> ManagementOperator<'a> { parent: parent.to_owned(), })); } - - let cycle_check_guard = self.ctx.workspace_snapshot()?.enable_cycle_check().await; - Component::add_manages_edge_to_component( - self.ctx, - self.manager_component_id, - component_id, - crate::EdgeWeightKind::Manages, - ) - .await?; - drop(cycle_check_guard); + pending_operations.push(PendingOperation::Manage(component_id)); } } @@ -737,10 +744,12 @@ impl<'a> ManagementOperator<'a> { )) } - async fn updates(&mut self) -> ManagementResult<()> { + async fn updates(&mut self) -> ManagementResult> { + let mut pending = vec![]; + let updates = self.operations.update.take(); let Some(updates) = &updates else { - return Ok(()); + return Ok(pending); }; for (placeholder, operation) in updates { @@ -777,25 +786,34 @@ impl<'a> ManagementOperator<'a> { if let Some(update_conns) = &operation.connect { if let Some(remove_conns) = &update_conns.remove { for to_remove in remove_conns { - self.remove_connection(component_id, to_remove).await?; + pending.push(PendingOperation::RemoveConnection(PendingConnect { + from_component_id: component_id, + connection: to_remove.to_owned(), + })); } } if let Some(add_conns) = &update_conns.add { for to_add in add_conns { - self.create_connection(component_id, to_add).await?; + pending.push(PendingOperation::Connect(PendingConnect { + from_component_id: component_id, + connection: to_add.to_owned(), + })); } } } if let Some(new_parent) = &operation.parent { - self.set_parent(component_id, new_parent).await?; + pending.push(PendingOperation::Parent(PendingParent { + child_component_id: component_id, + parent: new_parent.to_owned(), + })); } self.updated_components.push(component_id); } - Ok(()) + Ok(pending) } async fn actions(&self) -> ManagementResult<()> { @@ -816,26 +834,19 @@ impl<'a> ManagementOperator<'a> { } pub async fn operate(&mut self) -> ManagementResult<()> { - let pending_operations = self.creates().await?; - self.updates().await?; - - // We have to execute these after the creation of the component, and - // after updates, so that they can reference other created components - // and so that we can ensure the updates have been applied - for pending_operation in pending_operations { - match pending_operation { - PendingOperation::Connect(pending_connect) => { - self.create_connection( - pending_connect.from_component_id, - &pending_connect.connection, - ) - .await? - } - PendingOperation::Parent(pending_parent) => { - self.set_parent(pending_parent.child_component_id, &pending_parent.parent) - .await? - } - } + let mut pending_operations = self.creates().await?; + pending_operations.extend(self.updates().await?); + + // Parents have to be set before component events are sent + for pending_parent in pending_operations + .iter() + .filter_map(|pending_op| match pending_op { + PendingOperation::Parent(pending_parent) => Some(pending_parent), + _ => None, + }) + { + self.set_parent(pending_parent.child_component_id, &pending_parent.parent) + .await? } for &created_id in &self.created_components { @@ -874,6 +885,30 @@ impl<'a> ManagementOperator<'a> { .await?; } + // Now, the rest of the pending ops can be executed, which need to have + // their wsevents sent *after* the component ws events (otherwise some + // will be discarded by the frontend, since it does not know about the + // newly created components until the above events are sent) + for pending_op in pending_operations { + match pending_op { + PendingOperation::Connect(pending_connect) => { + self.create_connection( + pending_connect.from_component_id, + &pending_connect.connection, + ) + .await?; + } + PendingOperation::RemoveConnection(remove) => { + self.remove_connection(remove.from_component_id, &remove.connection) + .await?; + } + PendingOperation::Manage(managed_id) => { + self.manage(managed_id).await?; + } + PendingOperation::Parent(_) => {} + } + } + self.actions().await?; Ok(()) diff --git a/lib/dal/src/workspace_snapshot.rs b/lib/dal/src/workspace_snapshot.rs index 6919754604..d144de868e 100644 --- a/lib/dal/src/workspace_snapshot.rs +++ b/lib/dal/src/workspace_snapshot.rs @@ -658,13 +658,13 @@ impl WorkspaceSnapshot { from_node_id: impl Into, edge_weight: EdgeWeight, to_node_id: impl Into, - ) -> WorkspaceSnapshotResult { + ) -> WorkspaceSnapshotResult<()> { let from_node_index = self .working_copy() .await .get_node_index_by_id(from_node_id)?; let to_node_index = self.working_copy().await.get_node_index_by_id(to_node_id)?; - Ok(if self.cycle_check().await { + if self.cycle_check().await { let self_clone = self.clone(); slow_rt::spawn(async move { let mut working_copy = self_clone.working_copy_mut().await; @@ -675,7 +675,9 @@ impl WorkspaceSnapshot { self.working_copy_mut() .await .add_edge(from_node_index, edge_weight, to_node_index)? - }) + } + + Ok(()) } /// Add an edge to the graph, bypassing any cycle checks and using node @@ -685,13 +687,12 @@ impl WorkspaceSnapshot { from_node_index: NodeIndex, edge_weight: EdgeWeight, to_node_index: NodeIndex, - ) -> WorkspaceSnapshotResult { - let edge_index = - self.working_copy_mut() - .await - .add_edge(from_node_index, edge_weight, to_node_index)?; + ) -> WorkspaceSnapshotResult<()> { + self.working_copy_mut() + .await + .add_edge(from_node_index, edge_weight, to_node_index)?; - Ok(edge_index) + Ok(()) } pub async fn add_ordered_edge( @@ -699,19 +700,19 @@ impl WorkspaceSnapshot { from_node_id: impl Into, edge_weight: EdgeWeight, to_node_id: impl Into, - ) -> WorkspaceSnapshotResult { + ) -> WorkspaceSnapshotResult<()> { let from_node_index = self .working_copy() .await .get_node_index_by_id(from_node_id)?; let to_node_index = self.working_copy().await.get_node_index_by_id(to_node_id)?; - let (edge_index, _) = self.working_copy_mut().await.add_ordered_edge( + self.working_copy_mut().await.add_ordered_edge( from_node_index, edge_weight, to_node_index, )?; - Ok(edge_index) + Ok(()) } #[instrument( @@ -1232,6 +1233,7 @@ impl WorkspaceSnapshot { &self, from_id: impl Into, to_id: impl Into, + edge_weight_kind: EdgeWeightKindDiscriminants, ) -> WorkspaceSnapshotResult> { let working_copy = self.working_copy().await; @@ -1239,7 +1241,7 @@ impl WorkspaceSnapshot { let to_idx = working_copy.get_node_index_by_id(to_id)?; Ok(working_copy - .find_edge(from_idx, to_idx) + .find_edge(from_idx, to_idx, edge_weight_kind) .map(ToOwned::to_owned)) } diff --git a/lib/dal/src/workspace_snapshot/graph/tests/detect_updates.rs b/lib/dal/src/workspace_snapshot/graph/tests/detect_updates.rs index af42c597c0..705719ff7b 100644 --- a/lib/dal/src/workspace_snapshot/graph/tests/detect_updates.rs +++ b/lib/dal/src/workspace_snapshot/graph/tests/detect_updates.rs @@ -71,7 +71,7 @@ mod test { ContentAddress::Component(ContentHash::from("Component B")), )) .expect("Unable to add Component B"); - let _new_onto_root_component_edge_index = base_graph + base_graph .add_edge( base_graph.root(), EdgeWeight::new(EdgeWeightKind::new_use()), diff --git a/lib/dal/src/workspace_snapshot/graph/v4.rs b/lib/dal/src/workspace_snapshot/graph/v4.rs index a4db65e429..682bc8a899 100644 --- a/lib/dal/src/workspace_snapshot/graph/v4.rs +++ b/lib/dal/src/workspace_snapshot/graph/v4.rs @@ -263,19 +263,30 @@ impl WorkspaceSnapshotGraphV4 { edge_weight: EdgeWeight, to_node_index: NodeIndex, cycle_check: bool, - ) -> WorkspaceSnapshotGraphResult { + ) -> WorkspaceSnapshotGraphResult<()> { if cycle_check { self.add_temp_edge_cycle_check(from_node_index, edge_weight.clone(), to_node_index)?; } self.touch_node(from_node_index); - // Add the new edge to the new version of the "from" node. - let edge_index = self + let discrim: EdgeWeightKindDiscriminants = edge_weight.kind().into(); + + if !self .graph - .update_edge(from_node_index, to_node_index, edge_weight); + .edges_directed(from_node_index, Direction::Outgoing) + // Only allow one edge of each weight kind between two nodes. This + // keeps "add_edge" idempotent, and guards against any places where + // we might add the same edge twice + .any(|edge_ref| { + edge_ref.target() == to_node_index && discrim == edge_ref.weight().kind().into() + }) + { + self.graph + .add_edge(from_node_index, to_node_index, edge_weight); + } - Ok(edge_index) + Ok(()) } fn add_temp_edge_cycle_check( @@ -286,7 +297,7 @@ impl WorkspaceSnapshotGraphV4 { ) -> WorkspaceSnapshotGraphResult<()> { let temp_edge = self .graph - .update_edge(from_node_index, to_node_index, edge_weight.clone()); + .add_edge(from_node_index, to_node_index, edge_weight.clone()); let would_create_a_cycle = !self.is_acyclic_directed(); self.graph.remove_edge(temp_edge); @@ -322,7 +333,7 @@ impl WorkspaceSnapshotGraphV4 { from_node_index: NodeIndex, edge_weight: EdgeWeight, to_node_index: NodeIndex, - ) -> WorkspaceSnapshotGraphResult { + ) -> WorkspaceSnapshotGraphResult<()> { self.add_edge_inner(from_node_index, edge_weight, to_node_index, true) } @@ -331,7 +342,7 @@ impl WorkspaceSnapshotGraphV4 { from_node_id: Ulid, edge_weight: EdgeWeight, to_node_id: Ulid, - ) -> WorkspaceSnapshotGraphResult { + ) -> WorkspaceSnapshotGraphResult<()> { let from_node_index = *self .node_index_by_id .get(&from_node_id) @@ -349,7 +360,7 @@ impl WorkspaceSnapshotGraphV4 { from_node_index: NodeIndex, edge_weight: EdgeWeight, to_node_index: NodeIndex, - ) -> WorkspaceSnapshotGraphResult { + ) -> WorkspaceSnapshotGraphResult<()> { // Temporarily add the edge to the existing tree to see if it would create a cycle. // Configured to run only in tests because it has a major perf impact otherwise #[cfg(test)] @@ -465,10 +476,16 @@ impl WorkspaceSnapshotGraphV4 { self.graph.neighbors_directed(node_index, direction) } - pub fn find_edge(&self, from_idx: NodeIndex, to_idx: NodeIndex) -> Option<&EdgeWeight> { + pub fn find_edge( + &self, + from_idx: NodeIndex, + to_idx: NodeIndex, + edge_kind: EdgeWeightKindDiscriminants, + ) -> Option<&EdgeWeight> { self.graph - .find_edge(from_idx, to_idx) - .and_then(|edge_idx| self.graph.edge_weight(edge_idx)) + .edges_connecting(from_idx, to_idx) + .find(|edge_ref| edge_kind == edge_ref.weight().kind().into()) + .map(|edge_ref| edge_ref.weight()) } pub fn edges_directed_for_edge_weight_kind( @@ -518,17 +535,17 @@ impl WorkspaceSnapshotGraphV4 { from_node_index: NodeIndex, edge_weight: EdgeWeight, to_node_index: NodeIndex, - ) -> WorkspaceSnapshotGraphResult<(EdgeIndex, Option)> { - let new_edge_index = self.add_edge(from_node_index, edge_weight, to_node_index)?; + ) -> WorkspaceSnapshotGraphResult<()> { + self.add_edge(from_node_index, edge_weight, to_node_index)?; // Find the ordering node of the "container" if there is one, and add the thing pointed to // by the `to_node_index` to the ordering. Also point the ordering node at the thing with // an `Ordinal` edge, so that Ordering nodes must be touched *after* the things they order // in a depth first search - let maybe_ordinal_edge_index = if let Some(container_ordering_node_index) = + if let Some(container_ordering_node_index) = self.ordering_node_index_for_container(from_node_index)? { - let ordinal_edge_index = self.add_edge( + self.add_edge( container_ordering_node_index, EdgeWeight::new(EdgeWeightKind::Ordinal), to_node_index, @@ -544,13 +561,9 @@ impl WorkspaceSnapshotGraphV4 { ordering_node_weight.push_to_order(element_id); self.touch_node(container_ordering_node_index); } + } - Some(ordinal_edge_index) - } else { - None - }; - - Ok((new_edge_index, maybe_ordinal_edge_index)) + Ok(()) } pub fn add_ordered_node( @@ -565,13 +578,13 @@ impl WorkspaceSnapshotGraphV4 { OrderingNodeWeight::new(ordering_node_id, ordering_node_lineage_id), ))?; - let edge_index = self.add_edge( + self.add_edge( new_node_index, EdgeWeight::new(EdgeWeightKind::Ordering), ordering_node_index, )?; - let (source, _) = self.edge_endpoints(edge_index)?; - Ok(source) + + Ok(new_node_index) } /// Remove any orphaned nodes from the graph, then recalculate the merkle diff --git a/lib/dal/tests/integration_test/management.rs b/lib/dal/tests/integration_test/management.rs index fddbf9e411..2d959864ba 100644 --- a/lib/dal/tests/integration_test/management.rs +++ b/lib/dal/tests/integration_test/management.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use dal::{ diagram::view::View, management::{ @@ -5,7 +7,7 @@ use dal::{ }, AttributeValue, Component, DalContext, SchemaId, }; -use dal_test::expected::ExpectView; +use dal_test::expected::{apply_change_set_to_base, ExpectView}; use dal_test::{ helpers::create_component_for_default_schema_name_in_default_view, test, SCHEMA_ID_SMALL_EVEN_LEGO, @@ -183,7 +185,7 @@ async fn create_component_of_other_schema(ctx: &DalContext) { } #[test] -async fn create_and_connect_to_self_as_children(ctx: &DalContext) { +async fn create_and_connect_to_self_as_children(ctx: &mut DalContext) { let small_odd_lego = create_component_for_default_schema_name_in_default_view( ctx, "small odd lego", @@ -245,21 +247,64 @@ async fn create_and_connect_to_self_as_children(ctx: &DalContext) { let components = Component::list(ctx).await.expect("get components"); assert_eq!(4, components.len()); - let children = Component::get_children_for_id(ctx, small_odd_lego.id()) + let workspace_snapshot = ctx.workspace_snapshot().expect("get snap"); + let edges = workspace_snapshot + .edges_directed(small_odd_lego.id(), petgraph::Direction::Outgoing) + .await + .expect("get edges"); + for (weight, _, tgt) in edges { + let target_id = workspace_snapshot + .get_node_weight(tgt) + .await + .expect("get target") + .id(); + println!("{:?} -> {}", weight.kind(), target_id); + } + + let children: HashSet<_> = Component::get_children_for_id(ctx, small_odd_lego.id()) .await - .expect("get frame children"); + .expect("get frame children") + .into_iter() + .collect(); assert_eq!(3, children.len()); + let managed: HashSet<_> = small_odd_lego + .get_managed(ctx) + .await + .expect("get managed") + .into_iter() + .collect(); + assert_eq!(children, managed); + let small_even_lego_schema_id: SchemaId = ulid::Ulid::from_string(SCHEMA_ID_SMALL_EVEN_LEGO) .expect("make ulid") .into(); - for child_id in children { + for &child_id in &children { let c = Component::get_by_id(ctx, child_id) .await .expect("get component"); let schema_id = c.schema(ctx).await.expect("get schema").id(); assert_eq!(small_even_lego_schema_id, schema_id); } + + // Ensure parallel edges make it through the rebase + apply_change_set_to_base(ctx).await; + + let children_base: HashSet<_> = Component::get_children_for_id(ctx, small_odd_lego.id()) + .await + .expect("get frame children") + .into_iter() + .collect(); + assert_eq!(3, children_base.len()); + let managed_base: HashSet<_> = small_odd_lego + .get_managed(ctx) + .await + .expect("get managed") + .into_iter() + .collect(); + + assert_eq!(children, children_base); + assert_eq!(children_base, managed_base); } #[test]