Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11627. Support getBlock operation on short-circuit channel. #7456

Draft
wants to merge 5 commits into
base: HDDS-10685
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.DomainSocketFactory;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
Expand All @@ -41,6 +43,8 @@ public static void enableErrorInjection(ErrorInjector injector) {
private final boolean topologyAwareRead;
private final ClientTrustManager trustManager;
private final boolean securityEnabled;
private boolean shortCircuitEnabled;
private DomainSocketFactory domainSocketFactory;

public XceiverClientCreator(ConfigurationSource conf) {
this(conf, null);
Expand All @@ -56,20 +60,36 @@ public XceiverClientCreator(ConfigurationSource conf, ClientTrustManager trustMa
if (securityEnabled) {
Preconditions.checkNotNull(trustManager);
}
shortCircuitEnabled = conf.getObject(OzoneClientConfig.class).isShortCircuitEnabled();
if (shortCircuitEnabled) {
domainSocketFactory = DomainSocketFactory.getInstance(conf);
}
}

public boolean isSecurityEnabled() {
return securityEnabled;
}

public boolean isShortCircuitEnabled() {
return shortCircuitEnabled && domainSocketFactory.isServiceReady();
}

protected XceiverClientSpi newClient(Pipeline pipeline) throws IOException {
return newClient(pipeline, null);
}

protected XceiverClientSpi newClient(Pipeline pipeline, DatanodeDetails dn) throws IOException {
XceiverClientSpi client;
switch (pipeline.getType()) {
case RATIS:
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, trustManager, errorInjector);
break;
case STAND_ALONE:
client = new XceiverClientGrpc(pipeline, conf, trustManager);
if (dn != null) {
client = new XceiverClientShortCircuit(pipeline, conf, dn);
} else {
client = new XceiverClientGrpc(pipeline, conf, trustManager);
}
break;
case EC:
client = new ECXceiverClientGrpc(pipeline, conf, trustManager);
Expand Down Expand Up @@ -97,7 +117,14 @@ public void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClie
}

@Override
public XceiverClientSpi acquireClientForReadData(Pipeline pipeline) throws IOException {
public XceiverClientSpi acquireClientForReadData(Pipeline pipeline, boolean allowShortCircuit)
throws IOException {
return acquireClient(pipeline);
}

@Override
public XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware, boolean allowShortCircuit)
throws IOException {
return acquireClient(pipeline);
}

Expand All @@ -117,7 +144,10 @@ public void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClie
}

@Override
public void close() throws Exception {
// clients are not tracked, closing each client is the responsibility of users of this class
public void close() {
// clients are not tracked, closing each client is the responsibility of users of this classclass
if (domainSocketFactory != null) {
domainSocketFactory.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ public interface XceiverClientFactory extends AutoCloseable {
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
XceiverClientSpi acquireClientForReadData(Pipeline pipeline)
throws IOException;
default XceiverClientSpi acquireClientForReadData(Pipeline pipeline)
throws IOException {
return acquireClientForReadData(pipeline, false);
}

/**
* Releases a read XceiverClientSpi after use.
Expand All @@ -73,10 +75,20 @@ void releaseClientForReadData(XceiverClientSpi client,
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware)
default XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware) throws IOException {
return acquireClient(pipeline, topologyAware, false);
}

XceiverClientSpi acquireClientForReadData(Pipeline pipeline, boolean allowShortCircuit)
throws IOException;

XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware, boolean allowShortCircuit)
throws IOException;

void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient,
boolean topologyAware);

default boolean isShortCircuitEnabled() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
* how it works, and how it is integrated with the Ozone client.
*/
public class XceiverClientGrpc extends XceiverClientSpi {
private static final Logger LOG =
public static final Logger LOG =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final Logger LOG =
private static final Logger LOG =

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XceiverClientGrpc.LOG is used in unit test TestShortCircuitChunkInputStream.

LoggerFactory.getLogger(XceiverClientGrpc.class);
private final Pipeline pipeline;
private final ConfigurationSource config;
Expand Down Expand Up @@ -133,6 +133,7 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
this.trustManager = trustManager;
this.getBlockDNcache = new ConcurrentHashMap<>();
LOG.info("{} is created for pipeline {}", XceiverClientGrpc.class.getSimpleName(), pipeline);
}

/**
Expand Down Expand Up @@ -246,6 +247,10 @@ public synchronized void close() {
}
}

public boolean isClosed() {
return closed;
}

@Override
public Pipeline getPipeline() {
return pipeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.hadoop.hdds.scm;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hdds.conf.Config;
Expand All @@ -29,6 +31,9 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.DomainSocketFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.util.OzoneNetUtils;
import org.apache.hadoop.security.UserGroupInformation;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -38,6 +43,7 @@
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.hadoop.hdds.DatanodeVersion.SHORT_CIRCUIT_READS;
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
import static org.apache.hadoop.hdds.conf.ConfigTag.PERFORMANCE;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.NO_REPLICA_FOUND;
Expand All @@ -64,8 +70,8 @@ public class XceiverClientManager extends XceiverClientCreator {

private final Cache<String, XceiverClientSpi> clientCache;
private final CacheMetrics cacheMetrics;

private static XceiverClientMetrics metrics;
private final ConcurrentHashMap<String, DatanodeDetails> localDNCache;

/**
* Creates a new XceiverClientManager for non secured ozone cluster.
Expand Down Expand Up @@ -105,6 +111,7 @@ public void onRemoval(
}).build();

cacheMetrics = CacheMetrics.create(clientCache, this);
this.localDNCache = new ConcurrentHashMap<>();
}

@VisibleForTesting
Expand All @@ -117,17 +124,54 @@ public Cache<String, XceiverClientSpi> getClientCache() {
*
* If there is already a cached XceiverClientSpi, simply return
* the cached otherwise create a new one.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
@Override
public XceiverClientSpi acquireClient(Pipeline pipeline)
throws IOException {
return acquireClient(pipeline, false, false);
}

/**
* Acquires a XceiverClientSpi connected to a container for read.
*
* If there is already a cached XceiverClientSpi, simply return
* the cached otherwise create a new one.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
@Override
public XceiverClientSpi acquireClientForReadData(Pipeline pipeline, boolean allowShortCircuit)
throws IOException {
return acquireClient(pipeline, false, allowShortCircuit);
}

/**
* Acquires a XceiverClientSpi connected to a container capable of
* storing the specified key.
*
* If there is already a cached XceiverClientSpi, simply return
* the cached otherwise create a new one.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
@Override
public XceiverClientSpi acquireClient(Pipeline pipeline,
boolean topologyAware) throws IOException {
boolean topologyAware, boolean allowShortCircuit) throws IOException {
Preconditions.checkNotNull(pipeline);
Preconditions.checkArgument(pipeline.getNodes() != null);
Preconditions.checkArgument(!pipeline.getNodes().isEmpty(),
NO_REPLICA_FOUND);

synchronized (clientCache) {
XceiverClientSpi info = getClient(pipeline, topologyAware);
XceiverClientSpi info = getClient(pipeline, topologyAware, allowShortCircuit);
info.incrementReference();
return info;
}
Expand All @@ -141,7 +185,7 @@ public void releaseClient(XceiverClientSpi client, boolean invalidateClient,
client.decrementReference();
if (invalidateClient) {
Pipeline pipeline = client.getPipeline();
String key = getPipelineCacheKey(pipeline, topologyAware);
String key = getPipelineCacheKey(pipeline, topologyAware, client instanceof XceiverClientShortCircuit);
XceiverClientSpi cachedClient = clientCache.getIfPresent(key);
if (cachedClient == client) {
clientCache.invalidate(key);
Expand All @@ -150,24 +194,44 @@ public void releaseClient(XceiverClientSpi client, boolean invalidateClient,
}
}

protected XceiverClientSpi getClient(Pipeline pipeline, boolean topologyAware)
protected XceiverClientSpi getClient(Pipeline pipeline, boolean topologyAware, boolean allowShortCircuit)
throws IOException {
try {
// create different client different pipeline node based on
// network topology
String key = getPipelineCacheKey(pipeline, topologyAware);
return clientCache.get(key, () -> newClient(pipeline));
// create different client different pipeline node based on network topology
String key = getPipelineCacheKey(pipeline, topologyAware, allowShortCircuit);
return clientCache.get(key, () -> newClient(pipeline, localDNCache.get(key)));
} catch (Exception e) {
throw new IOException(
"Exception getting XceiverClient: " + e, e);
}
}

private String getPipelineCacheKey(Pipeline pipeline,
boolean topologyAware) {
String key = pipeline.getId().getId().toString() + pipeline.getType();
private String getPipelineCacheKey(Pipeline pipeline, boolean topologyAware, boolean allowShortCircuit) {
String key = pipeline.getId().getId().toString() + "-" + pipeline.getType();
boolean isEC = pipeline.getType() == HddsProtos.ReplicationType.EC;
if (topologyAware || isEC) {
DatanodeDetails localDN = null;

if ((!isEC) && allowShortCircuit && isShortCircuitEnabled()) {
int port = 0;
InetSocketAddress localAddr = null;
for (DatanodeDetails dn : pipeline.getNodes()) {
// read port from the data node, on failure use default configured port.
port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
InetSocketAddress addr = NetUtils.createSocketAddr(dn.getIpAddress(), port);
if (OzoneNetUtils.isAddressLocal(addr) &&
dn.getCurrentVersion() >= SHORT_CIRCUIT_READS.toProtoValue()) {
localAddr = addr;
localDN = dn;
break;
}
}
if (localAddr != null) {
// Find a local DN and short circuit read is enabled
key += "@" + localAddr.getHostName() + ":" + port + "/" + DomainSocketFactory.FEATURE_FLAG;
}
}

if (localDN == null && (topologyAware || isEC)) {
try {
DatanodeDetails closestNode = pipeline.getClosestNode();
// Pipeline cache key uses host:port suffix to handle
Expand All @@ -185,7 +249,7 @@ private String getPipelineCacheKey(Pipeline pipeline,
// Standalone port is chosen since all datanodes should have a
// standalone port regardless of version and this port should not
// have any collisions.
key += closestNode.getHostName() + closestNode.getPort(
key += closestNode.getHostName() + ":" + closestNode.getPort(
DatanodeDetails.Port.Name.STANDALONE);
} catch (IOException e) {
LOG.error("Failed to get closest node to create pipeline cache key:" +
Expand All @@ -197,12 +261,16 @@ private String getPipelineCacheKey(Pipeline pipeline,
// Append user short name to key to prevent a different user
// from using same instance of xceiverClient.
try {
key += UserGroupInformation.getCurrentUser().getShortUserName();
key = UserGroupInformation.getCurrentUser().getShortUserName() + "@" + key;
} catch (IOException e) {
LOG.error("Failed to get current user to create pipeline cache key:" +
e.getMessage());
}
}

if (localDN != null) {
localDNCache.put(key, localDN);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move above log inside if condition.

}
return key;
}

Expand All @@ -211,12 +279,14 @@ private String getPipelineCacheKey(Pipeline pipeline,
*/
@Override
public void close() {
super.close();
//closing is done through RemovalListener
clientCache.invalidateAll();
clientCache.cleanUp();
if (LOG.isDebugEnabled()) {
LOG.debug("XceiverClient cache stats: {}", clientCache.stats());
}
localDNCache.clear();
cacheMetrics.unregister();

if (metrics != null) {
Expand Down
Loading