Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ROS 2 peer clock offset estimator #532

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import ihmc_common_msgs.msg.dds.ConvexPolytope3DMessage;
import ihmc_common_msgs.msg.dds.Cylinder3DMessage;
import ihmc_common_msgs.msg.dds.Ellipsoid3DMessage;
import ihmc_common_msgs.msg.dds.GuidMessage;
import ihmc_common_msgs.msg.dds.InstantMessage;
import ihmc_common_msgs.msg.dds.PoseListMessage;
import ihmc_common_msgs.msg.dds.Ramp3DMessage;
Expand Down Expand Up @@ -82,6 +83,9 @@
import us.ihmc.mecano.multiBodySystem.interfaces.RigidBodyReadOnly;
import us.ihmc.mecano.spatial.interfaces.TwistReadOnly;
import us.ihmc.pubsub.TopicDataType;
import us.ihmc.pubsub.common.Guid;
import us.ihmc.pubsub.common.Guid.Entity;
import us.ihmc.pubsub.common.Guid.GuidPrefix;
import us.ihmc.pubsub.common.SerializedPayload;
import us.ihmc.robotics.lidar.LidarScanParameters;
import us.ihmc.robotics.math.QuaternionCalculus;
Expand Down Expand Up @@ -1422,6 +1426,25 @@ public static UUID toUUID(UUIDMessage uuidMessage)
return new UUID(uuidMessage.getMostSignificantBits(), uuidMessage.getLeastSignificantBits());
}

public static void toMessage(Guid guid, GuidMessage guidMessage)
{
System.arraycopy(guid.getGuidPrefix().getValue(), 0, guidMessage.getPrefix(), 0, GuidPrefix.size);
System.arraycopy(guid.getEntity().getValue(), 0, guidMessage.getEntity(), 0, Entity.size);
}

public static void fromMessage(GuidMessage guidMessage, Guid guid)
{
System.arraycopy(guidMessage.getPrefix(), 0, guid.getGuidPrefix().getValue(), 0, GuidPrefix.size);
System.arraycopy(guidMessage.getEntity(), 0, guid.getEntity().getValue(), 0, Entity.size);
}

public static Guid toGuid(GuidMessage guidMessage)
{
Guid guid = new Guid();
fromMessage(guidMessage, guid);
return guid;
}

