Skip to content

Commit

Permalink
Merge pull request #4979 from systeminit/fix/parent-and-management-edges
Browse files Browse the repository at this point in the history
fix: frames can manage children,  connection events sent after component events in management operator
  • Loading branch information
zacharyhamm authored Nov 15, 2024
2 parents 7310860 + 86a1326 commit e2b6a27
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 82 deletions.
6 changes: 5 additions & 1 deletion lib/dal/src/attribute/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2630,7 +2630,11 @@ impl AttributeValue {
) -> AttributeValueResult<Option<String>> {
Ok(ctx
.workspace_snapshot()?
.find_edge(parent_attribute_value_id, child_attribute_value_id)
.find_edge(
parent_attribute_value_id,
child_attribute_value_id,
EdgeWeightKindDiscriminants::Contain,
)
.await?
.and_then(|weight| match weight.kind() {
EdgeWeightKind::Contain(key) => key.to_owned(),
Expand Down
109 changes: 72 additions & 37 deletions lib/dal/src/management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use veritech_client::{ManagementFuncStatus, ManagementResultSuccess};
use crate::component::frame::{Frame, FrameError};
use crate::diagram::view::{View, ViewId};
use crate::diagram::DiagramError;
use crate::WorkspaceSnapshotError;
use crate::{
action::{
prototype::{ActionKind, ActionPrototype, ActionPrototypeError},
Expand All @@ -29,6 +28,7 @@ use crate::{
Func, FuncError, InputSocket, InputSocketId, OutputSocket, OutputSocketId, Prop, PropKind,
Schema, SchemaError, SchemaId, SchemaVariantId, StandardModelError, WsEvent, WsEventError,
};
use crate::{EdgeWeightKind, WorkspaceSnapshotError};

pub mod prototype;

Expand Down Expand Up @@ -396,7 +396,9 @@ struct PendingParent {
#[derive(Clone, Debug)]
enum PendingOperation {
Connect(PendingConnect),
Manage(ComponentId),
Parent(PendingParent),
RemoveConnection(PendingConnect),
}

impl<'a> ManagementOperator<'a> {
Expand Down Expand Up @@ -649,6 +651,20 @@ impl<'a> ManagementOperator<'a> {
Ok(())
}

async fn manage(&self, component_id: ComponentId) -> ManagementResult<()> {
let cycle_check_guard = self.ctx.workspace_snapshot()?.enable_cycle_check().await;
Component::add_manages_edge_to_component(
self.ctx,
self.manager_component_id,
component_id,
EdgeWeightKind::Manages,
)
.await?;
drop(cycle_check_guard);

Ok(())
}

async fn creates(&mut self) -> ManagementResult<Vec<PendingOperation>> {
// We take here to avoid holding on to an immutable ref to self throughout the loop
let creates = self.operations.create.take();
Expand Down Expand Up @@ -712,16 +728,7 @@ impl<'a> ManagementOperator<'a> {
parent: parent.to_owned(),
}));
}

let cycle_check_guard = self.ctx.workspace_snapshot()?.enable_cycle_check().await;
Component::add_manages_edge_to_component(
self.ctx,
self.manager_component_id,
component_id,
crate::EdgeWeightKind::Manages,
)
.await?;
drop(cycle_check_guard);
pending_operations.push(PendingOperation::Manage(component_id));
}
}

Expand All @@ -737,10 +744,12 @@ impl<'a> ManagementOperator<'a> {
))
}

async fn updates(&mut self) -> ManagementResult<()> {
async fn updates(&mut self) -> ManagementResult<Vec<PendingOperation>> {
let mut pending = vec![];

let updates = self.operations.update.take();
let Some(updates) = &updates else {
return Ok(());
return Ok(pending);
};

for (placeholder, operation) in updates {
Expand Down Expand Up @@ -777,25 +786,34 @@ impl<'a> ManagementOperator<'a> {
if let Some(update_conns) = &operation.connect {
if let Some(remove_conns) = &update_conns.remove {
for to_remove in remove_conns {
self.remove_connection(component_id, to_remove).await?;
pending.push(PendingOperation::RemoveConnection(PendingConnect {
from_component_id: component_id,
connection: to_remove.to_owned(),
}));
}
}

if let Some(add_conns) = &update_conns.add {
for to_add in add_conns {
self.create_connection(component_id, to_add).await?;
pending.push(PendingOperation::Connect(PendingConnect {
from_component_id: component_id,
connection: to_add.to_owned(),
}));
}
}
}

if let Some(new_parent) = &operation.parent {
self.set_parent(component_id, new_parent).await?;
pending.push(PendingOperation::Parent(PendingParent {
child_component_id: component_id,
parent: new_parent.to_owned(),
}));
}

self.updated_components.push(component_id);
}

Ok(())
Ok(pending)
}

async fn actions(&self) -> ManagementResult<()> {
Expand All @@ -816,26 +834,19 @@ impl<'a> ManagementOperator<'a> {
}

pub async fn operate(&mut self) -> ManagementResult<()> {
let pending_operations = self.creates().await?;
self.updates().await?;

// We have to execute these after the creation of the component, and
// after updates, so that they can reference other created components
// and so that we can ensure the updates have been applied
for pending_operation in pending_operations {
match pending_operation {
PendingOperation::Connect(pending_connect) => {
self.create_connection(
pending_connect.from_component_id,
&pending_connect.connection,
)
.await?
}
PendingOperation::Parent(pending_parent) => {
self.set_parent(pending_parent.child_component_id, &pending_parent.parent)
.await?
}
}
let mut pending_operations = self.creates().await?;
pending_operations.extend(self.updates().await?);

// Parents have to be set before component events are sent
for pending_parent in pending_operations
.iter()
.filter_map(|pending_op| match pending_op {
PendingOperation::Parent(pending_parent) => Some(pending_parent),
_ => None,
})
{
self.set_parent(pending_parent.child_component_id, &pending_parent.parent)
.await?
}

for &created_id in &self.created_components {
Expand Down Expand Up @@ -874,6 +885,30 @@ impl<'a> ManagementOperator<'a> {
.await?;
}

// Now, the rest of the pending ops can be executed, which need to have
// their wsevents sent *after* the component ws events (otherwise some
// will be discarded by the frontend, since it does not know about the
// newly created components until the above events are sent)
for pending_op in pending_operations {
match pending_op {
PendingOperation::Connect(pending_connect) => {
self.create_connection(
pending_connect.from_component_id,
&pending_connect.connection,
)
.await?;
}
PendingOperation::RemoveConnection(remove) => {
self.remove_connection(remove.from_component_id, &remove.connection)
.await?;
}
PendingOperation::Manage(managed_id) => {
self.manage(managed_id).await?;
}
PendingOperation::Parent(_) => {}
}
}

self.actions().await?;

Ok(())
Expand Down
28 changes: 15 additions & 13 deletions lib/dal/src/workspace_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,13 +658,13 @@ impl WorkspaceSnapshot {
from_node_id: impl Into<Ulid>,
edge_weight: EdgeWeight,
to_node_id: impl Into<Ulid>,
) -> WorkspaceSnapshotResult<EdgeIndex> {
) -> WorkspaceSnapshotResult<()> {
let from_node_index = self
.working_copy()
.await
.get_node_index_by_id(from_node_id)?;
let to_node_index = self.working_copy().await.get_node_index_by_id(to_node_id)?;
Ok(if self.cycle_check().await {
if self.cycle_check().await {
let self_clone = self.clone();
slow_rt::spawn(async move {
let mut working_copy = self_clone.working_copy_mut().await;
Expand All @@ -675,7 +675,9 @@ impl WorkspaceSnapshot {
self.working_copy_mut()
.await
.add_edge(from_node_index, edge_weight, to_node_index)?
})
}

Ok(())
}

/// Add an edge to the graph, bypassing any cycle checks and using node
Expand All @@ -685,33 +687,32 @@ impl WorkspaceSnapshot {
from_node_index: NodeIndex,
edge_weight: EdgeWeight,
to_node_index: NodeIndex,
) -> WorkspaceSnapshotResult<EdgeIndex> {
let edge_index =
self.working_copy_mut()
.await
.add_edge(from_node_index, edge_weight, to_node_index)?;
) -> WorkspaceSnapshotResult<()> {
self.working_copy_mut()
.await
.add_edge(from_node_index, edge_weight, to_node_index)?;

Ok(edge_index)
Ok(())
}

pub async fn add_ordered_edge(
&self,
from_node_id: impl Into<Ulid>,
edge_weight: EdgeWeight,
to_node_id: impl Into<Ulid>,
) -> WorkspaceSnapshotResult<EdgeIndex> {
) -> WorkspaceSnapshotResult<()> {
let from_node_index = self
.working_copy()
.await
.get_node_index_by_id(from_node_id)?;
let to_node_index = self.working_copy().await.get_node_index_by_id(to_node_id)?;
let (edge_index, _) = self.working_copy_mut().await.add_ordered_edge(
self.working_copy_mut().await.add_ordered_edge(
from_node_index,
edge_weight,
to_node_index,
)?;

Ok(edge_index)
Ok(())
}

#[instrument(
Expand Down Expand Up @@ -1232,14 +1233,15 @@ impl WorkspaceSnapshot {
&self,
from_id: impl Into<Ulid>,
to_id: impl Into<Ulid>,
edge_weight_kind: EdgeWeightKindDiscriminants,
) -> WorkspaceSnapshotResult<Option<EdgeWeight>> {
let working_copy = self.working_copy().await;

let from_idx = working_copy.get_node_index_by_id(from_id)?;
let to_idx = working_copy.get_node_index_by_id(to_id)?;

Ok(working_copy
.find_edge(from_idx, to_idx)
.find_edge(from_idx, to_idx, edge_weight_kind)
.map(ToOwned::to_owned))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mod test {
ContentAddress::Component(ContentHash::from("Component B")),
))
.expect("Unable to add Component B");
let _new_onto_root_component_edge_index = base_graph
base_graph
.add_edge(
base_graph.root(),
EdgeWeight::new(EdgeWeightKind::new_use()),
Expand Down
Loading

0 comments on commit e2b6a27

Please sign in to comment.