Skip to content

Commit

Permalink
Added resource manager for tracking and planning resource usage withi…
Browse files Browse the repository at this point in the history
…n and across queries
  • Loading branch information
Nitin-Kashyap committed Jul 26, 2022
1 parent b3bb549 commit 74624f1
Show file tree
Hide file tree
Showing 9 changed files with 435 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.prestosql.Session;
import io.prestosql.SystemSessionProperties;
import io.prestosql.cost.CostCalculator;
import io.prestosql.cost.PlanCostEstimate;
import io.prestosql.cost.StatsCalculator;
import io.prestosql.cube.CubeManager;
import io.prestosql.dynamicfilter.DynamicFilterService;
Expand All @@ -43,6 +44,8 @@
import io.prestosql.operator.ForScheduler;
import io.prestosql.query.CachedSqlQueryExecution;
import io.prestosql.query.CachedSqlQueryExecutionPlan;
import io.prestosql.resourcemanager.QueryResourceManager;
import io.prestosql.resourcemanager.QueryResourceManagerService;
import io.prestosql.security.AccessControl;
import io.prestosql.server.BasicQueryInfo;
import io.prestosql.snapshot.MarkerAnnouncer;
Expand All @@ -57,6 +60,7 @@
import io.prestosql.spi.connector.StandardWarningCode;
import io.prestosql.spi.metadata.TableHandle;
import io.prestosql.spi.plan.PlanNode;
import io.prestosql.spi.plan.PlanNodeId;
import io.prestosql.spi.plan.PlanNodeIdAllocator;
import io.prestosql.spi.plan.ProjectNode;
import io.prestosql.spi.plan.Symbol;
Expand Down Expand Up @@ -111,6 +115,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -173,6 +178,7 @@ public class SqlQueryExecution
private final QueryRecoveryManager queryRecoveryManager;
private final WarningCollector warningCollector;
private final AtomicBoolean suspendedWithRecoveryManager = new AtomicBoolean();
private final QueryResourceManager queryResourceManager;

