Skip to content

Commit

Permalink
Merge pull request #5048 from systeminit/zack/issue-ws-events-in-pare…
Browse files Browse the repository at this point in the history
…ntage-order

fix(dal): issue management component ws events in parentage order
  • Loading branch information
zacharyhamm authored Dec 2, 2024
2 parents f96e434 + 037d276 commit b2b4d5f
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 46 deletions.
40 changes: 40 additions & 0 deletions lib/dal-test/src/test_exclusive_schemas/legos/small.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,36 @@ pub(crate) async fn migrate_test_exclusive_schema_small_odd_lego(
create_and_connect_to_self_as_children_name,
)?;

let deeply_nested_children_code = r#"
async function main({ thisComponent, components }: Input): Promise<Output> {
const thisName = thisComponent.properties?.si?.name ?? "unknown";
const count = 10;
let create: { [key: string]: unknown } = {};
let prevName = "self";
for (let i = 0; i < count; i++) {
let name = `clone_${i}`;
create[name] = {
kind: "small odd lego",
properties: { si: { name, type: "configurationFrameDown" }, },
parent: prevName,
};
prevName = name;
}
return {
status: "ok",
ops: {
update: { self: { properties: { si: { type: "configurationFrameDown" } } } },
create,
}
}
}
"#;
let deeply_nested_children =
build_management_func(deeply_nested_children_code, "test:deeplyNestedChildren")?;

let fn_name = "test:deleteActionSmallLego";
let delete_action_func = build_action_func(delete_action_code, fn_name)?;

Expand Down Expand Up @@ -418,6 +448,15 @@ pub(crate) async fn migrate_test_exclusive_schema_small_odd_lego(
.func_unique_id(&create_and_connect_to_self_func.unique_id)
.build()?,
)
.management_func(
ManagementFuncSpec::builder()
.name("Deeply Nested Children")
.managed_schemas(Some(HashSet::from([
SCHEMA_ID_SMALL_EVEN_LEGO.to_string()
])))
.func_unique_id(&deeply_nested_children.unique_id)
.build()?,
)
.management_func(
ManagementFuncSpec::builder()
.name("Create and Connect to Self as Children")
Expand Down Expand Up @@ -446,6 +485,7 @@ pub(crate) async fn migrate_test_exclusive_schema_small_odd_lego(
.func(create_and_connect_from_self_func)
.func(create_and_connect_to_self_func)
.func(create_and_connect_to_self_as_children_func)
.func(deeply_nested_children)
.schema(small_lego_schema)
.build()?;

Expand Down
137 changes: 91 additions & 46 deletions lib/dal/src/management/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{hash_map, HashMap, VecDeque};
use std::collections::{hash_map, HashMap, HashSet, VecDeque};

use prototype::ManagementPrototypeExecution;
use serde::{Deserialize, Serialize};
Expand All @@ -7,6 +7,7 @@ use thiserror::Error;
use veritech_client::{ManagementFuncStatus, ManagementResultSuccess};

use crate::component::frame::{Frame, FrameError};
use crate::dependency_graph::DependencyGraph;
use crate::diagram::geometry::Geometry;
use crate::diagram::view::{View, ViewId};
use crate::diagram::{DiagramError, SummaryDiagramManagementEdge};
Expand Down Expand Up @@ -385,8 +386,8 @@ pub struct ManagementOperator<'a> {
socket_map: VariantSocketMap,
view_id: ViewId,
views: HashMap<String, ViewId>,
created_components: Vec<ComponentId>,
updated_components: Vec<ComponentId>,
created_components: HashSet<ComponentId>,
updated_components: HashSet<ComponentId>,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -469,8 +470,8 @@ impl<'a> ManagementOperator<'a> {
socket_map: VariantSocketMap::new(),
view_id,
views,
created_components: vec![],
updated_components: vec![],
created_components: HashSet::new(),
updated_components: HashSet::new(),
})
}

Expand Down Expand Up @@ -684,7 +685,7 @@ impl<'a> ManagementOperator<'a> {
&self,
child_id: ComponentId,
parent_placeholder: &String,
) -> ManagementResult<()> {
) -> ManagementResult<ComponentId> {
let new_parent_id = self
.component_id_placeholders
.get(parent_placeholder)
Expand All @@ -695,7 +696,7 @@ impl<'a> ManagementOperator<'a> {

Frame::upsert_parent(self.ctx, child_id, new_parent_id).await?;

Ok(())
Ok(new_parent_id)
}

async fn manage(
Expand Down Expand Up @@ -754,7 +755,7 @@ impl<'a> ManagementOperator<'a> {

let component_id = component.id();

self.created_components.push(component_id);
self.created_components.insert(component_id);

self.component_id_placeholders
.insert(placeholder.to_owned(), component_id);
Expand Down Expand Up @@ -906,7 +907,7 @@ impl<'a> ManagementOperator<'a> {
}));
}

self.updated_components.push(component_id);
self.updated_components.insert(component_id);
}

Ok(pending)
Expand All @@ -929,8 +930,84 @@ impl<'a> ManagementOperator<'a> {
Ok(())
}

