From 6c9d4cf9e464f4defa2ea164398b5ba4839d4be4 Mon Sep 17 00:00:00 2001 From: Stan York Date: Tue, 3 Feb 2015 08:24:16 -0500 Subject: [PATCH] V0.47 Connector Enhancement - auto throttle The connector now detects "rate exceeded" responses from the provider and automatically decreases max request/second. Currently only tested with BOX --- .../CloudElementsConnector.cs | 77 ++++++++++++++++--- 1 file changed, 66 insertions(+), 11 deletions(-) diff --git a/Cloud Elements Connector/CloudElementsConnector.cs b/Cloud Elements Connector/CloudElementsConnector.cs index 77db030..c6073be 100644 --- a/Cloud Elements Connector/CloudElementsConnector.cs +++ b/Cloud Elements Connector/CloudElementsConnector.cs @@ -109,7 +109,7 @@ public CloudElementsConnector() } /// - /// Returns another APIConnector with the same authorization and trace handler + /// Returns another APIConnector with the same authorization, endpoint optons, and trace handler /// /// public CloudElementsConnector Clone() @@ -129,6 +129,10 @@ public CloudElementsConnector Clone() } #endregion + /// + /// Test the endpoint and sets the endpoint type, enablig endpoint-specific options such as rate throttling + /// + /// public async Task Ping() { HttpResponseMessage response = await APIExecuteGet("hubs/documents/ping"); @@ -146,7 +150,7 @@ private void AssureEnpointControlData(string endpointName) { EndpointOptions options = new EndpointOptions(); options.LogThrottleDelays = false; - options.RequestsPerSecondWindow = 4; + options.RequestsPerSecondWindow = 3; switch (endpointName) { case "box": @@ -710,6 +714,9 @@ async Task APIExecuteVerb(HttpVerb verb, string URI) return await APIExecuteVerb(verb, URI, null); } + /// + /// Manages the requests per second, adding delays if/when needed + /// private void ThrottleRequestsPerSecond() { EndpointOptions options; @@ -740,7 +747,7 @@ private void ThrottleRequestsPerSecond() BaselineRequestAt = RecentRqQ.Peek(); RequestCountSinceBaseline = RecentRqQ.Count + 1; double TotalSecsSinceBaseline = DateTime.Now.Subtract(BaselineRequestAt).TotalSeconds; double RecentReqPerSecond = (RequestCountSinceBaseline / TotalSecsSinceBaseline); - if ((RecentReqPerSecond > options.HighwaterGeneratedRequestsPerSecond) && (TotalSecsSinceBaseline > 0.4) ) + if ((RecentReqPerSecond > options.HighwaterGeneratedRequestsPerSecond) && (TotalSecsSinceBaseline > 0.5)) { options.HighwaterGeneratedRequestsPerSecond = RecentReqPerSecond; if (options.LogHighwaterThroughput) OnDiagTrace(string.Format("ce(throughput) [{0}] reached {1:F2}r/s", Endpoint, RecentReqPerSecond)); @@ -771,15 +778,22 @@ private void ThrottleRequestsPerSecond() { if (options.LogThrottleDelays) OnDiagTrace(string.Format("ce(throttled) [{0}] delayed {1}ms", Endpoint, delayMS)); InstanceThrottleDelayCnt++; - InstanceThrottleDelayMS+= delayMS; + InstanceThrottleDelayMS += delayMS; System.Threading.Thread.Sleep(delayMS); } lock (StaticLockObject) { - RecentRqQ.Enqueue(DateTime.Now); + RecentRqQ.Enqueue(DateTime.Now); } } + /// + /// Executes a cloud elements request. Every request comes through here + /// + /// + /// + /// + /// async Task APIExecuteVerb(HttpVerb verb, string URI, HttpContent content) { ThrottleRequestsPerSecond(); @@ -834,8 +848,24 @@ async Task APIExecuteVerb(HttpVerb verb, string URI, HttpCo LastFailureInformation = msgtoken.ToString(); if (providerMsgtoken != null) { - traceInfo = string.Concat(traceInfo, " - ", providerMsgtoken.ToString()); - LastFailureInformation = string.Concat(traceInfo, " - ", providerMsgtoken.ToString()); + string providerMessage = providerMsgtoken.ToString(); + traceInfo = string.Concat(traceInfo, " - ", providerMessage); + LastFailureInformation = string.Concat(traceInfo, " - ", providerMessage); + if (providerMessage.IndexOf("rate limit exceeded", StringComparison.CurrentCultureIgnoreCase) > 0) + { + EndpointOptions options; + if (EndpointSettings.ContainsKey(Endpoint)) + { + options = EndpointSettings[Endpoint]; + if ((options.MaxRqPerSecond <= 0) || (options.MaxRqPerSecond > options.HighwaterGeneratedRequestsPerSecond)) options.MaxRqPerSecond = (int)options.HighwaterGeneratedRequestsPerSecond; + if ((options.MaxRqPerSecond > 2) && (DateTime.Now.Subtract(options.LastAutoLimit).TotalSeconds > 1)) + { + options.MaxRqPerSecond--; + options.LastAutoLimit = DateTime.Now; + OnDiagTrace(string.Format("ce(throughput) [{0}] rate limit exceeded: inferred new target of {1}r/s", Endpoint, options.MaxRqPerSecond)); + } + } + } } if (rqIDtoken != null) LastFailureInformation = string.Format("{0}; (Request #{1}, ID {2})", LastFailureInformation, RequestCounter, rqIDtoken.ToString()); } @@ -868,19 +898,44 @@ HttpClient NewHttpClient() #endregion - - - } + /// + /// Connector Options that apply to a specific Endpoint Provider (BOX, Google Drive, OneDrive, etc) + /// public class EndpointOptions { + /// + /// The maximum requests per second. The engine will introduce delays if request rate exceeds this. + /// public int MaxRqPerSecond; + /// + /// The number of seconds over which the average is calculated. Too high a number will allow bursts that exceed the providers limit + /// public int RequestsPerSecondWindow; - public double HighwaterGeneratedRequestsPerSecond; + /// + /// Set by the engine when a new high for requests per second is reached. The value may be higher than expected due to sub-second bursts. + /// + public double HighwaterGeneratedRequestsPerSecond + { + get { return _HighwaterGeneratedRequestsPerSecond; } + internal set { _HighwaterGeneratedRequestsPerSecond = value; } + } public DateTime BackoffUntil; // future: + /// + /// Set to the time at which the connector last detected a "rate exceeded" condition and adjust MaxRqPerSecond + /// + public DateTime LastAutoLimit + { + get { return _LastAutoLimit; } + internal set { _LastAutoLimit = value; } + } public bool LogThrottleDelays; public bool LogHighwaterThroughput; + + private double _HighwaterGeneratedRequestsPerSecond; + private DateTime _LastAutoLimit; + } public class Pong