Skip to content
This repository has been archived by the owner on May 3, 2022. It is now read-only.

Commit

Permalink
V0.47 Connector Enhancement - auto throttle
Browse files Browse the repository at this point in the history
The connector now detects "rate exceeded" responses from the provider
and  automatically decreases max request/second.  Currently only tested
with BOX
  • Loading branch information
stanyork committed Feb 3, 2015
1 parent 8f464ce commit 6c9d4cf
Showing 1 changed file with 66 additions and 11 deletions.
77 changes: 66 additions & 11 deletions Cloud Elements Connector/CloudElementsConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public CloudElementsConnector()
}

/// <summary>
/// Returns another APIConnector with the same authorization and trace handler
/// Returns another APIConnector with the same authorization, endpoint optons, and trace handler
/// </summary>
/// <returns></returns>
public CloudElementsConnector Clone()
Expand All @@ -129,6 +129,10 @@ public CloudElementsConnector Clone()
}
#endregion

/// <summary>
/// Test the endpoint and sets the endpoint type, enablig endpoint-specific options such as rate throttling
/// </summary>
/// <returns></returns>
public async Task<Pong> Ping()
{
HttpResponseMessage response = await APIExecuteGet("hubs/documents/ping");
Expand All @@ -146,7 +150,7 @@ private void AssureEnpointControlData(string endpointName)
{
EndpointOptions options = new EndpointOptions();
options.LogThrottleDelays = false;
options.RequestsPerSecondWindow = 4;
options.RequestsPerSecondWindow = 3;
switch (endpointName)
{
case "box":
Expand Down Expand Up @@ -710,6 +714,9 @@ async Task<HttpResponseMessage> APIExecuteVerb(HttpVerb verb, string URI)
return await APIExecuteVerb(verb, URI, null);
}

/// <summary>
/// Manages the requests per second, adding delays if/when needed
/// </summary>
private void ThrottleRequestsPerSecond()
{
EndpointOptions options;
Expand Down Expand Up @@ -740,7 +747,7 @@ private void ThrottleRequestsPerSecond()
BaselineRequestAt = RecentRqQ.Peek(); RequestCountSinceBaseline = RecentRqQ.Count + 1;
double TotalSecsSinceBaseline = DateTime.Now.Subtract(BaselineRequestAt).TotalSeconds;
double RecentReqPerSecond = (RequestCountSinceBaseline / TotalSecsSinceBaseline);
if ((RecentReqPerSecond > options.HighwaterGeneratedRequestsPerSecond) && (TotalSecsSinceBaseline > 0.4) )
if ((RecentReqPerSecond > options.HighwaterGeneratedRequestsPerSecond) && (TotalSecsSinceBaseline > 0.5))
{
options.HighwaterGeneratedRequestsPerSecond = RecentReqPerSecond;
if (options.LogHighwaterThroughput) OnDiagTrace(string.Format("ce(throughput) [{0}] reached {1:F2}r/s", Endpoint, RecentReqPerSecond));
Expand Down Expand Up @@ -771,15 +778,22 @@ private void ThrottleRequestsPerSecond()
{
if (options.LogThrottleDelays) OnDiagTrace(string.Format("ce(throttled) [{0}] delayed {1}ms", Endpoint, delayMS));
InstanceThrottleDelayCnt++;
InstanceThrottleDelayMS+= delayMS;
InstanceThrottleDelayMS += delayMS;
System.Threading.Thread.Sleep(delayMS);
}
lock (StaticLockObject)
{
RecentRqQ.Enqueue(DateTime.Now);
RecentRqQ.Enqueue(DateTime.Now);
}
}

/// <summary>
/// Executes a cloud elements request. Every request comes through here
/// </summary>
/// <param name="verb"></param>
/// <param name="URI"></param>
/// <param name="content"></param>
/// <returns></returns>
async Task<HttpResponseMessage> APIExecuteVerb(HttpVerb verb, string URI, HttpContent content)
{
ThrottleRequestsPerSecond();
Expand Down Expand Up @@ -834,8 +848,24 @@ async Task<HttpResponseMessage> APIExecuteVerb(HttpVerb verb, string URI, HttpCo
LastFailureInformation = msgtoken.ToString();
if (providerMsgtoken != null)
{
traceInfo = string.Concat(traceInfo, " - ", providerMsgtoken.ToString());
LastFailureInformation = string.Concat(traceInfo, " - ", providerMsgtoken.ToString());
string providerMessage = providerMsgtoken.ToString();
traceInfo = string.Concat(traceInfo, " - ", providerMessage);
LastFailureInformation = string.Concat(traceInfo, " - ", providerMessage);
if (providerMessage.IndexOf("rate limit exceeded", StringComparison.CurrentCultureIgnoreCase) > 0)
{
EndpointOptions options;
if (EndpointSettings.ContainsKey(Endpoint))
{
options = EndpointSettings[Endpoint];
if ((options.MaxRqPerSecond <= 0) || (options.MaxRqPerSecond > options.HighwaterGeneratedRequestsPerSecond)) options.MaxRqPerSecond = (int)options.HighwaterGeneratedRequestsPerSecond;
if ((options.MaxRqPerSecond > 2) && (DateTime.Now.Subtract(options.LastAutoLimit).TotalSeconds > 1))
{
options.MaxRqPerSecond--;
options.LastAutoLimit = DateTime.Now;
OnDiagTrace(string.Format("ce(throughput) [{0}] rate limit exceeded: inferred new target of {1}r/s", Endpoint, options.MaxRqPerSecond));
}
}
}
}
if (rqIDtoken != null) LastFailureInformation = string.Format("{0}; (Request #{1}, ID {2})", LastFailureInformation, RequestCounter, rqIDtoken.ToString());
}
Expand Down Expand Up @@ -868,19 +898,44 @@ HttpClient NewHttpClient()
#endregion





}

/// <summary>
/// Connector Options that apply to a specific Endpoint Provider (BOX, Google Drive, OneDrive, etc)
/// </summary>
public class EndpointOptions
{
/// <summary>
/// The maximum requests per second. The engine will introduce delays if request rate exceeds this.
/// </summary>
public int MaxRqPerSecond;
/// <summary>
/// The number of seconds over which the average is calculated. Too high a number will allow bursts that exceed the providers limit
/// </summary>
public int RequestsPerSecondWindow;
public double HighwaterGeneratedRequestsPerSecond;
/// <summary>
/// Set by the engine when a new high for requests per second is reached. The value may be higher than expected due to sub-second bursts.
/// </summary>
public double HighwaterGeneratedRequestsPerSecond
{
get { return _HighwaterGeneratedRequestsPerSecond; }
internal set { _HighwaterGeneratedRequestsPerSecond = value; }
}
public DateTime BackoffUntil; // future:
/// <summary>
/// Set to the time at which the connector last detected a "rate exceeded" condition and adjust MaxRqPerSecond
/// </summary>
public DateTime LastAutoLimit
{
get { return _LastAutoLimit; }
internal set { _LastAutoLimit = value; }
}
public bool LogThrottleDelays;
public bool LogHighwaterThroughput;

private double _HighwaterGeneratedRequestsPerSecond;
private DateTime _LastAutoLimit;

}

public class Pong
Expand Down

0 comments on commit 6c9d4cf

Please sign in to comment.