Skip to content

Commit

Permalink
Running oplog processor plugins (#1095)
Browse files Browse the repository at this point in the history
* Running oplog processor plugins, WIP

* Running oplog processor plugins, WIP

* Running oplog processor plugins

* Format

* Rebuilt rust test components and added test oplog processor

* Function name parsing fix

* Fixes

* Fix

* Fix

---------

Co-authored-by: Afsal Thaj <[email protected]>
  • Loading branch information
vigoo and afsalthaj authored Dec 3, 2024
1 parent e86093d commit a9a0e4f
Show file tree
Hide file tree
Showing 125 changed files with 22,887 additions and 332 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ exclude = [
"test-components/key-value-service",
"test-components/logging",
"test-components/networking",
"test-components/oplog-processor",
"test-components/option-service",
"test-components/promise",
"test-components/read-stdin",
Expand Down Expand Up @@ -104,7 +105,7 @@ futures-core = "0.3.29"
futures-util = "0.3.29"
git-version = "0.3.9"
golem-wasm-ast = "1.0.1"
golem-wasm-rpc = { version = "1.0.7", default-features = false, features = [
golem-wasm-rpc = { version = "1.0.12", default-features = false, features = [
"host",
] }
hex = "0.4.3"
Expand Down
2 changes: 1 addition & 1 deletion golem-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ glob = "0.3.1"
golem-examples = "1.0.7"
golem-wasm-ast = { workspace = true }
golem-wasm-rpc = { workspace = true }
golem-wasm-rpc-stubgen = { version = "1.0.7", optional = true }
golem-wasm-rpc-stubgen = { version = "1.0.12", optional = true }
h2 = "0.3.24"
http = { workspace = true }
humansize = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion golem-cli/tests/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ fn text_component_list(
+----------------------------------------------------+-------------------------------+---------+-------+---------------+
| URN | Name | Version | Size | Exports count |
+----------------------------------------------------+-------------------------------+---------+-------+---------------+
| {} | {} | 0 | 71228 | 2 |
| {} | {} | 0 | 70000 | 2 |
+----------------------------------------------------+-------------------------------+---------+-------+---------------+
",
component.component_urn,
Expand Down
80 changes: 79 additions & 1 deletion golem-common/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use golem_api_grpc::proto::golem::shardmanager::{
Pod as GrpcPod, RoutingTable as GrpcRoutingTable, RoutingTableEntry as GrpcRoutingTableEntry,
};
use golem_api_grpc::proto::golem::worker::Cursor;
use golem_wasm_ast::analysis::analysed_type::{field, record, s64};
use golem_wasm_ast::analysis::analysed_type::{
field, list, r#enum, record, s64, str, tuple, u32, u64,
};
use golem_wasm_ast::analysis::{analysed_type, AnalysedType};
use golem_wasm_rpc::IntoValue;
use poem::http::Uri;
Expand Down Expand Up @@ -242,6 +244,22 @@ impl From<u64> for Timestamp {
}
}

impl IntoValue for Timestamp {
fn into_value(self) -> golem_wasm_rpc::Value {
let d = self
.0
.duration_since(iso8601_timestamp::Timestamp::UNIX_EPOCH);
golem_wasm_rpc::Value::Record(vec![
golem_wasm_rpc::Value::U64(d.whole_seconds() as u64),
golem_wasm_rpc::Value::U32(d.subsec_nanoseconds() as u32),
])
}

fn get_type() -> AnalysedType {
record(vec![field("seconds", u64()), field("nanoseconds", u32())])
}
}

pub type ComponentVersion = u64;

#[derive(Clone, Debug, Eq, PartialEq, Hash, Encode, Decode, Serialize, Deserialize, Object)]
Expand Down Expand Up @@ -1005,6 +1023,30 @@ impl WorkerMetadata {
}
}

impl IntoValue for WorkerMetadata {
fn into_value(self) -> golem_wasm_rpc::Value {
golem_wasm_rpc::Value::Record(vec![
self.worker_id.into_value(),
self.args.into_value(),
self.env.into_value(),
self.last_known_status.status.into_value(),
self.last_known_status.component_version.into_value(),
0u64.into_value(), // retry count could be computed from the worker status record here but we don't support it yet
])
}

fn get_type() -> AnalysedType {
record(vec![
field("worker-id", WorkerId::get_type()),
field("args", list(str())),
field("env", list(tuple(vec![str(), str()]))),
field("status", WorkerStatus::get_type()),
field("component-version", u64()),
field("retry-count", u64()),
])
}
}

#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
pub struct WorkerResourceDescription {
pub created_at: Timestamp,
Expand Down Expand Up @@ -1268,6 +1310,32 @@ impl From<WorkerStatus> for i32 {
}
}

impl IntoValue for WorkerStatus {
fn into_value(self) -> golem_wasm_rpc::Value {
match self {
WorkerStatus::Running => golem_wasm_rpc::Value::Enum(0),
WorkerStatus::Idle => golem_wasm_rpc::Value::Enum(1),
WorkerStatus::Suspended => golem_wasm_rpc::Value::Enum(2),
WorkerStatus::Interrupted => golem_wasm_rpc::Value::Enum(3),
WorkerStatus::Retrying => golem_wasm_rpc::Value::Enum(4),
WorkerStatus::Failed => golem_wasm_rpc::Value::Enum(5),
WorkerStatus::Exited => golem_wasm_rpc::Value::Enum(6),
}
}

fn get_type() -> AnalysedType {
r#enum(&[
"running",
"idle",
"suspended",
"interrupted",
"retrying",
"failed",
"exited",
])
}
}

#[derive(Clone, Debug, PartialEq, Encode, Decode)]
pub enum WorkerInvocation {
ExportedFunction {
Expand Down Expand Up @@ -1414,6 +1482,16 @@ impl Display for AccountId {
}
}

impl IntoValue for AccountId {
fn into_value(self) -> golem_wasm_rpc::Value {
golem_wasm_rpc::Value::Record(vec![golem_wasm_rpc::Value::String(self.value)])
}

fn get_type() -> AnalysedType {
record(vec![field("value", str())])
}
}

pub trait HasAccountId {
fn account_id(&self) -> AccountId;
}
Expand Down
33 changes: 32 additions & 1 deletion golem-common/src/model/oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use bincode::enc::write::Writer;
use bincode::enc::Encoder;
use bincode::error::{DecodeError, EncodeError};
use bincode::{BorrowDecode, Decode, Encode};
use golem_wasm_ast::analysis::analysed_type::u64;
use golem_wasm_ast::analysis::analysed_type::{r#enum, u64};
use golem_wasm_ast::analysis::AnalysedType;
use golem_wasm_rpc::{IntoValue, Value};
use poem_openapi::{Enum, NewType};
Expand Down Expand Up @@ -230,6 +230,16 @@ impl Display for WorkerResourceId {
}
}

impl IntoValue for WorkerResourceId {
fn into_value(self) -> Value {
Value::U64(self.0)
}

fn get_type() -> AnalysedType {
u64()
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
pub struct IndexedResourceKey {
pub resource_name: String,
Expand Down Expand Up @@ -268,6 +278,27 @@ pub enum LogLevel {
Critical,
}

impl IntoValue for LogLevel {
fn into_value(self) -> Value {
match self {
LogLevel::Stdout => Value::Enum(0),
LogLevel::Stderr => Value::Enum(1),
LogLevel::Trace => Value::Enum(2),
LogLevel::Debug => Value::Enum(3),
LogLevel::Info => Value::Enum(4),
LogLevel::Warn => Value::Enum(5),
LogLevel::Error => Value::Enum(6),
LogLevel::Critical => Value::Enum(7),
}
}

fn get_type() -> AnalysedType {
r#enum(&[
"stdout", "stderr", "trace", "debug", "info", "warn", "error", "critical",
])
}
}

#[derive(Clone, Debug, PartialEq, Encode, Decode)]
pub enum OplogEntry {
CreateV1 {
Expand Down
Loading

0 comments on commit a9a0e4f

Please sign in to comment.