public static double calculateDelay(ImageMessage imageMessage)
{
return TimeTools.calculateDelay(imageMessage.getAcquisitionTime().getSecondsSinceEpoch(), imageMessage.getAcquisitionTime().getAdditionalNanos());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package us.ihmc.communication.ros2.sync;

import ihmc_common_msgs.msg.dds.PeerClockOffsetEstimatorPingMessage;
import us.ihmc.commons.exception.DefaultExceptionHandler;
import us.ihmc.commons.thread.RepeatingTaskThread;
import us.ihmc.commons.thread.ThreadTools;
import us.ihmc.communication.ROS2Tools;
import us.ihmc.communication.packets.MessageTools;
import us.ihmc.pubsub.common.Guid;
import us.ihmc.pubsub.common.SampleInfo;
import us.ihmc.ros2.ROS2Node;
import us.ihmc.ros2.ROS2Publisher;
import us.ihmc.ros2.ROS2Topic;

import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
* Sends out pings to other instances of this class on the network to estimate
* the clock offsets on those computers. This can be used to synchronize data
* modifications. We account for and assume a symmetric network delay.
*/
public class ROS2PeerClockOffsetEstimator
{
private static final ROS2Topic<PeerClockOffsetEstimatorPingMessage> TOPIC = ROS2Tools.IHMC_ROOT.withModule("peer_clock_offset_estimator")
.withType(PeerClockOffsetEstimatorPingMessage.class);

private final HashMap<Guid, ROS2PeerClockOffsetEstimatorPeer> peerMap = new HashMap<>();
private final List<ROS2PeerClockOffsetEstimatorPeer> peerList = new ArrayList<>();
private int nextPeerToPing = 0;
private final ROS2Publisher<PeerClockOffsetEstimatorPingMessage> publisher;
private final Guid ourGuid;
private final RepeatingTaskThread requestThread = new RepeatingTaskThread(getClass().getSimpleName(),
this::requestThread,
DefaultExceptionHandler.MESSAGE_AND_STACKTRACE);
private final PeerClockOffsetEstimatorPingMessage requestMessage = new PeerClockOffsetEstimatorPingMessage();
private final PeerClockOffsetEstimatorPingMessage receivedMessage = new PeerClockOffsetEstimatorPingMessage();
private final SampleInfo sampleInfo = new SampleInfo();
private final Guid receivedRequestTarget = new Guid();
private final Guid receivedReplyTarget = new Guid();

public ROS2PeerClockOffsetEstimator(ROS2Node ros2Node)
{
publisher = ros2Node.createPublisher(TOPIC);
ourGuid = publisher.getPublisher().getGuid();

ros2Node.createSubscription(TOPIC, subscriber ->
{
subscriber.takeNextData(receivedMessage, sampleInfo);

MessageTools.fromMessage(receivedMessage.getRequestTarget(), receivedRequestTarget);
MessageTools.fromMessage(receivedMessage.getReplyTarget(), receivedReplyTarget);

if (receivedMessage.getIsRequest() && receivedRequestTarget.equals(ourGuid)) // Reply
{
PeerClockOffsetEstimatorPingMessage replyMessage = new PeerClockOffsetEstimatorPingMessage();
replyMessage.set(receivedMessage);
replyMessage.setIsRequest(false);
MessageTools.toMessage(Instant.now(), replyMessage.getReplySendTime());

ThreadTools.startAsDaemon(() ->
{
synchronized (publisher)
{
publisher.publish(replyMessage);
}
}, getClass().getSimpleName() + "PublishReply");
}
else if (!receivedMessage.getIsRequest() && receivedReplyTarget.equals(ourGuid)) // Update clock offset estimate
{
ROS2PeerClockOffsetEstimatorPeer peer = peerMap.get(receivedRequestTarget);
if (peer != null)
{
peer.update(MessageTools.toInstant(receivedMessage.getRequestSendTime()),
Instant.now(),
MessageTools.toInstant(receivedMessage.getReplySendTime()));
}
}
}, (subscriber, info) ->
{
Guid guid = info.getGuid();
if (!guid.equals(publisher.getPublisher().getGuid())) // Exclude our publisher
{
switch (info.getStatus())
{
case MATCHED_MATCHING ->
{
if (!peerMap.containsKey(guid))
{
Guid guidCopy = new Guid();
guidCopy.set(guid);

ROS2PeerClockOffsetEstimatorPeer peer = new ROS2PeerClockOffsetEstimatorPeer(guidCopy);
peerMap.put(guidCopy, peer);
peerList.add(peer);
}
}
case REMOVED_MATCHING ->
{
peerMap.remove(guid);
peerList.removeIf(peer -> peer.getGuid().equals(guid));
}
}
}
});

requestThread.setFrequencyLimit(5.0);
requestThread.startRepeating();
}

private void requestThread()
{
if (!peerList.isEmpty())
{
if (nextPeerToPing >= peerList.size())
nextPeerToPing = 0;

ROS2PeerClockOffsetEstimatorPeer peer = peerList.get(nextPeerToPing);
requestMessage.setIsRequest(true);
MessageTools.toMessage(peer.getGuid(), requestMessage.getRequestTarget());
MessageTools.toMessage(ourGuid, requestMessage.getReplyTarget());
MessageTools.toMessage(Instant.now(), requestMessage.getRequestSendTime());
synchronized (publisher)
{
publisher.publish(requestMessage);
}

++nextPeerToPing;
}
}

public void destroy()
{
requestThread.kill();
}

public List<ROS2PeerClockOffsetEstimatorPeer> getPeerList()
{
return peerList;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package us.ihmc.communication.ros2.sync;

import us.ihmc.pubsub.common.Guid;

import java.time.Duration;
import java.time.Instant;

/**
* Keeps track of the clock offset of a peer.
*
* TODO: Add an alpha filter since the network latency and compute
* is noisy, but the clock rates are smooth.
*/
public class ROS2PeerClockOffsetEstimatorPeer
{
private final Guid guid;
private Duration peerClockOffset;

public ROS2PeerClockOffsetEstimatorPeer(Guid guid)
{
this.guid = guid;
}

public void update(Instant requestSendTime, Instant replyReceiveTime, Instant peerClockTime)
{
Duration roundTripTime = Duration.between(requestSendTime, replyReceiveTime);
Duration halfRoundTripTime = roundTripTime.dividedBy(2);
Instant peerNow = peerClockTime.plus(halfRoundTripTime);
peerClockOffset = Duration.between(replyReceiveTime, peerNow);
}

public Instant convertPeerTimeToOurTime(Instant peerTime)
{
return peerTime.minus(peerClockOffset);
}

public Duration getPeerClockOffset()
{
return peerClockOffset;
}

public Guid getGuid()
{
return guid;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package us.ihmc.communication.ros2.sync;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import us.ihmc.commons.thread.ThreadTools;
import us.ihmc.log.LogTools;
import us.ihmc.robotics.time.TimeTools;
import us.ihmc.ros2.ROS2Node;
import us.ihmc.ros2.ROS2NodeBuilder;
import us.ihmc.ros2.ROS2NodeBuilder.SpecialTransportMode;

import java.time.Duration;
import java.time.Instant;

public class ROS2PeerClockOffsetEstimatorTest
{
@Test
public void test()
{
ROS2Node ros2Node0 = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("peer_clock_test0");
ROS2Node ros2Node1 = new ROS2NodeBuilder().specialTransportMode(SpecialTransportMode.INTRAPROCESS_ONLY).build("peer_clock_test1");

ROS2PeerClockOffsetEstimator[] clocks = new ROS2PeerClockOffsetEstimator[3];
clocks[0] = new ROS2PeerClockOffsetEstimator(ros2Node0);
clocks[1] = new ROS2PeerClockOffsetEstimator(ros2Node0);
clocks[2] = new ROS2PeerClockOffsetEstimator(ros2Node1);

ThreadTools.park(0.5);

for (int i = 0; i < 10; i++)
{
for (ROS2PeerClockOffsetEstimator clock : clocks)
Assertions.assertEquals(clocks.length - 1, clock.getPeerList().size());

for (ROS2PeerClockOffsetEstimator clock : clocks)
{
Duration offset = clock.getPeerList().get(0).getPeerClockOffset();
LogTools.info("Clock offset: {}", offset);
Assertions.assertTrue(TimeTools.toDoubleSeconds(offset) < 0.1);
}

for (ROS2PeerClockOffsetEstimator clock : clocks)
{
LogTools.info("Converted peer time: {}", clock.getPeerList().get(0).convertPeerTimeToOurTime(Instant.now()));
}

ThreadTools.park(0.3);
}

for (ROS2PeerClockOffsetEstimator clock : clocks)
clock.destroy();

ros2Node0.destroy();
ros2Node1.destroy();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef __ihmc_common_msgs__msg__GuidMessage__idl__
#define __ihmc_common_msgs__msg__GuidMessage__idl__

module ihmc_common_msgs
{
module msg
{
module dds
{

/**
* Represents a DDS-RTPS Guid
*/
@TypeCode(type="ihmc_common_msgs::msg::dds_::GuidMessage_")
struct GuidMessage
{
/**
* Prefix
*/
octet prefix[12];
/**
* Entity
*/
octet entity[4];
};
};
};
};

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#ifndef __ihmc_common_msgs__msg__PeerClockOffsetEstimatorPingMessage__idl__
#define __ihmc_common_msgs__msg__PeerClockOffsetEstimatorPingMessage__idl__

#include "ihmc_common_msgs/msg/./GuidMessage_.idl"
#include "ihmc_common_msgs/msg/./InstantMessage_.idl"
module ihmc_common_msgs
{
module msg
{
module dds
{

/**
* This message is a request-reply ping used to estimate the clock offset
* of another node to be used by data synchronization algorithms.
*/
@TypeCode(type="ihmc_common_msgs::msg::dds_::PeerClockOffsetEstimatorPingMessage_")
struct PeerClockOffsetEstimatorPingMessage
{
/**
* If this is a request message, else it's a reply message
*/
boolean is_request;
/**
* The guid of the publisher that's expected to send back a reply
*/
ihmc_common_msgs::msg::dds::GuidMessage request_target;
/**
* The guid of the publisher associated with the requester
*/
ihmc_common_msgs::msg::dds::GuidMessage reply_target;
/**
* The time at which the request is sent
*/
ihmc_common_msgs::msg::dds::InstantMessage request_send_time;
/**
* The time at which the reply is sent
*/
ihmc_common_msgs::msg::dds::InstantMessage reply_send_time;
};
};
};
};

#endif
Loading
Loading