Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WASM loading #320

Merged
merged 5 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/benches/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const MUNICH_COORDS: TileCoords = TileCoords {
pub struct DummyContext;

impl Context for DummyContext {
fn send<T: IntoMessage>(&self, _message: T) -> Result<(), SendError> {
fn send_back<T: IntoMessage>(&self, _message: T) -> Result<(), SendError> {
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion maplibre/src/headless/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub struct HeadlessContext {
}

impl Context for HeadlessContext {
fn send<T: IntoMessage>(&self, message: T) -> Result<(), SendError> {
fn send_back<T: IntoMessage>(&self, message: T) -> Result<(), SendError> {
self.messages.deref().borrow_mut().push(message.into());
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions maplibre/src/io/apc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub enum SendError {
/// Allows sending messages from workers to back to the caller.
pub trait Context: 'static {
/// Send a message back to the caller.
fn send<T: IntoMessage>(&self, message: T) -> Result<(), SendError>;
fn send_back<T: IntoMessage>(&self, message: T) -> Result<(), SendError>;
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -190,7 +190,7 @@ pub struct SchedulerContext {
}

impl Context for SchedulerContext {
fn send<T: IntoMessage>(&self, message: T) -> Result<(), SendError> {
fn send_back<T: IntoMessage>(&self, message: T) -> Result<(), SendError> {
self.sender
.send(message.into())
.map_err(|_e| SendError::Transmission)
Expand Down Expand Up @@ -282,7 +282,7 @@ pub mod tests {
pub struct DummyContext;

impl Context for DummyContext {
fn send<T: IntoMessage>(&self, _message: T) -> Result<(), SendError> {
fn send_back<T: IntoMessage>(&self, _message: T) -> Result<(), SendError> {
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion maplibre/src/raster/process_raster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl<T: RasterTransferables, C: Context> ProcessRasterContext<T, C> {
image_data: RgbaImage,
) -> Result<(), ProcessRasterError> {
self.context
.send(T::LayerRaster::build_from(*coords, layer_name, image_data))
.send_back(T::LayerRaster::build_from(*coords, layer_name, image_data))
.map_err(|e| ProcessRasterError::Processing(Box::new(e)))
}
}
Expand Down
2 changes: 1 addition & 1 deletion maplibre/src/raster/request_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub fn fetch_raster_apc<K: OffscreenKernel, T: RasterTransferables, C: Context +
log::error!("{e:?}");

context
.send(<T as RasterTransferables>::LayerRasterMissing::build_from(
.send_back(<T as RasterTransferables>::LayerRasterMissing::build_from(
coords,
))
.map_err(ProcedureError::Send)?;
Expand Down
8 changes: 4 additions & 4 deletions maplibre/src/vector/process_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl<T: VectorTransferables, C: Context> ProcessVectorContext<T, C> {

fn tile_finished(&mut self, coords: &WorldTileCoords) -> Result<(), ProcessVectorError> {
self.context
.send(T::TileTessellated::build_from(*coords))
.send_back(T::TileTessellated::build_from(*coords))
.map_err(|e| ProcessVectorError::SendError(e))
}

Expand All @@ -135,7 +135,7 @@ impl<T: VectorTransferables, C: Context> ProcessVectorContext<T, C> {
layer_name: &str,
) -> Result<(), ProcessVectorError> {
self.context
.send(T::LayerMissing::build_from(*coords, layer_name.to_owned()))
.send_back(T::LayerMissing::build_from(*coords, layer_name.to_owned()))
.map_err(|e| ProcessVectorError::SendError(e))
}

Expand All @@ -147,7 +147,7 @@ impl<T: VectorTransferables, C: Context> ProcessVectorContext<T, C> {
layer_data: tile::Layer,
) -> Result<(), ProcessVectorError> {
self.context
.send(T::LayerTessellated::build_from(
.send_back(T::LayerTessellated::build_from(
*coords,
buffer,
feature_indices,
Expand All @@ -162,7 +162,7 @@ impl<T: VectorTransferables, C: Context> ProcessVectorContext<T, C> {
geometries: Vec<IndexedGeometry<f64>>,
) -> Result<(), ProcessVectorError> {
self.context
.send(T::LayerIndexed::build_from(
.send_back(T::LayerIndexed::build_from(
*coords,
TileIndex::Linear { list: geometries },
))
Expand Down
2 changes: 1 addition & 1 deletion maplibre/src/vector/request_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub fn fetch_vector_apc<K: OffscreenKernel, T: VectorTransferables, C: Context +
log::error!("{e:?}");
for to_load in &fill_layers {
context
.send(<T as VectorTransferables>::LayerMissing::build_from(
.send_back(<T as VectorTransferables>::LayerMissing::build_from(
coords,
to_load.to_string(),
))
Expand Down
1 change: 1 addition & 0 deletions web/lib/build.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ if (multithreaded) {
let baseConfig = {
platform: "browser",
bundle: true,
minify: release,
assetNames: "assets/[name]",
define: {
WEBGL: `${webgl}`,
Expand Down
2 changes: 1 addition & 1 deletion web/lib/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export const startMapLibre = async (wasmPath: string | undefined, workerPath: st
preventDefaultTouchActions();

if (MULTITHREADED) {
const MEMORY = 209715200; // 200MB
const MEMORY = 900 * 1024 * 1024; // 900 MB
const PAGES = 64 * 1024;

const memory = new WebAssembly.Memory({initial: 1024, maximum: MEMORY / PAGES, shared: true})
Expand Down
8 changes: 3 additions & 5 deletions web/lib/src/multithreaded/multithreaded-pool.worker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as maplibre from "../wasm/maplibre"

type MessageData = {type: 'wasm_init', module: WebAssembly.Module, memory: WebAssembly.Memory}
| {type: 'call', work_ptr: number}
| {type: 'pool_call', work_ptr: number}

let initialised: Promise<maplibre.InitOutput> = null

Expand All @@ -17,18 +17,16 @@ onmessage = async (message: MessageEvent<MessageData>) => {
const data = message.data;
const module = data.module;
const memory = data.memory;
const initialised = maplibre.default(module, memory).catch(err => {
initialised = maplibre.default(module, memory).catch(err => {
// Propagate to main `onerror`:
setTimeout(() => {
throw err;
});
// Rethrow to keep promise rejected and prevent execution of further commands:
throw err;
});
} else if (type === 'call') {
} else if (type === 'pool_call') {
const work_ptr = message.data.work_ptr; // because memory is shared, this pointer is valid in the memory of the main thread and this worker thread
// This will queue further commands up until the module is fully initialised:
await initialised;

const process_data: (msg: any) => Promise<void> = maplibre["multithreaded_process_data"]

Expand Down
14 changes: 9 additions & 5 deletions web/src/platform/multithreaded/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use std::{cell::RefCell, rc::Rc};

use js_sys::Promise;
use rand::prelude::*;
use wasm_bindgen::prelude::*;
use web_sys::Worker;
Expand All @@ -17,8 +16,10 @@ extern "C" {
fn new_worker() -> JsValue;
}

pub type PinnedFuture = std::pin::Pin<Box<(dyn std::future::Future<Output = ()> + 'static)>>;

type NewWorker = Box<dyn Fn() -> Result<Worker, WebError>>;
type Execute = Box<dyn (FnOnce() -> Promise) + Send>;
type Execute = Box<dyn (FnOnce() -> PinnedFuture) + Send>;

pub struct WorkerPool {
new_worker: NewWorker,
Expand All @@ -44,7 +45,7 @@ pub struct Work {
}

impl Work {
pub fn execute(self) -> Promise {
pub fn execute(self) -> PinnedFuture {
(self.func)()
}
}
Expand Down Expand Up @@ -139,13 +140,16 @@ impl WorkerPool {
///
/// Returns any error that may happen while a JS web worker is created and a
/// message is sent to it.
pub fn execute(&self, f: impl (FnOnce() -> Promise) + Send + 'static) -> Result<(), WebError> {
pub fn execute(
&self,
f: impl (FnOnce() -> PinnedFuture) + Send + 'static,
) -> Result<(), WebError> {
let worker = self.worker()?;
let work = Work { func: Box::new(f) };
let work_ptr = Box::into_raw(Box::new(work));
match worker.post_message(
&js_sys::Object::from_entries(&js_sys::Array::of2(
&js_sys::Array::of2(&JsValue::from("type"), &js_sys::JsString::from("call")),
&js_sys::Array::of2(&JsValue::from("type"), &js_sys::JsString::from("pool_call")),
&js_sys::Array::of2(&JsValue::from("work_ptr"), &JsValue::from(work_ptr as u32)),
))
.expect("can not fail"),
Expand Down
7 changes: 1 addition & 6 deletions web/src/platform/multithreaded/pool_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,7 @@ impl Scheduler for WebWorkerPoolScheduler {
T: Future<Output = ()> + 'static,
{
self.pool
.execute(move || {
wasm_bindgen_futures::future_to_promise(async move {
future_factory().await;
Ok(JsValue::undefined())
})
})
.execute(move || Box::pin(future_factory()))
.map_err(|e| ScheduleError::Scheduling(Box::new(e)))
}
}
8 changes: 2 additions & 6 deletions web/src/platform/multithreaded/wasm_entries.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use maplibre::io::apc::CallError;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;

use crate::{platform::multithreaded::pool::Work, JSError};

/// Entry point invoked by the worker.
#[wasm_bindgen]
pub async fn multithreaded_process_data(work_ptr: *mut Work) -> Result<(), JSError> {
let work = unsafe { Box::from_raw(work_ptr) };
JsFuture::from(work.execute())
.await
.map_err(|_e| CallError::Schedule)?;
let work: Box<Work> = unsafe { Box::from_raw(work_ptr) };
work.execute().await;
Ok(())
}
2 changes: 1 addition & 1 deletion web/src/platform/singlethreaded/apc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub struct PassingContext {
}

impl Context for PassingContext {
fn send<T: IntoMessage>(&self, message: T) -> Result<(), SendError> {
fn send_back<T: IntoMessage>(&self, message: T) -> Result<(), SendError> {
let message = message.into();
let tag = if WebMessageTag::LayerRaster.dyn_clone().as_ref() == message.tag() {
&WebMessageTag::LayerRaster
Expand Down
Loading