Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New priority queue & exponential backoff client #4745

Closed
wants to merge 9 commits into from

Conversation

Murderlon
Copy link
Member

@Murderlon Murderlon commented Oct 18, 2023

Problems

  • Users regularly report issues with big upload batches failing because one thing went wrong which isn't retried
  • It's hard to add retries everywhere as our rate-limited priority queue doesn't handle retrying and rate limiting. Instead it exposes methods to do so, but the API is hard to understand and requires a lot of thinking and duplication every time you want to retry.
  • The idea of a "rate-limited priority queue" is not something that theoretically exists. We are mixing concepts such as retrying, priorities queues, rate limiting while those don't belong together into one abstraction. This poses further problems, such rate limiting (slowing down) the queue because one request started to receive 429 (Too Many Requests), while there may be other requests (such as to Companion) which do not have this problem.
  • The queue is re-implemented in every uploader plugin, leading to more duplication while such a central piece to uploading could/should belong in core.

Consider this example, which is already duplicated between tus and S3 multipart, and shows the problem of needing to manually orchestrate setTimeout's and calling rateLimit with the right values. It also keeps track of a retryDelayIterator and a previousRetryDelay to use the timeout for the timeout.

Because it's tightly coupled, it's hard to reuse this and regardless it exposes complexities which should be hidden away.

