Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically switch jmx/sensor names based on dual read mode and source type #926

Merged
merged 4 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.44.0] - 2023-08-06
- dynamically switch jmx/sensor names based on dual read mode and source type

## [29.43.11] - 2023-08-01
- fix logging issues about observer host and dual read mode

Expand Down Expand Up @@ -5512,7 +5515,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.43.11...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.44.0...master
[29.44.0]: https://github.com/linkedin/rest.li/compare/v29.43.11...v29.44.0
[29.43.11]: https://github.com/linkedin/rest.li/compare/v29.43.10...v29.43.11
[29.43.10]: https://github.com/linkedin/rest.li/compare/v29.43.9...v29.43.10
[29.43.9]: https://github.com/linkedin/rest.li/compare/v29.43.8...v29.43.9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
{
LOG.info("Creating D2 LoadBalancer based on LastSeenLoadBalancerWithFacilities");

D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager);
D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager,
D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager);

// init connection
ZKConnectionBuilder zkConnectionBuilder = new ZKConnectionBuilder(config.zkHosts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private ZKFSLoadBalancer.TogglingLoadBalancerFactory createLoadBalancerFactory(D
loadBalancerComponentFactory = config.componentFactory;
}

D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager);
D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager,
D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager);

return new ZKFSTogglingLoadBalancerFactoryImpl(loadBalancerComponentFactory,
config.lbWaitTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.util.clock.Clock;
import com.linkedin.util.clock.SystemClock;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Checks and manages the global and per-service dual read state.
* Provides monitoring of the dual read data.
*
* The dual read state is broken down into global and per-service state. Per-service dual read
* mode has a higher priority. Only if per-service dual read mode is not defined, global
* dual read mode will be used.
Expand All @@ -52,6 +53,10 @@ public class DualReadStateManager
private final RateLimiter _rateLimiter;
// Stores global dual read mode
private volatile DualReadModeProvider.DualReadMode _dualReadMode = DualReadModeProvider.DualReadMode.OLD_LB_ONLY;
private final Set<DualReadModeWatcher> _globalDualReadModeWatchers;
private final ConcurrentMap<String, Set<DualReadModeWatcher>> _serviceDualReadModeWatchers;
private final ConcurrentMap<String, Set<DualReadModeWatcher>> _clusterDualReadModeWatchers;

private final DualReadLoadBalancerJmx _dualReadLoadBalancerJmx;

private final DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor _uriPropertiesDualReadMonitor;
Expand All @@ -74,6 +79,9 @@ public DualReadStateManager(DualReadModeProvider dualReadModeProvider, Scheduled
_dualReadModeProvider = dualReadModeProvider;
_executorService = executorService;
_rateLimiter = RateLimiter.create((double) 1 / DUAL_READ_MODE_SWITCH_MIN_INTERVAL);
_globalDualReadModeWatchers = ConcurrentHashMap.newKeySet();
_serviceDualReadModeWatchers = new ConcurrentHashMap<>();
_clusterDualReadModeWatchers = new ConcurrentHashMap<>();
}

public void updateGlobal(DualReadModeProvider.DualReadMode mode)
Expand All @@ -82,6 +90,7 @@ public void updateGlobal(DualReadModeProvider.DualReadMode mode)
_dualReadMode = mode;
if (updated) {
LOG.info("Global dual read mode updated: {}", mode);
notifyGlobalWatchers(_dualReadMode);
}
}

Expand All @@ -90,6 +99,7 @@ public void updateService(String service, DualReadModeProvider.DualReadMode mode
DualReadModeProvider.DualReadMode oldMode = _serviceDualReadModes.put(service, mode);
if (oldMode != mode) {
LOG.info("Dual read mode for service {} updated: {}", service, mode);
notifyServiceWatchers(service, mode);
}
}

Expand All @@ -98,6 +108,7 @@ public void updateCluster(String cluster, DualReadModeProvider.DualReadMode mode
DualReadModeProvider.DualReadMode oldMode = _clusterDualReadModes.put(cluster, mode);
if (oldMode != mode) {
LOG.info("Dual read mode for cluster {} updated: {}", cluster, mode);
notifyClusterWatchers(cluster, mode);
}
}

Expand Down Expand Up @@ -198,4 +209,75 @@ public DualReadModeProvider getDualReadModeProvider()
{
return _dualReadModeProvider;
}

bohhyang marked this conversation as resolved.
Show resolved Hide resolved
// Add watchers watching for global dual read mode. The watcher will be notified when the global dual read mode changes.
public void addGlobalWatcher(DualReadModeWatcher watcher)
{
_globalDualReadModeWatchers.add(watcher);
}

// Add watchers watching for dual read mode of a service. The watcher will be notified when the dual read mode changes.
public void addServiceWatcher(String serviceName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _serviceDualReadModeWatchers.computeIfAbsent(serviceName, k -> ConcurrentHashMap.newKeySet());
watchers.add(watcher);
}

