Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/origin/tars-java-dev'
Browse files Browse the repository at this point in the history
# Conflicts:
#	docs/README.md
  • Loading branch information
LawlietLi committed Dec 1, 2017
1 parent e76328b commit 787dae1
Show file tree
Hide file tree
Showing 35 changed files with 962 additions and 37 deletions.
2 changes: 1 addition & 1 deletion java/core/client.pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>qq-cloud-central</groupId>
<artifactId>tars-parent</artifactId>
<version>1.0.2</version>
<version>1.0.3</version>
</parent>
<artifactId>tars-client</artifactId>
<name>${project.artifactId}</name>
Expand Down
2 changes: 1 addition & 1 deletion java/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>qq-cloud-central</groupId>
<artifactId>tars-parent</artifactId>
<version>1.0.2</version>
<version>1.0.3</version>
</parent>
<artifactId>tars-core</artifactId>
<name>${project.artifactId}</name>
Expand Down
2 changes: 1 addition & 1 deletion java/core/server.pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>qq-cloud-central</groupId>
<artifactId>tars-parent</artifactId>
<version>1.0.2</version>
<version>1.0.3</version>
</parent>
<artifactId>tars-server</artifactId>
<name>${project.artifactId}</name>
Expand Down
4 changes: 2 additions & 2 deletions java/core/src/main/java/com/qq/tars/client/Communicator.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ public <T> T stringToProxy(Class<T> clazz, ServantProxyConfig servantProxyConfig
return stringToProxy(clazz, servantProxyConfig, null);
}