public SqlQueryExecution(
PreparedQuery preparedQuery,
Expand Down Expand Up @@ -203,7 +209,8 @@ public SqlQueryExecution(
DynamicFilterService dynamicFilterService,
HeuristicIndexerManager heuristicIndexerManager,
StateStoreProvider stateStoreProvider,
RecoveryUtils recoveryUtils)
RecoveryUtils recoveryUtils,
QueryResourceManagerService queryResourceManager)
{
try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
this.slug = requireNonNull(slug, "slug is null");
Expand Down Expand Up @@ -235,6 +242,7 @@ public SqlQueryExecution(

this.stateMachine = requireNonNull(stateMachine, "stateMachine is null");
this.stateStoreProvider = requireNonNull(stateStoreProvider, "stateStoreProvider is null");
this.queryResourceManager = requireNonNull(queryResourceManager, "queryResourceManager is null").createQueryResourceManager(stateMachine.getQueryId(), stateMachine.getSession());

// clear dynamic filter tasks and data created for this query
stateMachine.addStateChangeListener(state -> {
Expand Down Expand Up @@ -473,6 +481,26 @@ private void handleCrossRegionDynamicFilter(PlanRoot plan)
log.debug("queryId=%s, add columnToSymbolMapping into hazelcast success.", queryId + QUERY_COLUMN_NAME_TO_SYMBOL_MAPPING);
}

private void setResourceLimitsFromEstimates(PlanNodeId rootId)
{
PlanCostEstimate estimate = queryPlan.get().getStatsAndCosts().getCosts().get(rootId);
queryResourceManager.setResourceLimit(new DataSize(estimate.getMaxMemory(), BYTE),
new Duration(estimate.getCpuCost(), TimeUnit.MILLISECONDS),
new DataSize(estimate.getNetworkCost(), BYTE));
}

private void updateQueryResourceStats()
{
SqlQueryScheduler scheduler = queryScheduler.get();
if (scheduler != null) {
Duration totalCpu = scheduler.getTotalCpuTime();
DataSize totalMem = DataSize.succinctBytes(scheduler.getTotalMemoryReservation());
DataSize totalIo = scheduler.getBasicStageStats().getInternalNetworkInputDataSize();

queryResourceManager.updateStats(totalCpu, totalMem, totalIo);
}
}

@Override
public void start()
{
Expand All @@ -486,6 +514,7 @@ public void start()

// analyze query
PlanRoot plan = analyzeQuery();
setResourceLimitsFromEstimates(plan.getRoot().getFragment().getRoot().getId());

try {
handleCrossRegionDynamicFilter(plan);
Expand Down Expand Up @@ -583,7 +612,7 @@ private SqlQueryScheduler createResumeScheduler(OptionalLong snapshotId, PlanRoo
queryRecoveryManager,
// Require same number of tasks to be scheduled, but do not require it if starting from beginning
snapshotId.isPresent() ? queryScheduler.get().getStageTaskCounts() : null,
true);
true, queryResourceManager);
if (snapshotId.isPresent() && snapshotId.getAsLong() != 0) {
// Restore going to happen first, mark the restore state for all stages
scheduler.setResuming(snapshotId.getAsLong());
Expand Down Expand Up @@ -850,7 +879,8 @@ private void planDistribution(PlanRoot plan)
snapshotManager,
queryRecoveryManager,
null,
false);
false,
queryResourceManager);

queryScheduler.set(scheduler);

Expand Down Expand Up @@ -1085,6 +1115,8 @@ public static class SqlQueryExecutionFactory
private final StateStoreProvider stateStoreProvider;
private final RecoveryUtils recoveryUtils;

private final QueryResourceManagerService queryResourceManagerService;

@Inject
SqlQueryExecutionFactory(QueryManagerConfig config,
HetuConfig hetuConfig,
Expand All @@ -1111,7 +1143,8 @@ public static class SqlQueryExecutionFactory
DynamicFilterService dynamicFilterService,
HeuristicIndexerManager heuristicIndexerManager,
StateStoreProvider stateStoreProvider,
RecoveryUtils recoveryUtils)
RecoveryUtils recoveryUtils,
QueryResourceManagerService queryResourceManagerService)
{
requireNonNull(config, "config is null");
this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null");
Expand Down Expand Up @@ -1139,6 +1172,7 @@ public static class SqlQueryExecutionFactory
this.heuristicIndexerManager = requireNonNull(heuristicIndexerManager, "heuristicIndexerManager is null");
this.stateStoreProvider = requireNonNull(stateStoreProvider, "stateStoreProvider is null");
this.recoveryUtils = requireNonNull(recoveryUtils, "recoveryUtils is null");
this.queryResourceManagerService = requireNonNull(queryResourceManagerService, "queryResourceManagerService is null");
this.loadConfigToService(hetuConfig);
if (hetuConfig.isExecutionPlanCacheEnabled()) {
this.cache = Optional.of(CacheBuilder.newBuilder()
Expand Down Expand Up @@ -1199,7 +1233,8 @@ public QueryExecution createQueryExecution(
this.cache,
heuristicIndexerManager,
stateStoreProvider,
recoveryUtils);
recoveryUtils,
queryResourceManagerService);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airlift.concurrent.SetThreadName;
import io.airlift.log.Logger;
import io.airlift.stats.TimeStat;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.prestosql.Session;
import io.prestosql.SystemSessionProperties;
Expand All @@ -46,6 +47,7 @@
import io.prestosql.heuristicindex.HeuristicIndexerManager;
import io.prestosql.metadata.InternalNode;
import io.prestosql.operator.TaskLocation;
import io.prestosql.resourcemanager.QueryResourceManager;
import io.prestosql.server.ResourceGroupInfo;
import io.prestosql.snapshot.QueryRecoveryManager;
import io.prestosql.snapshot.QuerySnapshotManager;
Expand Down Expand Up @@ -162,6 +164,8 @@ public class SqlQueryScheduler
private final QueryRecoveryManager queryRecoveryManager;
private final Map<PlanNodeId, FixedNodeScheduleData> feederScheduledNodes = new ConcurrentHashMap<>();

private final QueryResourceManager queryResourceManager;

public static SqlQueryScheduler createSqlQueryScheduler(
QueryStateMachine queryStateMachine,
LocationFactory locationFactory,
Expand All @@ -184,7 +188,8 @@ public static SqlQueryScheduler createSqlQueryScheduler(
QuerySnapshotManager snapshotManager,
QueryRecoveryManager queryRecoveryManager,
Map<StageId, Integer> stageTaskCounts,
boolean isResume)
boolean isResume,
QueryResourceManager queryResourceManager)
{
SqlQueryScheduler sqlQueryScheduler = new SqlQueryScheduler(
queryStateMachine,
Expand All @@ -208,7 +213,8 @@ public static SqlQueryScheduler createSqlQueryScheduler(
snapshotManager,
queryRecoveryManager,
stageTaskCounts,
isResume);
isResume,
queryResourceManager);
sqlQueryScheduler.initialize();
return sqlQueryScheduler;
}
Expand All @@ -235,7 +241,8 @@ private SqlQueryScheduler(
QuerySnapshotManager snapshotManager,
QueryRecoveryManager queryRecoveryManager,
Map<StageId, Integer> stageTaskCounts,
boolean isResumeScheduler)
boolean isResumeScheduler,
QueryResourceManager queryResourceManager)
{
this.queryStateMachine = requireNonNull(queryStateMachine, "queryStateMachine is null");
this.executionPolicy = requireNonNull(executionPolicy, "schedulerPolicyFactory is null");
Expand All @@ -246,6 +253,7 @@ private SqlQueryScheduler(

this.snapshotManager = snapshotManager;
this.queryRecoveryManager = queryRecoveryManager;
this.queryResourceManager = queryResourceManager;
if (SystemSessionProperties.isRecoveryEnabled(session)) {
queryRecoveryManager.setCancelToResumeCb(this::cancelToResume);
}
Expand Down Expand Up @@ -733,6 +741,8 @@ private boolean canScheduleMoreSplits()
{
long cachedMemoryUsage = queryStateMachine.getResourceGroupManager().getCachedMemoryUsage(queryStateMachine.getResourceGroup());
long softReservedMemory = queryStateMachine.getResourceGroupManager().getSoftReservedMemory(queryStateMachine.getResourceGroup());

/* Todo(Nitin K) replace with smart scheduler and pull the resource limit from QueryResourceManager/Service */
if (cachedMemoryUsage < softReservedMemory) {
return true;
}
Expand All @@ -754,6 +764,15 @@ private int getOptimalSmallSplitGroupSize()
return SPLIT_GROUP_GRADATION[result];
}

private void updateQueryResourceStats()
{
Duration totalCpu = getTotalCpuTime();
DataSize totalMem = DataSize.succinctBytes(getTotalMemoryReservation());
DataSize totalIo = getBasicStageStats().getInternalNetworkInputDataSize();

queryResourceManager.updateStats(totalCpu, totalMem, totalIo);
}

private void schedule()
{
try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
Expand Down Expand Up @@ -787,6 +806,8 @@ private void schedule()
currentTimerLevel = 0;
}

updateQueryResourceStats();

// perform some scheduling work
/* Get groupSize specification from the ResourceGroupManager */
int maxSplitGroupSize = getOptimalSmallSplitGroupSize();
Expand Down Expand Up @@ -832,6 +853,8 @@ else if (!result.getBlocked().isDone()) {
}
}

updateQueryResourceStats();

// wait for a state change and then schedule again
if (!blockedStages.isEmpty()) {
try (TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.prestosql.failuredetector.FailureDetector;
import io.prestosql.heuristicindex.HeuristicIndexerManager;
import io.prestosql.metadata.Metadata;
import io.prestosql.resourcemanager.QueryResourceManagerService;
import io.prestosql.security.AccessControl;
import io.prestosql.snapshot.RecoveryUtils;
import io.prestosql.spi.PrestoException;
Expand Down Expand Up @@ -111,20 +112,21 @@ public class CachedSqlQueryExecution
private final BeginTableWrite beginTableWrite;

public CachedSqlQueryExecution(QueryPreparer.PreparedQuery preparedQuery, QueryStateMachine stateMachine,
String slug, Metadata metadata, CubeManager cubeManager, AccessControl accessControl, SqlParser sqlParser, SplitManager splitManager,
NodePartitioningManager nodePartitioningManager, NodeScheduler nodeScheduler,
List<PlanOptimizer> planOptimizers, PlanFragmenter planFragmenter, RemoteTaskFactory remoteTaskFactory,
LocationFactory locationFactory, int scheduleSplitBatchSize, ExecutorService queryExecutor,
ScheduledExecutorService schedulerExecutor, FailureDetector failureDetector, NodeTaskMap nodeTaskMap,
QueryExplainer queryExplainer, ExecutionPolicy executionPolicy, SplitSchedulerStats schedulerStats,
StatsCalculator statsCalculator, CostCalculator costCalculator, WarningCollector warningCollector,
DynamicFilterService dynamicFilterService, Optional<Cache<Integer, CachedSqlQueryExecutionPlan>> cache,
HeuristicIndexerManager heuristicIndexerManager, StateStoreProvider stateStoreProvider, RecoveryUtils recoveryUtils)
String slug, Metadata metadata, CubeManager cubeManager, AccessControl accessControl, SqlParser sqlParser, SplitManager splitManager,
NodePartitioningManager nodePartitioningManager, NodeScheduler nodeScheduler,
List<PlanOptimizer> planOptimizers, PlanFragmenter planFragmenter, RemoteTaskFactory remoteTaskFactory,
LocationFactory locationFactory, int scheduleSplitBatchSize, ExecutorService queryExecutor,
ScheduledExecutorService schedulerExecutor, FailureDetector failureDetector, NodeTaskMap nodeTaskMap,
QueryExplainer queryExplainer, ExecutionPolicy executionPolicy, SplitSchedulerStats schedulerStats,
StatsCalculator statsCalculator, CostCalculator costCalculator, WarningCollector warningCollector,
DynamicFilterService dynamicFilterService, Optional<Cache<Integer, CachedSqlQueryExecutionPlan>> cache,
HeuristicIndexerManager heuristicIndexerManager, StateStoreProvider stateStoreProvider, RecoveryUtils recoveryUtils,
QueryResourceManagerService queryResourceManager)
{
super(preparedQuery, stateMachine, slug, metadata, cubeManager, accessControl, sqlParser, splitManager,
nodePartitioningManager, nodeScheduler, planOptimizers, planFragmenter, remoteTaskFactory, locationFactory,
scheduleSplitBatchSize, queryExecutor, schedulerExecutor, failureDetector, nodeTaskMap, queryExplainer,
executionPolicy, schedulerStats, statsCalculator, costCalculator, warningCollector, dynamicFilterService, heuristicIndexerManager, stateStoreProvider, recoveryUtils);
executionPolicy, schedulerStats, statsCalculator, costCalculator, warningCollector, dynamicFilterService, heuristicIndexerManager, stateStoreProvider, recoveryUtils, queryResourceManager);
this.cache = cache;
this.beginTableWrite = new BeginTableWrite(metadata);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.resourcemanager;

import io.airlift.units.DataSize;
import io.airlift.units.Duration;

public class BasicResourceStats
{
public Duration cpuTime;
public DataSize memCurrent;
public DataSize ioCurrent;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.resourcemanager;

import javax.inject.Qualifier;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForResourceMonitor
{
}
Loading

0 comments on commit 74624f1

Please sign in to comment.