Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-1865]build a rule of relationship between table and resource group #3300

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions amoro-ams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,13 @@
<scope>test</scope>
</dependency>

<!-- check regexp whether intersect -->
<!-- https://mvnrepository.com/artifact/dk.brics/automaton -->
<dependency>
<groupId>dk.brics</groupId>
<artifactId>automaton</artifactId>
<version>1.12-4</version>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ public void createResourceGroup(ResourceGroup resourceGroup) {
maxPlanningParallelism);
optimizingQueueByGroup.put(resourceGroup.getName(), optimizingQueue);
});
tableService.notify(TableService.NotifyEvent.RESOURCE_GROUP_INSERT, resourceGroup);
}

@Override
Expand All @@ -327,6 +328,7 @@ public void deleteResourceGroup(String groupName) {
doAs(ResourceMapper.class, mapper -> mapper.deleteResourceGroup(groupName));
OptimizingQueue optimizingQueue = optimizingQueueByGroup.remove(groupName);
optimizingQueue.dispose();
tableService.notify(TableService.NotifyEvent.RESOURCE_GROUP_DELETE, groupName);
} else {
throw new RuntimeException(
String.format(
Expand All @@ -341,6 +343,7 @@ public void updateResourceGroup(ResourceGroup resourceGroup) {
Optional.ofNullable(optimizingQueueByGroup.get(resourceGroup.getName()))
.ifPresent(queue -> queue.updateOptimizerGroup(resourceGroup));
doAs(ResourceMapper.class, mapper -> mapper.updateResourceGroup(resourceGroup));
tableService.notify(TableService.NotifyEvent.RESOURCE_GROUP_UPDATE, resourceGroup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.amoro.server.dashboard.controller;

import static org.apache.amoro.resource.ResourceGroup.RULE_SEPARATOR;
import static org.apache.amoro.resource.ResourceGroup.validateRule;

import dk.brics.automaton.Automaton;
import dk.brics.automaton.RegExp;
import io.javalin.http.Context;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceGroup;
Expand All @@ -26,6 +31,7 @@
import org.apache.amoro.server.dashboard.model.OptimizerInstanceInfo;
import org.apache.amoro.server.dashboard.model.OptimizerResourceInfo;
import org.apache.amoro.server.dashboard.model.TableOptimizingInfo;
import org.apache.amoro.server.dashboard.response.ErrorResponse;
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.dashboard.response.PageResult;
import org.apache.amoro.server.dashboard.utils.OptimizingUtil;
Expand All @@ -36,6 +42,7 @@
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Objects;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand All @@ -61,6 +68,87 @@ public class OptimizerGroupController {
private final TableService tableService;
private final DefaultOptimizingService optimizerManager;

private List<String> getInvalidateRules(String rules) {
return Arrays.stream(rules.split(RULE_SEPARATOR))
.filter(rule -> !validateRule(rule))
.collect(Collectors.toList());
}

/**
* rules: catalog.db.table, catalog\.db\.table,
*
* @param item
* @return {{catalog, db, table}, {catalog,db, table}}
*/
private List<String> splitRuleNamespace(String item) {
item = item.trim();
String separator = ResourceGroup.getSpaceSeparator(item);
return Arrays.asList(item.split(separator));
}

/**
* check whether rules are overlap
*
* @param one
* @param other
* @return
*/
private boolean regExpIntersect(String one, String other) {
RegExp regExp1 = new RegExp(one);
RegExp regExp2 = new RegExp(other);
Automaton automaton1 = regExp1.toAutomaton();
Automaton automaton2 = regExp2.toAutomaton();
Automaton intersection = automaton1.intersection(automaton2);
return !intersection.isEmpty();
}

/**
* @param newRules
* @param other
* @return
*/
private List<String> groupRuleOverlap(String newRules, String otherRules, String otherGroupName) {
List<String> result = new ArrayList<>();
Arrays.stream(newRules.split(RULE_SEPARATOR))
.forEach(
newRule -> {
List<String> newRuleSpace = splitRuleNamespace(newRule);
Arrays.stream(otherRules.split(RULE_SEPARATOR))
.forEach(
otherRule -> {
List<String> otherRuleSpace = splitRuleNamespace(otherRule);
// if there have some intersection, we return false;
if (regExpIntersect(newRuleSpace.get(2), otherRuleSpace.get(2))
&& regExpIntersect(newRuleSpace.get(1), otherRuleSpace.get(1))
&& regExpIntersect(newRuleSpace.get(0), otherRuleSpace.get(0))) {
result.add(
String.format("%s -> %s:%s", newRule, otherGroupName, otherRule));
}
});
});
return result;
}

/**
* check the rule whether is conflict with some other group
*
* @param newRules
* @return
*/
private List<List<String>> groupRuleOverlap(String newRules, String newName) {
if (newRules == null) {
return new ArrayList<>();
}
List<List<String>> result =
optimizerManager.listResourceGroups().stream()
.filter(item -> !Objects.equal(item.getName(), newName))
.filter(item -> item.getOptimizeGroupRule() != null)
.map(item -> groupRuleOverlap(newRules, item.getOptimizeGroupRule(), item.getName()))
.filter(olItem -> olItem.size() > 0)
.collect(Collectors.toList());
return result;
}

public OptimizerGroupController(
TableService tableService, DefaultOptimizingService optimizerManager) {
this.tableService = tableService;
Expand Down Expand Up @@ -276,8 +364,21 @@ public void createResourceGroup(Context ctx) {
}
ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container);
builder.addProperties(properties);
optimizerManager.createResourceGroup(builder.build());
ctx.json(OkResponse.of("The optimizer group has been successfully created."));
ResourceGroup newGroup = builder.build();

String newRules = newGroup.getOptimizeGroupRule();
List<List<String>> intersectionMsg = new ArrayList<>();
// check whether the rules are conflict with other group
intersectionMsg = groupRuleOverlap(newRules, name);
if (intersectionMsg.size() > 0) {
StringBuilder sb = new StringBuilder();
sb.append("intersection rules:" + "\n");
intersectionMsg.stream().forEach(item -> sb.append(String.join(",", item)));
ctx.json(new ErrorResponse(sb.toString()));
} else {
optimizerManager.createResourceGroup(newGroup);
ctx.json(OkResponse.of("The optimizer group has been successfully created."));
}
}

/**
Expand All @@ -291,7 +392,22 @@ public void updateResourceGroup(Context ctx) {
Map<String, String> properties = (Map) map.get("properties");
ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container);
builder.addProperties(properties);
optimizerManager.updateResourceGroup(builder.build());
ResourceGroup newGroup = builder.build();
// check if the new rules is conflict with others
String oldRules = optimizerManager.getResourceGroup(name).getOptimizeGroupRule();
String newRules = newGroup.getOptimizeGroupRule();
if (newRules != null && !Objects.equal(oldRules, newRules)) {
// only check the rule updated
List<List<String>> intersectionMsg = groupRuleOverlap(newRules, name);
if (intersectionMsg.size() > 0) {
StringBuilder sb = new StringBuilder();
sb.append("intersection rules:" + "\n");
intersectionMsg.stream().forEach(item -> sb.append(String.join(",", item)));
ctx.json(new ErrorResponse(sb.toString()));
return;
}
}
optimizerManager.updateResourceGroup(newGroup);
ctx.json(OkResponse.of("The optimizer group has been successfully updated."));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.amoro.server.dashboard.controller;

import static org.apache.amoro.properties.CatalogMetaProperties.CATALOG_TYPE_HIVE;
import static org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_GROUP;

import io.javalin.http.Context;
import org.apache.amoro.Constants;
Expand Down Expand Up @@ -155,6 +156,12 @@ public void getTableDetail(Context ctx) {
if (tableRuntimeSummary != null) {
tableSummary.setHealthScore(tableRuntimeSummary.getHealthScore());
}
// show the optimizeGroup in tableRuntime.
serverTableMeta
.getProperties()
.put(
SELF_OPTIMIZING_GROUP,
tableService.getRuntime(serverTableIdentifier.get().getId()).getOptimizerGroup());
} else {
tableSummary.setOptimizingStatus(OptimizingStatus.IDLE.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public interface CatalogMetaMapper {
column = "catalog_properties",
typeHandler = Map2StringConverter.class)
})
List<CatalogMeta> getCatalog(@Param("catalogName") String catalogName);
CatalogMeta getCatalog(@Param("catalogName") String catalogName);

@Insert(
"INSERT INTO "
Expand Down
Loading
Loading