public <T> T stringToProxy(Class<T> clazz, ServantProxyConfig servantProxyConfig, LoadBalance loadBalance) throws CommunicatorConfigException {
public <T> T stringToProxy(Class<T> clazz, ServantProxyConfig servantProxyConfig, LoadBalance<T> loadBalance) throws CommunicatorConfigException {
return stringToProxy(clazz, servantProxyConfig.getObjectName(), servantProxyConfig, loadBalance, null);
}

@SuppressWarnings("unchecked")
private <T> T stringToProxy(Class<T> clazz, String objName, ServantProxyConfig servantProxyConfig,
LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker) throws CommunicatorConfigException {
LoadBalance<T> loadBalance, ProtocolInvoker<T> protocolInvoker) throws CommunicatorConfigException {
if (!inited.get()) {
throw new CommunicatorConfigException("communicator uninitialized!");
}
Expand Down
15 changes: 11 additions & 4 deletions java/core/src/main/java/com/qq/tars/client/ObjectProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public final class ObjectProxy<T> implements ServantProxy, InvocationHandler {
private ScheduledFuture<?> statReportFuture;
private ScheduledFuture<?> queryRefreshFuture;

private final Object refreshLock = new Object();

private final Random random = new Random(System.currentTimeMillis() / 1000);

public ObjectProxy(Class<T> api, String objName, ServantProxyConfig servantProxyConfig, LoadBalance loadBalance,
Expand Down Expand Up @@ -89,7 +91,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
return null;
}

Invoker<T> invoker = loadBalancer.select(protocolInvoker.getInvokers(), context);
Invoker invoker = loadBalancer.select(context);
return invoker.invoke(context);
} catch (Throwable e) {
if (ClientLogger.getLogger().isDebugEnabled()) {
Expand All @@ -111,9 +113,12 @@ public String getObjectName() {
}

public void refresh() {
registryStatReproter();
registryServantNodeRefresher();
protocolInvoker.refresh();
synchronized (refreshLock) {
registryStatReproter();
registryServantNodeRefresher();
protocolInvoker.refresh();
loadBalancer.refresh(protocolInvoker.getInvokers());
}
}

public void destroy() {
Expand All @@ -127,6 +132,8 @@ public ServantProxyConfig getConfig() {
}

private void initialize() {
loadBalancer.refresh(protocolInvoker.getInvokers());

if (StringUtils.isNotEmpty(this.servantProxyConfig.getLocator()) && !StringUtils.isEmpty(this.servantProxyConfig.getStat())) {
this.registryStatReproter();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

import java.lang.reflect.Constructor;

import com.qq.tars.client.cluster.DefaultLoadBalance;

import com.qq.tars.client.rpc.loadbalance.DefaultLoadBalance;
import com.qq.tars.client.rpc.tars.TarsProtocolInvoker;
import com.qq.tars.client.support.ServantCacheManager;
import com.qq.tars.client.util.ClientLogger;
Expand All @@ -42,7 +43,7 @@ public ObjectProxyFactory(Communicator communicator) {
}

public <T> ObjectProxy<T> getObjectProxy(Class<T> api, String objName, ServantProxyConfig servantProxyConfig,
LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker) throws ClientException {
LoadBalance<T> loadBalance, ProtocolInvoker<T> protocolInvoker) throws ClientException {
if (servantProxyConfig == null) {
servantProxyConfig = createServantProxyConfig(objName);
} else {
Expand Down Expand Up @@ -77,8 +78,8 @@ private <T> ProtocolInvoker<T> createProtocolInvoker(Class<T> api, String objNam
return protocolInvoker;
}

private LoadBalance createLoadBalance(ServantProxyConfig servantProxyConfig) {
return new DefaultLoadBalance(servantProxyConfig);
private <T> LoadBalance<T> createLoadBalance(ServantProxyConfig servantProxyConfig) {
return new DefaultLoadBalance<T>(servantProxyConfig);
}

private <T> Codec createCodec(Class<T> api, ServantProxyConfig servantProxyConfig) throws ClientException {
Expand Down
30 changes: 30 additions & 0 deletions java/core/src/main/java/com/qq/tars/client/ServantProxyConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public final class ServantProxyConfig {

private boolean directConnection = false;

private int minStaticWeightLimit = 10;
private int maxStaticWeightLimit = 100;

private int defaultConHashVirtualNodes = 100;

public ServantProxyConfig(String objectName) {
this(null, null, objectName);
}
Expand Down Expand Up @@ -269,6 +274,31 @@ public void setReportInterval(int reportInterval) {
this.reportInterval = reportInterval;
}

public int getMaxStaticWeightLimit() {
return maxStaticWeightLimit;
}

public void setMaxStaticWeightLimit(int maxStaticWeightLimit) {
this.maxStaticWeightLimit = maxStaticWeightLimit;
}


public int getMinStaticWeightLimit() {
return minStaticWeightLimit;
}

public void setMinStaticWeightLimit(int minStaticWeightLimit) {
this.minStaticWeightLimit = minStaticWeightLimit;
}

public int getDefaultConHashVirtualNodes() {
return defaultConHashVirtualNodes;
}

public void setDefaultConHashVirtualNodes(int defaultConHashVirtualNodes) {
this.defaultConHashVirtualNodes = defaultConHashVirtualNodes;
}

@Override
public int hashCode() {
final int prime = 31;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
import com.qq.tars.rpc.common.LoadBalance;
import com.qq.tars.rpc.common.exc.NoInvokerException;

public class DefaultLoadBalance implements LoadBalance {
@Deprecated
public class DefaultLoadBalance {

private final AtomicInteger sequence = new AtomicInteger();
private volatile ServantProxyConfig config;
Expand Down
13 changes: 13 additions & 0 deletions java/core/src/main/java/com/qq/tars/client/util/ParseTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ private static Url parse(String objectName, String content, ServantProxyConfig c

int active = 1;
String setDivision = null;
String enableAuth = "0";
int weightType = 0;
int weight = 0;
for (int i = 0; i < items.length; i++) {
if (items[i].equals("-h")) {
host = items[i + 1];
Expand All @@ -82,6 +85,12 @@ private static Url parse(String objectName, String content, ServantProxyConfig c
active = Integer.parseInt(items[i + 1]);
} else if (items[i].equals("-s")) {
setDivision = items[i + 1];
} else if (items[i].equals("-e")) {
enableAuth = items[i + 1];
} else if (items[i].equals("-v")) {
weightType = Integer.parseInt(items[i + 1]);
} else if (items[i].equals("-w")) {
weight = Integer.parseInt(items[i + 1]);
}
}
if (StringUtils.isEmpty(host) || port == -1) {
Expand All @@ -98,6 +107,10 @@ private static Url parse(String objectName, String content, ServantProxyConfig c
parameters.put(Constants.TARS_CLIENT_UDPMODE, Boolean.toString(items[0].toLowerCase().equals("udp")));
parameters.put(Constants.TARS_CLIENT_TCPNODELAY, Boolean.toString(conf.isTcpNoDelay()));
parameters.put(Constants.TARS_CLIENT_CHARSETNAME, conf.getCharsetName());
parameters.put(Constants.TARS_CLIENT_ENABLEAUTH, enableAuth);
parameters.put(Constants.TARS_CLIENT_WEIGHT_TYPE, String.valueOf(weightType));
parameters.put(Constants.TARS_CLIENT_WEIGHT, String.valueOf(weight));

return new Url(conf.getProtocol(), host, port, objectName, parameters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,14 @@ public interface Constants {
String TARS_CLIENT_CHARSETNAME = "charsetName";

String TARS_HASH = "tars_hash";
String TARS_CONSISTENT_HASH = "taf_consistent_hash";

String TARS_TUP_CLIENT = "tup_client";
String TARS_ONE_WAY_CLIENT = "one_way_client";
String TARS_NOT_CLIENT = "not_tars_client";
String TARS_CLIENT_ENABLEAUTH = "enableAuth";

String TARS_CLIENT_WEIGHT_TYPE = "weightType";
String TARS_CLIENT_WEIGHT = "weight";
String TARS_CLIENT_GRAYFLAG = "taf.framework.GrayFlag";
}
17 changes: 15 additions & 2 deletions java/core/src/main/java/com/qq/tars/rpc/common/LoadBalance.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,20 @@

import com.qq.tars.rpc.common.exc.NoInvokerException;

public interface LoadBalance {
public interface LoadBalance<T> {

<T> Invoker<T> select(Collection<Invoker<T>> invokers, InvokeContext context) throws NoInvokerException;
/**
* 根据负载均衡策略,挑选invoker
*
* @param invokeContext
* @return
* @throws NoInvokerException
*/
Invoker<T> select(InvokeContext invokeContext) throws NoInvokerException;

/**
* 通知invoker列表的更新
* @param invokers
*/
void refresh(Collection<Invoker<T>> invokers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public final class ServerVersion {

public static final String major = "1";
public static final String minor = "0";
public static final String build = "1";
public static final String build = "3";

public static String getVersion() {
return major + "." + minor + "." + build;
Expand Down
122 changes: 122 additions & 0 deletions java/core/src/main/java/com/qq/tars/server/apps/BaseAppContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package com.qq.tars.server.apps;

import com.qq.tars.rpc.protocol.ext.ExtendedServant;
import com.qq.tars.rpc.protocol.tars.support.AnalystManager;
import com.qq.tars.server.config.ConfigurationManager;
import com.qq.tars.server.config.ServantAdapterConfig;
import com.qq.tars.server.config.ServerConfig;
import com.qq.tars.server.core.*;
import com.qq.tars.server.core.AppContext;
import com.qq.tars.support.admin.AdminFServant;
import com.qq.tars.support.admin.impl.AdminFServantImpl;
import com.qq.tars.support.om.OmConstants;

import java.io.File;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public abstract class BaseAppContext implements AppContext {
protected File path = null;
boolean ready = true;

ConcurrentHashMap<String, ServantHomeSkeleton> skeletonMap = new ConcurrentHashMap<String, ServantHomeSkeleton>();
ConcurrentHashMap<String, Adapter> servantAdapterMap = new ConcurrentHashMap<String, Adapter>();

HashMap<String, String> contextParams = new HashMap<String, String>();

Set<AppContextListener> listeners = new HashSet<AppContextListener>(4);

BaseAppContext(File path) {
this.path = path;
}

void injectAdminServant() {
try {
String skeletonName = OmConstants.AdminServant;
ServantHomeSkeleton skeleton = new ServantHomeSkeleton(skeletonName, new AdminFServantImpl(), AdminFServant.class, null, null, -1);
skeleton.setAppContext(this);

ServerConfig serverCfg = ConfigurationManager.getInstance().getServerConfig();
ServantAdapterConfig config = serverCfg.getServantAdapterConfMap().get(OmConstants.AdminServant);
ServantAdapter servantAdapter = new ServantAdapter(config);
servantAdapter.bind(skeleton);
servantAdapterMap.put(skeletonName, servantAdapter);

skeletonMap.put(skeletonName, skeleton);
} catch (Exception e) {
System.err.println("init om service failed:context=[]");
e.printStackTrace();
}
}

void appServantStarted(AppService appService) {
for (AppContextListener listener : listeners) {
listener.appServantStarted(new DefaultAppServantEvent(appService));
}
}

void initServants() {
for (String skeletonName : skeletonMap.keySet()) {
ServantHomeSkeleton skeleton = skeletonMap.get(skeletonName);
Class<?> api = skeleton.getApiClass();
try {
if (api.isAssignableFrom(ExtendedServant.class)) {
continue;
}
AnalystManager.getInstance().registry(name(), api, skeleton.name());
} catch (Exception e) {
System.err.println("app[] init servant[" + api.getName() + "] failed");
e.printStackTrace();
}
}
}

void appContextStarted() {
for (AppContextListener listener : listeners) {
listener.appContextStarted(new DefaultAppContextEvent(this));
}
}

@Override
public String getInitParameter(String name) {
return contextParams.get(name);
}

@Override
public String name() {
return "";
}

@Override
public void stop() {
for (Adapter servantAdapter : servantAdapterMap.values()) {
servantAdapter.stop();
}
}

@Override
public ServantHomeSkeleton getCapHomeSkeleton(String homeName) {
if (!ready) {
throw new RuntimeException("The application isn't started.");
}
return skeletonMap.get(homeName);
}
}
Loading

0 comments on commit 787dae1

Please sign in to comment.