// Using the dep graph to ensure we send ws events for components in parent
// to child order, so that parents exist in the frontend before their
// children / parents are rendered as frames before their children report
// their parentage
async fn send_component_ws_events(
&mut self,
mut parentage_graph: DependencyGraph<ComponentId>,
) -> ManagementResult<()> {
loop {
let independent_ids = parentage_graph.independent_ids();
if independent_ids.is_empty() {
break;
}
for id in independent_ids {
if self.created_components.contains(&id) {
self.send_created_event(id).await?;
self.created_components.remove(&id);
} else if self.updated_components.contains(&id) {
self.send_updated_event(id).await?;
self.updated_components.remove(&id);
}
parentage_graph.remove_id(id);
}
}

for &created_id in &self.created_components {
self.send_created_event(created_id).await?;
}

for &updated_id in &self.updated_components {
self.send_updated_event(updated_id).await?;
}

Ok(())
}

async fn send_created_event(&self, id: ComponentId) -> ManagementResult<()> {
let component = Component::get_by_id(self.ctx, id).await?;
WsEvent::component_created(
self.ctx,
component
.into_frontend_type(
self.ctx,
Some(&component.geometry(self.ctx, self.view_id).await?),
Added,
&mut HashMap::new(),
)
.await?,
)
.await?
.publish_on_commit(self.ctx)
.await?;

Ok(())
}
async fn send_updated_event(&self, id: ComponentId) -> ManagementResult<()> {
let component = Component::get_by_id(self.ctx, id).await?;
WsEvent::component_updated(
self.ctx,
component
.into_frontend_type(
self.ctx,
Some(&component.geometry(self.ctx, self.view_id).await?),
component.change_status(self.ctx).await?,
&mut HashMap::new(),
)
.await?,
)
.await?
.publish_on_commit(self.ctx)
.await?;

Ok(())
}

pub async fn operate(&mut self) -> ManagementResult<()> {
let mut pending_operations = self.creates().await?;
let mut component_graph = DependencyGraph::new();
pending_operations.extend(self.updates().await?);

// Parents have to be set before component events are sent
Expand All @@ -941,45 +1018,13 @@ impl<'a> ManagementOperator<'a> {
_ => None,
})
{
self.set_parent(pending_parent.child_component_id, &pending_parent.parent)
.await?
}

for &created_id in &self.created_components {
let component = Component::get_by_id(self.ctx, created_id).await?;
WsEvent::component_created(
self.ctx,
component
.into_frontend_type(
self.ctx,
Some(&component.geometry(self.ctx, self.view_id).await?),
Added,
&mut HashMap::new(),
)
.await?,
)
.await?
.publish_on_commit(self.ctx)
.await?;
let parent_id = self
.set_parent(pending_parent.child_component_id, &pending_parent.parent)
.await?;
component_graph.id_depends_on(pending_parent.child_component_id, parent_id);
}

for &updated_id in &self.updated_components {
let component = Component::get_by_id(self.ctx, updated_id).await?;
WsEvent::component_updated(
self.ctx,
component
.into_frontend_type(
self.ctx,
Some(&component.geometry(self.ctx, self.view_id).await?),
component.change_status(self.ctx).await?,
&mut HashMap::new(),
)
.await?,
)
.await?
.publish_on_commit(self.ctx)
.await?;
}
self.send_component_ws_events(component_graph).await?;

// Now, the rest of the pending ops can be executed, which need to have
// their wsevents sent *after* the component ws events (otherwise some
Expand Down
76 changes: 76 additions & 0 deletions lib/dal/tests/integration_test/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,3 +776,79 @@ async fn execute_management_func(ctx: &DalContext) {

assert_eq!(Some(serde_json::json!("step")), two_value);
}

#[test]
async fn deeply_nested_children(ctx: &DalContext) {
let small_odd_lego = create_component_for_default_schema_name_in_default_view(
ctx,
"small odd lego",
"small odd lego",
)
.await
.expect("could not create component");
let variant = small_odd_lego
.schema_variant(ctx)
.await
.expect("get variant");
let management_prototype = ManagementPrototype::list_for_variant_id(ctx, variant.id())
.await
.expect("get prototypes")
.into_iter()
.find(|proto| proto.name() == "Deeply Nested Children")
.expect("could not find prototype");

let mut execution_result = management_prototype
.execute(ctx, small_odd_lego.id(), None)
.await
.expect("should execute management prototype func");

let result: ManagementFuncReturn = execution_result
.result
.take()
.expect("should have a result success")
.try_into()
.expect("should be a valid management func return");

assert_eq!(result.status, ManagementFuncStatus::Ok);

let operations = result.operations.expect("should have operations");

ManagementOperator::new(ctx, small_odd_lego.id(), operations, execution_result, None)
.await
.expect("should create operator")
.operate()
.await
.expect("should operate");

let mut component_names = vec![];

let mut current = small_odd_lego.id();
loop {
let children = Component::get_children_for_id(ctx, current)
.await
.expect("get children");

if children.is_empty() {
break;
}

let child_id = children[0];
current = child_id;
let name = Component::get_by_id(ctx, child_id)
.await
.expect("get comp")
.name(ctx)
.await
.expect("get name");

component_names.push(name);
}

assert_eq!(
vec![
"clone_0", "clone_1", "clone_2", "clone_3", "clone_4", "clone_5", "clone_6", "clone_7",
"clone_8", "clone_9",
],
component_names
)
}

0 comments on commit b2b4d5f

Please sign in to comment.