// Add watchers watching for dual read mode of a cluster. The watcher will be notified when the dual read mode changes.
public void addClusterWatcher(String clusterName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _clusterDualReadModeWatchers.computeIfAbsent(clusterName, k -> ConcurrentHashMap.newKeySet());
watchers.add(watcher);
}

// Remove watchers for dual read mode of a service.
public void removeServiceWatcher(String serviceName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _serviceDualReadModeWatchers.get(serviceName);
if (watchers != null)
{
watchers.remove(watcher);
}
}

// Remove watchers for dual read mode of a cluster.
public void removeClusterWatcher(String clusterName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _clusterDualReadModeWatchers.get(clusterName);
if (watchers != null)
{
watchers.remove(watcher);
}
}

private void notifyGlobalWatchers(DualReadModeProvider.DualReadMode mode)
{
notifyWatchers(_globalDualReadModeWatchers, mode);
}

private void notifyServiceWatchers(String serviceName, DualReadModeProvider.DualReadMode mode)
{
notifyWatchers(_serviceDualReadModeWatchers.get(serviceName), mode);
}

private void notifyClusterWatchers(String clusterName, DualReadModeProvider.DualReadMode mode)
{
notifyWatchers(_clusterDualReadModeWatchers.get(clusterName), mode);
}

private static void notifyWatchers(Set<DualReadModeWatcher> watchers, DualReadModeProvider.DualReadMode mode)
{
if (watchers != null)
{
for (DualReadModeWatcher w : watchers)
{
w.onChanged(mode);
}
}
}

public interface DualReadModeWatcher
{
void onChanged(@Nonnull DualReadModeProvider.DualReadMode mode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
Copyright (c) 2023 LinkedIn Corp.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.linkedin.d2.jmx;

import com.linkedin.d2.balancer.LoadBalancerStateItem;
import com.linkedin.d2.balancer.dualread.DualReadModeProvider;
import com.linkedin.d2.balancer.dualread.DualReadStateManager;
import com.linkedin.d2.balancer.properties.ClusterProperties;
import com.linkedin.d2.balancer.properties.ServiceProperties;
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.d2.balancer.simple.ClusterInfoItem;
import com.linkedin.d2.balancer.simple.SimpleLoadBalancer;
import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState;
import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy;
import com.linkedin.d2.discovery.stores.file.FileStore;
import java.util.function.BiConsumer;
import javax.annotation.Nonnull;


/**
* Manage d2 client jmx dual read mode watchers for different types of load balancing related properties.
*/
public interface D2ClientJmxDualReadModeWatcherManager
{

void updateWatcher(SimpleLoadBalancer balancer, BiConsumer<SimpleLoadBalancer, DualReadModeProvider.DualReadMode> callback);

void updateWatcher(SimpleLoadBalancerState state, BiConsumer<SimpleLoadBalancerState, DualReadModeProvider.DualReadMode> callback);

void updateWatcher(String serviceName, String scheme, LoadBalancerStrategy strategy,
BiConsumer<LoadBalancerStrategy, DualReadModeProvider.DualReadMode> callback);

void updateWatcher(String clusterName, ClusterInfoItem clusterInfoItem,
BiConsumer<ClusterInfoItem, DualReadModeProvider.DualReadMode> callback);

void updateWatcher(String serviceName, LoadBalancerStateItem<ServiceProperties> serviceProperties,
BiConsumer<LoadBalancerStateItem<ServiceProperties>, DualReadModeProvider.DualReadMode> callback);

void updateWatcherForFileStoreUriProperties(FileStore<UriProperties> uriStore,
BiConsumer<FileStore<UriProperties>, DualReadModeProvider.DualReadMode> callback);

void updateWatcherForFileStoreClusterProperties(FileStore<ClusterProperties> clusterStore,
BiConsumer<FileStore<ClusterProperties>, DualReadModeProvider.DualReadMode> callback);

void updateWatcherForFileStoreServiceProperties(FileStore<ServiceProperties> serviceStore,
BiConsumer<FileStore<ServiceProperties>, DualReadModeProvider.DualReadMode> callback);

void removeWatcherForLoadBalancerStrategy(String serviceName, String scheme);

void removeWatcherForClusterInfoItem(String clusterName);

void removeWatcherForServiceProperties(String serviceName);


final class D2ClientJmxDualReadModeWatcher<T> implements DualReadStateManager.DualReadModeWatcher
{
private T _latestJmxProperty;
private final BiConsumer<T, DualReadModeProvider.DualReadMode> _callback;

D2ClientJmxDualReadModeWatcher(T initialJmxProperty, BiConsumer<T, DualReadModeProvider.DualReadMode>callback)
{
_latestJmxProperty = initialJmxProperty;
_callback = callback;
}

public T getLatestJmxProperty()
{
return _latestJmxProperty;
}

public void setLatestJmxProperty(T latestJmxProperty)
{
_latestJmxProperty = latestJmxProperty;
}

@Override
public void onChanged(@Nonnull DualReadModeProvider.DualReadMode mode)
{
_callback.accept(_latestJmxProperty, mode);
}
}
}
Loading
Loading