From b820f843dd2045be0fd7d5eb29d818b146739421 Mon Sep 17 00:00:00 2001 From: Nitin Kashyap Date: Tue, 26 Jul 2022 00:14:35 +0530 Subject: [PATCH] Added resource manager for tracking and planning resource usage within and across queries --- .../execution/SqlQueryExecution.java | 56 +++++- .../scheduler/SqlQueryScheduler.java | 29 ++- .../query/CachedSqlQueryExecution.java | 22 ++- .../resourcemanager/BasicResourceStats.java | 76 ++++++++ .../resourcemanager/ForResourceMonitor.java | 32 ++++ .../resourcemanager/QueryResourceManager.java | 180 ++++++++++++++++++ .../QueryResourceManagerService.java | 83 ++++++++ .../resourcemanager/ResourceManager.java | 32 ++++ .../prestosql/server/CoordinatorModule.java | 7 + .../TestQueryResourceManager.java | 166 ++++++++++++++++ 10 files changed, 665 insertions(+), 18 deletions(-) create mode 100644 presto-main/src/main/java/io/prestosql/resourcemanager/BasicResourceStats.java create mode 100644 presto-main/src/main/java/io/prestosql/resourcemanager/ForResourceMonitor.java create mode 100644 presto-main/src/main/java/io/prestosql/resourcemanager/QueryResourceManager.java create mode 100644 presto-main/src/main/java/io/prestosql/resourcemanager/QueryResourceManagerService.java create mode 100644 presto-main/src/main/java/io/prestosql/resourcemanager/ResourceManager.java create mode 100644 presto-main/src/test/java/io/prestosql/resourcemanager/TestQueryResourceManager.java diff --git a/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java b/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java index 542c213de..ceafaf0d4 100644 --- a/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java +++ b/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java @@ -24,6 +24,7 @@ import io.prestosql.Session; import io.prestosql.SystemSessionProperties; import io.prestosql.cost.CostCalculator; +import io.prestosql.cost.PlanCostEstimate; import io.prestosql.cost.StatsCalculator; import io.prestosql.cube.CubeManager; import io.prestosql.dynamicfilter.DynamicFilterService; @@ -43,6 +44,8 @@ import io.prestosql.operator.ForScheduler; import io.prestosql.query.CachedSqlQueryExecution; import io.prestosql.query.CachedSqlQueryExecutionPlan; +import io.prestosql.resourcemanager.QueryResourceManager; +import io.prestosql.resourcemanager.QueryResourceManagerService; import io.prestosql.security.AccessControl; import io.prestosql.server.BasicQueryInfo; import io.prestosql.snapshot.MarkerAnnouncer; @@ -57,6 +60,7 @@ import io.prestosql.spi.connector.StandardWarningCode; import io.prestosql.spi.metadata.TableHandle; import io.prestosql.spi.plan.PlanNode; +import io.prestosql.spi.plan.PlanNodeId; import io.prestosql.spi.plan.PlanNodeIdAllocator; import io.prestosql.spi.plan.ProjectNode; import io.prestosql.spi.plan.Symbol; @@ -111,6 +115,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -173,6 +178,7 @@ public class SqlQueryExecution private final QueryRecoveryManager queryRecoveryManager; private final WarningCollector warningCollector; private final AtomicBoolean suspendedWithRecoveryManager = new AtomicBoolean(); + private final QueryResourceManager queryResourceManager; public SqlQueryExecution( PreparedQuery preparedQuery, @@ -203,7 +209,8 @@ public SqlQueryExecution( DynamicFilterService dynamicFilterService, HeuristicIndexerManager heuristicIndexerManager, StateStoreProvider stateStoreProvider, - RecoveryUtils recoveryUtils) + RecoveryUtils recoveryUtils, + QueryResourceManagerService queryResourceManager) { try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) { this.slug = requireNonNull(slug, "slug is null"); @@ -235,6 +242,7 @@ public SqlQueryExecution( this.stateMachine = requireNonNull(stateMachine, "stateMachine is null"); this.stateStoreProvider = requireNonNull(stateStoreProvider, "stateStoreProvider is null"); + this.queryResourceManager = requireNonNull(queryResourceManager, "queryResourceManager is null").createQueryResourceManager(stateMachine.getQueryId(), stateMachine.getSession()); // clear dynamic filter tasks and data created for this query stateMachine.addStateChangeListener(state -> { @@ -473,6 +481,37 @@ private void handleCrossRegionDynamicFilter(PlanRoot plan) log.debug("queryId=%s, add columnToSymbolMapping into hazelcast success.", queryId + QUERY_COLUMN_NAME_TO_SYMBOL_MAPPING); } + private void setResourceLimitsFromEstimates(PlanNodeId rootId) + { + PlanCostEstimate estimate = queryPlan.get().getStatsAndCosts().getCosts().get(rootId); + if (estimate != null && estimate != PlanCostEstimate.zero() && estimate != PlanCostEstimate.unknown()) { + double cpuCost = estimate.getCpuCost(); + cpuCost = Double.isInfinite(cpuCost) || Double.isNaN(cpuCost) ? 0.0 : cpuCost; + + double memCost = estimate.getMaxMemory(); + memCost = Double.isInfinite(memCost) || Double.isNaN(memCost) ? 0.0 : memCost; + + double ioCost = estimate.getNetworkCost(); + ioCost = Double.isInfinite(ioCost) || Double.isNaN(ioCost) ? 0.0 : ioCost; + + queryResourceManager.setResourceLimit(new DataSize(memCost, BYTE), + new Duration(cpuCost, TimeUnit.MILLISECONDS), + new DataSize(ioCost, BYTE)); + } + } + + private void updateQueryResourceStats() + { + SqlQueryScheduler scheduler = queryScheduler.get(); + if (scheduler != null) { + Duration totalCpu = scheduler.getTotalCpuTime(); + DataSize totalMem = DataSize.succinctBytes(scheduler.getTotalMemoryReservation()); + DataSize totalIo = scheduler.getBasicStageStats().getInternalNetworkInputDataSize(); + + queryResourceManager.updateStats(totalCpu, totalMem, totalIo); + } + } + @Override public void start() { @@ -486,6 +525,7 @@ public void start() // analyze query PlanRoot plan = analyzeQuery(); + setResourceLimitsFromEstimates(plan.getRoot().getFragment().getRoot().getId()); try { handleCrossRegionDynamicFilter(plan); @@ -583,7 +623,7 @@ private SqlQueryScheduler createResumeScheduler(OptionalLong snapshotId, PlanRoo queryRecoveryManager, // Require same number of tasks to be scheduled, but do not require it if starting from beginning snapshotId.isPresent() ? queryScheduler.get().getStageTaskCounts() : null, - true); + true, queryResourceManager); if (snapshotId.isPresent() && snapshotId.getAsLong() != 0) { // Restore going to happen first, mark the restore state for all stages scheduler.setResuming(snapshotId.getAsLong()); @@ -850,7 +890,8 @@ private void planDistribution(PlanRoot plan) snapshotManager, queryRecoveryManager, null, - false); + false, + queryResourceManager); queryScheduler.set(scheduler); @@ -1085,6 +1126,8 @@ public static class SqlQueryExecutionFactory private final StateStoreProvider stateStoreProvider; private final RecoveryUtils recoveryUtils; + private final QueryResourceManagerService queryResourceManagerService; + @Inject SqlQueryExecutionFactory(QueryManagerConfig config, HetuConfig hetuConfig, @@ -1111,7 +1154,8 @@ public static class SqlQueryExecutionFactory DynamicFilterService dynamicFilterService, HeuristicIndexerManager heuristicIndexerManager, StateStoreProvider stateStoreProvider, - RecoveryUtils recoveryUtils) + RecoveryUtils recoveryUtils, + QueryResourceManagerService queryResourceManagerService) { requireNonNull(config, "config is null"); this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null"); @@ -1139,6 +1183,7 @@ public static class SqlQueryExecutionFactory this.heuristicIndexerManager = requireNonNull(heuristicIndexerManager, "heuristicIndexerManager is null"); this.stateStoreProvider = requireNonNull(stateStoreProvider, "stateStoreProvider is null"); this.recoveryUtils = requireNonNull(recoveryUtils, "recoveryUtils is null"); + this.queryResourceManagerService = requireNonNull(queryResourceManagerService, "queryResourceManagerService is null"); this.loadConfigToService(hetuConfig); if (hetuConfig.isExecutionPlanCacheEnabled()) { this.cache = Optional.of(CacheBuilder.newBuilder() @@ -1199,7 +1244,8 @@ public QueryExecution createQueryExecution( this.cache, heuristicIndexerManager, stateStoreProvider, - recoveryUtils); + recoveryUtils, + queryResourceManagerService); } } } diff --git a/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java b/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java index cc84ac6c1..5a8e43c41 100644 --- a/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java +++ b/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java @@ -23,6 +23,7 @@ import io.airlift.concurrent.SetThreadName; import io.airlift.log.Logger; import io.airlift.stats.TimeStat; +import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.prestosql.Session; import io.prestosql.SystemSessionProperties; @@ -46,6 +47,7 @@ import io.prestosql.heuristicindex.HeuristicIndexerManager; import io.prestosql.metadata.InternalNode; import io.prestosql.operator.TaskLocation; +import io.prestosql.resourcemanager.QueryResourceManager; import io.prestosql.server.ResourceGroupInfo; import io.prestosql.snapshot.QueryRecoveryManager; import io.prestosql.snapshot.QuerySnapshotManager; @@ -162,6 +164,8 @@ public class SqlQueryScheduler private final QueryRecoveryManager queryRecoveryManager; private final Map feederScheduledNodes = new ConcurrentHashMap<>(); + private final QueryResourceManager queryResourceManager; + public static SqlQueryScheduler createSqlQueryScheduler( QueryStateMachine queryStateMachine, LocationFactory locationFactory, @@ -184,7 +188,8 @@ public static SqlQueryScheduler createSqlQueryScheduler( QuerySnapshotManager snapshotManager, QueryRecoveryManager queryRecoveryManager, Map stageTaskCounts, - boolean isResume) + boolean isResume, + QueryResourceManager queryResourceManager) { SqlQueryScheduler sqlQueryScheduler = new SqlQueryScheduler( queryStateMachine, @@ -208,7 +213,8 @@ public static SqlQueryScheduler createSqlQueryScheduler( snapshotManager, queryRecoveryManager, stageTaskCounts, - isResume); + isResume, + queryResourceManager); sqlQueryScheduler.initialize(); return sqlQueryScheduler; } @@ -235,7 +241,8 @@ private SqlQueryScheduler( QuerySnapshotManager snapshotManager, QueryRecoveryManager queryRecoveryManager, Map stageTaskCounts, - boolean isResumeScheduler) + boolean isResumeScheduler, + QueryResourceManager queryResourceManager) { this.queryStateMachine = requireNonNull(queryStateMachine, "queryStateMachine is null"); this.executionPolicy = requireNonNull(executionPolicy, "schedulerPolicyFactory is null"); @@ -246,6 +253,7 @@ private SqlQueryScheduler( this.snapshotManager = snapshotManager; this.queryRecoveryManager = queryRecoveryManager; + this.queryResourceManager = queryResourceManager; if (SystemSessionProperties.isRecoveryEnabled(session)) { queryRecoveryManager.setCancelToResumeCb(this::cancelToResume); } @@ -733,6 +741,8 @@ private boolean canScheduleMoreSplits() { long cachedMemoryUsage = queryStateMachine.getResourceGroupManager().getCachedMemoryUsage(queryStateMachine.getResourceGroup()); long softReservedMemory = queryStateMachine.getResourceGroupManager().getSoftReservedMemory(queryStateMachine.getResourceGroup()); + + /* Todo(Nitin K) replace with smart scheduler and pull the resource limit from QueryResourceManager/Service */ if (cachedMemoryUsage < softReservedMemory) { return true; } @@ -754,6 +764,15 @@ private int getOptimalSmallSplitGroupSize() return SPLIT_GROUP_GRADATION[result]; } + private void updateQueryResourceStats() + { + Duration totalCpu = getTotalCpuTime(); + DataSize totalMem = DataSize.succinctBytes(getTotalMemoryReservation()); + DataSize totalIo = getBasicStageStats().getInternalNetworkInputDataSize(); + + queryResourceManager.updateStats(totalCpu, totalMem, totalIo); + } + private void schedule() { try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) { @@ -787,6 +806,8 @@ private void schedule() currentTimerLevel = 0; } + updateQueryResourceStats(); + // perform some scheduling work /* Get groupSize specification from the ResourceGroupManager */ int maxSplitGroupSize = getOptimalSmallSplitGroupSize(); @@ -832,6 +853,8 @@ else if (!result.getBlocked().isDone()) { } } + updateQueryResourceStats(); + // wait for a state change and then schedule again if (!blockedStages.isEmpty()) { try (TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()) { diff --git a/presto-main/src/main/java/io/prestosql/query/CachedSqlQueryExecution.java b/presto-main/src/main/java/io/prestosql/query/CachedSqlQueryExecution.java index 63b716cf7..461158064 100644 --- a/presto-main/src/main/java/io/prestosql/query/CachedSqlQueryExecution.java +++ b/presto-main/src/main/java/io/prestosql/query/CachedSqlQueryExecution.java @@ -37,6 +37,7 @@ import io.prestosql.failuredetector.FailureDetector; import io.prestosql.heuristicindex.HeuristicIndexerManager; import io.prestosql.metadata.Metadata; +import io.prestosql.resourcemanager.QueryResourceManagerService; import io.prestosql.security.AccessControl; import io.prestosql.snapshot.RecoveryUtils; import io.prestosql.spi.PrestoException; @@ -111,20 +112,21 @@ public class CachedSqlQueryExecution private final BeginTableWrite beginTableWrite; public CachedSqlQueryExecution(QueryPreparer.PreparedQuery preparedQuery, QueryStateMachine stateMachine, - String slug, Metadata metadata, CubeManager cubeManager, AccessControl accessControl, SqlParser sqlParser, SplitManager splitManager, - NodePartitioningManager nodePartitioningManager, NodeScheduler nodeScheduler, - List planOptimizers, PlanFragmenter planFragmenter, RemoteTaskFactory remoteTaskFactory, - LocationFactory locationFactory, int scheduleSplitBatchSize, ExecutorService queryExecutor, - ScheduledExecutorService schedulerExecutor, FailureDetector failureDetector, NodeTaskMap nodeTaskMap, - QueryExplainer queryExplainer, ExecutionPolicy executionPolicy, SplitSchedulerStats schedulerStats, - StatsCalculator statsCalculator, CostCalculator costCalculator, WarningCollector warningCollector, - DynamicFilterService dynamicFilterService, Optional> cache, - HeuristicIndexerManager heuristicIndexerManager, StateStoreProvider stateStoreProvider, RecoveryUtils recoveryUtils) + String slug, Metadata metadata, CubeManager cubeManager, AccessControl accessControl, SqlParser sqlParser, SplitManager splitManager, + NodePartitioningManager nodePartitioningManager, NodeScheduler nodeScheduler, + List planOptimizers, PlanFragmenter planFragmenter, RemoteTaskFactory remoteTaskFactory, + LocationFactory locationFactory, int scheduleSplitBatchSize, ExecutorService queryExecutor, + ScheduledExecutorService schedulerExecutor, FailureDetector failureDetector, NodeTaskMap nodeTaskMap, + QueryExplainer queryExplainer, ExecutionPolicy executionPolicy, SplitSchedulerStats schedulerStats, + StatsCalculator statsCalculator, CostCalculator costCalculator, WarningCollector warningCollector, + DynamicFilterService dynamicFilterService, Optional> cache, + HeuristicIndexerManager heuristicIndexerManager, StateStoreProvider stateStoreProvider, RecoveryUtils recoveryUtils, + QueryResourceManagerService queryResourceManager) { super(preparedQuery, stateMachine, slug, metadata, cubeManager, accessControl, sqlParser, splitManager, nodePartitioningManager, nodeScheduler, planOptimizers, planFragmenter, remoteTaskFactory, locationFactory, scheduleSplitBatchSize, queryExecutor, schedulerExecutor, failureDetector, nodeTaskMap, queryExplainer, - executionPolicy, schedulerStats, statsCalculator, costCalculator, warningCollector, dynamicFilterService, heuristicIndexerManager, stateStoreProvider, recoveryUtils); + executionPolicy, schedulerStats, statsCalculator, costCalculator, warningCollector, dynamicFilterService, heuristicIndexerManager, stateStoreProvider, recoveryUtils, queryResourceManager); this.cache = cache; this.beginTableWrite = new BeginTableWrite(metadata); } diff --git a/presto-main/src/main/java/io/prestosql/resourcemanager/BasicResourceStats.java b/presto-main/src/main/java/io/prestosql/resourcemanager/BasicResourceStats.java new file mode 100644 index 000000000..45556646c --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/resourcemanager/BasicResourceStats.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.resourcemanager; + +import io.airlift.units.DataSize; +import io.airlift.units.Duration; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +public class BasicResourceStats +{ + public Duration cpuTime; + public DataSize memCurrent; + public DataSize ioCurrent; + + public BasicResourceStats() + { + this(DataSize.succinctBytes(0), + Duration.succinctDuration(0, TimeUnit.NANOSECONDS), + DataSize.succinctBytes(0)); + } + + public BasicResourceStats(DataSize memCurrent, Duration cpuTime, DataSize ioCurrent) + { + this.memCurrent = memCurrent; + this.cpuTime = cpuTime; + this.ioCurrent = ioCurrent; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + BasicResourceStats stats = (BasicResourceStats) o; + + return this.cpuTime.equals(stats.cpuTime) + && this.ioCurrent.equals(stats.ioCurrent) + && this.memCurrent.equals(stats.memCurrent); + } + + @Override + public int hashCode() + { + return Objects.hash(cpuTime, ioCurrent, memCurrent); + } + + @Override + public String toString() + { + StringBuilder builder = new StringBuilder(); + builder.append("{") + .append("CpuTime: ").append(this.cpuTime).append(", ") + .append("Memory: ").append(this.memCurrent).append(", ") + .append("Network: ").append(this.ioCurrent).append("}"); + return builder.toString(); + } +} diff --git a/presto-main/src/main/java/io/prestosql/resourcemanager/ForResourceMonitor.java b/presto-main/src/main/java/io/prestosql/resourcemanager/ForResourceMonitor.java new file mode 100644 index 000000000..95d4f708a --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/resourcemanager/ForResourceMonitor.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.resourcemanager; + +import javax.inject.Qualifier; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@Qualifier +public @interface ForResourceMonitor +{ +} diff --git a/presto-main/src/main/java/io/prestosql/resourcemanager/QueryResourceManager.java b/presto-main/src/main/java/io/prestosql/resourcemanager/QueryResourceManager.java new file mode 100644 index 000000000..7459502ff --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/resourcemanager/QueryResourceManager.java @@ -0,0 +1,180 @@ +/* + * Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.resourcemanager; + +import com.google.common.util.concurrent.AtomicDouble; +import io.airlift.log.Logger; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import io.prestosql.Session; +import io.prestosql.execution.StageStats; +import io.prestosql.metadata.InternalNode; +import io.prestosql.spi.QueryId; +import io.prestosql.spi.resourcegroups.KillPolicy; +import io.prestosql.spi.resourcegroups.ResourceGroup; +import io.prestosql.spi.resourcegroups.SchedulingPolicy; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class QueryResourceManager + implements ResourceManager +{ + private static final Logger logger = Logger.get(QueryResourceManager.class); + private static final double HARD_LIMIT_FACTOR = 1.5; + + private final QueryId queryId; + private Optional softMemoryLimit; + private final Optional softMemoryLimitFraction; + private final Optional softReservedMemory; + private final Optional softReservedFraction; + private final Optional softConcurrencyLimit; + private final Optional hardReservedConcurrency; + private final int hardConcurrencyLimit; + private final Optional schedulingPolicy; + private final Optional schedulingWeight; + private final ResourceGroup resourceGroup; + private Optional softCpuLimit; + private Optional hardCpuLimit; + private Optional diskUseLimit = Optional.empty(); + + private final Optional killPolicy; + + private final QueryResourceManagerService.ResourceUpdateListener resourceUpdateListener; + private final BasicResourceStats queryResources = new BasicResourceStats(); + private Map nodeWiseQueryStats = new HashMap<>(); + + public QueryResourceManager(QueryId queryId, Session session, QueryResourceManagerService.ResourceUpdateListener resourceUpdateListener) + { + this.queryId = queryId; + this.softMemoryLimit = Optional.empty(); + this.softMemoryLimitFraction = Optional.empty(); + this.softReservedMemory = Optional.empty(); + this.softReservedFraction = Optional.empty(); + this.softConcurrencyLimit = Optional.empty(); + this.hardReservedConcurrency = Optional.empty(); + this.hardConcurrencyLimit = 10; + this.schedulingPolicy = Optional.empty(); + this.schedulingWeight = Optional.empty(); + this.resourceGroup = null; + this.softCpuLimit = Optional.empty(); + this.hardCpuLimit = Optional.empty(); + this.killPolicy = Optional.empty(); + this.resourceUpdateListener = resourceUpdateListener; + } + + public QueryResourceManager(QueryId queryId, Optional softMemoryLimit, Optional softMemoryLimitFraction, + Optional softReservedMemory, Optional softReservedFraction, + int maxQueued, Optional softConcurrencyLimit, Optional hardReservedConcurrency, + int hardConcurrencyLimit, Optional schedulingPolicy, + Optional schedulingWeight, ResourceGroup resourceGroup, Optional softCpuLimit, + Optional hardCpuLimit, Optional killPolicy, + QueryResourceManagerService.ResourceUpdateListener resourceUpdateListener) + { + this.queryId = queryId; + this.softMemoryLimit = softMemoryLimit; + this.softMemoryLimitFraction = softMemoryLimitFraction; + this.softReservedMemory = softReservedMemory; + this.softReservedFraction = softReservedFraction; + this.softConcurrencyLimit = softConcurrencyLimit; + this.hardReservedConcurrency = hardReservedConcurrency; + this.hardConcurrencyLimit = hardConcurrencyLimit; + this.schedulingPolicy = schedulingPolicy; + this.schedulingWeight = schedulingWeight; + this.resourceGroup = resourceGroup; + this.softCpuLimit = softCpuLimit; + this.hardCpuLimit = hardCpuLimit; + this.killPolicy = killPolicy; + this.resourceUpdateListener = resourceUpdateListener; + } + + @Override + public void setResourceLimit(DataSize memoryLimit, Duration cpuLimit, DataSize ioLimit) + { + /* Todo(Nitin K) check estimates correctness before updating limits! */ + softCpuLimit = Optional.of(cpuLimit); + hardCpuLimit = Optional.of(new Duration(cpuLimit.getValue(TimeUnit.SECONDS) * HARD_LIMIT_FACTOR, TimeUnit.SECONDS)); + + softMemoryLimit = Optional.of(memoryLimit); + diskUseLimit = Optional.of(ioLimit); + } + + @Override + public DataSize getMemoryLimit() + { + return softMemoryLimit.orElse(DataSize.succinctBytes(0)); + } + + @Override + public void updateStats(List stats) + { + /*Todo(Nitin K): record worker node level resource as well! */ + AtomicDouble cpuTime = new AtomicDouble(); + AtomicDouble userMem = new AtomicDouble(); + AtomicLong io = new AtomicLong(); + stats.forEach(stageStats -> { + cpuTime.addAndGet(stageStats.getTotalCpuTime().getValue()); + userMem.addAndGet(stageStats.getCumulativeUserMemory()); + io.addAndGet(stageStats.getInternalNetworkInputDataSize().toBytes()); + }); + + queryResources.cpuTime = Duration.succinctDuration(cpuTime.longValue(), TimeUnit.NANOSECONDS); + queryResources.memCurrent = DataSize.succinctBytes(userMem.longValue()); + queryResources.ioCurrent = DataSize.succinctBytes(io.get()); + + checkResourceLimits(); + } + + @Override + public void updateStats(Duration totalCpu, DataSize totalMem, DataSize totalIo) + { + queryResources.cpuTime = totalCpu; + queryResources.memCurrent = totalMem; + queryResources.ioCurrent = totalIo; + + checkResourceLimits(); + } + + private void checkResourceLimits() + { + /* Inform the resource usage update to the ResourceManagerService + * for broader resource planning. */ + resourceUpdateListener.resourceUpdate(queryId, queryResources); + + if (hardCpuLimit.isPresent() && queryResources.cpuTime.compareTo(hardCpuLimit.get()) > 0) { + logger.warn("Query [%s] exceeds assigned Hard CPU-Time of %s, current Cpu: %s", + queryId.toString(), hardCpuLimit.get().toString(), queryResources.cpuTime.toString()); + } + + if (softCpuLimit.isPresent() && queryResources.cpuTime.compareTo(softCpuLimit.get()) > 0) { + logger.warn("Query [%s] exceeds assigned CPU-Time of %s, current Cpu: %s", + queryId.toString(), softCpuLimit.get().toString(), queryResources.cpuTime.toString()); + } + + if (softMemoryLimit.isPresent() && queryResources.memCurrent.compareTo(softMemoryLimit.get()) > 0) { + logger.warn("Query [%s] exceeds assigned Memory limit of %s, current memory used: %s", + queryId.toString(), softMemoryLimit.get().toString(), queryResources.memCurrent.toString()); + } + + if (diskUseLimit.isPresent() && queryResources.ioCurrent.compareTo(diskUseLimit.get()) > 0) { + logger.warn("Query [%s] exceeds assigned disk use limit of %s, current disk used: %s", + queryId.toString(), diskUseLimit.get().toString(), queryResources.ioCurrent.toString()); + } + } +} diff --git a/presto-main/src/main/java/io/prestosql/resourcemanager/QueryResourceManagerService.java b/presto-main/src/main/java/io/prestosql/resourcemanager/QueryResourceManagerService.java new file mode 100644 index 000000000..df49f3155 --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/resourcemanager/QueryResourceManagerService.java @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.resourcemanager; + +import com.google.inject.Inject; +import io.prestosql.Session; +import io.prestosql.spi.QueryId; +import io.prestosql.spi.memory.ClusterMemoryPoolManager; + +import java.util.concurrent.ExecutorService; + +import static java.util.Objects.requireNonNull; + +public class QueryResourceManagerService +{ + private final ResourceUpdateListener resourceUpdateListener = new ResourceUpdateListener() { + public void resourceUpdate(QueryId queryId, BasicResourceStats stats) + { + /* Todo(Nitin K) Implement resource estimation & reservation for future + * also, decision for given query level resource change */ + } + }; + private final ExecutorService executor; + + @Inject + QueryResourceManagerService(ClusterMemoryPoolManager memoryPoolManager, @ForResourceMonitor ExecutorService executor) + { + /* Todo(Nitin K) add a resourceMonitor and resourcePlanner based on observed + * resource usage pattern over a period of time. */ + + this.executor = requireNonNull(executor, "Executor cannot be null"); + } + + private void start() + { + executor.submit(this::resourceMonitor); + executor.submit(this::resourcePlanner); + } + + private void resourcePlanner() + { + /* Do Nothing */ + + /* Todo(Nitin K) add action handler for queries out of bound on resources, like: + * - Suspend - Resume + * - Apply Grace (add resource so that query can finish and make more resource available) + * - Future usage based resource planning + * - Resource allocation reduction in case more queries arrive in burst */ + } + + private void resourceMonitor() + { + /* Do Nothing */ + + /* Todo(Nitin K) add action handler for queries out of bound on resources, like: + * - Apply Penalty + * - Trigger Memory Spill + * - Suspend - Resume + * - Apply Grace (add resource so that query can finish and make more resource available) */ + } + + public QueryResourceManager createQueryResourceManager(QueryId queryId, Session session) + { + return new QueryResourceManager(queryId, session, resourceUpdateListener); + } + + public interface ResourceUpdateListener + { + void resourceUpdate(QueryId queryId, BasicResourceStats stats); + } +} diff --git a/presto-main/src/main/java/io/prestosql/resourcemanager/ResourceManager.java b/presto-main/src/main/java/io/prestosql/resourcemanager/ResourceManager.java new file mode 100644 index 000000000..eaf466234 --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/resourcemanager/ResourceManager.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.resourcemanager; + +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import io.prestosql.execution.StageStats; + +import java.util.List; + +public interface ResourceManager +{ + void setResourceLimit(DataSize memoryLimit, Duration cpuLimit, DataSize ioLimit); + + DataSize getMemoryLimit(); + + void updateStats(List stats); + + void updateStats(Duration totalCpu, DataSize totalMem, DataSize totalIo); +} diff --git a/presto-main/src/main/java/io/prestosql/server/CoordinatorModule.java b/presto-main/src/main/java/io/prestosql/server/CoordinatorModule.java index a2885cce8..a52106c4d 100644 --- a/presto-main/src/main/java/io/prestosql/server/CoordinatorModule.java +++ b/presto-main/src/main/java/io/prestosql/server/CoordinatorModule.java @@ -115,6 +115,8 @@ import io.prestosql.operator.ForScheduler; import io.prestosql.queryeditorui.QueryEditorUIModule; import io.prestosql.queryhistory.QueryHistoryModule; +import io.prestosql.resourcemanager.ForResourceMonitor; +import io.prestosql.resourcemanager.QueryResourceManagerService; import io.prestosql.server.remotetask.RemoteTaskStats; import io.prestosql.spi.memory.ClusterMemoryPoolManager; import io.prestosql.spi.resourcegroups.QueryType; @@ -192,6 +194,7 @@ import static io.prestosql.util.StatementUtils.getAllQueryTypes; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newScheduledThreadPool; +import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static org.weakref.jmx.guice.ExportBinder.newExporter; @@ -355,6 +358,10 @@ protected void setup(Binder binder) config.setMaxConnectionsPerServer(250); }); + binder.bind(ExecutorService.class).annotatedWith(ForResourceMonitor.class) + .toInstance(newSingleThreadExecutor(threadsNamed("resource-monitor-%s"))); + binder.bind(QueryResourceManagerService.class).in(Scopes.SINGLETON); + binder.bind(ScheduledExecutorService.class).annotatedWith(ForScheduler.class) .toInstance(newSingleThreadScheduledExecutor(threadsNamed("stage-scheduler"))); diff --git a/presto-main/src/test/java/io/prestosql/resourcemanager/TestQueryResourceManager.java b/presto-main/src/test/java/io/prestosql/resourcemanager/TestQueryResourceManager.java new file mode 100644 index 000000000..d29b7a6bb --- /dev/null +++ b/presto-main/src/test/java/io/prestosql/resourcemanager/TestQueryResourceManager.java @@ -0,0 +1,166 @@ +/* + * Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.resourcemanager; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.airlift.stats.Distribution; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import io.prestosql.Session; +import io.prestosql.execution.StageStats; +import io.prestosql.spi.QueryId; +import io.prestosql.spi.eventlistener.StageGcStatistics; +import org.joda.time.DateTime; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +import static io.airlift.units.DataSize.Unit.BYTE; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.testng.Assert.assertEquals; + +public class TestQueryResourceManager +{ + private final QueryId queryId = new QueryId("test_query_001"); + private BasicResourceStats basicResourceStats = new BasicResourceStats(); + private QueryResourceManagerService.ResourceUpdateListener resourceUpdateListener; + private QueryResourceManager queryResourceManager; + private Session session; + + private final StageStats sampleStats = new StageStats( + new DateTime(0), + getTestDistribution(1), + + 4, + 5, + 6, + + 7, + 8, + 10, + 26, + 11, + + 12.0, + new DataSize(13, BYTE), + new DataSize(14, BYTE), + new DataSize(15, BYTE), + new DataSize(16, BYTE), + new DataSize(17, BYTE), + + new Duration(15, NANOSECONDS), + new Duration(16, NANOSECONDS), + new Duration(18, NANOSECONDS), + false, + ImmutableSet.of(), + + new DataSize(191, BYTE), + 201, + + new DataSize(192, BYTE), + 202, + + new DataSize(19, BYTE), + 20, + + new DataSize(21, BYTE), + 22, + + new DataSize(23, BYTE), + new DataSize(24, BYTE), + 25, + + new DataSize(26, BYTE), + + new StageGcStatistics( + 101, + 102, + 103, + 104, + 105, + 106, + 107), + + ImmutableList.of()); + + public TestQueryResourceManager() + { + resourceUpdateListener = new QueryResourceManagerService.ResourceUpdateListener() { + public void resourceUpdate(QueryId queryId, BasicResourceStats stats) + { + basicResourceStats.cpuTime = Duration.succinctDuration(basicResourceStats.cpuTime.getValue() + stats.cpuTime.getValue(), stats.cpuTime.getUnit()); + basicResourceStats.ioCurrent = DataSize.succinctBytes(basicResourceStats.ioCurrent.toBytes() + stats.ioCurrent.toBytes()); + basicResourceStats.memCurrent = DataSize.succinctBytes(basicResourceStats.memCurrent.toBytes() + stats.memCurrent.toBytes()); + } + }; + queryResourceManager = new QueryResourceManager(queryId, null, resourceUpdateListener); + } + + @Test + public void testSetResourceLimit() + { + queryResourceManager.setResourceLimit(DataSize.succinctBytes(100), + Duration.succinctDuration(200, TimeUnit.MINUTES), + DataSize.succinctBytes(300)); + assertEquals(DataSize.succinctBytes(100), queryResourceManager.getMemoryLimit()); + } + + @Test + public void testUpdateStats() + { + /* reset stats */ + basicResourceStats = new BasicResourceStats(); + + BasicResourceStats stats = new BasicResourceStats(DataSize.succinctBytes(12), + Duration.succinctDuration(16.0, NANOSECONDS), + DataSize.succinctBytes(192)); + queryResourceManager.updateStats(ImmutableList.of(sampleStats)); // 12.0, 16, 192 + assertEquals(stats, basicResourceStats); + + queryResourceManager.updateStats(ImmutableList.of(sampleStats, sampleStats, sampleStats)); // 12.0, 16, 192 + + stats = new BasicResourceStats(DataSize.succinctBytes(12 * 4), + Duration.succinctDuration(16.0 * 4, NANOSECONDS), + DataSize.succinctBytes(192 * 4)); + assertEquals(stats, basicResourceStats); + } + + @Test + public void testExceedLimit() + { + /* reset stats */ + basicResourceStats = new BasicResourceStats(); + + queryResourceManager.setResourceLimit(DataSize.succinctBytes(10), + Duration.succinctDuration(20, NANOSECONDS), + DataSize.succinctBytes(30)); + queryResourceManager.updateStats(ImmutableList.of(sampleStats, sampleStats, sampleStats)); // 12.0, 16, 192 + + BasicResourceStats stats = new BasicResourceStats(DataSize.succinctBytes(12 * 3), + Duration.succinctDuration(16.0 * 3, NANOSECONDS), + DataSize.succinctBytes(192 * 3)); + assertEquals(stats, basicResourceStats); + } + + private static Distribution.DistributionSnapshot getTestDistribution(int count) + { + Distribution distribution = new Distribution(); + for (int i = 0; i < count; i++) { + distribution.add(i); + } + return distribution.snapshot(); + } +}