From 3c349e708d6204335e3118dace035ca4e60237b0 Mon Sep 17 00:00:00 2001 From: Duncan Calvert Date: Thu, 12 Dec 2024 17:34:47 -0600 Subject: [PATCH 01/10] Start on time sync tool. --- .../ros2/sync/ROS2DistributedClock.java | 72 ++++++ .../ros2/sync/ROS2DistributedClockPeer.java | 11 + .../ros2/sync/ROS2DistributedClockTest.java | 17 ++ .../msg/DistributedClockMessage_.idl | 44 ++++ .../msg/dds/DistributedClockMessage.java | 215 ++++++++++++++++++ .../DistributedClockMessagePubSubType.java | 184 +++++++++++++++ .../msg/DistributedClockMessage.msg | 17 ++ .../msg/DistributedClockMessage.msg | 19 ++ 8 files changed, 579 insertions(+) create mode 100644 ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java create mode 100644 ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java create mode 100644 ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java create mode 100644 ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl create mode 100644 ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java create mode 100644 ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java create mode 100644 ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg create mode 100644 ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java new file mode 100644 index 00000000000..5b7a6cc9a0c --- /dev/null +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java @@ -0,0 +1,72 @@ +package us.ihmc.communication.ros2.sync; + +import ihmc_common_msgs.msg.dds.DistributedClockMessage; +import us.ihmc.commons.exception.DefaultExceptionHandler; +import us.ihmc.commons.exception.ExceptionTools; +import us.ihmc.commons.thread.RepeatingTaskThread; +import us.ihmc.communication.ROS2Tools; +import us.ihmc.pubsub.common.SampleInfo; +import us.ihmc.pubsub.common.Time; +import us.ihmc.ros2.ROS2NodeInterface; +import us.ihmc.ros2.ROS2Topic; + +import java.net.InetAddress; +import java.time.Instant; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Assumes symmetric network delay. + */ +public class ROS2DistributedClock +{ + private static final AtomicLong INDEX = new AtomicLong(); + private static final ROS2Topic TOPIC = ROS2Tools.IHMC_ROOT.withModule("distributed_clock").withType(DistributedClockMessage.class); + + private final String ourName; + private final HashMap peers = new HashMap<>(); + + private final SampleInfo receivedSampleInfo = new SampleInfo(); + private final DistributedClockMessage receivedClockMessage = new DistributedClockMessage(); + + private final RepeatingTaskThread requestThread = new RepeatingTaskThread(getClass().getSimpleName(), + this::requestThread, + DefaultExceptionHandler.MESSAGE_AND_STACKTRACE) + .setFrequencyLimit(1.0); + + public ROS2DistributedClock(ROS2NodeInterface ros2Node) + { + AtomicReference hostname = new AtomicReference<>(""); + ExceptionTools.handle(() -> hostname.set("_" + InetAddress.getLocalHost().getHostName()), DefaultExceptionHandler.PROCEED_SILENTLY); + + + ourName = "%s%s_%d_%d".formatted(ros2Node.getName(), hostname.get(), ProcessHandle.current().pid(), INDEX.getAndIncrement()); + + + ros2Node.createSubscription(TOPIC, subscriber -> + { + Instant now = Instant.now(); + + long epochSecond = now.getEpochSecond(); + int nano = now.getNano(); + + subscriber.takeNextData(receivedClockMessage, receivedSampleInfo); + + Time sourceTimestamp = receivedSampleInfo.getSourceTimestamp(); + long nanoseconds = sourceTimestamp.getNanoseconds(); + int seconds = sourceTimestamp.getSeconds(); + + + }); + + + + requestThread.start(); + } + + private void requestThread() + { + + } +} diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java new file mode 100644 index 00000000000..474e49fa1be --- /dev/null +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java @@ -0,0 +1,11 @@ +package us.ihmc.communication.ros2.sync; + +public class ROS2DistributedClockPeer +{ + private long epochSecondOffset = 0; + private int nanoOffset = 0; + + public ROS2DistributedClockPeer(String name) + { + } +} diff --git a/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java b/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java new file mode 100644 index 00000000000..b7e58df23d7 --- /dev/null +++ b/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java @@ -0,0 +1,17 @@ +package us.ihmc.communication.ros2.sync; + +import org.junit.jupiter.api.Test; +import us.ihmc.communication.ROS2Tools; +import us.ihmc.pubsub.DomainFactory.PubSubImplementation; +import us.ihmc.ros2.ROS2Node; + +public class ROS2DistributedClockTest +{ + @Test + public void test() + { + ROS2Node ros2Node = ROS2Tools.createROS2Node(PubSubImplementation.INTRAPROCESS, "test"); + + new ROS2DistributedClock(ros2Node); + } +} diff --git a/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl b/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl new file mode 100644 index 00000000000..e570611b681 --- /dev/null +++ b/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl @@ -0,0 +1,44 @@ +#ifndef __ihmc_common_msgs__msg__DistributedClockMessage__idl__ +#define __ihmc_common_msgs__msg__DistributedClockMessage__idl__ + +#include "ihmc_common_msgs/msg/./InstantMessage_.idl" +module ihmc_common_msgs +{ + module msg + { + module dds + { + + /** + * This message is a request-reply ping used to estimate the clock offset + * of another node to be used by data synchronization algorithms. + */ + @TypeCode(type="ihmc_common_msgs::msg::dds_::DistributedClockMessage_") + struct DistributedClockMessage + { + /** + * The requester's unique ID + */ + string requester_id; + /** + * The replier's unique ID + */ + string replier_id; + /** + * Monotonically increasing request number + */ + unsigned long long sequence_id; + /** + * The time at which the request is sent + */ + ihmc_common_msgs::msg::dds::InstantMessage request_send_time; + /** + * The time at which the reply is sent + */ + ihmc_common_msgs::msg::dds::InstantMessage reply_send_time; + }; + }; + }; +}; + +#endif diff --git a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java new file mode 100644 index 00000000000..d3880ab6113 --- /dev/null +++ b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java @@ -0,0 +1,215 @@ +package ihmc_common_msgs.msg.dds; + +import us.ihmc.communication.packets.Packet; +import us.ihmc.euclid.interfaces.Settable; +import us.ihmc.euclid.interfaces.EpsilonComparable; +import java.util.function.Supplier; +import us.ihmc.pubsub.TopicDataType; + +/** + * This message is a request-reply ping used to estimate the clock offset + * of another node to be used by data synchronization algorithms. + */ +public class DistributedClockMessage extends Packet implements Settable, EpsilonComparable +{ + /** + * The requester's unique ID + */ + public java.lang.StringBuilder requester_id_; + /** + * The replier's unique ID + */ + public java.lang.StringBuilder replier_id_; + /** + * Monotonically increasing request number + */ + public long sequence_id_; + /** + * The time at which the request is sent + */ + public ihmc_common_msgs.msg.dds.InstantMessage request_send_time_; + /** + * The time at which the reply is sent + */ + public ihmc_common_msgs.msg.dds.InstantMessage reply_send_time_; + + public DistributedClockMessage() + { + requester_id_ = new java.lang.StringBuilder(255); + replier_id_ = new java.lang.StringBuilder(255); + request_send_time_ = new ihmc_common_msgs.msg.dds.InstantMessage(); + reply_send_time_ = new ihmc_common_msgs.msg.dds.InstantMessage(); + } + + public DistributedClockMessage(DistributedClockMessage other) + { + this(); + set(other); + } + + public void set(DistributedClockMessage other) + { + requester_id_.setLength(0); + requester_id_.append(other.requester_id_); + + replier_id_.setLength(0); + replier_id_.append(other.replier_id_); + + sequence_id_ = other.sequence_id_; + + ihmc_common_msgs.msg.dds.InstantMessagePubSubType.staticCopy(other.request_send_time_, request_send_time_); + ihmc_common_msgs.msg.dds.InstantMessagePubSubType.staticCopy(other.reply_send_time_, reply_send_time_); + } + + /** + * The requester's unique ID + */ + public void setRequesterId(java.lang.String requester_id) + { + requester_id_.setLength(0); + requester_id_.append(requester_id); + } + + /** + * The requester's unique ID + */ + public java.lang.String getRequesterIdAsString() + { + return getRequesterId().toString(); + } + /** + * The requester's unique ID + */ + public java.lang.StringBuilder getRequesterId() + { + return requester_id_; + } + + /** + * The replier's unique ID + */ + public void setReplierId(java.lang.String replier_id) + { + replier_id_.setLength(0); + replier_id_.append(replier_id); + } + + /** + * The replier's unique ID + */ + public java.lang.String getReplierIdAsString() + { + return getReplierId().toString(); + } + /** + * The replier's unique ID + */ + public java.lang.StringBuilder getReplierId() + { + return replier_id_; + } + + /** + * Monotonically increasing request number + */ + public void setSequenceId(long sequence_id) + { + sequence_id_ = sequence_id; + } + /** + * Monotonically increasing request number + */ + public long getSequenceId() + { + return sequence_id_; + } + + + /** + * The time at which the request is sent + */ + public ihmc_common_msgs.msg.dds.InstantMessage getRequestSendTime() + { + return request_send_time_; + } + + + /** + * The time at which the reply is sent + */ + public ihmc_common_msgs.msg.dds.InstantMessage getReplySendTime() + { + return reply_send_time_; + } + + + public static Supplier getPubSubType() + { + return DistributedClockMessagePubSubType::new; + } + + @Override + public Supplier getPubSubTypePacket() + { + return DistributedClockMessagePubSubType::new; + } + + @Override + public boolean epsilonEquals(DistributedClockMessage other, double epsilon) + { + if(other == null) return false; + if(other == this) return true; + + if (!us.ihmc.idl.IDLTools.epsilonEqualsStringBuilder(this.requester_id_, other.requester_id_, epsilon)) return false; + + if (!us.ihmc.idl.IDLTools.epsilonEqualsStringBuilder(this.replier_id_, other.replier_id_, epsilon)) return false; + + if (!us.ihmc.idl.IDLTools.epsilonEqualsPrimitive(this.sequence_id_, other.sequence_id_, epsilon)) return false; + + if (!this.request_send_time_.epsilonEquals(other.request_send_time_, epsilon)) return false; + if (!this.reply_send_time_.epsilonEquals(other.reply_send_time_, epsilon)) return false; + + return true; + } + + @Override + public boolean equals(Object other) + { + if(other == null) return false; + if(other == this) return true; + if(!(other instanceof DistributedClockMessage)) return false; + + DistributedClockMessage otherMyClass = (DistributedClockMessage) other; + + if (!us.ihmc.idl.IDLTools.equals(this.requester_id_, otherMyClass.requester_id_)) return false; + + if (!us.ihmc.idl.IDLTools.equals(this.replier_id_, otherMyClass.replier_id_)) return false; + + if(this.sequence_id_ != otherMyClass.sequence_id_) return false; + + if (!this.request_send_time_.equals(otherMyClass.request_send_time_)) return false; + if (!this.reply_send_time_.equals(otherMyClass.reply_send_time_)) return false; + + return true; + } + + @Override + public java.lang.String toString() + { + StringBuilder builder = new StringBuilder(); + + builder.append("DistributedClockMessage {"); + builder.append("requester_id="); + builder.append(this.requester_id_); builder.append(", "); + builder.append("replier_id="); + builder.append(this.replier_id_); builder.append(", "); + builder.append("sequence_id="); + builder.append(this.sequence_id_); builder.append(", "); + builder.append("request_send_time="); + builder.append(this.request_send_time_); builder.append(", "); + builder.append("reply_send_time="); + builder.append(this.reply_send_time_); + builder.append("}"); + return builder.toString(); + } +} diff --git a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java new file mode 100644 index 00000000000..edc6ec66593 --- /dev/null +++ b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java @@ -0,0 +1,184 @@ +package ihmc_common_msgs.msg.dds; + +/** +* +* Topic data type of the struct "DistributedClockMessage" defined in "DistributedClockMessage_.idl". Use this class to provide the TopicDataType to a Participant. +* +* This file was automatically generated from DistributedClockMessage_.idl by us.ihmc.idl.generator.IDLGenerator. +* Do not update this file directly, edit DistributedClockMessage_.idl instead. +* +*/ +public class DistributedClockMessagePubSubType implements us.ihmc.pubsub.TopicDataType +{ + public static final java.lang.String name = "ihmc_common_msgs::msg::dds_::DistributedClockMessage_"; + + @Override + public final java.lang.String getDefinitionChecksum() + { + return "dd81b48b59f9cf8127efe53e2aa6f59c2eebc719487f6051827e5fdc2d7a5ac3"; + } + + @Override + public final java.lang.String getDefinitionVersion() + { + return "local"; + } + + private final us.ihmc.idl.CDR serializeCDR = new us.ihmc.idl.CDR(); + private final us.ihmc.idl.CDR deserializeCDR = new us.ihmc.idl.CDR(); + + @Override + public void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.pubsub.common.SerializedPayload serializedPayload) throws java.io.IOException + { + serializeCDR.serialize(serializedPayload); + write(data, serializeCDR); + serializeCDR.finishSerialize(); + } + + @Override + public void deserialize(us.ihmc.pubsub.common.SerializedPayload serializedPayload, ihmc_common_msgs.msg.dds.DistributedClockMessage data) throws java.io.IOException + { + deserializeCDR.deserialize(serializedPayload); + read(data, deserializeCDR); + deserializeCDR.finishDeserialize(); + } + + public static int getMaxCdrSerializedSize() + { + return getMaxCdrSerializedSize(0); + } + + public static int getMaxCdrSerializedSize(int current_alignment) + { + int initial_alignment = current_alignment; + + current_alignment += 4 + us.ihmc.idl.CDR.alignment(current_alignment, 4) + 255 + 1; + current_alignment += 4 + us.ihmc.idl.CDR.alignment(current_alignment, 4) + 255 + 1; + current_alignment += 8 + us.ihmc.idl.CDR.alignment(current_alignment, 8); + + current_alignment += ihmc_common_msgs.msg.dds.InstantMessagePubSubType.getMaxCdrSerializedSize(current_alignment); + + current_alignment += ihmc_common_msgs.msg.dds.InstantMessagePubSubType.getMaxCdrSerializedSize(current_alignment); + + + return current_alignment - initial_alignment; + } + + public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.DistributedClockMessage data) + { + return getCdrSerializedSize(data, 0); + } + + public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.DistributedClockMessage data, int current_alignment) + { + int initial_alignment = current_alignment; + + current_alignment += 4 + us.ihmc.idl.CDR.alignment(current_alignment, 4) + data.getRequesterId().length() + 1; + + current_alignment += 4 + us.ihmc.idl.CDR.alignment(current_alignment, 4) + data.getReplierId().length() + 1; + + current_alignment += 8 + us.ihmc.idl.CDR.alignment(current_alignment, 8); + + + current_alignment += ihmc_common_msgs.msg.dds.InstantMessagePubSubType.getCdrSerializedSize(data.getRequestSendTime(), current_alignment); + + current_alignment += ihmc_common_msgs.msg.dds.InstantMessagePubSubType.getCdrSerializedSize(data.getReplySendTime(), current_alignment); + + + return current_alignment - initial_alignment; + } + + public static void write(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.CDR cdr) + { + if(data.getRequesterId().length() <= 255) + cdr.write_type_d(data.getRequesterId());else + throw new RuntimeException("requester_id field exceeds the maximum length"); + + if(data.getReplierId().length() <= 255) + cdr.write_type_d(data.getReplierId());else + throw new RuntimeException("replier_id field exceeds the maximum length"); + + cdr.write_type_12(data.getSequenceId()); + + ihmc_common_msgs.msg.dds.InstantMessagePubSubType.write(data.getRequestSendTime(), cdr); + ihmc_common_msgs.msg.dds.InstantMessagePubSubType.write(data.getReplySendTime(), cdr); + } + + public static void read(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.CDR cdr) + { + cdr.read_type_d(data.getRequesterId()); + cdr.read_type_d(data.getReplierId()); + data.setSequenceId(cdr.read_type_12()); + + ihmc_common_msgs.msg.dds.InstantMessagePubSubType.read(data.getRequestSendTime(), cdr); + ihmc_common_msgs.msg.dds.InstantMessagePubSubType.read(data.getReplySendTime(), cdr); + + } + + @Override + public final void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.InterchangeSerializer ser) + { + ser.write_type_d("requester_id", data.getRequesterId()); + ser.write_type_d("replier_id", data.getReplierId()); + ser.write_type_12("sequence_id", data.getSequenceId()); + ser.write_type_a("request_send_time", new ihmc_common_msgs.msg.dds.InstantMessagePubSubType(), data.getRequestSendTime()); + + ser.write_type_a("reply_send_time", new ihmc_common_msgs.msg.dds.InstantMessagePubSubType(), data.getReplySendTime()); + + } + + @Override + public final void deserialize(us.ihmc.idl.InterchangeSerializer ser, ihmc_common_msgs.msg.dds.DistributedClockMessage data) + { + ser.read_type_d("requester_id", data.getRequesterId()); + ser.read_type_d("replier_id", data.getReplierId()); + data.setSequenceId(ser.read_type_12("sequence_id")); + ser.read_type_a("request_send_time", new ihmc_common_msgs.msg.dds.InstantMessagePubSubType(), data.getRequestSendTime()); + + ser.read_type_a("reply_send_time", new ihmc_common_msgs.msg.dds.InstantMessagePubSubType(), data.getReplySendTime()); + + } + + public static void staticCopy(ihmc_common_msgs.msg.dds.DistributedClockMessage src, ihmc_common_msgs.msg.dds.DistributedClockMessage dest) + { + dest.set(src); + } + + @Override + public ihmc_common_msgs.msg.dds.DistributedClockMessage createData() + { + return new ihmc_common_msgs.msg.dds.DistributedClockMessage(); + } + @Override + public int getTypeSize() + { + return us.ihmc.idl.CDR.getTypeSize(getMaxCdrSerializedSize()); + } + + @Override + public java.lang.String getName() + { + return name; + } + + public void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.CDR cdr) + { + write(data, cdr); + } + + public void deserialize(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.CDR cdr) + { + read(data, cdr); + } + + public void copy(ihmc_common_msgs.msg.dds.DistributedClockMessage src, ihmc_common_msgs.msg.dds.DistributedClockMessage dest) + { + staticCopy(src, dest); + } + + @Override + public DistributedClockMessagePubSubType newInstance() + { + return new DistributedClockMessagePubSubType(); + } +} diff --git a/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg b/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg new file mode 100644 index 00000000000..b6c991bdc78 --- /dev/null +++ b/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg @@ -0,0 +1,17 @@ +# This message is a request-reply ping used to estimate the clock offset +# of another node to be used by data synchronization algorithms. + +# The requester's unique ID +string requester_id + +# The replier's unique ID +string replier_id + +# Monotonically increasing request number +uint64 sequence_id + +# The time at which the request is sent +ihmc_common_msgs/InstantMessage request_send_time + +# The time at which the reply is sent +ihmc_common_msgs/InstantMessage reply_send_time diff --git a/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg b/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg new file mode 100644 index 00000000000..2095d71f68b --- /dev/null +++ b/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg @@ -0,0 +1,19 @@ +# This message is a request-reply ping used to estimate the clock offset +# of another node to be used by data synchronization algorithms. + +# The requester's unique ID +string requester_id + +# The replier's unique ID +string replier_id + +# Monotonically increasing request number +uint64 sequence_id + +# The time at which the request is sent +ihmc_common_msgs/InstantMessage request_send_time + +# The time at which the reply is sent +ihmc_common_msgs/InstantMessage reply_send_time + + From 64adef457c0c8e2972a2bcf8d13064f0b1fca274 Mon Sep 17 00:00:00 2001 From: Duncan Calvert Date: Thu, 12 Dec 2024 21:06:49 -0600 Subject: [PATCH 02/10] Update to ros2 1.1.3 API. --- .../ihmc/communication/ros2/sync/ROS2DistributedClock.java | 4 ++-- .../communication/ros2/sync/ROS2DistributedClockTest.java | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java index 5b7a6cc9a0c..c25fbd636ff 100644 --- a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java @@ -7,7 +7,7 @@ import us.ihmc.communication.ROS2Tools; import us.ihmc.pubsub.common.SampleInfo; import us.ihmc.pubsub.common.Time; -import us.ihmc.ros2.ROS2NodeInterface; +import us.ihmc.ros2.ROS2Node; import us.ihmc.ros2.ROS2Topic; import java.net.InetAddress; @@ -35,7 +35,7 @@ public class ROS2DistributedClock DefaultExceptionHandler.MESSAGE_AND_STACKTRACE) .setFrequencyLimit(1.0); - public ROS2DistributedClock(ROS2NodeInterface ros2Node) + public ROS2DistributedClock(ROS2Node ros2Node) { AtomicReference hostname = new AtomicReference<>(""); ExceptionTools.handle(() -> hostname.set("_" + InetAddress.getLocalHost().getHostName()), DefaultExceptionHandler.PROCEED_SILENTLY); diff --git a/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java b/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java index b7e58df23d7..da870883f79 100644 --- a/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java +++ b/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java @@ -1,16 +1,16 @@ package us.ihmc.communication.ros2.sync; import org.junit.jupiter.api.Test; -import us.ihmc.communication.ROS2Tools; -import us.ihmc.pubsub.DomainFactory.PubSubImplementation; import us.ihmc.ros2.ROS2Node; +import us.ihmc.ros2.ROS2NodeBuilder; +import us.ihmc.ros2.ROS2NodeBuilder.SpecialTransportMode; public class ROS2DistributedClockTest { @Test public void test() { - ROS2Node ros2Node = ROS2Tools.createROS2Node(PubSubImplementation.INTRAPROCESS, "test"); + ROS2Node ros2Node = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("test"); new ROS2DistributedClock(ros2Node); } From ada8993424e3df816c56a0e4905c4c68676b8012 Mon Sep 17 00:00:00 2001 From: Duncan Calvert Date: Thu, 12 Dec 2024 22:40:38 -0600 Subject: [PATCH 03/10] Working on avoiding n^2 --- .../ros2/sync/ROS2DistributedClock.java | 69 +++++++++++++++---- .../ros2/sync/ROS2DistributedClockTest.java | 9 +++ .../msg/DistributedClockMessage_.idl | 2 +- .../msg/dds/DistributedClockMessage.java | 20 +++--- .../DistributedClockMessagePubSubType.java | 10 +-- .../msg/DistributedClockMessage.msg | 2 +- .../msg/DistributedClockMessage.msg | 2 +- 7 files changed, 83 insertions(+), 31 deletions(-) diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java index c25fbd636ff..e2cc98afdbd 100644 --- a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java @@ -5,10 +5,17 @@ import us.ihmc.commons.exception.ExceptionTools; import us.ihmc.commons.thread.RepeatingTaskThread; import us.ihmc.communication.ROS2Tools; -import us.ihmc.pubsub.common.SampleInfo; -import us.ihmc.pubsub.common.Time; +import us.ihmc.log.LogTools; +import us.ihmc.pubsub.common.Guid; +import us.ihmc.pubsub.common.Guid.Entity; +import us.ihmc.pubsub.common.MatchingInfo; +import us.ihmc.pubsub.common.MatchingInfo.MatchingStatus; +import us.ihmc.pubsub.impl.fastRTPS.FastRTPSPublisher; +import us.ihmc.pubsub.subscriber.Subscriber; import us.ihmc.ros2.ROS2Node; +import us.ihmc.ros2.ROS2Publisher; import us.ihmc.ros2.ROS2Topic; +import us.ihmc.ros2.SubscriptionMatchedListener; import java.net.InetAddress; import java.time.Instant; @@ -18,6 +25,8 @@ /** * Assumes symmetric network delay. + * + * TODO: Handle intraprocess mode? Are there only FastRTPS pub/subs now? */ public class ROS2DistributedClock { @@ -27,40 +36,71 @@ public class ROS2DistributedClock private final String ourName; private final HashMap peers = new HashMap<>(); - private final SampleInfo receivedSampleInfo = new SampleInfo(); private final DistributedClockMessage receivedClockMessage = new DistributedClockMessage(); +// private final Guid ourPublisherGuid; + private long requestNumber = 0; + private final DistributedClockMessage outgoingClockMessage = new DistributedClockMessage(); + private final ROS2Publisher publisher; private final RepeatingTaskThread requestThread = new RepeatingTaskThread(getClass().getSimpleName(), this::requestThread, DefaultExceptionHandler.MESSAGE_AND_STACKTRACE) - .setFrequencyLimit(1.0); + .setFrequencyLimit(5.0); public ROS2DistributedClock(ROS2Node ros2Node) { AtomicReference hostname = new AtomicReference<>(""); ExceptionTools.handle(() -> hostname.set("_" + InetAddress.getLocalHost().getHostName()), DefaultExceptionHandler.PROCEED_SILENTLY); - ourName = "%s%s_%d_%d".formatted(ros2Node.getName(), hostname.get(), ProcessHandle.current().pid(), INDEX.getAndIncrement()); + publisher = ros2Node.createPublisher(TOPIC); + +// if (publisher instanceof FastRTPSPublisher fastRTPSPublisher) +// { +// ourPublisherGuid = fastRTPSPublisher.getGuid(); +// } + ros2Node.createSubscription(TOPIC, subscriber -> { - Instant now = Instant.now(); + subscriber.takeNextData(receivedClockMessage, null); - long epochSecond = now.getEpochSecond(); - int nano = now.getNano(); + if (receivedClockMessage.getRequesterIdAsString().equals(ourName)) + { + Instant replyReceptionTime = Instant.now(); - subscriber.takeNextData(receivedClockMessage, receivedSampleInfo); + String peerName = receivedClockMessage.getReplierIdAsString(); + ROS2DistributedClockPeer peer = peers.get(peerName); - Time sourceTimestamp = receivedSampleInfo.getSourceTimestamp(); - long nanoseconds = sourceTimestamp.getNanoseconds(); - int seconds = sourceTimestamp.getSeconds(); + if (peer == null) + { + peer = new ROS2DistributedClockPeer(peerName); + peers.put(peerName, peer); + } +// peer. + } + else + { - }); + } + }, (subscriber, info) -> + { + + Guid guid = info.getGuid(); + Entity entity = guid.getEntity(); + String entityName = entity.toString(); + String prefix = guid.getGuidPrefix().toString(); + + MatchingStatus status = info.getStatus(); + + LogTools.info("ent: %s prefix: %s status: %s".formatted(entityName, prefix, status)); + + + }); requestThread.start(); } @@ -68,5 +108,8 @@ public ROS2DistributedClock(ROS2Node ros2Node) private void requestThread() { +// outgoingClockMessage.set + + ++requestNumber; } } diff --git a/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java b/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java index da870883f79..9fb1c410db8 100644 --- a/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java +++ b/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java @@ -1,6 +1,7 @@ package us.ihmc.communication.ros2.sync; import org.junit.jupiter.api.Test; +import us.ihmc.commons.thread.ThreadTools; import us.ihmc.ros2.ROS2Node; import us.ihmc.ros2.ROS2NodeBuilder; import us.ihmc.ros2.ROS2NodeBuilder.SpecialTransportMode; @@ -13,5 +14,13 @@ public void test() ROS2Node ros2Node = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("test"); new ROS2DistributedClock(ros2Node); + new ROS2DistributedClock(ros2Node); + + ros2Node = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("test2"); + + new ROS2DistributedClock(ros2Node); + new ROS2DistributedClock(ros2Node); + + ThreadTools.park(1.0); } } diff --git a/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl b/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl index e570611b681..3da4578bbc8 100644 --- a/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl +++ b/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl @@ -27,7 +27,7 @@ module ihmc_common_msgs /** * Monotonically increasing request number */ - unsigned long long sequence_id; + unsigned long long request_number; /** * The time at which the request is sent */ diff --git a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java index d3880ab6113..e31730b2372 100644 --- a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java +++ b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java @@ -23,7 +23,7 @@ public class DistributedClockMessage extends Packet imp /** * Monotonically increasing request number */ - public long sequence_id_; + public long request_number_; /** * The time at which the request is sent */ @@ -55,7 +55,7 @@ public void set(DistributedClockMessage other) replier_id_.setLength(0); replier_id_.append(other.replier_id_); - sequence_id_ = other.sequence_id_; + request_number_ = other.request_number_; ihmc_common_msgs.msg.dds.InstantMessagePubSubType.staticCopy(other.request_send_time_, request_send_time_); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.staticCopy(other.reply_send_time_, reply_send_time_); @@ -112,16 +112,16 @@ public java.lang.StringBuilder getReplierId() /** * Monotonically increasing request number */ - public void setSequenceId(long sequence_id) + public void setRequestNumber(long request_number) { - sequence_id_ = sequence_id; + request_number_ = request_number; } /** * Monotonically increasing request number */ - public long getSequenceId() + public long getRequestNumber() { - return sequence_id_; + return request_number_; } @@ -164,7 +164,7 @@ public boolean epsilonEquals(DistributedClockMessage other, double epsilon) if (!us.ihmc.idl.IDLTools.epsilonEqualsStringBuilder(this.replier_id_, other.replier_id_, epsilon)) return false; - if (!us.ihmc.idl.IDLTools.epsilonEqualsPrimitive(this.sequence_id_, other.sequence_id_, epsilon)) return false; + if (!us.ihmc.idl.IDLTools.epsilonEqualsPrimitive(this.request_number_, other.request_number_, epsilon)) return false; if (!this.request_send_time_.epsilonEquals(other.request_send_time_, epsilon)) return false; if (!this.reply_send_time_.epsilonEquals(other.reply_send_time_, epsilon)) return false; @@ -185,7 +185,7 @@ public boolean equals(Object other) if (!us.ihmc.idl.IDLTools.equals(this.replier_id_, otherMyClass.replier_id_)) return false; - if(this.sequence_id_ != otherMyClass.sequence_id_) return false; + if(this.request_number_ != otherMyClass.request_number_) return false; if (!this.request_send_time_.equals(otherMyClass.request_send_time_)) return false; if (!this.reply_send_time_.equals(otherMyClass.reply_send_time_)) return false; @@ -203,8 +203,8 @@ public java.lang.String toString() builder.append(this.requester_id_); builder.append(", "); builder.append("replier_id="); builder.append(this.replier_id_); builder.append(", "); - builder.append("sequence_id="); - builder.append(this.sequence_id_); builder.append(", "); + builder.append("request_number="); + builder.append(this.request_number_); builder.append(", "); builder.append("request_send_time="); builder.append(this.request_send_time_); builder.append(", "); builder.append("reply_send_time="); diff --git a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java index edc6ec66593..d0fb7eb57ab 100644 --- a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java +++ b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java @@ -15,7 +15,7 @@ public class DistributedClockMessagePubSubType implements us.ihmc.pubsub.TopicDa @Override public final java.lang.String getDefinitionChecksum() { - return "dd81b48b59f9cf8127efe53e2aa6f59c2eebc719487f6051827e5fdc2d7a5ac3"; + return "9b29df5decb710752845fe1e9451103d00a8c5892b8dadd4a3a7998bea11974a"; } @Override @@ -98,7 +98,7 @@ public static void write(ihmc_common_msgs.msg.dds.DistributedClockMessage data, cdr.write_type_d(data.getReplierId());else throw new RuntimeException("replier_id field exceeds the maximum length"); - cdr.write_type_12(data.getSequenceId()); + cdr.write_type_12(data.getRequestNumber()); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.write(data.getRequestSendTime(), cdr); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.write(data.getReplySendTime(), cdr); @@ -108,7 +108,7 @@ public static void read(ihmc_common_msgs.msg.dds.DistributedClockMessage data, u { cdr.read_type_d(data.getRequesterId()); cdr.read_type_d(data.getReplierId()); - data.setSequenceId(cdr.read_type_12()); + data.setRequestNumber(cdr.read_type_12()); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.read(data.getRequestSendTime(), cdr); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.read(data.getReplySendTime(), cdr); @@ -120,7 +120,7 @@ public final void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage dat { ser.write_type_d("requester_id", data.getRequesterId()); ser.write_type_d("replier_id", data.getReplierId()); - ser.write_type_12("sequence_id", data.getSequenceId()); + ser.write_type_12("request_number", data.getRequestNumber()); ser.write_type_a("request_send_time", new ihmc_common_msgs.msg.dds.InstantMessagePubSubType(), data.getRequestSendTime()); ser.write_type_a("reply_send_time", new ihmc_common_msgs.msg.dds.InstantMessagePubSubType(), data.getReplySendTime()); @@ -132,7 +132,7 @@ public final void deserialize(us.ihmc.idl.InterchangeSerializer ser, ihmc_common { ser.read_type_d("requester_id", data.getRequesterId()); ser.read_type_d("replier_id", data.getReplierId()); - data.setSequenceId(ser.read_type_12("sequence_id")); + data.setRequestNumber(ser.read_type_12("request_number")); ser.read_type_a("request_send_time", new ihmc_common_msgs.msg.dds.InstantMessagePubSubType(), data.getRequestSendTime()); ser.read_type_a("reply_send_time", new ihmc_common_msgs.msg.dds.InstantMessagePubSubType(), data.getReplySendTime()); diff --git a/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg b/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg index b6c991bdc78..9eda2764e33 100644 --- a/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg +++ b/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg @@ -8,7 +8,7 @@ string requester_id string replier_id # Monotonically increasing request number -uint64 sequence_id +uint64 request_number # The time at which the request is sent ihmc_common_msgs/InstantMessage request_send_time diff --git a/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg b/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg index 2095d71f68b..871e736a1cc 100644 --- a/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg +++ b/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg @@ -8,7 +8,7 @@ string requester_id string replier_id # Monotonically increasing request number -uint64 sequence_id +uint64 request_number # The time at which the request is sent ihmc_common_msgs/InstantMessage request_send_time From 98402760eb4bcfef57935d893636461c2f3c409b Mon Sep 17 00:00:00 2001 From: Duncan Calvert Date: Fri, 13 Dec 2024 15:05:22 -0600 Subject: [PATCH 04/10] Fully implement first draft of clock sync. --- .../communication/packets/MessageTools.java | 23 +++ .../ros2/sync/ROS2DistributedClock.java | 131 +++++++------- .../ros2/sync/ROS2DistributedClockPeer.java | 30 +++- .../msg/DistributedClockMessage_.idl | 13 +- .../ihmc_common_msgs/msg/GuidMessage_.idl | 30 ++++ .../msg/dds/DistributedClockMessage.java | 109 +++--------- .../DistributedClockMessagePubSubType.java | 47 ++--- .../ihmc_common_msgs/msg/dds/GuidMessage.java | 139 +++++++++++++++ .../msg/dds/GuidMessagePubSubType.java | 165 ++++++++++++++++++ .../msg/DistributedClockMessage.msg | 11 +- .../ihmc_common_msgs/msg/GuidMessage.msg | 7 + .../msg/DistributedClockMessage.msg | 11 +- .../ros1/ihmc_common_msgs/msg/GuidMessage.msg | 9 + 13 files changed, 521 insertions(+), 204 deletions(-) create mode 100644 ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/GuidMessage_.idl create mode 100644 ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/GuidMessage.java create mode 100644 ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/GuidMessagePubSubType.java create mode 100644 ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/GuidMessage.msg create mode 100644 ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/GuidMessage.msg diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/packets/MessageTools.java b/ihmc-communication/src/main/java/us/ihmc/communication/packets/MessageTools.java index d09aa202bc9..7858649b003 100644 --- a/ihmc-communication/src/main/java/us/ihmc/communication/packets/MessageTools.java +++ b/ihmc-communication/src/main/java/us/ihmc/communication/packets/MessageTools.java @@ -17,6 +17,7 @@ import ihmc_common_msgs.msg.dds.ConvexPolytope3DMessage; import ihmc_common_msgs.msg.dds.Cylinder3DMessage; import ihmc_common_msgs.msg.dds.Ellipsoid3DMessage; +import ihmc_common_msgs.msg.dds.GuidMessage; import ihmc_common_msgs.msg.dds.InstantMessage; import ihmc_common_msgs.msg.dds.PoseListMessage; import ihmc_common_msgs.msg.dds.Ramp3DMessage; @@ -82,6 +83,9 @@ import us.ihmc.mecano.multiBodySystem.interfaces.RigidBodyReadOnly; import us.ihmc.mecano.spatial.interfaces.TwistReadOnly; import us.ihmc.pubsub.TopicDataType; +import us.ihmc.pubsub.common.Guid; +import us.ihmc.pubsub.common.Guid.Entity; +import us.ihmc.pubsub.common.Guid.GuidPrefix; import us.ihmc.pubsub.common.SerializedPayload; import us.ihmc.robotics.lidar.LidarScanParameters; import us.ihmc.robotics.math.QuaternionCalculus; @@ -1422,6 +1426,25 @@ public static UUID toUUID(UUIDMessage uuidMessage) return new UUID(uuidMessage.getMostSignificantBits(), uuidMessage.getLeastSignificantBits()); } + public static void toMessage(Guid guid, GuidMessage guidMessage) + { + System.arraycopy(guid.getGuidPrefix().getValue(), 0, guidMessage.getPrefix(), 0, GuidPrefix.size); + System.arraycopy(guid.getEntity().getValue(), 0, guidMessage.getEntity(), 0, Entity.size); + } + + public static void fromMessage(GuidMessage guidMessage, Guid guid) + { + System.arraycopy(guidMessage.getPrefix(), 0, guid.getGuidPrefix().getValue(), 0, GuidPrefix.size); + System.arraycopy(guidMessage.getEntity(), 0, guid.getEntity().getValue(), 0, Entity.size); + } + + public static Guid toGuid(GuidMessage guidMessage) + { + Guid guid = new Guid(); + fromMessage(guidMessage, guid); + return guid; + } + public static double calculateDelay(ImageMessage imageMessage) { return TimeTools.calculateDelay(imageMessage.getAcquisitionTime().getSecondsSinceEpoch(), imageMessage.getAcquisitionTime().getAdditionalNanos()); diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java index e2cc98afdbd..2f72770edf5 100644 --- a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java @@ -2,114 +2,119 @@ import ihmc_common_msgs.msg.dds.DistributedClockMessage; import us.ihmc.commons.exception.DefaultExceptionHandler; -import us.ihmc.commons.exception.ExceptionTools; import us.ihmc.commons.thread.RepeatingTaskThread; import us.ihmc.communication.ROS2Tools; +import us.ihmc.communication.packets.MessageTools; import us.ihmc.log.LogTools; import us.ihmc.pubsub.common.Guid; -import us.ihmc.pubsub.common.Guid.Entity; -import us.ihmc.pubsub.common.MatchingInfo; -import us.ihmc.pubsub.common.MatchingInfo.MatchingStatus; -import us.ihmc.pubsub.impl.fastRTPS.FastRTPSPublisher; -import us.ihmc.pubsub.subscriber.Subscriber; import us.ihmc.ros2.ROS2Node; import us.ihmc.ros2.ROS2Publisher; import us.ihmc.ros2.ROS2Topic; -import us.ihmc.ros2.SubscriptionMatchedListener; -import java.net.InetAddress; import java.time.Instant; +import java.util.ArrayList; import java.util.HashMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; /** * Assumes symmetric network delay. - * - * TODO: Handle intraprocess mode? Are there only FastRTPS pub/subs now? */ public class ROS2DistributedClock { - private static final AtomicLong INDEX = new AtomicLong(); private static final ROS2Topic TOPIC = ROS2Tools.IHMC_ROOT.withModule("distributed_clock").withType(DistributedClockMessage.class); - private final String ourName; - private final HashMap peers = new HashMap<>(); - - private final DistributedClockMessage receivedClockMessage = new DistributedClockMessage(); -// private final Guid ourPublisherGuid; - - private long requestNumber = 0; - private final DistributedClockMessage outgoingClockMessage = new DistributedClockMessage(); + private final HashMap peerMap = new HashMap<>(); + private final ArrayList peerList = new ArrayList<>(); + private int nextPeerToPing = 0; private final ROS2Publisher publisher; + private final Guid ourGuid; private final RepeatingTaskThread requestThread = new RepeatingTaskThread(getClass().getSimpleName(), this::requestThread, - DefaultExceptionHandler.MESSAGE_AND_STACKTRACE) - .setFrequencyLimit(5.0); + DefaultExceptionHandler.MESSAGE_AND_STACKTRACE); + private final DistributedClockMessage requestMessage = new DistributedClockMessage(); + private final DistributedClockMessage receivedMessage = new DistributedClockMessage(); + private final DistributedClockMessage replyMessage = new DistributedClockMessage(); + private final Guid receivedRequestTarget = new Guid(); + private final Guid receivedReplyTarget = new Guid(); public ROS2DistributedClock(ROS2Node ros2Node) { - AtomicReference hostname = new AtomicReference<>(""); - ExceptionTools.handle(() -> hostname.set("_" + InetAddress.getLocalHost().getHostName()), DefaultExceptionHandler.PROCEED_SILENTLY); - - ourName = "%s%s_%d_%d".formatted(ros2Node.getName(), hostname.get(), ProcessHandle.current().pid(), INDEX.getAndIncrement()); - - publisher = ros2Node.createPublisher(TOPIC); - -// if (publisher instanceof FastRTPSPublisher fastRTPSPublisher) -// { -// ourPublisherGuid = fastRTPSPublisher.getGuid(); -// } + ourGuid = publisher.getPublisher().getGuid(); ros2Node.createSubscription(TOPIC, subscriber -> { - subscriber.takeNextData(receivedClockMessage, null); - - if (receivedClockMessage.getRequesterIdAsString().equals(ourName)) - { - Instant replyReceptionTime = Instant.now(); + subscriber.takeNextData(receivedMessage, null); - String peerName = receivedClockMessage.getReplierIdAsString(); - ROS2DistributedClockPeer peer = peers.get(peerName); + MessageTools.fromMessage(receivedMessage.getRequestTarget(), receivedRequestTarget); + MessageTools.fromMessage(receivedMessage.getReplyTarget(), receivedReplyTarget); - if (peer == null) + if (receivedRequestTarget.equals(ourGuid)) // Reply + { + replyMessage.set(receivedMessage); + MessageTools.toMessage(Instant.now(), replyMessage.getReplySendTime()); + synchronized (publisher) { - peer = new ROS2DistributedClockPeer(peerName); - peers.put(peerName, peer); + publisher.publish(replyMessage); } - -// peer. } - else + else if (receivedReplyTarget.equals(ourGuid)) // Update clock offset estimate { - + ROS2DistributedClockPeer peer = peerMap.get(receivedReplyTarget); + if (peer != null) + { + peer.update(MessageTools.toInstant(receivedMessage.getRequestSendTime()), + Instant.now(), + MessageTools.toInstant(receivedMessage.getReplySendTime())); + } } }, (subscriber, info) -> { - Guid guid = info.getGuid(); - - Entity entity = guid.getEntity(); - String entityName = entity.toString(); - - String prefix = guid.getGuidPrefix().toString(); - - MatchingStatus status = info.getStatus(); - - LogTools.info("ent: %s prefix: %s status: %s".formatted(entityName, prefix, status)); - - + if (!guid.equals(publisher.getPublisher().getGuid())) // Exclude our publisher + { + LogTools.info("ent: %s prefix: %s status: %s".formatted(guid.getEntity(), guid.getGuidPrefix(), info.getStatus())); + switch (info.getStatus()) + { + case MATCHED_MATCHING -> + { + if (!peerMap.containsKey(guid)) + { + Guid guidCopy = new Guid(); + guidCopy.set(guid); + + LogTools.info("Setting up peer: %s", guidCopy); + peerMap.put(guidCopy, new ROS2DistributedClockPeer(guidCopy)); + } + } + case REMOVED_MATCHING -> + { + peerMap.remove(guid); + } + } + } }); + requestThread.setFrequencyLimit(5.0); requestThread.start(); } private void requestThread() { + if (!peerList.isEmpty()) + { + if (nextPeerToPing >= peerList.size()) + nextPeerToPing = 0; + + ROS2DistributedClockPeer peer = peerList.get(nextPeerToPing); + MessageTools.toMessage(Instant.now(), requestMessage.getRequestSendTime()); + MessageTools.toMessage(peer.getGuid(), requestMessage.getRequestTarget()); + MessageTools.toMessage(publisher.getPublisher().getGuid(), requestMessage.getReplyTarget()); + synchronized (publisher) + { + publisher.publish(requestMessage); + } -// outgoingClockMessage.set - - ++requestNumber; + ++nextPeerToPing; + } } } diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java index 474e49fa1be..20aecbd046b 100644 --- a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java @@ -1,11 +1,35 @@ package us.ihmc.communication.ros2.sync; +import us.ihmc.pubsub.common.Guid; + +import java.time.Duration; +import java.time.Instant; + public class ROS2DistributedClockPeer { - private long epochSecondOffset = 0; - private int nanoOffset = 0; + private final Guid guid; + private Duration peerClockOffset; + + public ROS2DistributedClockPeer(Guid guid) + { + this.guid = guid; + } + + public void update(Instant requestSendTime, Instant replyReceiveTime, Instant peerClockTime) + { + Duration roundTripTime = Duration.between(requestSendTime, replyReceiveTime); + Duration halfRoundTripTime = roundTripTime.dividedBy(2); + Instant peerNow = peerClockTime.plus(halfRoundTripTime); + peerClockOffset = Duration.between(replyReceiveTime, peerNow); + } + + public Instant convertPeerTimeToOurTime(Instant peerTime) + { + return peerTime.minus(peerClockOffset); + } - public ROS2DistributedClockPeer(String name) + public Guid getGuid() { + return guid; } } diff --git a/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl b/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl index 3da4578bbc8..a174b941a84 100644 --- a/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl +++ b/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl @@ -1,6 +1,7 @@ #ifndef __ihmc_common_msgs__msg__DistributedClockMessage__idl__ #define __ihmc_common_msgs__msg__DistributedClockMessage__idl__ +#include "ihmc_common_msgs/msg/./GuidMessage_.idl" #include "ihmc_common_msgs/msg/./InstantMessage_.idl" module ihmc_common_msgs { @@ -17,17 +18,13 @@ module ihmc_common_msgs struct DistributedClockMessage { /** - * The requester's unique ID + * The guid of the publisher that's expected to send back a reply */ - string requester_id; + ihmc_common_msgs::msg::dds::GuidMessage request_target; /** - * The replier's unique ID + * The guid of the publisher associated with the requester */ - string replier_id; - /** - * Monotonically increasing request number - */ - unsigned long long request_number; + ihmc_common_msgs::msg::dds::GuidMessage reply_target; /** * The time at which the request is sent */ diff --git a/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/GuidMessage_.idl b/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/GuidMessage_.idl new file mode 100644 index 00000000000..1fe7bebf6d0 --- /dev/null +++ b/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/GuidMessage_.idl @@ -0,0 +1,30 @@ +#ifndef __ihmc_common_msgs__msg__GuidMessage__idl__ +#define __ihmc_common_msgs__msg__GuidMessage__idl__ + +module ihmc_common_msgs +{ + module msg + { + module dds + { + + /** + * Represents a DDS-RTPS Guid + */ + @TypeCode(type="ihmc_common_msgs::msg::dds_::GuidMessage_") + struct GuidMessage + { + /** + * Prefix + */ + octet prefix[12]; + /** + * Entity + */ + octet entity[4]; + }; + }; + }; +}; + +#endif diff --git a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java index e31730b2372..4fb11dc8409 100644 --- a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java +++ b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java @@ -13,17 +13,13 @@ public class DistributedClockMessage extends Packet implements Settable, EpsilonComparable { /** - * The requester's unique ID + * The guid of the publisher that's expected to send back a reply */ - public java.lang.StringBuilder requester_id_; + public ihmc_common_msgs.msg.dds.GuidMessage request_target_; /** - * The replier's unique ID + * The guid of the publisher associated with the requester */ - public java.lang.StringBuilder replier_id_; - /** - * Monotonically increasing request number - */ - public long request_number_; + public ihmc_common_msgs.msg.dds.GuidMessage reply_target_; /** * The time at which the request is sent */ @@ -35,8 +31,8 @@ public class DistributedClockMessage extends Packet imp public DistributedClockMessage() { - requester_id_ = new java.lang.StringBuilder(255); - replier_id_ = new java.lang.StringBuilder(255); + request_target_ = new ihmc_common_msgs.msg.dds.GuidMessage(); + reply_target_ = new ihmc_common_msgs.msg.dds.GuidMessage(); request_send_time_ = new ihmc_common_msgs.msg.dds.InstantMessage(); reply_send_time_ = new ihmc_common_msgs.msg.dds.InstantMessage(); } @@ -49,79 +45,28 @@ public DistributedClockMessage(DistributedClockMessage other) public void set(DistributedClockMessage other) { - requester_id_.setLength(0); - requester_id_.append(other.requester_id_); - - replier_id_.setLength(0); - replier_id_.append(other.replier_id_); - - request_number_ = other.request_number_; - + ihmc_common_msgs.msg.dds.GuidMessagePubSubType.staticCopy(other.request_target_, request_target_); + ihmc_common_msgs.msg.dds.GuidMessagePubSubType.staticCopy(other.reply_target_, reply_target_); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.staticCopy(other.request_send_time_, request_send_time_); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.staticCopy(other.reply_send_time_, reply_send_time_); } - /** - * The requester's unique ID - */ - public void setRequesterId(java.lang.String requester_id) - { - requester_id_.setLength(0); - requester_id_.append(requester_id); - } - - /** - * The requester's unique ID - */ - public java.lang.String getRequesterIdAsString() - { - return getRequesterId().toString(); - } - /** - * The requester's unique ID - */ - public java.lang.StringBuilder getRequesterId() - { - return requester_id_; - } /** - * The replier's unique ID + * The guid of the publisher that's expected to send back a reply */ - public void setReplierId(java.lang.String replier_id) + public ihmc_common_msgs.msg.dds.GuidMessage getRequestTarget() { - replier_id_.setLength(0); - replier_id_.append(replier_id); + return request_target_; } - /** - * The replier's unique ID - */ - public java.lang.String getReplierIdAsString() - { - return getReplierId().toString(); - } - /** - * The replier's unique ID - */ - public java.lang.StringBuilder getReplierId() - { - return replier_id_; - } /** - * Monotonically increasing request number - */ - public void setRequestNumber(long request_number) - { - request_number_ = request_number; - } - /** - * Monotonically increasing request number + * The guid of the publisher associated with the requester */ - public long getRequestNumber() + public ihmc_common_msgs.msg.dds.GuidMessage getReplyTarget() { - return request_number_; + return reply_target_; } @@ -160,12 +105,8 @@ public boolean epsilonEquals(DistributedClockMessage other, double epsilon) if(other == null) return false; if(other == this) return true; - if (!us.ihmc.idl.IDLTools.epsilonEqualsStringBuilder(this.requester_id_, other.requester_id_, epsilon)) return false; - - if (!us.ihmc.idl.IDLTools.epsilonEqualsStringBuilder(this.replier_id_, other.replier_id_, epsilon)) return false; - - if (!us.ihmc.idl.IDLTools.epsilonEqualsPrimitive(this.request_number_, other.request_number_, epsilon)) return false; - + if (!this.request_target_.epsilonEquals(other.request_target_, epsilon)) return false; + if (!this.reply_target_.epsilonEquals(other.reply_target_, epsilon)) return false; if (!this.request_send_time_.epsilonEquals(other.request_send_time_, epsilon)) return false; if (!this.reply_send_time_.epsilonEquals(other.reply_send_time_, epsilon)) return false; @@ -181,12 +122,8 @@ public boolean equals(Object other) DistributedClockMessage otherMyClass = (DistributedClockMessage) other; - if (!us.ihmc.idl.IDLTools.equals(this.requester_id_, otherMyClass.requester_id_)) return false; - - if (!us.ihmc.idl.IDLTools.equals(this.replier_id_, otherMyClass.replier_id_)) return false; - - if(this.request_number_ != otherMyClass.request_number_) return false; - + if (!this.request_target_.equals(otherMyClass.request_target_)) return false; + if (!this.reply_target_.equals(otherMyClass.reply_target_)) return false; if (!this.request_send_time_.equals(otherMyClass.request_send_time_)) return false; if (!this.reply_send_time_.equals(otherMyClass.reply_send_time_)) return false; @@ -199,12 +136,10 @@ public java.lang.String toString() StringBuilder builder = new StringBuilder(); builder.append("DistributedClockMessage {"); - builder.append("requester_id="); - builder.append(this.requester_id_); builder.append(", "); - builder.append("replier_id="); - builder.append(this.replier_id_); builder.append(", "); - builder.append("request_number="); - builder.append(this.request_number_); builder.append(", "); + builder.append("request_target="); + builder.append(this.request_target_); builder.append(", "); + builder.append("reply_target="); + builder.append(this.reply_target_); builder.append(", "); builder.append("request_send_time="); builder.append(this.request_send_time_); builder.append(", "); builder.append("reply_send_time="); diff --git a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java index d0fb7eb57ab..21e7535bef2 100644 --- a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java +++ b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java @@ -15,7 +15,7 @@ public class DistributedClockMessagePubSubType implements us.ihmc.pubsub.TopicDa @Override public final java.lang.String getDefinitionChecksum() { - return "9b29df5decb710752845fe1e9451103d00a8c5892b8dadd4a3a7998bea11974a"; + return "8af4786a0f7a7ac2922fcced67bbc34bc8ddd2f51ce7c24e11b2f74aebcbfa6f"; } @Override @@ -52,9 +52,9 @@ public static int getMaxCdrSerializedSize(int current_alignment) { int initial_alignment = current_alignment; - current_alignment += 4 + us.ihmc.idl.CDR.alignment(current_alignment, 4) + 255 + 1; - current_alignment += 4 + us.ihmc.idl.CDR.alignment(current_alignment, 4) + 255 + 1; - current_alignment += 8 + us.ihmc.idl.CDR.alignment(current_alignment, 8); + current_alignment += ihmc_common_msgs.msg.dds.GuidMessagePubSubType.getMaxCdrSerializedSize(current_alignment); + + current_alignment += ihmc_common_msgs.msg.dds.GuidMessagePubSubType.getMaxCdrSerializedSize(current_alignment); current_alignment += ihmc_common_msgs.msg.dds.InstantMessagePubSubType.getMaxCdrSerializedSize(current_alignment); @@ -73,12 +73,9 @@ public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.Distribute { int initial_alignment = current_alignment; - current_alignment += 4 + us.ihmc.idl.CDR.alignment(current_alignment, 4) + data.getRequesterId().length() + 1; - - current_alignment += 4 + us.ihmc.idl.CDR.alignment(current_alignment, 4) + data.getReplierId().length() + 1; - - current_alignment += 8 + us.ihmc.idl.CDR.alignment(current_alignment, 8); + current_alignment += ihmc_common_msgs.msg.dds.GuidMessagePubSubType.getCdrSerializedSize(data.getRequestTarget(), current_alignment); + current_alignment += ihmc_common_msgs.msg.dds.GuidMessagePubSubType.getCdrSerializedSize(data.getReplyTarget(), current_alignment); current_alignment += ihmc_common_msgs.msg.dds.InstantMessagePubSubType.getCdrSerializedSize(data.getRequestSendTime(), current_alignment); @@ -90,26 +87,16 @@ public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.Distribute public static void write(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.CDR cdr) { - if(data.getRequesterId().length() <= 255) - cdr.write_type_d(data.getRequesterId());else - throw new RuntimeException("requester_id field exceeds the maximum length"); - - if(data.getReplierId().length() <= 255) - cdr.write_type_d(data.getReplierId());else - throw new RuntimeException("replier_id field exceeds the maximum length"); - - cdr.write_type_12(data.getRequestNumber()); - + ihmc_common_msgs.msg.dds.GuidMessagePubSubType.write(data.getRequestTarget(), cdr); + ihmc_common_msgs.msg.dds.GuidMessagePubSubType.write(data.getReplyTarget(), cdr); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.write(data.getRequestSendTime(), cdr); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.write(data.getReplySendTime(), cdr); } public static void read(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.CDR cdr) { - cdr.read_type_d(data.getRequesterId()); - cdr.read_type_d(data.getReplierId()); - data.setRequestNumber(cdr.read_type_12()); - + ihmc_common_msgs.msg.dds.GuidMessagePubSubType.read(data.getRequestTarget(), cdr); + ihmc_common_msgs.msg.dds.GuidMessagePubSubType.read(data.getReplyTarget(), cdr); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.read(data.getRequestSendTime(), cdr); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.read(data.getReplySendTime(), cdr); @@ -118,9 +105,10 @@ public static void read(ihmc_common_msgs.msg.dds.DistributedClockMessage data, u @Override public final void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.InterchangeSerializer ser) { - ser.write_type_d("requester_id", data.getRequesterId()); - ser.write_type_d("replier_id", data.getReplierId()); - ser.write_type_12("request_number", data.getRequestNumber()); + ser.write_type_a("request_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getRequestTarget()); + + ser.write_type_a("reply_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getReplyTarget()); + ser.write_type_a("request_send_time", new ihmc_common_msgs.msg.dds.InstantMessagePubSubType(), data.getRequestSendTime()); ser.write_type_a("reply_send_time", new ihmc_common_msgs.msg.dds.InstantMessagePubSubType(), data.getReplySendTime()); @@ -130,9 +118,10 @@ public final void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage dat @Override public final void deserialize(us.ihmc.idl.InterchangeSerializer ser, ihmc_common_msgs.msg.dds.DistributedClockMessage data) { - ser.read_type_d("requester_id", data.getRequesterId()); - ser.read_type_d("replier_id", data.getReplierId()); - data.setRequestNumber(ser.read_type_12("request_number")); + ser.read_type_a("request_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getRequestTarget()); + + ser.read_type_a("reply_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getReplyTarget()); + ser.read_type_a("request_send_time", new ihmc_common_msgs.msg.dds.InstantMessagePubSubType(), data.getRequestSendTime()); ser.read_type_a("reply_send_time", new ihmc_common_msgs.msg.dds.InstantMessagePubSubType(), data.getReplySendTime()); diff --git a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/GuidMessage.java b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/GuidMessage.java new file mode 100644 index 00000000000..93616a3a20c --- /dev/null +++ b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/GuidMessage.java @@ -0,0 +1,139 @@ +package ihmc_common_msgs.msg.dds; + +import us.ihmc.communication.packets.Packet; +import us.ihmc.euclid.interfaces.Settable; +import us.ihmc.euclid.interfaces.EpsilonComparable; +import java.util.function.Supplier; +import us.ihmc.pubsub.TopicDataType; + +/** + * Represents a DDS-RTPS Guid + */ +public class GuidMessage extends Packet implements Settable, EpsilonComparable +{ + /** + * Prefix + */ + public byte[] prefix_; + /** + * Entity + */ + public byte[] entity_; + + public GuidMessage() + { + prefix_ = new byte[12]; + + entity_ = new byte[4]; + + } + + public GuidMessage(GuidMessage other) + { + this(); + set(other); + } + + public void set(GuidMessage other) + { + for(int i1 = 0; i1 < prefix_.length; ++i1) + { + prefix_[i1] = other.prefix_[i1]; + + } + + for(int i3 = 0; i3 < entity_.length; ++i3) + { + entity_[i3] = other.entity_[i3]; + + } + + } + + + /** + * Prefix + */ + public byte[] getPrefix() + { + return prefix_; + } + + + /** + * Entity + */ + public byte[] getEntity() + { + return entity_; + } + + + public static Supplier getPubSubType() + { + return GuidMessagePubSubType::new; + } + + @Override + public Supplier getPubSubTypePacket() + { + return GuidMessagePubSubType::new; + } + + @Override + public boolean epsilonEquals(GuidMessage other, double epsilon) + { + if(other == null) return false; + if(other == this) return true; + + for(int i5 = 0; i5 < prefix_.length; ++i5) + { + if (!us.ihmc.idl.IDLTools.epsilonEqualsPrimitive(this.prefix_[i5], other.prefix_[i5], epsilon)) return false; + } + + for(int i7 = 0; i7 < entity_.length; ++i7) + { + if (!us.ihmc.idl.IDLTools.epsilonEqualsPrimitive(this.entity_[i7], other.entity_[i7], epsilon)) return false; + } + + + return true; + } + + @Override + public boolean equals(Object other) + { + if(other == null) return false; + if(other == this) return true; + if(!(other instanceof GuidMessage)) return false; + + GuidMessage otherMyClass = (GuidMessage) other; + + for(int i9 = 0; i9 < prefix_.length; ++i9) + { + if(this.prefix_[i9] != otherMyClass.prefix_[i9]) return false; + + } + for(int i11 = 0; i11 < entity_.length; ++i11) + { + if(this.entity_[i11] != otherMyClass.entity_[i11]) return false; + + } + + return true; + } + + @Override + public java.lang.String toString() + { + StringBuilder builder = new StringBuilder(); + + builder.append("GuidMessage {"); + builder.append("prefix="); + builder.append(java.util.Arrays.toString(this.prefix_)); builder.append(", "); + builder.append("entity="); + builder.append(java.util.Arrays.toString(this.entity_)); + builder.append("}"); + return builder.toString(); + } +} diff --git a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/GuidMessagePubSubType.java b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/GuidMessagePubSubType.java new file mode 100644 index 00000000000..4bdeea0bf5d --- /dev/null +++ b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/GuidMessagePubSubType.java @@ -0,0 +1,165 @@ +package ihmc_common_msgs.msg.dds; + +/** +* +* Topic data type of the struct "GuidMessage" defined in "GuidMessage_.idl". Use this class to provide the TopicDataType to a Participant. +* +* This file was automatically generated from GuidMessage_.idl by us.ihmc.idl.generator.IDLGenerator. +* Do not update this file directly, edit GuidMessage_.idl instead. +* +*/ +public class GuidMessagePubSubType implements us.ihmc.pubsub.TopicDataType +{ + public static final java.lang.String name = "ihmc_common_msgs::msg::dds_::GuidMessage_"; + + @Override + public final java.lang.String getDefinitionChecksum() + { + return "5f61ddf5c243dbf8995931ed6448c975b957a0835684ac6064ec5e810f93607d"; + } + + @Override + public final java.lang.String getDefinitionVersion() + { + return "local"; + } + + private final us.ihmc.idl.CDR serializeCDR = new us.ihmc.idl.CDR(); + private final us.ihmc.idl.CDR deserializeCDR = new us.ihmc.idl.CDR(); + + @Override + public void serialize(ihmc_common_msgs.msg.dds.GuidMessage data, us.ihmc.pubsub.common.SerializedPayload serializedPayload) throws java.io.IOException + { + serializeCDR.serialize(serializedPayload); + write(data, serializeCDR); + serializeCDR.finishSerialize(); + } + + @Override + public void deserialize(us.ihmc.pubsub.common.SerializedPayload serializedPayload, ihmc_common_msgs.msg.dds.GuidMessage data) throws java.io.IOException + { + deserializeCDR.deserialize(serializedPayload); + read(data, deserializeCDR); + deserializeCDR.finishDeserialize(); + } + + public static int getMaxCdrSerializedSize() + { + return getMaxCdrSerializedSize(0); + } + + public static int getMaxCdrSerializedSize(int current_alignment) + { + int initial_alignment = current_alignment; + + current_alignment += ((12) * 1) + us.ihmc.idl.CDR.alignment(current_alignment, 1); + + current_alignment += ((4) * 1) + us.ihmc.idl.CDR.alignment(current_alignment, 1); + + + return current_alignment - initial_alignment; + } + + public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.GuidMessage data) + { + return getCdrSerializedSize(data, 0); + } + + public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.GuidMessage data, int current_alignment) + { + int initial_alignment = current_alignment; + + current_alignment += ((12) * 1) + us.ihmc.idl.CDR.alignment(current_alignment, 1); + current_alignment += ((4) * 1) + us.ihmc.idl.CDR.alignment(current_alignment, 1); + + return current_alignment - initial_alignment; + } + + public static void write(ihmc_common_msgs.msg.dds.GuidMessage data, us.ihmc.idl.CDR cdr) + { + for(int i0 = 0; i0 < data.getPrefix().length; ++i0) + { + cdr.write_type_9(data.getPrefix()[i0]); + } + + for(int i0 = 0; i0 < data.getEntity().length; ++i0) + { + cdr.write_type_9(data.getEntity()[i0]); + } + + } + + public static void read(ihmc_common_msgs.msg.dds.GuidMessage data, us.ihmc.idl.CDR cdr) + { + for(int i0 = 0; i0 < data.getPrefix().length; ++i0) + { + data.getPrefix()[i0] = cdr.read_type_9(); + + } + + for(int i0 = 0; i0 < data.getEntity().length; ++i0) + { + data.getEntity()[i0] = cdr.read_type_9(); + + } + + + } + + @Override + public final void serialize(ihmc_common_msgs.msg.dds.GuidMessage data, us.ihmc.idl.InterchangeSerializer ser) + { + ser.write_type_f("prefix", data.getPrefix()); + ser.write_type_f("entity", data.getEntity()); + } + + @Override + public final void deserialize(us.ihmc.idl.InterchangeSerializer ser, ihmc_common_msgs.msg.dds.GuidMessage data) + { + ser.read_type_f("prefix", data.getPrefix()); + ser.read_type_f("entity", data.getEntity()); + } + + public static void staticCopy(ihmc_common_msgs.msg.dds.GuidMessage src, ihmc_common_msgs.msg.dds.GuidMessage dest) + { + dest.set(src); + } + + @Override + public ihmc_common_msgs.msg.dds.GuidMessage createData() + { + return new ihmc_common_msgs.msg.dds.GuidMessage(); + } + @Override + public int getTypeSize() + { + return us.ihmc.idl.CDR.getTypeSize(getMaxCdrSerializedSize()); + } + + @Override + public java.lang.String getName() + { + return name; + } + + public void serialize(ihmc_common_msgs.msg.dds.GuidMessage data, us.ihmc.idl.CDR cdr) + { + write(data, cdr); + } + + public void deserialize(ihmc_common_msgs.msg.dds.GuidMessage data, us.ihmc.idl.CDR cdr) + { + read(data, cdr); + } + + public void copy(ihmc_common_msgs.msg.dds.GuidMessage src, ihmc_common_msgs.msg.dds.GuidMessage dest) + { + staticCopy(src, dest); + } + + @Override + public GuidMessagePubSubType newInstance() + { + return new GuidMessagePubSubType(); + } +} diff --git a/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg b/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg index 9eda2764e33..8b5636a127d 100644 --- a/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg +++ b/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg @@ -1,14 +1,11 @@ # This message is a request-reply ping used to estimate the clock offset # of another node to be used by data synchronization algorithms. -# The requester's unique ID -string requester_id +# The guid of the publisher that's expected to send back a reply +ihmc_common_msgs/GuidMessage request_target -# The replier's unique ID -string replier_id - -# Monotonically increasing request number -uint64 request_number +# The guid of the publisher associated with the requester +ihmc_common_msgs/GuidMessage reply_target # The time at which the request is sent ihmc_common_msgs/InstantMessage request_send_time diff --git a/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/GuidMessage.msg b/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/GuidMessage.msg new file mode 100644 index 00000000000..427739a9e45 --- /dev/null +++ b/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/GuidMessage.msg @@ -0,0 +1,7 @@ +# Represents a DDS-RTPS Guid + +# Prefix +byte[12] prefix + +# Entity +byte[4] entity diff --git a/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg b/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg index 871e736a1cc..ab49b5622ce 100644 --- a/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg +++ b/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg @@ -1,14 +1,11 @@ # This message is a request-reply ping used to estimate the clock offset # of another node to be used by data synchronization algorithms. -# The requester's unique ID -string requester_id +# The guid of the publisher that's expected to send back a reply +ihmc_common_msgs/GuidMessage request_target -# The replier's unique ID -string replier_id - -# Monotonically increasing request number -uint64 request_number +# The guid of the publisher associated with the requester +ihmc_common_msgs/GuidMessage reply_target # The time at which the request is sent ihmc_common_msgs/InstantMessage request_send_time diff --git a/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/GuidMessage.msg b/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/GuidMessage.msg new file mode 100644 index 00000000000..a367a4b165e --- /dev/null +++ b/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/GuidMessage.msg @@ -0,0 +1,9 @@ +# Represents a DDS-RTPS Guid + +# Prefix +int8[] prefix + +# Entity +int8[] entity + + From 9ea8daae2d4332f1369c0d9c2d8204f328dfecfb Mon Sep 17 00:00:00 2001 From: Duncan Calvert Date: Fri, 13 Dec 2024 16:07:34 -0600 Subject: [PATCH 05/10] Fix bugs, seems kinda working. --- .../ros2/sync/ROS2DistributedClock.java | 71 ++++++++++++------- .../ros2/sync/ROS2DistributedClockPeer.java | 2 + .../ros2/sync/ROS2DistributedClockTest.java | 21 ++++-- .../msg/DistributedClockMessage_.idl | 4 ++ .../msg/dds/DistributedClockMessage.java | 19 +++++ .../DistributedClockMessagePubSubType.java | 12 +++- .../msg/DistributedClockMessage.msg | 3 + .../msg/DistributedClockMessage.msg | 3 + 8 files changed, 104 insertions(+), 31 deletions(-) diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java index 2f72770edf5..d994c327c71 100644 --- a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java @@ -3,10 +3,12 @@ import ihmc_common_msgs.msg.dds.DistributedClockMessage; import us.ihmc.commons.exception.DefaultExceptionHandler; import us.ihmc.commons.thread.RepeatingTaskThread; +import us.ihmc.commons.thread.ThreadTools; import us.ihmc.communication.ROS2Tools; import us.ihmc.communication.packets.MessageTools; import us.ihmc.log.LogTools; import us.ihmc.pubsub.common.Guid; +import us.ihmc.pubsub.common.SampleInfo; import us.ihmc.ros2.ROS2Node; import us.ihmc.ros2.ROS2Publisher; import us.ihmc.ros2.ROS2Topic; @@ -14,6 +16,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; /** * Assumes symmetric network delay. @@ -23,7 +26,7 @@ public class ROS2DistributedClock private static final ROS2Topic TOPIC = ROS2Tools.IHMC_ROOT.withModule("distributed_clock").withType(DistributedClockMessage.class); private final HashMap peerMap = new HashMap<>(); - private final ArrayList peerList = new ArrayList<>(); + private final List peerList = new ArrayList<>(); private int nextPeerToPing = 0; private final ROS2Publisher publisher; private final Guid ourGuid; @@ -32,7 +35,8 @@ public class ROS2DistributedClock DefaultExceptionHandler.MESSAGE_AND_STACKTRACE); private final DistributedClockMessage requestMessage = new DistributedClockMessage(); private final DistributedClockMessage receivedMessage = new DistributedClockMessage(); - private final DistributedClockMessage replyMessage = new DistributedClockMessage(); + private final SampleInfo sampleInfo = new SampleInfo(); + private final Guid receivedDestination = new Guid(); private final Guid receivedRequestTarget = new Guid(); private final Guid receivedReplyTarget = new Guid(); @@ -43,28 +47,38 @@ public ROS2DistributedClock(ROS2Node ros2Node) ros2Node.createSubscription(TOPIC, subscriber -> { - subscriber.takeNextData(receivedMessage, null); + subscriber.takeNextData(receivedMessage, sampleInfo); + MessageTools.fromMessage(receivedMessage.getDestinationTarget(), receivedDestination); - MessageTools.fromMessage(receivedMessage.getRequestTarget(), receivedRequestTarget); - MessageTools.fromMessage(receivedMessage.getReplyTarget(), receivedReplyTarget); - - if (receivedRequestTarget.equals(ourGuid)) // Reply + if (receivedDestination.equals(ourGuid)) { - replyMessage.set(receivedMessage); - MessageTools.toMessage(Instant.now(), replyMessage.getReplySendTime()); - synchronized (publisher) + MessageTools.fromMessage(receivedMessage.getRequestTarget(), receivedRequestTarget); + MessageTools.fromMessage(receivedMessage.getReplyTarget(), receivedReplyTarget); + + if (receivedRequestTarget.equals(ourGuid)) // Reply { - publisher.publish(replyMessage); + DistributedClockMessage replyMessage = new DistributedClockMessage(); + replyMessage.set(receivedMessage); + MessageTools.toMessage(receivedReplyTarget, replyMessage.getDestinationTarget()); + MessageTools.toMessage(Instant.now(), replyMessage.getReplySendTime()); + + ThreadTools.startAsDaemon(() -> + { + synchronized (publisher) + { + publisher.publish(replyMessage); + } + }, getClass().getSimpleName() + "PublishReply"); } - } - else if (receivedReplyTarget.equals(ourGuid)) // Update clock offset estimate - { - ROS2DistributedClockPeer peer = peerMap.get(receivedReplyTarget); - if (peer != null) + else if (receivedReplyTarget.equals(ourGuid)) // Update clock offset estimate { - peer.update(MessageTools.toInstant(receivedMessage.getRequestSendTime()), - Instant.now(), - MessageTools.toInstant(receivedMessage.getReplySendTime())); + ROS2DistributedClockPeer peer = peerMap.get(receivedRequestTarget); + if (peer != null) + { + peer.update(MessageTools.toInstant(receivedMessage.getRequestSendTime()), + Instant.now(), + MessageTools.toInstant(receivedMessage.getReplySendTime())); + } } } }, (subscriber, info) -> @@ -83,19 +97,22 @@ else if (receivedReplyTarget.equals(ourGuid)) // Update clock offset estimate guidCopy.set(guid); LogTools.info("Setting up peer: %s", guidCopy); - peerMap.put(guidCopy, new ROS2DistributedClockPeer(guidCopy)); + ROS2DistributedClockPeer peer = new ROS2DistributedClockPeer(guidCopy); + peerMap.put(guidCopy, peer); + peerList.add(peer); } } case REMOVED_MATCHING -> { peerMap.remove(guid); + peerList.removeIf(peer -> peer.getGuid().equals(guid)); } } } }); - requestThread.setFrequencyLimit(5.0); - requestThread.start(); + requestThread.setFrequencyLimit(.5); + requestThread.startRepeating(); } private void requestThread() @@ -106,9 +123,10 @@ private void requestThread() nextPeerToPing = 0; ROS2DistributedClockPeer peer = peerList.get(nextPeerToPing); - MessageTools.toMessage(Instant.now(), requestMessage.getRequestSendTime()); + MessageTools.toMessage(peer.getGuid(), requestMessage.getDestinationTarget()); MessageTools.toMessage(peer.getGuid(), requestMessage.getRequestTarget()); - MessageTools.toMessage(publisher.getPublisher().getGuid(), requestMessage.getReplyTarget()); + MessageTools.toMessage(ourGuid, requestMessage.getReplyTarget()); + MessageTools.toMessage(Instant.now(), requestMessage.getRequestSendTime()); synchronized (publisher) { publisher.publish(requestMessage); @@ -117,4 +135,9 @@ private void requestThread() ++nextPeerToPing; } } + + public List getPeerList() + { + return peerList; + } } diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java index 20aecbd046b..c2eccd9dfd5 100644 --- a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java @@ -1,5 +1,6 @@ package us.ihmc.communication.ros2.sync; +import us.ihmc.log.LogTools; import us.ihmc.pubsub.common.Guid; import java.time.Duration; @@ -21,6 +22,7 @@ public void update(Instant requestSendTime, Instant replyReceiveTime, Instant pe Duration halfRoundTripTime = roundTripTime.dividedBy(2); Instant peerNow = peerClockTime.plus(halfRoundTripTime); peerClockOffset = Duration.between(replyReceiveTime, peerNow); + LogTools.info("Offset: {}", peerClockOffset); } public Instant convertPeerTimeToOurTime(Instant peerTime) diff --git a/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java b/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java index 9fb1c410db8..ec3f2028b66 100644 --- a/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java +++ b/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java @@ -13,14 +13,23 @@ public void test() { ROS2Node ros2Node = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("test"); - new ROS2DistributedClock(ros2Node); - new ROS2DistributedClock(ros2Node); + ROS2DistributedClock clock1 = new ROS2DistributedClock(ros2Node); + ROS2DistributedClock clock2 = new ROS2DistributedClock(ros2Node); - ros2Node = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("test2"); + while (true) + { + ThreadTools.park(1.0); + } - new ROS2DistributedClock(ros2Node); - new ROS2DistributedClock(ros2Node); - ThreadTools.park(1.0); + + + +// ros2Node = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("test2"); +// +// new ROS2DistributedClock(ros2Node); +// new ROS2DistributedClock(ros2Node); + +// ThreadTools.park(1.0); } } diff --git a/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl b/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl index a174b941a84..9bbb1ec3c4c 100644 --- a/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl +++ b/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl @@ -17,6 +17,10 @@ module ihmc_common_msgs @TypeCode(type="ihmc_common_msgs::msg::dds_::DistributedClockMessage_") struct DistributedClockMessage { + /** + * The guid of destination of this message + */ + ihmc_common_msgs::msg::dds::GuidMessage destination_target; /** * The guid of the publisher that's expected to send back a reply */ diff --git a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java index 4fb11dc8409..106ed935b77 100644 --- a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java +++ b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java @@ -12,6 +12,10 @@ */ public class DistributedClockMessage extends Packet implements Settable, EpsilonComparable { + /** + * The guid of destination of this message + */ + public ihmc_common_msgs.msg.dds.GuidMessage destination_target_; /** * The guid of the publisher that's expected to send back a reply */ @@ -31,6 +35,7 @@ public class DistributedClockMessage extends Packet imp public DistributedClockMessage() { + destination_target_ = new ihmc_common_msgs.msg.dds.GuidMessage(); request_target_ = new ihmc_common_msgs.msg.dds.GuidMessage(); reply_target_ = new ihmc_common_msgs.msg.dds.GuidMessage(); request_send_time_ = new ihmc_common_msgs.msg.dds.InstantMessage(); @@ -45,6 +50,7 @@ public DistributedClockMessage(DistributedClockMessage other) public void set(DistributedClockMessage other) { + ihmc_common_msgs.msg.dds.GuidMessagePubSubType.staticCopy(other.destination_target_, destination_target_); ihmc_common_msgs.msg.dds.GuidMessagePubSubType.staticCopy(other.request_target_, request_target_); ihmc_common_msgs.msg.dds.GuidMessagePubSubType.staticCopy(other.reply_target_, reply_target_); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.staticCopy(other.request_send_time_, request_send_time_); @@ -52,6 +58,15 @@ public void set(DistributedClockMessage other) } + /** + * The guid of destination of this message + */ + public ihmc_common_msgs.msg.dds.GuidMessage getDestinationTarget() + { + return destination_target_; + } + + /** * The guid of the publisher that's expected to send back a reply */ @@ -105,6 +120,7 @@ public boolean epsilonEquals(DistributedClockMessage other, double epsilon) if(other == null) return false; if(other == this) return true; + if (!this.destination_target_.epsilonEquals(other.destination_target_, epsilon)) return false; if (!this.request_target_.epsilonEquals(other.request_target_, epsilon)) return false; if (!this.reply_target_.epsilonEquals(other.reply_target_, epsilon)) return false; if (!this.request_send_time_.epsilonEquals(other.request_send_time_, epsilon)) return false; @@ -122,6 +138,7 @@ public boolean equals(Object other) DistributedClockMessage otherMyClass = (DistributedClockMessage) other; + if (!this.destination_target_.equals(otherMyClass.destination_target_)) return false; if (!this.request_target_.equals(otherMyClass.request_target_)) return false; if (!this.reply_target_.equals(otherMyClass.reply_target_)) return false; if (!this.request_send_time_.equals(otherMyClass.request_send_time_)) return false; @@ -136,6 +153,8 @@ public java.lang.String toString() StringBuilder builder = new StringBuilder(); builder.append("DistributedClockMessage {"); + builder.append("destination_target="); + builder.append(this.destination_target_); builder.append(", "); builder.append("request_target="); builder.append(this.request_target_); builder.append(", "); builder.append("reply_target="); diff --git a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java index 21e7535bef2..5ee1e4b9407 100644 --- a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java +++ b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java @@ -15,7 +15,7 @@ public class DistributedClockMessagePubSubType implements us.ihmc.pubsub.TopicDa @Override public final java.lang.String getDefinitionChecksum() { - return "8af4786a0f7a7ac2922fcced67bbc34bc8ddd2f51ce7c24e11b2f74aebcbfa6f"; + return "0dbb6546a403e5f5df788c3e99df502f689c69f344be2246fc925536d220cc0e"; } @Override @@ -56,6 +56,8 @@ public static int getMaxCdrSerializedSize(int current_alignment) current_alignment += ihmc_common_msgs.msg.dds.GuidMessagePubSubType.getMaxCdrSerializedSize(current_alignment); + current_alignment += ihmc_common_msgs.msg.dds.GuidMessagePubSubType.getMaxCdrSerializedSize(current_alignment); + current_alignment += ihmc_common_msgs.msg.dds.InstantMessagePubSubType.getMaxCdrSerializedSize(current_alignment); current_alignment += ihmc_common_msgs.msg.dds.InstantMessagePubSubType.getMaxCdrSerializedSize(current_alignment); @@ -73,6 +75,8 @@ public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.Distribute { int initial_alignment = current_alignment; + current_alignment += ihmc_common_msgs.msg.dds.GuidMessagePubSubType.getCdrSerializedSize(data.getDestinationTarget(), current_alignment); + current_alignment += ihmc_common_msgs.msg.dds.GuidMessagePubSubType.getCdrSerializedSize(data.getRequestTarget(), current_alignment); current_alignment += ihmc_common_msgs.msg.dds.GuidMessagePubSubType.getCdrSerializedSize(data.getReplyTarget(), current_alignment); @@ -87,6 +91,7 @@ public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.Distribute public static void write(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.CDR cdr) { + ihmc_common_msgs.msg.dds.GuidMessagePubSubType.write(data.getDestinationTarget(), cdr); ihmc_common_msgs.msg.dds.GuidMessagePubSubType.write(data.getRequestTarget(), cdr); ihmc_common_msgs.msg.dds.GuidMessagePubSubType.write(data.getReplyTarget(), cdr); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.write(data.getRequestSendTime(), cdr); @@ -95,6 +100,7 @@ public static void write(ihmc_common_msgs.msg.dds.DistributedClockMessage data, public static void read(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.CDR cdr) { + ihmc_common_msgs.msg.dds.GuidMessagePubSubType.read(data.getDestinationTarget(), cdr); ihmc_common_msgs.msg.dds.GuidMessagePubSubType.read(data.getRequestTarget(), cdr); ihmc_common_msgs.msg.dds.GuidMessagePubSubType.read(data.getReplyTarget(), cdr); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.read(data.getRequestSendTime(), cdr); @@ -105,6 +111,8 @@ public static void read(ihmc_common_msgs.msg.dds.DistributedClockMessage data, u @Override public final void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.InterchangeSerializer ser) { + ser.write_type_a("destination_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getDestinationTarget()); + ser.write_type_a("request_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getRequestTarget()); ser.write_type_a("reply_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getReplyTarget()); @@ -118,6 +126,8 @@ public final void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage dat @Override public final void deserialize(us.ihmc.idl.InterchangeSerializer ser, ihmc_common_msgs.msg.dds.DistributedClockMessage data) { + ser.read_type_a("destination_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getDestinationTarget()); + ser.read_type_a("request_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getRequestTarget()); ser.read_type_a("reply_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getReplyTarget()); diff --git a/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg b/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg index 8b5636a127d..813b7d9f43b 100644 --- a/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg +++ b/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg @@ -1,6 +1,9 @@ # This message is a request-reply ping used to estimate the clock offset # of another node to be used by data synchronization algorithms. +# The guid of destination of this message +ihmc_common_msgs/GuidMessage destination_target + # The guid of the publisher that's expected to send back a reply ihmc_common_msgs/GuidMessage request_target diff --git a/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg b/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg index ab49b5622ce..da7533c6eae 100644 --- a/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg +++ b/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg @@ -1,6 +1,9 @@ # This message is a request-reply ping used to estimate the clock offset # of another node to be used by data synchronization algorithms. +# The guid of destination of this message +ihmc_common_msgs/GuidMessage destination_target + # The guid of the publisher that's expected to send back a reply ihmc_common_msgs/GuidMessage request_target From 399bc57382f198c30d8963dc12409638d5f11ddf Mon Sep 17 00:00:00 2001 From: Duncan Calvert Date: Fri, 13 Dec 2024 16:33:27 -0600 Subject: [PATCH 06/10] Fix up. --- .../ros2/sync/ROS2DistributedClock.java | 10 +++--- .../ros2/sync/ROS2DistributedClockPeer.java | 7 ++-- .../ros2/sync/ROS2DistributedClockTest.java | 35 ++++++++++++------- 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java index d994c327c71..05fdfc83e8d 100644 --- a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java @@ -6,7 +6,6 @@ import us.ihmc.commons.thread.ThreadTools; import us.ihmc.communication.ROS2Tools; import us.ihmc.communication.packets.MessageTools; -import us.ihmc.log.LogTools; import us.ihmc.pubsub.common.Guid; import us.ihmc.pubsub.common.SampleInfo; import us.ihmc.ros2.ROS2Node; @@ -86,7 +85,6 @@ else if (receivedReplyTarget.equals(ourGuid)) // Update clock offset estimate Guid guid = info.getGuid(); if (!guid.equals(publisher.getPublisher().getGuid())) // Exclude our publisher { - LogTools.info("ent: %s prefix: %s status: %s".formatted(guid.getEntity(), guid.getGuidPrefix(), info.getStatus())); switch (info.getStatus()) { case MATCHED_MATCHING -> @@ -96,7 +94,6 @@ else if (receivedReplyTarget.equals(ourGuid)) // Update clock offset estimate Guid guidCopy = new Guid(); guidCopy.set(guid); - LogTools.info("Setting up peer: %s", guidCopy); ROS2DistributedClockPeer peer = new ROS2DistributedClockPeer(guidCopy); peerMap.put(guidCopy, peer); peerList.add(peer); @@ -111,7 +108,7 @@ else if (receivedReplyTarget.equals(ourGuid)) // Update clock offset estimate } }); - requestThread.setFrequencyLimit(.5); + requestThread.setFrequencyLimit(5.0); requestThread.startRepeating(); } @@ -136,6 +133,11 @@ private void requestThread() } } + public void destroy() + { + requestThread.kill(); + } + public List getPeerList() { return peerList; diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java index c2eccd9dfd5..3785827b245 100644 --- a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java @@ -1,6 +1,5 @@ package us.ihmc.communication.ros2.sync; -import us.ihmc.log.LogTools; import us.ihmc.pubsub.common.Guid; import java.time.Duration; @@ -22,7 +21,6 @@ public void update(Instant requestSendTime, Instant replyReceiveTime, Instant pe Duration halfRoundTripTime = roundTripTime.dividedBy(2); Instant peerNow = peerClockTime.plus(halfRoundTripTime); peerClockOffset = Duration.between(replyReceiveTime, peerNow); - LogTools.info("Offset: {}", peerClockOffset); } public Instant convertPeerTimeToOurTime(Instant peerTime) @@ -30,6 +28,11 @@ public Instant convertPeerTimeToOurTime(Instant peerTime) return peerTime.minus(peerClockOffset); } + public Duration getPeerClockOffset() + { + return peerClockOffset; + } + public Guid getGuid() { return guid; diff --git a/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java b/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java index ec3f2028b66..128d3e1b100 100644 --- a/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java +++ b/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java @@ -1,35 +1,46 @@ package us.ihmc.communication.ros2.sync; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import us.ihmc.commons.thread.ThreadTools; +import us.ihmc.log.LogTools; +import us.ihmc.robotics.time.TimeTools; import us.ihmc.ros2.ROS2Node; import us.ihmc.ros2.ROS2NodeBuilder; import us.ihmc.ros2.ROS2NodeBuilder.SpecialTransportMode; +import java.time.Duration; + public class ROS2DistributedClockTest { @Test public void test() { - ROS2Node ros2Node = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("test"); + ROS2Node ros2Node1 = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("test"); + + ROS2DistributedClock clock1 = new ROS2DistributedClock(ros2Node1); - ROS2DistributedClock clock1 = new ROS2DistributedClock(ros2Node); - ROS2DistributedClock clock2 = new ROS2DistributedClock(ros2Node); + ROS2Node ros2Node2 = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("test2"); - while (true) - { - ThreadTools.park(1.0); - } + ROS2DistributedClock clock2 = new ROS2DistributedClock(ros2Node2); + ThreadTools.park(1.0); + Assertions.assertEquals(1, clock1.getPeerList().size()); + Assertions.assertEquals(1, clock2.getPeerList().size()); + Duration offset0 = clock1.getPeerList().get(0).getPeerClockOffset(); + LogTools.info("Clock 1 offset: {}", offset0); + Duration offset1 = clock2.getPeerList().get(0).getPeerClockOffset(); + LogTools.info("Clock 2 offset: {}", offset1); + Assertions.assertTrue(TimeTools.toDoubleSeconds(offset0) < 0.1); + Assertions.assertTrue(TimeTools.toDoubleSeconds(offset1) < 0.1); -// ros2Node = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("test2"); -// -// new ROS2DistributedClock(ros2Node); -// new ROS2DistributedClock(ros2Node); + clock1.destroy(); + clock2.destroy(); -// ThreadTools.park(1.0); + ros2Node1.destroy(); + ros2Node2.destroy(); } } From 9470412d6c2dece3cae667518ba673e64a78bc70 Mon Sep 17 00:00:00 2001 From: Duncan Calvert Date: Fri, 13 Dec 2024 17:00:03 -0600 Subject: [PATCH 07/10] Fix up test more. --- .../ros2/sync/ROS2DistributedClockTest.java | 44 ++++++++++++------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java b/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java index 128d3e1b100..fb4e4d516f2 100644 --- a/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java +++ b/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java @@ -10,37 +10,47 @@ import us.ihmc.ros2.ROS2NodeBuilder.SpecialTransportMode; import java.time.Duration; +import java.time.Instant; public class ROS2DistributedClockTest { @Test public void test() { - ROS2Node ros2Node1 = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("test"); + ROS2Node ros2Node0 = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("distributed_clock_test0"); + ROS2Node ros2Node1 = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("distributed_clock_test1"); - ROS2DistributedClock clock1 = new ROS2DistributedClock(ros2Node1); + ROS2DistributedClock[] clocks = new ROS2DistributedClock[3]; + clocks[0] = new ROS2DistributedClock(ros2Node0); + clocks[1] = new ROS2DistributedClock(ros2Node0); + clocks[2] = new ROS2DistributedClock(ros2Node1); - ROS2Node ros2Node2 = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("test2"); + ThreadTools.park(0.5); - ROS2DistributedClock clock2 = new ROS2DistributedClock(ros2Node2); + for (int i = 0; i < 10; i++) + { + for (ROS2DistributedClock clock : clocks) + Assertions.assertEquals(clocks.length - 1, clock.getPeerList().size()); - ThreadTools.park(1.0); + for (ROS2DistributedClock clock : clocks) + { + Duration offset = clock.getPeerList().get(0).getPeerClockOffset(); + LogTools.info("Clock offset: {}", offset); + Assertions.assertTrue(TimeTools.toDoubleSeconds(offset) < 0.1); + } - Assertions.assertEquals(1, clock1.getPeerList().size()); - Assertions.assertEquals(1, clock2.getPeerList().size()); + for (ROS2DistributedClock clock : clocks) + { + LogTools.info("Converted peer time: {}", clock.getPeerList().get(0).convertPeerTimeToOurTime(Instant.now())); + } - Duration offset0 = clock1.getPeerList().get(0).getPeerClockOffset(); - LogTools.info("Clock 1 offset: {}", offset0); - Duration offset1 = clock2.getPeerList().get(0).getPeerClockOffset(); - LogTools.info("Clock 2 offset: {}", offset1); + ThreadTools.park(0.3); + } - Assertions.assertTrue(TimeTools.toDoubleSeconds(offset0) < 0.1); - Assertions.assertTrue(TimeTools.toDoubleSeconds(offset1) < 0.1); - - clock1.destroy(); - clock2.destroy(); + for (ROS2DistributedClock clock : clocks) + clock.destroy(); + ros2Node0.destroy(); ros2Node1.destroy(); - ros2Node2.destroy(); } } From bf7e5df2ec2c73e4722bdb8e07946d3de817aa23 Mon Sep 17 00:00:00 2001 From: Duncan Calvert Date: Fri, 13 Dec 2024 17:07:42 -0600 Subject: [PATCH 08/10] Simplify logic. --- .../ros2/sync/ROS2DistributedClock.java | 49 +++++++++---------- .../msg/DistributedClockMessage_.idl | 4 +- .../msg/dds/DistributedClockMessage.java | 32 +++++++----- .../DistributedClockMessagePubSubType.java | 19 +++---- .../msg/DistributedClockMessage.msg | 4 +- .../msg/DistributedClockMessage.msg | 4 +- 6 files changed, 58 insertions(+), 54 deletions(-) diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java index 05fdfc83e8d..58254a1350d 100644 --- a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java @@ -35,7 +35,6 @@ public class ROS2DistributedClock private final DistributedClockMessage requestMessage = new DistributedClockMessage(); private final DistributedClockMessage receivedMessage = new DistributedClockMessage(); private final SampleInfo sampleInfo = new SampleInfo(); - private final Guid receivedDestination = new Guid(); private final Guid receivedRequestTarget = new Guid(); private final Guid receivedReplyTarget = new Guid(); @@ -47,37 +46,33 @@ public ROS2DistributedClock(ROS2Node ros2Node) ros2Node.createSubscription(TOPIC, subscriber -> { subscriber.takeNextData(receivedMessage, sampleInfo); - MessageTools.fromMessage(receivedMessage.getDestinationTarget(), receivedDestination); - if (receivedDestination.equals(ourGuid)) - { - MessageTools.fromMessage(receivedMessage.getRequestTarget(), receivedRequestTarget); - MessageTools.fromMessage(receivedMessage.getReplyTarget(), receivedReplyTarget); + MessageTools.fromMessage(receivedMessage.getRequestTarget(), receivedRequestTarget); + MessageTools.fromMessage(receivedMessage.getReplyTarget(), receivedReplyTarget); - if (receivedRequestTarget.equals(ourGuid)) // Reply - { - DistributedClockMessage replyMessage = new DistributedClockMessage(); - replyMessage.set(receivedMessage); - MessageTools.toMessage(receivedReplyTarget, replyMessage.getDestinationTarget()); - MessageTools.toMessage(Instant.now(), replyMessage.getReplySendTime()); + if (receivedMessage.getIsRequest() && receivedRequestTarget.equals(ourGuid)) // Reply + { + DistributedClockMessage replyMessage = new DistributedClockMessage(); + replyMessage.set(receivedMessage); + replyMessage.setIsRequest(false); + MessageTools.toMessage(Instant.now(), replyMessage.getReplySendTime()); - ThreadTools.startAsDaemon(() -> - { - synchronized (publisher) - { - publisher.publish(replyMessage); - } - }, getClass().getSimpleName() + "PublishReply"); - } - else if (receivedReplyTarget.equals(ourGuid)) // Update clock offset estimate + ThreadTools.startAsDaemon(() -> { - ROS2DistributedClockPeer peer = peerMap.get(receivedRequestTarget); - if (peer != null) + synchronized (publisher) { - peer.update(MessageTools.toInstant(receivedMessage.getRequestSendTime()), - Instant.now(), - MessageTools.toInstant(receivedMessage.getReplySendTime())); + publisher.publish(replyMessage); } + }, getClass().getSimpleName() + "PublishReply"); + } + else if (!receivedMessage.getIsRequest() && receivedReplyTarget.equals(ourGuid)) // Update clock offset estimate + { + ROS2DistributedClockPeer peer = peerMap.get(receivedRequestTarget); + if (peer != null) + { + peer.update(MessageTools.toInstant(receivedMessage.getRequestSendTime()), + Instant.now(), + MessageTools.toInstant(receivedMessage.getReplySendTime())); } } }, (subscriber, info) -> @@ -120,7 +115,7 @@ private void requestThread() nextPeerToPing = 0; ROS2DistributedClockPeer peer = peerList.get(nextPeerToPing); - MessageTools.toMessage(peer.getGuid(), requestMessage.getDestinationTarget()); + requestMessage.setIsRequest(true); MessageTools.toMessage(peer.getGuid(), requestMessage.getRequestTarget()); MessageTools.toMessage(ourGuid, requestMessage.getReplyTarget()); MessageTools.toMessage(Instant.now(), requestMessage.getRequestSendTime()); diff --git a/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl b/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl index 9bbb1ec3c4c..37aef40cdb5 100644 --- a/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl +++ b/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl @@ -18,9 +18,9 @@ module ihmc_common_msgs struct DistributedClockMessage { /** - * The guid of destination of this message + * If this is a request message, else it's a reply message */ - ihmc_common_msgs::msg::dds::GuidMessage destination_target; + boolean is_request; /** * The guid of the publisher that's expected to send back a reply */ diff --git a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java index 106ed935b77..ebcc8c5000d 100644 --- a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java +++ b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java @@ -13,9 +13,9 @@ public class DistributedClockMessage extends Packet implements Settable, EpsilonComparable { /** - * The guid of destination of this message + * If this is a request message, else it's a reply message */ - public ihmc_common_msgs.msg.dds.GuidMessage destination_target_; + public boolean is_request_; /** * The guid of the publisher that's expected to send back a reply */ @@ -35,7 +35,6 @@ public class DistributedClockMessage extends Packet imp public DistributedClockMessage() { - destination_target_ = new ihmc_common_msgs.msg.dds.GuidMessage(); request_target_ = new ihmc_common_msgs.msg.dds.GuidMessage(); reply_target_ = new ihmc_common_msgs.msg.dds.GuidMessage(); request_send_time_ = new ihmc_common_msgs.msg.dds.InstantMessage(); @@ -50,20 +49,27 @@ public DistributedClockMessage(DistributedClockMessage other) public void set(DistributedClockMessage other) { - ihmc_common_msgs.msg.dds.GuidMessagePubSubType.staticCopy(other.destination_target_, destination_target_); + is_request_ = other.is_request_; + ihmc_common_msgs.msg.dds.GuidMessagePubSubType.staticCopy(other.request_target_, request_target_); ihmc_common_msgs.msg.dds.GuidMessagePubSubType.staticCopy(other.reply_target_, reply_target_); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.staticCopy(other.request_send_time_, request_send_time_); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.staticCopy(other.reply_send_time_, reply_send_time_); } - /** - * The guid of destination of this message + * If this is a request message, else it's a reply message + */ + public void setIsRequest(boolean is_request) + { + is_request_ = is_request; + } + /** + * If this is a request message, else it's a reply message */ - public ihmc_common_msgs.msg.dds.GuidMessage getDestinationTarget() + public boolean getIsRequest() { - return destination_target_; + return is_request_; } @@ -120,7 +126,8 @@ public boolean epsilonEquals(DistributedClockMessage other, double epsilon) if(other == null) return false; if(other == this) return true; - if (!this.destination_target_.epsilonEquals(other.destination_target_, epsilon)) return false; + if (!us.ihmc.idl.IDLTools.epsilonEqualsBoolean(this.is_request_, other.is_request_, epsilon)) return false; + if (!this.request_target_.epsilonEquals(other.request_target_, epsilon)) return false; if (!this.reply_target_.epsilonEquals(other.reply_target_, epsilon)) return false; if (!this.request_send_time_.epsilonEquals(other.request_send_time_, epsilon)) return false; @@ -138,7 +145,8 @@ public boolean equals(Object other) DistributedClockMessage otherMyClass = (DistributedClockMessage) other; - if (!this.destination_target_.equals(otherMyClass.destination_target_)) return false; + if(this.is_request_ != otherMyClass.is_request_) return false; + if (!this.request_target_.equals(otherMyClass.request_target_)) return false; if (!this.reply_target_.equals(otherMyClass.reply_target_)) return false; if (!this.request_send_time_.equals(otherMyClass.request_send_time_)) return false; @@ -153,8 +161,8 @@ public java.lang.String toString() StringBuilder builder = new StringBuilder(); builder.append("DistributedClockMessage {"); - builder.append("destination_target="); - builder.append(this.destination_target_); builder.append(", "); + builder.append("is_request="); + builder.append(this.is_request_); builder.append(", "); builder.append("request_target="); builder.append(this.request_target_); builder.append(", "); builder.append("reply_target="); diff --git a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java index 5ee1e4b9407..30d5038581f 100644 --- a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java +++ b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java @@ -15,7 +15,7 @@ public class DistributedClockMessagePubSubType implements us.ihmc.pubsub.TopicDa @Override public final java.lang.String getDefinitionChecksum() { - return "0dbb6546a403e5f5df788c3e99df502f689c69f344be2246fc925536d220cc0e"; + return "16d5f26ad9da80c8261bf1dc1800a8b91e7535ec399247617b9095f59ee71428"; } @Override @@ -52,7 +52,7 @@ public static int getMaxCdrSerializedSize(int current_alignment) { int initial_alignment = current_alignment; - current_alignment += ihmc_common_msgs.msg.dds.GuidMessagePubSubType.getMaxCdrSerializedSize(current_alignment); + current_alignment += 1 + us.ihmc.idl.CDR.alignment(current_alignment, 1); current_alignment += ihmc_common_msgs.msg.dds.GuidMessagePubSubType.getMaxCdrSerializedSize(current_alignment); @@ -75,7 +75,8 @@ public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.Distribute { int initial_alignment = current_alignment; - current_alignment += ihmc_common_msgs.msg.dds.GuidMessagePubSubType.getCdrSerializedSize(data.getDestinationTarget(), current_alignment); + current_alignment += 1 + us.ihmc.idl.CDR.alignment(current_alignment, 1); + current_alignment += ihmc_common_msgs.msg.dds.GuidMessagePubSubType.getCdrSerializedSize(data.getRequestTarget(), current_alignment); @@ -91,7 +92,8 @@ public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.Distribute public static void write(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.CDR cdr) { - ihmc_common_msgs.msg.dds.GuidMessagePubSubType.write(data.getDestinationTarget(), cdr); + cdr.write_type_7(data.getIsRequest()); + ihmc_common_msgs.msg.dds.GuidMessagePubSubType.write(data.getRequestTarget(), cdr); ihmc_common_msgs.msg.dds.GuidMessagePubSubType.write(data.getReplyTarget(), cdr); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.write(data.getRequestSendTime(), cdr); @@ -100,7 +102,8 @@ public static void write(ihmc_common_msgs.msg.dds.DistributedClockMessage data, public static void read(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.CDR cdr) { - ihmc_common_msgs.msg.dds.GuidMessagePubSubType.read(data.getDestinationTarget(), cdr); + data.setIsRequest(cdr.read_type_7()); + ihmc_common_msgs.msg.dds.GuidMessagePubSubType.read(data.getRequestTarget(), cdr); ihmc_common_msgs.msg.dds.GuidMessagePubSubType.read(data.getReplyTarget(), cdr); ihmc_common_msgs.msg.dds.InstantMessagePubSubType.read(data.getRequestSendTime(), cdr); @@ -111,8 +114,7 @@ public static void read(ihmc_common_msgs.msg.dds.DistributedClockMessage data, u @Override public final void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.InterchangeSerializer ser) { - ser.write_type_a("destination_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getDestinationTarget()); - + ser.write_type_7("is_request", data.getIsRequest()); ser.write_type_a("request_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getRequestTarget()); ser.write_type_a("reply_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getReplyTarget()); @@ -126,8 +128,7 @@ public final void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage dat @Override public final void deserialize(us.ihmc.idl.InterchangeSerializer ser, ihmc_common_msgs.msg.dds.DistributedClockMessage data) { - ser.read_type_a("destination_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getDestinationTarget()); - + data.setIsRequest(ser.read_type_7("is_request")); ser.read_type_a("request_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getRequestTarget()); ser.read_type_a("reply_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getReplyTarget()); diff --git a/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg b/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg index 813b7d9f43b..4da98d6c8fc 100644 --- a/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg +++ b/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg @@ -1,8 +1,8 @@ # This message is a request-reply ping used to estimate the clock offset # of another node to be used by data synchronization algorithms. -# The guid of destination of this message -ihmc_common_msgs/GuidMessage destination_target +# If this is a request message, else it's a reply message +bool is_request # The guid of the publisher that's expected to send back a reply ihmc_common_msgs/GuidMessage request_target diff --git a/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg b/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg index da7533c6eae..a035fc68c86 100644 --- a/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg +++ b/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg @@ -1,8 +1,8 @@ # This message is a request-reply ping used to estimate the clock offset # of another node to be used by data synchronization algorithms. -# The guid of destination of this message -ihmc_common_msgs/GuidMessage destination_target +# If this is a request message, else it's a reply message +bool is_request # The guid of the publisher that's expected to send back a reply ihmc_common_msgs/GuidMessage request_target From e65051f0256a66a62a778608c8ed51c2cd071a9b Mon Sep 17 00:00:00 2001 From: Duncan Calvert Date: Fri, 13 Dec 2024 17:10:42 -0600 Subject: [PATCH 09/10] Add doc. --- .../ihmc/communication/ros2/sync/ROS2DistributedClock.java | 4 +++- .../communication/ros2/sync/ROS2DistributedClockPeer.java | 6 ++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java index 58254a1350d..ab831f6d64c 100644 --- a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java @@ -18,7 +18,9 @@ import java.util.List; /** - * Assumes symmetric network delay. + * Sends out pings to other instances of this class on the network to estimate + * the clock offsets on those computers. This can be used to synchronize data + * modifications. We account for and assume a symmetric network delay. */ public class ROS2DistributedClock { diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java index 3785827b245..cd7b73a7d8c 100644 --- a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java @@ -5,6 +5,12 @@ import java.time.Duration; import java.time.Instant; +/** + * Keeps track of the clock offset of a peer. + * + * TODO: Add an alpha filter since the network latency and compute + * is noisy, but the clock rates are smooth. + */ public class ROS2DistributedClockPeer { private final Guid guid; From 9cb4b57249189e727804cc85cbc27c40377d1a5b Mon Sep 17 00:00:00 2001 From: Duncan Calvert Date: Fri, 13 Dec 2024 17:19:01 -0600 Subject: [PATCH 10/10] Rename eveything. --- ...java => ROS2PeerClockOffsetEstimator.java} | 29 ++++++------ ... => ROS2PeerClockOffsetEstimatorPeer.java} | 4 +- ... => ROS2PeerClockOffsetEstimatorTest.java} | 22 +++++----- ... PeerClockOffsetEstimatorPingMessage_.idl} | 8 ++-- ... PeerClockOffsetEstimatorPingMessage.java} | 22 +++++----- ...OffsetEstimatorPingMessagePubSubType.java} | 44 +++++++++---------- ...> PeerClockOffsetEstimatorPingMessage.msg} | 0 ...> PeerClockOffsetEstimatorPingMessage.msg} | 0 8 files changed, 65 insertions(+), 64 deletions(-) rename ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/{ROS2DistributedClock.java => ROS2PeerClockOffsetEstimator.java} (75%) rename ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/{ROS2DistributedClockPeer.java => ROS2PeerClockOffsetEstimatorPeer.java} (91%) rename ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/{ROS2DistributedClockTest.java => ROS2PeerClockOffsetEstimatorTest.java} (64%) rename ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/{DistributedClockMessage_.idl => PeerClockOffsetEstimatorPingMessage_.idl} (79%) rename ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/{DistributedClockMessage.java => PeerClockOffsetEstimatorPingMessage.java} (83%) rename ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/{DistributedClockMessagePubSubType.java => PeerClockOffsetEstimatorPingMessagePubSubType.java} (67%) rename ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/{DistributedClockMessage.msg => PeerClockOffsetEstimatorPingMessage.msg} (100%) rename ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/{DistributedClockMessage.msg => PeerClockOffsetEstimatorPingMessage.msg} (100%) diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2PeerClockOffsetEstimator.java similarity index 75% rename from ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java rename to ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2PeerClockOffsetEstimator.java index ab831f6d64c..d2856774e97 100644 --- a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClock.java +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2PeerClockOffsetEstimator.java @@ -1,6 +1,6 @@ package us.ihmc.communication.ros2.sync; -import ihmc_common_msgs.msg.dds.DistributedClockMessage; +import ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage; import us.ihmc.commons.exception.DefaultExceptionHandler; import us.ihmc.commons.thread.RepeatingTaskThread; import us.ihmc.commons.thread.ThreadTools; @@ -22,25 +22,26 @@ * the clock offsets on those computers. This can be used to synchronize data * modifications. We account for and assume a symmetric network delay. */ -public class ROS2DistributedClock +public class ROS2PeerClockOffsetEstimator { - private static final ROS2Topic TOPIC = ROS2Tools.IHMC_ROOT.withModule("distributed_clock").withType(DistributedClockMessage.class); + private static final ROS2Topic TOPIC = ROS2Tools.IHMC_ROOT.withModule("peer_clock_offset_estimator") + .withType(PeerClockOffsetEstimatorPingMessage.class); - private final HashMap peerMap = new HashMap<>(); - private final List peerList = new ArrayList<>(); + private final HashMap peerMap = new HashMap<>(); + private final List peerList = new ArrayList<>(); private int nextPeerToPing = 0; - private final ROS2Publisher publisher; + private final ROS2Publisher publisher; private final Guid ourGuid; private final RepeatingTaskThread requestThread = new RepeatingTaskThread(getClass().getSimpleName(), this::requestThread, DefaultExceptionHandler.MESSAGE_AND_STACKTRACE); - private final DistributedClockMessage requestMessage = new DistributedClockMessage(); - private final DistributedClockMessage receivedMessage = new DistributedClockMessage(); + private final PeerClockOffsetEstimatorPingMessage requestMessage = new PeerClockOffsetEstimatorPingMessage(); + private final PeerClockOffsetEstimatorPingMessage receivedMessage = new PeerClockOffsetEstimatorPingMessage(); private final SampleInfo sampleInfo = new SampleInfo(); private final Guid receivedRequestTarget = new Guid(); private final Guid receivedReplyTarget = new Guid(); - public ROS2DistributedClock(ROS2Node ros2Node) + public ROS2PeerClockOffsetEstimator(ROS2Node ros2Node) { publisher = ros2Node.createPublisher(TOPIC); ourGuid = publisher.getPublisher().getGuid(); @@ -54,7 +55,7 @@ public ROS2DistributedClock(ROS2Node ros2Node) if (receivedMessage.getIsRequest() && receivedRequestTarget.equals(ourGuid)) // Reply { - DistributedClockMessage replyMessage = new DistributedClockMessage(); + PeerClockOffsetEstimatorPingMessage replyMessage = new PeerClockOffsetEstimatorPingMessage(); replyMessage.set(receivedMessage); replyMessage.setIsRequest(false); MessageTools.toMessage(Instant.now(), replyMessage.getReplySendTime()); @@ -69,7 +70,7 @@ public ROS2DistributedClock(ROS2Node ros2Node) } else if (!receivedMessage.getIsRequest() && receivedReplyTarget.equals(ourGuid)) // Update clock offset estimate { - ROS2DistributedClockPeer peer = peerMap.get(receivedRequestTarget); + ROS2PeerClockOffsetEstimatorPeer peer = peerMap.get(receivedRequestTarget); if (peer != null) { peer.update(MessageTools.toInstant(receivedMessage.getRequestSendTime()), @@ -91,7 +92,7 @@ else if (!receivedMessage.getIsRequest() && receivedReplyTarget.equals(ourGuid)) Guid guidCopy = new Guid(); guidCopy.set(guid); - ROS2DistributedClockPeer peer = new ROS2DistributedClockPeer(guidCopy); + ROS2PeerClockOffsetEstimatorPeer peer = new ROS2PeerClockOffsetEstimatorPeer(guidCopy); peerMap.put(guidCopy, peer); peerList.add(peer); } @@ -116,7 +117,7 @@ private void requestThread() if (nextPeerToPing >= peerList.size()) nextPeerToPing = 0; - ROS2DistributedClockPeer peer = peerList.get(nextPeerToPing); + ROS2PeerClockOffsetEstimatorPeer peer = peerList.get(nextPeerToPing); requestMessage.setIsRequest(true); MessageTools.toMessage(peer.getGuid(), requestMessage.getRequestTarget()); MessageTools.toMessage(ourGuid, requestMessage.getReplyTarget()); @@ -135,7 +136,7 @@ public void destroy() requestThread.kill(); } - public List getPeerList() + public List getPeerList() { return peerList; } diff --git a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2PeerClockOffsetEstimatorPeer.java similarity index 91% rename from ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java rename to ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2PeerClockOffsetEstimatorPeer.java index cd7b73a7d8c..a8d9b4fbf65 100644 --- a/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockPeer.java +++ b/ihmc-communication/src/main/java/us/ihmc/communication/ros2/sync/ROS2PeerClockOffsetEstimatorPeer.java @@ -11,12 +11,12 @@ * TODO: Add an alpha filter since the network latency and compute * is noisy, but the clock rates are smooth. */ -public class ROS2DistributedClockPeer +public class ROS2PeerClockOffsetEstimatorPeer { private final Guid guid; private Duration peerClockOffset; - public ROS2DistributedClockPeer(Guid guid) + public ROS2PeerClockOffsetEstimatorPeer(Guid guid) { this.guid = guid; } diff --git a/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java b/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2PeerClockOffsetEstimatorTest.java similarity index 64% rename from ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java rename to ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2PeerClockOffsetEstimatorTest.java index fb4e4d516f2..2115ab34776 100644 --- a/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2DistributedClockTest.java +++ b/ihmc-communication/src/test/java/us/ihmc/communication/ros2/sync/ROS2PeerClockOffsetEstimatorTest.java @@ -12,34 +12,34 @@ import java.time.Duration; import java.time.Instant; -public class ROS2DistributedClockTest +public class ROS2PeerClockOffsetEstimatorTest { @Test public void test() { - ROS2Node ros2Node0 = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("distributed_clock_test0"); - ROS2Node ros2Node1 = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("distributed_clock_test1"); + ROS2Node ros2Node0 = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("peer_clock_test0"); + ROS2Node ros2Node1 = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("peer_clock_test1"); - ROS2DistributedClock[] clocks = new ROS2DistributedClock[3]; - clocks[0] = new ROS2DistributedClock(ros2Node0); - clocks[1] = new ROS2DistributedClock(ros2Node0); - clocks[2] = new ROS2DistributedClock(ros2Node1); + ROS2PeerClockOffsetEstimator[] clocks = new ROS2PeerClockOffsetEstimator[3]; + clocks[0] = new ROS2PeerClockOffsetEstimator(ros2Node0); + clocks[1] = new ROS2PeerClockOffsetEstimator(ros2Node0); + clocks[2] = new ROS2PeerClockOffsetEstimator(ros2Node1); ThreadTools.park(0.5); for (int i = 0; i < 10; i++) { - for (ROS2DistributedClock clock : clocks) + for (ROS2PeerClockOffsetEstimator clock : clocks) Assertions.assertEquals(clocks.length - 1, clock.getPeerList().size()); - for (ROS2DistributedClock clock : clocks) + for (ROS2PeerClockOffsetEstimator clock : clocks) { Duration offset = clock.getPeerList().get(0).getPeerClockOffset(); LogTools.info("Clock offset: {}", offset); Assertions.assertTrue(TimeTools.toDoubleSeconds(offset) < 0.1); } - for (ROS2DistributedClock clock : clocks) + for (ROS2PeerClockOffsetEstimator clock : clocks) { LogTools.info("Converted peer time: {}", clock.getPeerList().get(0).convertPeerTimeToOurTime(Instant.now())); } @@ -47,7 +47,7 @@ public void test() ThreadTools.park(0.3); } - for (ROS2DistributedClock clock : clocks) + for (ROS2PeerClockOffsetEstimator clock : clocks) clock.destroy(); ros2Node0.destroy(); diff --git a/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl b/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/PeerClockOffsetEstimatorPingMessage_.idl similarity index 79% rename from ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl rename to ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/PeerClockOffsetEstimatorPingMessage_.idl index 37aef40cdb5..60c0f1a61b3 100644 --- a/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/DistributedClockMessage_.idl +++ b/ihmc-interfaces/src/main/generated-idl/ihmc_common_msgs/msg/PeerClockOffsetEstimatorPingMessage_.idl @@ -1,5 +1,5 @@ -#ifndef __ihmc_common_msgs__msg__DistributedClockMessage__idl__ -#define __ihmc_common_msgs__msg__DistributedClockMessage__idl__ +#ifndef __ihmc_common_msgs__msg__PeerClockOffsetEstimatorPingMessage__idl__ +#define __ihmc_common_msgs__msg__PeerClockOffsetEstimatorPingMessage__idl__ #include "ihmc_common_msgs/msg/./GuidMessage_.idl" #include "ihmc_common_msgs/msg/./InstantMessage_.idl" @@ -14,8 +14,8 @@ module ihmc_common_msgs * This message is a request-reply ping used to estimate the clock offset * of another node to be used by data synchronization algorithms. */ - @TypeCode(type="ihmc_common_msgs::msg::dds_::DistributedClockMessage_") - struct DistributedClockMessage + @TypeCode(type="ihmc_common_msgs::msg::dds_::PeerClockOffsetEstimatorPingMessage_") + struct PeerClockOffsetEstimatorPingMessage { /** * If this is a request message, else it's a reply message diff --git a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/PeerClockOffsetEstimatorPingMessage.java similarity index 83% rename from ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java rename to ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/PeerClockOffsetEstimatorPingMessage.java index ebcc8c5000d..2f28591b13a 100644 --- a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessage.java +++ b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/PeerClockOffsetEstimatorPingMessage.java @@ -10,7 +10,7 @@ * This message is a request-reply ping used to estimate the clock offset * of another node to be used by data synchronization algorithms. */ -public class DistributedClockMessage extends Packet implements Settable, EpsilonComparable +public class PeerClockOffsetEstimatorPingMessage extends Packet implements Settable, EpsilonComparable { /** * If this is a request message, else it's a reply message @@ -33,7 +33,7 @@ public class DistributedClockMessage extends Packet imp */ public ihmc_common_msgs.msg.dds.InstantMessage reply_send_time_; - public DistributedClockMessage() + public PeerClockOffsetEstimatorPingMessage() { request_target_ = new ihmc_common_msgs.msg.dds.GuidMessage(); reply_target_ = new ihmc_common_msgs.msg.dds.GuidMessage(); @@ -41,13 +41,13 @@ public DistributedClockMessage() reply_send_time_ = new ihmc_common_msgs.msg.dds.InstantMessage(); } - public DistributedClockMessage(DistributedClockMessage other) + public PeerClockOffsetEstimatorPingMessage(PeerClockOffsetEstimatorPingMessage other) { this(); set(other); } - public void set(DistributedClockMessage other) + public void set(PeerClockOffsetEstimatorPingMessage other) { is_request_ = other.is_request_; @@ -109,19 +109,19 @@ public ihmc_common_msgs.msg.dds.InstantMessage getReplySendTime() } - public static Supplier getPubSubType() + public static Supplier getPubSubType() { - return DistributedClockMessagePubSubType::new; + return PeerClockOffsetEstimatorPingMessagePubSubType::new; } @Override public Supplier getPubSubTypePacket() { - return DistributedClockMessagePubSubType::new; + return PeerClockOffsetEstimatorPingMessagePubSubType::new; } @Override - public boolean epsilonEquals(DistributedClockMessage other, double epsilon) + public boolean epsilonEquals(PeerClockOffsetEstimatorPingMessage other, double epsilon) { if(other == null) return false; if(other == this) return true; @@ -141,9 +141,9 @@ public boolean equals(Object other) { if(other == null) return false; if(other == this) return true; - if(!(other instanceof DistributedClockMessage)) return false; + if(!(other instanceof PeerClockOffsetEstimatorPingMessage)) return false; - DistributedClockMessage otherMyClass = (DistributedClockMessage) other; + PeerClockOffsetEstimatorPingMessage otherMyClass = (PeerClockOffsetEstimatorPingMessage) other; if(this.is_request_ != otherMyClass.is_request_) return false; @@ -160,7 +160,7 @@ public java.lang.String toString() { StringBuilder builder = new StringBuilder(); - builder.append("DistributedClockMessage {"); + builder.append("PeerClockOffsetEstimatorPingMessage {"); builder.append("is_request="); builder.append(this.is_request_); builder.append(", "); builder.append("request_target="); diff --git a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/PeerClockOffsetEstimatorPingMessagePubSubType.java similarity index 67% rename from ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java rename to ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/PeerClockOffsetEstimatorPingMessagePubSubType.java index 30d5038581f..793f94d10cf 100644 --- a/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/DistributedClockMessagePubSubType.java +++ b/ihmc-interfaces/src/main/generated-java/ihmc_common_msgs/msg/dds/PeerClockOffsetEstimatorPingMessagePubSubType.java @@ -2,20 +2,20 @@ /** * -* Topic data type of the struct "DistributedClockMessage" defined in "DistributedClockMessage_.idl". Use this class to provide the TopicDataType to a Participant. +* Topic data type of the struct "PeerClockOffsetEstimatorPingMessage" defined in "PeerClockOffsetEstimatorPingMessage_.idl". Use this class to provide the TopicDataType to a Participant. * -* This file was automatically generated from DistributedClockMessage_.idl by us.ihmc.idl.generator.IDLGenerator. -* Do not update this file directly, edit DistributedClockMessage_.idl instead. +* This file was automatically generated from PeerClockOffsetEstimatorPingMessage_.idl by us.ihmc.idl.generator.IDLGenerator. +* Do not update this file directly, edit PeerClockOffsetEstimatorPingMessage_.idl instead. * */ -public class DistributedClockMessagePubSubType implements us.ihmc.pubsub.TopicDataType +public class PeerClockOffsetEstimatorPingMessagePubSubType implements us.ihmc.pubsub.TopicDataType { - public static final java.lang.String name = "ihmc_common_msgs::msg::dds_::DistributedClockMessage_"; + public static final java.lang.String name = "ihmc_common_msgs::msg::dds_::PeerClockOffsetEstimatorPingMessage_"; @Override public final java.lang.String getDefinitionChecksum() { - return "16d5f26ad9da80c8261bf1dc1800a8b91e7535ec399247617b9095f59ee71428"; + return "d21275a34208cc47d453c10c1d2d01990ee182f4a62b2de9c99f880e6b2bf166"; } @Override @@ -28,7 +28,7 @@ public final java.lang.String getDefinitionVersion() private final us.ihmc.idl.CDR deserializeCDR = new us.ihmc.idl.CDR(); @Override - public void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.pubsub.common.SerializedPayload serializedPayload) throws java.io.IOException + public void serialize(ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage data, us.ihmc.pubsub.common.SerializedPayload serializedPayload) throws java.io.IOException { serializeCDR.serialize(serializedPayload); write(data, serializeCDR); @@ -36,7 +36,7 @@ public void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us. } @Override - public void deserialize(us.ihmc.pubsub.common.SerializedPayload serializedPayload, ihmc_common_msgs.msg.dds.DistributedClockMessage data) throws java.io.IOException + public void deserialize(us.ihmc.pubsub.common.SerializedPayload serializedPayload, ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage data) throws java.io.IOException { deserializeCDR.deserialize(serializedPayload); read(data, deserializeCDR); @@ -66,12 +66,12 @@ public static int getMaxCdrSerializedSize(int current_alignment) return current_alignment - initial_alignment; } - public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.DistributedClockMessage data) + public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage data) { return getCdrSerializedSize(data, 0); } - public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.DistributedClockMessage data, int current_alignment) + public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage data, int current_alignment) { int initial_alignment = current_alignment; @@ -90,7 +90,7 @@ public final static int getCdrSerializedSize(ihmc_common_msgs.msg.dds.Distribute return current_alignment - initial_alignment; } - public static void write(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.CDR cdr) + public static void write(ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage data, us.ihmc.idl.CDR cdr) { cdr.write_type_7(data.getIsRequest()); @@ -100,7 +100,7 @@ public static void write(ihmc_common_msgs.msg.dds.DistributedClockMessage data, ihmc_common_msgs.msg.dds.InstantMessagePubSubType.write(data.getReplySendTime(), cdr); } - public static void read(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.CDR cdr) + public static void read(ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage data, us.ihmc.idl.CDR cdr) { data.setIsRequest(cdr.read_type_7()); @@ -112,7 +112,7 @@ public static void read(ihmc_common_msgs.msg.dds.DistributedClockMessage data, u } @Override - public final void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.InterchangeSerializer ser) + public final void serialize(ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage data, us.ihmc.idl.InterchangeSerializer ser) { ser.write_type_7("is_request", data.getIsRequest()); ser.write_type_a("request_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getRequestTarget()); @@ -126,7 +126,7 @@ public final void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage dat } @Override - public final void deserialize(us.ihmc.idl.InterchangeSerializer ser, ihmc_common_msgs.msg.dds.DistributedClockMessage data) + public final void deserialize(us.ihmc.idl.InterchangeSerializer ser, ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage data) { data.setIsRequest(ser.read_type_7("is_request")); ser.read_type_a("request_target", new ihmc_common_msgs.msg.dds.GuidMessagePubSubType(), data.getRequestTarget()); @@ -139,15 +139,15 @@ public final void deserialize(us.ihmc.idl.InterchangeSerializer ser, ihmc_common } - public static void staticCopy(ihmc_common_msgs.msg.dds.DistributedClockMessage src, ihmc_common_msgs.msg.dds.DistributedClockMessage dest) + public static void staticCopy(ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage src, ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage dest) { dest.set(src); } @Override - public ihmc_common_msgs.msg.dds.DistributedClockMessage createData() + public ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage createData() { - return new ihmc_common_msgs.msg.dds.DistributedClockMessage(); + return new ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage(); } @Override public int getTypeSize() @@ -161,24 +161,24 @@ public java.lang.String getName() return name; } - public void serialize(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.CDR cdr) + public void serialize(ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage data, us.ihmc.idl.CDR cdr) { write(data, cdr); } - public void deserialize(ihmc_common_msgs.msg.dds.DistributedClockMessage data, us.ihmc.idl.CDR cdr) + public void deserialize(ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage data, us.ihmc.idl.CDR cdr) { read(data, cdr); } - public void copy(ihmc_common_msgs.msg.dds.DistributedClockMessage src, ihmc_common_msgs.msg.dds.DistributedClockMessage dest) + public void copy(ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage src, ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage dest) { staticCopy(src, dest); } @Override - public DistributedClockMessagePubSubType newInstance() + public PeerClockOffsetEstimatorPingMessagePubSubType newInstance() { - return new DistributedClockMessagePubSubType(); + return new PeerClockOffsetEstimatorPingMessagePubSubType(); } } diff --git a/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg b/ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/PeerClockOffsetEstimatorPingMessage.msg similarity index 100% rename from ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/DistributedClockMessage.msg rename to ihmc-interfaces/src/main/messages/ihmc_interfaces/ihmc_common_msgs/msg/PeerClockOffsetEstimatorPingMessage.msg diff --git a/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg b/ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/PeerClockOffsetEstimatorPingMessage.msg similarity index 100% rename from ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/DistributedClockMessage.msg rename to ihmc-interfaces/src/main/messages/ros1/ihmc_common_msgs/msg/PeerClockOffsetEstimatorPingMessage.msg