async #shouldRetry (err, retryDelayIterator) {
const requests = this.#requests
const status = err?.source?.status
// TODO: this retry logic is taken out of Tus. We should have a centralized place for retrying,
// perhaps the rate limited queue, and dedupe all plugins with that.
if (status == null) {
return false
}
if (status === 403 && err.message === 'Request has expired') {
if (!requests.isPaused) {
// We don't want to exhaust the retryDelayIterator as long as there are
// more than one request in parallel, to give slower connection a chance
// to catch up with the expiry set in Companion.
if (requests.limit === 1 || this.#previousRetryDelay == null) {
const next = retryDelayIterator.next()
if (next == null || next.done) {
return false
}
// If there are more than 1 request done in parallel, the RLQ limit is
// decreased and the failed request is requeued after waiting for a bit.
// If there is only one request in parallel, the limit can't be
// decreased, so we iterate over `retryDelayIterator` as we do for
// other failures.
// `#previousRetryDelay` caches the value so we can re-use it next time.
this.#previousRetryDelay = next.value
}
// No need to stop the other requests, we just want to lower the limit.
requests.rateLimit(0)
await new Promise(resolve => setTimeout(resolve, this.#previousRetryDelay))
}
} else if (status === 429) {
// HTTP 429 Too Many Requests => to avoid the whole download to fail, pause all requests.
if (!requests.isPaused) {
const next = retryDelayIterator.next()
if (next == null || next.done) {
return false
}
requests.rateLimit(next.value)
}
} else if (status > 400 && status < 500 && status !== 409) {
// HTTP 4xx, the server won't send anything, it's doesn't make sense to retry
return false
} else if (typeof navigator !== 'undefined' && navigator.onLine === false) {
// The navigator is offline, let's wait for it to come back online.
if (!requests.isPaused) {
requests.pause()
window.addEventListener('online', () => {
requests.resume()
}, { once: true })
}
} else {
// Other error code means the request can be retried later.
const next = retryDelayIterator.next()
if (next == null || next.done) {
return false
}
await new Promise(resolve => setTimeout(resolve, next.value))
}
return true
}

Solution

Separation of concerns: a priority queue, which does that and nothing else, and a fetcher util with exponential backoff built-in. Conceptually, the queue shouldn't care if a retry is needed or not, it just adds and removes promises when they resolve/reject. It is the promise inside the queue that should not resolve/reject until it has retried.

  • Adds p-queue to @uppy/core as a centralised priority queue.
  • Create fetcher in @uppy/utils, a wrapper around XMLHttpRequest with exponential backoff.
  • Refactor @uppy/xhr-upload to leverage both.

Notes

  • This PR is also starting to phase out EventManager, which is unnecessary abstraction as far as I'm concerned.
  • The removal of RateLimitedQueue is not in this diff yet as the other packages still depend on it.

Breaking changes

  • Because both core and the uploader package in use need to be upgraded at the same time, it's conceptually already a breaking change. If you upgrade one but not the other it doesn't work.
  • Pausing individual files is no longer possible.

Pitfalls

  • Why not let fetcher use fetch? I started this refactor with using ky but I found out that fetch can't consistently measure upload progress (it can do download). There is theoretically a way by turning your file into a ReadableStream or pushing it through a TransformStream and using the duplex: 'half' option on fetch. However, that won't work for FormData (the default of xhr-upload) and it will only work over HTTP/2.0 or 3.0 (which S3 doesn't use)....

Adoption plan

It's important that there is not one person who feels comfortable with the queue and retrying. Therefor I propose the following:

  • @arturi refactors @uppy/tus (since tus handles uploading internally, only the queue needs to be swapped)
  • @Murderlon refactors @uppy/aws-s3-multipart (not easy)
  • @mifi refactors @uppy/companion-client. Doesn't have a queue but would need to use the new fetcher util for retrying. Note that in Companion+client stability fixes, error handling and retry #4734 custom logic with p-retry is added, but this should only be temporarily and we should end up with one util that instead of different implementations.

@Murderlon Murderlon self-assigned this Oct 18, 2023
* main:
  meta: fix js2ts check
  meta: add support for TypeScript plugins (#4640)
@Murderlon Murderlon added the 4.0 For the 4.0 major version label Oct 24, 2023
@arturi arturi mentioned this pull request Oct 25, 2023
38 tasks
Copy link
Contributor

@mifi mifi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice simplifications! makes it so much easier to follow code when we have a simple promise based queue like p-queue

this.requests = this.opts[internalRateLimitedQueue]
} else {
this.requests = new RateLimitedQueue(this.opts.limit)
this.uppy.queue.concurrency = this.opts.limit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems odd for a plugin to mutate a deep property on the central uppy instance like this. what if more plugins were to do this, won't it lead to undeterministic behaviour?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the only way to do backwards compatibility. If we want to do breaking changes for this api as well, that should be discussed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok, so before this PR we had one queue per plugin, but now the goal is to have a single queue. I think this has the side-effect that when uploading to multiple destinations, there will be lower performance, because max 6 requests will be enforced in total (as opposed to max 6 requests per destination). But I guess it's an acceptable trade-off in order to get maintainable code. In the future we might want to implement a multi-queue which dispatches requests to the corresponding queue based on which domain name the request will connect to, as I believe that's what the browsers 6 connection limit is based on (per domain)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support multiple uploaders, so multiple destinations can only mean to Companion or your own backend for signing, which is only a thing on multipart. I wouldn't overcomplicate the queue in core for one upload plugin. Instead I think it's best we wait till S3 supports HTTP/2.0 and we can have high limits everywhere.

...options,
onTimeout: () => {
const seconds = Math.ceil(options.timeout / 1000)
const error = new Error(this.i18n('uploadStalled', { seconds }))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we should put i18n strings in error messages. Error objects are not user-facing but developer-facing. Having error messages in different languages could make debugging production issues harder.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was already the case, I just moved the code. It is user facing though, people view this message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see. I just don't think that Error objects (and their message) should be user facing, but yea we can fix it in a diferent pr

* @returns {AbortController}
* The abort signal.
*/
export function getUppyAbortController(uppy, id, additionalSignal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
export function getUppyAbortController(uppy, id, additionalSignal) {
export function createUppyAbortController(uppy, id, additionalSignal) {


xhr.send(formData)
await this.uppy.queue.add(async () => {
return uppyFetch(endpoint, { ...this.opts, body, signal })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't we just call uppyFetch here? Why do we need it to return a function?

Suggested change
return uppyFetch(endpoint, { ...this.opts, body, signal })
return this.#uppyFetch(files)(endpoint, { ...this.opts, body, signal })

})

if (id) {
uppy.on('file-removed', (file) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't this leak event listeners? we never uppy.off all of these events as far as i can see?

const host = getSocketHost(file.remote.companionUrl)
const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
const { capabilities } = this.uppy.getState()
const deferred = defer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need defer here? can't we just use the new Promise constructor? I thought defer was considered an anti-pattern

Copy link
Member Author

@Murderlon Murderlon Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly because we need to return the promise itself in the queue add:

    this.uppy.queue.add(() => {
      if (file.isPaused) {
        socket.send('pause', {})
      } else {
        socket.open()
      }
      return deferred.promise
    }, { signal: controller.signal })

But could probably be rewritten

'content-type',
'uppy-auth-token',
]
const fallbackAllowedHeaders = ['accept', 'content-type', 'uppy-auth-token']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a lot of unrelated changes - makes it hard to review. we should get our eslint config and prettier fixed

* @returns {Promise<XMLHttpRequest>}
* A Promise that resolves to the response text if the request succeeds, and rejects with an error if it fails.
*/
export function fetcher(url, options = {}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you tried to use ky first but we need upload progress which is not supported. I think if we are implementing our own request abstraction, ideally it should have unit tests because it becomes such an important core component of Uppy. Could we alternatively look into using axios instead of implementing our own wrapper? axios is extremely popular and seems to support upload events too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer our own abstraction over a library, especially since it's mostly just a promise wrapper. But your right about tests!

@Murderlon
Copy link
Member Author

Closing this. Breaking up the relevant parts into smaller PRs. Not doing the queue for now.

@Murderlon Murderlon closed this Apr 16, 2024
@Murderlon Murderlon deleted the new-priority-queue branch April 16, 2024 10:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
4.0 For the 4.0 major version
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants