diff --git a/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java b/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java index 542c213de..738505799 100644 --- a/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java +++ b/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java @@ -24,6 +24,7 @@ import io.prestosql.Session; import io.prestosql.SystemSessionProperties; import io.prestosql.cost.CostCalculator; +import io.prestosql.cost.PlanCostEstimate; import io.prestosql.cost.StatsCalculator; import io.prestosql.cube.CubeManager; import io.prestosql.dynamicfilter.DynamicFilterService; @@ -43,6 +44,8 @@ import io.prestosql.operator.ForScheduler; import io.prestosql.query.CachedSqlQueryExecution; import io.prestosql.query.CachedSqlQueryExecutionPlan; +import io.prestosql.resourcemanager.QueryResourceManager; +import io.prestosql.resourcemanager.QueryResourceManagerService; import io.prestosql.security.AccessControl; import io.prestosql.server.BasicQueryInfo; import io.prestosql.snapshot.MarkerAnnouncer; @@ -57,6 +60,7 @@ import io.prestosql.spi.connector.StandardWarningCode; import io.prestosql.spi.metadata.TableHandle; import io.prestosql.spi.plan.PlanNode; +import io.prestosql.spi.plan.PlanNodeId; import io.prestosql.spi.plan.PlanNodeIdAllocator; import io.prestosql.spi.plan.ProjectNode; import io.prestosql.spi.plan.Symbol; @@ -111,6 +115,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -173,6 +178,7 @@ public class SqlQueryExecution private final QueryRecoveryManager queryRecoveryManager; private final WarningCollector warningCollector; private final AtomicBoolean suspendedWithRecoveryManager = new AtomicBoolean(); + private final QueryResourceManager queryResourceManager; public SqlQueryExecution( PreparedQuery preparedQuery, @@ -203,7 +209,8 @@ public SqlQueryExecution( DynamicFilterService dynamicFilterService, HeuristicIndexerManager heuristicIndexerManager, StateStoreProvider stateStoreProvider, - RecoveryUtils recoveryUtils) + RecoveryUtils recoveryUtils, + QueryResourceManagerService queryResourceManager) { try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) { this.slug = requireNonNull(slug, "slug is null"); @@ -235,6 +242,7 @@ public SqlQueryExecution( this.stateMachine = requireNonNull(stateMachine, "stateMachine is null"); this.stateStoreProvider = requireNonNull(stateStoreProvider, "stateStoreProvider is null"); + this.queryResourceManager = requireNonNull(queryResourceManager, "queryResourceManager is null").createQueryResourceManager(stateMachine.getQueryId(), stateMachine.getSession()); // clear dynamic filter tasks and data created for this query stateMachine.addStateChangeListener(state -> { @@ -473,6 +481,28 @@ private void handleCrossRegionDynamicFilter(PlanRoot plan) log.debug("queryId=%s, add columnToSymbolMapping into hazelcast success.", queryId + QUERY_COLUMN_NAME_TO_SYMBOL_MAPPING); } + private void setResourceLimitsFromEstimates(PlanNodeId rootId) + { + PlanCostEstimate estimate = queryPlan.get().getStatsAndCosts().getCosts().get(rootId); + if (estimate != null && estimate != PlanCostEstimate.zero() && estimate != PlanCostEstimate.unknown()) { + queryResourceManager.setResourceLimit(new DataSize(estimate.getMaxMemory(), BYTE), + new Duration(estimate.getCpuCost(), TimeUnit.MILLISECONDS), + new DataSize(estimate.getNetworkCost(), BYTE)); + } + } + + private void updateQueryResourceStats() + { + SqlQueryScheduler scheduler = queryScheduler.get(); + if (scheduler != null) { + Duration totalCpu = scheduler.getTotalCpuTime(); + DataSize totalMem = DataSize.succinctBytes(scheduler.getTotalMemoryReservation()); + DataSize totalIo = scheduler.getBasicStageStats().getInternalNetworkInputDataSize(); + + queryResourceManager.updateStats(totalCpu, totalMem, totalIo); + } + } + @Override public void start() { @@ -486,6 +516,7 @@ public void start() // analyze query PlanRoot plan = analyzeQuery(); + setResourceLimitsFromEstimates(plan.getRoot().getFragment().getRoot().getId()); try { handleCrossRegionDynamicFilter(plan); @@ -583,7 +614,7 @@ private SqlQueryScheduler createResumeScheduler(OptionalLong snapshotId, PlanRoo queryRecoveryManager, // Require same number of tasks to be scheduled, but do not require it if starting from beginning snapshotId.isPresent() ? queryScheduler.get().getStageTaskCounts() : null, - true); + true, queryResourceManager); if (snapshotId.isPresent() && snapshotId.getAsLong() != 0) { // Restore going to happen first, mark the restore state for all stages scheduler.setResuming(snapshotId.getAsLong()); @@ -850,7 +881,8 @@ private void planDistribution(PlanRoot plan) snapshotManager, queryRecoveryManager, null, - false); + false, + queryResourceManager); queryScheduler.set(scheduler); @@ -1085,6 +1117,8 @@ public static class SqlQueryExecutionFactory private final StateStoreProvider stateStoreProvider; private final RecoveryUtils recoveryUtils; + private final QueryResourceManagerService queryResourceManagerService; + @Inject SqlQueryExecutionFactory(QueryManagerConfig config, HetuConfig hetuConfig, @@ -1111,7 +1145,8 @@ public static class SqlQueryExecutionFactory DynamicFilterService dynamicFilterService, HeuristicIndexerManager heuristicIndexerManager, StateStoreProvider stateStoreProvider, - RecoveryUtils recoveryUtils) + RecoveryUtils recoveryUtils, + QueryResourceManagerService queryResourceManagerService) { requireNonNull(config, "config is null"); this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null"); @@ -1139,6 +1174,7 @@ public static class SqlQueryExecutionFactory this.heuristicIndexerManager = requireNonNull(heuristicIndexerManager, "heuristicIndexerManager is null"); this.stateStoreProvider = requireNonNull(stateStoreProvider, "stateStoreProvider is null"); this.recoveryUtils = requireNonNull(recoveryUtils, "recoveryUtils is null"); + this.queryResourceManagerService = requireNonNull(queryResourceManagerService, "queryResourceManagerService is null"); this.loadConfigToService(hetuConfig); if (hetuConfig.isExecutionPlanCacheEnabled()) { this.cache = Optional.of(CacheBuilder.newBuilder() @@ -1199,7 +1235,8 @@ public QueryExecution createQueryExecution( this.cache, heuristicIndexerManager, stateStoreProvider, - recoveryUtils); + recoveryUtils, + queryResourceManagerService); } } } diff --git a/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java b/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java index cc84ac6c1..5a8e43c41 100644 --- a/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java +++ b/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java @@ -23,6 +23,7 @@ import io.airlift.concurrent.SetThreadName; import io.airlift.log.Logger; import io.airlift.stats.TimeStat; +import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.prestosql.Session; import io.prestosql.SystemSessionProperties; @@ -46,6 +47,7 @@ import io.prestosql.heuristicindex.HeuristicIndexerManager; import io.prestosql.metadata.InternalNode; import io.prestosql.operator.TaskLocation; +import io.prestosql.resourcemanager.QueryResourceManager; import io.prestosql.server.ResourceGroupInfo; import io.prestosql.snapshot.QueryRecoveryManager; import io.prestosql.snapshot.QuerySnapshotManager; @@ -162,6 +164,8 @@ public class SqlQueryScheduler private final QueryRecoveryManager queryRecoveryManager; private final Map feederScheduledNodes = new ConcurrentHashMap<>(); + private final QueryResourceManager queryResourceManager; + public static SqlQueryScheduler createSqlQueryScheduler( QueryStateMachine queryStateMachine, LocationFactory locationFactory, @@ -184,7 +188,8 @@ public static SqlQueryScheduler createSqlQueryScheduler( QuerySnapshotManager snapshotManager, QueryRecoveryManager queryRecoveryManager, Map stageTaskCounts, - boolean isResume) + boolean isResume, + QueryResourceManager queryResourceManager) { SqlQueryScheduler sqlQueryScheduler = new SqlQueryScheduler( queryStateMachine, @@ -208,7 +213,8 @@ public static SqlQueryScheduler createSqlQueryScheduler( snapshotManager, queryRecoveryManager, stageTaskCounts, - isResume); + isResume, + queryResourceManager); sqlQueryScheduler.initialize(); return sqlQueryScheduler; } @@ -235,7 +241,8 @@ private SqlQueryScheduler( QuerySnapshotManager snapshotManager, QueryRecoveryManager queryRecoveryManager, Map stageTaskCounts, - boolean isResumeScheduler) + boolean isResumeScheduler, + QueryResourceManager queryResourceManager) { this.queryStateMachine = requireNonNull(queryStateMachine, "queryStateMachine is null"); this.executionPolicy = requireNonNull(executionPolicy, "schedulerPolicyFactory is null"); @@ -246,6 +253,7 @@ private SqlQueryScheduler( this.snapshotManager = snapshotManager; this.queryRecoveryManager = queryRecoveryManager; + this.queryResourceManager = queryResourceManager; if (SystemSessionProperties.isRecoveryEnabled(session)) { queryRecoveryManager.setCancelToResumeCb(this::cancelToResume); } @@ -733,6 +741,8 @@ private boolean canScheduleMoreSplits() { long cachedMemoryUsage = queryStateMachine.getResourceGroupManager().getCachedMemoryUsage(queryStateMachine.getResourceGroup()); long softReservedMemory = queryStateMachine.getResourceGroupManager().getSoftReservedMemory(queryStateMachine.getResourceGroup()); + + /* Todo(Nitin K) replace with smart scheduler and pull the resource limit from QueryResourceManager/Service */ if (cachedMemoryUsage < softReservedMemory) { return true; } @@ -754,6 +764,15 @@ private int getOptimalSmallSplitGroupSize() return SPLIT_GROUP_GRADATION[result]; } + private void updateQueryResourceStats() + { + Duration totalCpu = getTotalCpuTime(); + DataSize totalMem = DataSize.succinctBytes(getTotalMemoryReservation()); + DataSize totalIo = getBasicStageStats().getInternalNetworkInputDataSize(); + + queryResourceManager.updateStats(totalCpu, totalMem, totalIo); + } + private void schedule() { try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) { @@ -787,6 +806,8 @@ private void schedule() currentTimerLevel = 0; } + updateQueryResourceStats(); + // perform some scheduling work /* Get groupSize specification from the ResourceGroupManager */ int maxSplitGroupSize = getOptimalSmallSplitGroupSize(); @@ -832,6 +853,8 @@ else if (!result.getBlocked().isDone()) { } } + updateQueryResourceStats(); + // wait for a state change and then schedule again if (!blockedStages.isEmpty()) { try (TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()) { diff --git a/presto-main/src/main/java/io/prestosql/query/CachedSqlQueryExecution.java b/presto-main/src/main/java/io/prestosql/query/CachedSqlQueryExecution.java index 63b716cf7..461158064 100644 --- a/presto-main/src/main/java/io/prestosql/query/CachedSqlQueryExecution.java +++ b/presto-main/src/main/java/io/prestosql/query/CachedSqlQueryExecution.java @@ -37,6 +37,7 @@ import io.prestosql.failuredetector.FailureDetector; import io.prestosql.heuristicindex.HeuristicIndexerManager; import io.prestosql.metadata.Metadata; +import io.prestosql.resourcemanager.QueryResourceManagerService; import io.prestosql.security.AccessControl; import io.prestosql.snapshot.RecoveryUtils; import io.prestosql.spi.PrestoException; @@ -111,20 +112,21 @@ public class CachedSqlQueryExecution private final BeginTableWrite beginTableWrite; public CachedSqlQueryExecution(QueryPreparer.PreparedQuery preparedQuery, QueryStateMachine stateMachine, - String slug, Metadata metadata, CubeManager cubeManager, AccessControl accessControl, SqlParser sqlParser, SplitManager splitManager, - NodePartitioningManager nodePartitioningManager, NodeScheduler nodeScheduler, - List planOptimizers, PlanFragmenter planFragmenter, RemoteTaskFactory remoteTaskFactory, - LocationFactory locationFactory, int scheduleSplitBatchSize, ExecutorService queryExecutor, - ScheduledExecutorService schedulerExecutor, FailureDetector failureDetector, NodeTaskMap nodeTaskMap, - QueryExplainer queryExplainer, ExecutionPolicy executionPolicy, SplitSchedulerStats schedulerStats, - StatsCalculator statsCalculator, CostCalculator costCalculator, WarningCollector warningCollector, - DynamicFilterService dynamicFilterService, Optional> cache, - HeuristicIndexerManager heuristicIndexerManager, StateStoreProvider stateStoreProvider, RecoveryUtils recoveryUtils) + String slug, Metadata metadata, CubeManager cubeManager, AccessControl accessControl, SqlParser sqlParser, SplitManager splitManager, + NodePartitioningManager nodePartitioningManager, NodeScheduler nodeScheduler, + List planOptimizers, PlanFragmenter planFragmenter, RemoteTaskFactory remoteTaskFactory, + LocationFactory locationFactory, int scheduleSplitBatchSize, ExecutorService queryExecutor, + ScheduledExecutorService schedulerExecutor, FailureDetector failureDetector, NodeTaskMap nodeTaskMap, + QueryExplainer queryExplainer, ExecutionPolicy executionPolicy, SplitSchedulerStats schedulerStats, + StatsCalculator statsCalculator, CostCalculator costCalculator, WarningCollector warningCollector, + DynamicFilterService dynamicFilterService, Optional> cache, + HeuristicIndexerManager heuristicIndexerManager, StateStoreProvider stateStoreProvider, RecoveryUtils recoveryUtils, + QueryResourceManagerService queryResourceManager) { super(preparedQuery, stateMachine, slug, metadata, cubeManager, accessControl, sqlParser, splitManager, nodePartitioningManager, nodeScheduler, planOptimizers, planFragmenter, remoteTaskFactory, locationFactory, scheduleSplitBatchSize, queryExecutor, schedulerExecutor, failureDetector, nodeTaskMap, queryExplainer, - executionPolicy, schedulerStats, statsCalculator, costCalculator, warningCollector, dynamicFilterService, heuristicIndexerManager, stateStoreProvider, recoveryUtils); + executionPolicy, schedulerStats, statsCalculator, costCalculator, warningCollector, dynamicFilterService, heuristicIndexerManager, stateStoreProvider, recoveryUtils, queryResourceManager); this.cache = cache; this.beginTableWrite = new BeginTableWrite(metadata); } diff --git a/presto-main/src/main/java/io/prestosql/resourcemanager/BasicResourceStats.java b/presto-main/src/main/java/io/prestosql/resourcemanager/BasicResourceStats.java new file mode 100644 index 000000000..38a7ef573 --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/resourcemanager/BasicResourceStats.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.resourcemanager; + +import io.airlift.units.DataSize; +import io.airlift.units.Duration; + +public class BasicResourceStats +{ + public Duration cpuTime; + public DataSize memCurrent; + public DataSize ioCurrent; +} diff --git a/presto-main/src/main/java/io/prestosql/resourcemanager/ForResourceMonitor.java b/presto-main/src/main/java/io/prestosql/resourcemanager/ForResourceMonitor.java new file mode 100644 index 000000000..95d4f708a --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/resourcemanager/ForResourceMonitor.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.resourcemanager; + +import javax.inject.Qualifier; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@Qualifier +public @interface ForResourceMonitor +{ +} diff --git a/presto-main/src/main/java/io/prestosql/resourcemanager/QueryResourceManager.java b/presto-main/src/main/java/io/prestosql/resourcemanager/QueryResourceManager.java new file mode 100644 index 000000000..34e930475 --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/resourcemanager/QueryResourceManager.java @@ -0,0 +1,180 @@ +/* + * Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.resourcemanager; + +import com.google.common.util.concurrent.AtomicDouble; +import io.airlift.log.Logger; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import io.prestosql.Session; +import io.prestosql.execution.StageStats; +import io.prestosql.metadata.InternalNode; +import io.prestosql.spi.QueryId; +import io.prestosql.spi.resourcegroups.KillPolicy; +import io.prestosql.spi.resourcegroups.ResourceGroup; +import io.prestosql.spi.resourcegroups.SchedulingPolicy; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class QueryResourceManager + implements ResourceManager +{ + private static final Logger logger = Logger.get(QueryResourceManager.class); + private static final double HARD_LIMIT_FACTOR = 1.5; + + private final QueryId queryId; + private Optional softMemoryLimit; + private final Optional softMemoryLimitFraction; + private final Optional softReservedMemory; + private final Optional softReservedFraction; + private final Optional softConcurrencyLimit; + private final Optional hardReservedConcurrency; + private final int hardConcurrencyLimit; + private final Optional schedulingPolicy; + private final Optional schedulingWeight; + private final ResourceGroup resourceGroup; + private Optional softCpuLimit; + private Optional hardCpuLimit; + private Optional diskUseLimit = Optional.empty(); + + private final Optional killPolicy; + + private final QueryResourceManagerService.ResourceUpdateListener resourceUpdateListener; + private final BasicResourceStats queryResources = new BasicResourceStats(); + private Map nodeWiseQueryStats = new HashMap<>(); + + public QueryResourceManager(QueryId queryId, Session session, QueryResourceManagerService.ResourceUpdateListener resourceUpdateListener) + { + this.queryId = queryId; + this.softMemoryLimit = Optional.empty(); + this.softMemoryLimitFraction = Optional.empty(); + this.softReservedMemory = Optional.empty(); + this.softReservedFraction = Optional.empty(); + this.softConcurrencyLimit = Optional.empty(); + this.hardReservedConcurrency = Optional.empty(); + this.hardConcurrencyLimit = 10; + this.schedulingPolicy = Optional.empty(); + this.schedulingWeight = Optional.empty(); + this.resourceGroup = null; + this.softCpuLimit = Optional.empty(); + this.hardCpuLimit = Optional.empty(); + this.killPolicy = Optional.empty(); + this.resourceUpdateListener = resourceUpdateListener; + } + + public QueryResourceManager(QueryId queryId, Optional softMemoryLimit, Optional softMemoryLimitFraction, + Optional softReservedMemory, Optional softReservedFraction, + int maxQueued, Optional softConcurrencyLimit, Optional hardReservedConcurrency, + int hardConcurrencyLimit, Optional schedulingPolicy, + Optional schedulingWeight, ResourceGroup resourceGroup, Optional softCpuLimit, + Optional hardCpuLimit, Optional killPolicy, + QueryResourceManagerService.ResourceUpdateListener resourceUpdateListener) + { + this.queryId = queryId; + this.softMemoryLimit = softMemoryLimit; + this.softMemoryLimitFraction = softMemoryLimitFraction; + this.softReservedMemory = softReservedMemory; + this.softReservedFraction = softReservedFraction; + this.softConcurrencyLimit = softConcurrencyLimit; + this.hardReservedConcurrency = hardReservedConcurrency; + this.hardConcurrencyLimit = hardConcurrencyLimit; + this.schedulingPolicy = schedulingPolicy; + this.schedulingWeight = schedulingWeight; + this.resourceGroup = resourceGroup; + this.softCpuLimit = softCpuLimit; + this.hardCpuLimit = hardCpuLimit; + this.killPolicy = killPolicy; + this.resourceUpdateListener = resourceUpdateListener; + } + + @Override + public void setResourceLimit(DataSize memoryLimit, Duration cpuLimit, DataSize ioLimit) + { + /* Todo(Nitin K) check estimates correctness before updating limits! */ + softCpuLimit = Optional.of(cpuLimit); + hardCpuLimit = Optional.of(new Duration(cpuLimit.getValue(TimeUnit.SECONDS) * HARD_LIMIT_FACTOR, TimeUnit.SECONDS)); + + softMemoryLimit = Optional.of(memoryLimit); + diskUseLimit = Optional.of(ioLimit); + } + + @Override + public DataSize getMemoryLimit() + { + return softMemoryLimit.orElse(DataSize.succinctBytes(0)); + } + + @Override + public void updateStats(List stats) + { + /*Todo(Nitin K): record worker node level resource as well! */ + AtomicDouble cpuTime = new AtomicDouble(); + AtomicDouble userMem = new AtomicDouble(); + AtomicLong io = new AtomicLong(); + stats.forEach(stageStats -> { + cpuTime.addAndGet(stageStats.getTotalCpuTime().getValue()); + userMem.addAndGet(stageStats.getCumulativeUserMemory()); + io.addAndGet(stageStats.getInternalNetworkInputDataSize().toBytes()); + }); + + queryResources.cpuTime = Duration.succinctDuration(cpuTime.longValue(), TimeUnit.NANOSECONDS); + queryResources.memCurrent = DataSize.succinctBytes(userMem.longValue()); + queryResources.ioCurrent = DataSize.succinctBytes(io.get()); + + checkResourceLimits(); + } + + @Override + public void updateStats(Duration totalCpu, DataSize totalMem, DataSize totalIo) + { + queryResources.cpuTime = totalCpu; + queryResources.memCurrent = totalMem; + queryResources.ioCurrent = totalIo; + + checkResourceLimits(); + } + + private void checkResourceLimits() + { + /* Inform the resource usage update to the ResourceManagerService + * for broader resource planning. */ + resourceUpdateListener.resourceUpdate(queryId, queryResources); + + if (softCpuLimit.isPresent() && queryResources.cpuTime.compareTo(softCpuLimit.get()) > 0) { + logger.warn("Query [%s] exceeds assigned CPU-Time of %s, current Cpu: %s", + queryId.toString(), softCpuLimit.get().toString(), queryResources.cpuTime.toString()); + } + + if (hardCpuLimit.isPresent() && queryResources.cpuTime.compareTo(hardCpuLimit.get()) > 0) { + logger.warn("Query [%s] exceeds assigned Hard CPU-Time of %s, current Cpu: %s", + queryId.toString(), hardCpuLimit.get().toString(), queryResources.cpuTime.toString()); + } + + if (softMemoryLimit.isPresent() && queryResources.memCurrent.compareTo(softMemoryLimit.get()) > 0) { + logger.warn("Query [%s] exceeds assigned Memory limit of %s, current memory used: %s", + queryId.toString(), softMemoryLimit.get().toString(), queryResources.memCurrent.toString()); + } + + if (diskUseLimit.isPresent() && queryResources.ioCurrent.compareTo(diskUseLimit.get()) > 0) { + logger.warn("Query [%s] exceeds assigned disk use limit of %s, current disk used: %s", + queryId.toString(), diskUseLimit.get().toString(), queryResources.ioCurrent.toString()); + } + } +} diff --git a/presto-main/src/main/java/io/prestosql/resourcemanager/QueryResourceManagerService.java b/presto-main/src/main/java/io/prestosql/resourcemanager/QueryResourceManagerService.java new file mode 100644 index 000000000..4745cd39b --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/resourcemanager/QueryResourceManagerService.java @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.resourcemanager; + +import com.google.inject.Inject; +import io.prestosql.Session; +import io.prestosql.spi.QueryId; +import io.prestosql.spi.memory.ClusterMemoryPoolManager; + +import java.util.concurrent.ExecutorService; + +import static java.util.Objects.requireNonNull; + +public class QueryResourceManagerService +{ + private final ResourceUpdateListener resourceUpdateListener = new ResourceUpdateListener(); + private final ExecutorService executor; + + @Inject + QueryResourceManagerService(ClusterMemoryPoolManager memoryPoolManager, @ForResourceMonitor ExecutorService executor) + { + /* Todo(Nitin K) add a resourceMonitor and resourcePlanner based on observed + * resource usage pattern over a period of time. */ + + this.executor = requireNonNull(executor, "Executor cannot be null"); + } + + private void start() + { + executor.submit(this::resourceMonitor); + executor.submit(this::resourcePlanner); + } + + private void resourcePlanner() + { + /* Do Nothing */ + + /* Todo(Nitin K) add action handler for queries out of bound on resources, like: + * - Suspend - Resume + * - Apply Grace (add resource so that query can finish and make more resource available) + * - Future usage based resource planning + * - Resource allocation reduction in case more queries arrive in burst */ + } + + private void resourceMonitor() + { + /* Do Nothing */ + + /* Todo(Nitin K) add action handler for queries out of bound on resources, like: + * - Apply Penalty + * - Trigger Memory Spill + * - Suspend - Resume + * - Apply Grace (add resource so that query can finish and make more resource available) */ + } + + public QueryResourceManager createQueryResourceManager(QueryId queryId, Session session) + { + return new QueryResourceManager(queryId, session, resourceUpdateListener); + } + + public class ResourceUpdateListener + { + public void resourceUpdate(QueryId queryId, BasicResourceStats stats) + { + /* Todo(Nitin K) Implement resource estimation & reservation for future + * also, decision for given query level resource change */ + } + } +} diff --git a/presto-main/src/main/java/io/prestosql/resourcemanager/ResourceManager.java b/presto-main/src/main/java/io/prestosql/resourcemanager/ResourceManager.java new file mode 100644 index 000000000..eaf466234 --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/resourcemanager/ResourceManager.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.resourcemanager; + +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import io.prestosql.execution.StageStats; + +import java.util.List; + +public interface ResourceManager +{ + void setResourceLimit(DataSize memoryLimit, Duration cpuLimit, DataSize ioLimit); + + DataSize getMemoryLimit(); + + void updateStats(List stats); + + void updateStats(Duration totalCpu, DataSize totalMem, DataSize totalIo); +} diff --git a/presto-main/src/main/java/io/prestosql/server/CoordinatorModule.java b/presto-main/src/main/java/io/prestosql/server/CoordinatorModule.java index a2885cce8..a52106c4d 100644 --- a/presto-main/src/main/java/io/prestosql/server/CoordinatorModule.java +++ b/presto-main/src/main/java/io/prestosql/server/CoordinatorModule.java @@ -115,6 +115,8 @@ import io.prestosql.operator.ForScheduler; import io.prestosql.queryeditorui.QueryEditorUIModule; import io.prestosql.queryhistory.QueryHistoryModule; +import io.prestosql.resourcemanager.ForResourceMonitor; +import io.prestosql.resourcemanager.QueryResourceManagerService; import io.prestosql.server.remotetask.RemoteTaskStats; import io.prestosql.spi.memory.ClusterMemoryPoolManager; import io.prestosql.spi.resourcegroups.QueryType; @@ -192,6 +194,7 @@ import static io.prestosql.util.StatementUtils.getAllQueryTypes; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newScheduledThreadPool; +import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static org.weakref.jmx.guice.ExportBinder.newExporter; @@ -355,6 +358,10 @@ protected void setup(Binder binder) config.setMaxConnectionsPerServer(250); }); + binder.bind(ExecutorService.class).annotatedWith(ForResourceMonitor.class) + .toInstance(newSingleThreadExecutor(threadsNamed("resource-monitor-%s"))); + binder.bind(QueryResourceManagerService.class).in(Scopes.SINGLETON); + binder.bind(ScheduledExecutorService.class).annotatedWith(ForScheduler.class) .toInstance(newSingleThreadScheduledExecutor(threadsNamed("stage-scheduler")));