Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: OSRD goes async #7103

Closed
wants to merge 48 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d7efc9b
core_controller: init project and devenv
ElysaSrc Apr 4, 2024
6dc4b05
editoast: add core rabbitmq client
ElysaSrc Apr 5, 2024
99dc00a
fixup! core_controller: init project and devenv
ElysaSrc Apr 10, 2024
a4383dd
wip: integrating victor's osrdyne
ElysaSrc Jun 17, 2024
d460984
start core
Khoyo Apr 13, 2024
c11e6fb
wip
Khoyo Jun 19, 2024
21bf5ba
wip
Khoyo Jun 23, 2024
e8f99ef
wip on osrdyne
ElysaSrc Jul 3, 2024
3d79074
wip on editoast
ElysaSrc Jul 3, 2024
5c88114
wip on editoast
ElysaSrc Jul 3, 2024
a74f024
wip
ElysaSrc Jul 5, 2024
af2157f
wip core
Khoyo Jul 5, 2024
bd35074
core: more wip
Khoyo Jul 5, 2024
0202165
core: stop computing track section names for blocks ids
Khoyo Jul 5, 2024
b89cb06
wip editoast: attach otel tracing info
Khoyo Jul 5, 2024
b66ff8d
moar wip
Khoyo Jul 6, 2024
02137d8
add dockerignore to osrdyne
Khoyo Jul 7, 2024
3b9146d
add back core to docker compose - in order to build it
Khoyo Jul 7, 2024
a41769a
it works, kinda
Khoyo Jul 7, 2024
862a437
wip
Khoyo Jul 8, 2024
797babb
update codeowners
Khoyo Jul 8, 2024
9657c0f
osrdyne useless import
Khoyo Jul 8, 2024
5f953ee
docker: add back build cache mounts
Khoyo Jul 9, 2024
b1862a6
docker: add back build cache mounts - now osrdyne
Khoyo Jul 9, 2024
22d0f35
osrdyne: start adding the monitoring api, it doesn't work
Khoyo Jul 9, 2024
98807c3
building osrdyne in the ci, maybe, maybe not
ElysaSrc Jul 10, 2024
cb42583
fix api listening address
Khoyo Jul 10, 2024
c8845d2
fixup! fix api listening address
Khoyo Jul 10, 2024
b419aa7
add host mode support
Khoyo Jul 10, 2024
f7bfd9b
unfatfinger editoast port
Khoyo Jul 11, 2024
082552b
remove editoast debug logging
Khoyo Jul 11, 2024
f0c5879
handle core errors properly
Khoyo Jul 11, 2024
f76231a
fixup! wip
Khoyo Jul 11, 2024
e806d1d
fix final newline
Khoyo Jul 11, 2024
7c8e3ff
osrdyne: push amqp uri in kube workers env
Khoyo Jul 11, 2024
ff7208d
orsdyne for k8s goes brrrrrr, try 1
ElysaSrc Jul 12, 2024
117cbf3
build osrdyne
ElysaSrc Jul 12, 2024
31b8cae
fix management config parsing
Khoyo Jul 12, 2024
e429868
docker: add back build cache mounts
Khoyo Jul 9, 2024
78d8dc4
move osrdyne.toml into osrdyne.yml
Khoyo Jul 12, 2024
f23ab87
osrdyne: enable no-ack mode for activity messages
Khoyo Jul 12, 2024
7f2835d
editoast: stop reconstruction db url
Khoyo Jul 12, 2024
75f573e
bah alors, ça compile pas ?
ElysaSrc Jul 12, 2024
a19b8a5
fix issue with osrdyne dockerfile
ElysaSrc Jul 12, 2024
5e2b0b9
removing useless psql values
ElysaSrc Jul 12, 2024
73265d2
fix selector
ElysaSrc Jul 13, 2024
6ac159b
fix labels
ElysaSrc Jul 14, 2024
70c0eeb
Adding container start command k8s driver
ElysaSrc Jul 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
# --- CORE ---
/core @openrailassociation/OSRD-Core

# --- OSRDYNE ---
/osrdyne @openrailassociation/OSRD-DevOps

# --- CORE PHYSICS ---
/core/*Physics*.java @openrailassociation/OSRD-Physics
/core/*Integration*.java @openrailassociation/OSRD-Physics
Expand Down
6 changes: 6 additions & 0 deletions .github/scripts/bake-metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,21 @@ def suffix(self):
TARGETS = [
Target(name="core", image="core", release=True),
Target(name="core-build", image="core", variant="build"),

Target(name="editoast", image="editoast", release=True),
Target(name="editoast-test", image="editoast", variant="test"),

Target(name="front-devel", image="front", variant="devel"),
Target(name="front-nginx", image="front", variant="nginx"),
Target(name="front-build", image="front", variant="build"),
Target(name="front-tests", image="front", variant="tests"),

Target(name="gateway-standalone", image="gateway", variant="standalone"),
Target(name="gateway-test", image="gateway", variant="test"),
Target(name="gateway-front", image="gateway", variant="front", release=True),

Target(name="osrdyne", image="osrdyne", release=True),
Target(name="osrdyne-test", image="osrdyne", variant="test"),
]


Expand Down
16 changes: 16 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ jobs:
- [editoast, editoast-test]
- [gateway-test, gateway-standalone]
- [front-build, front-tests, front-devel, front-nginx]
- [osrdyne, osrdyne-test]
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down Expand Up @@ -71,6 +72,9 @@ jobs:
run: |
set -eo pipefail

# disable cache mounts as github cache is slow
find -name Dockerfile -print0 | xargs -0 sed -Ei 's/--mount=type=cache,target=[^[:blank:]]+//g'

TRANSIENT_FAILURES=(
"failed to solve: failed to compute cache key"
"httpReadSeeker: failed open: failed to authorize: no active session"
Expand Down Expand Up @@ -160,6 +164,18 @@ jobs:
with:
name: front-nginx
path: osrd-front-nginx.tar
- name: Upload osrdyne artifact
uses: actions/upload-artifact@v4
if: steps.bake-metadata.outputs.output_method == 'artifact' && contains(matrix.targets, 'osrdyne')
with:
name: osrdyne
path: osrd-osrdyne.tar
- name: Upload osrdyne-test artifact
uses: actions/upload-artifact@v4
if: steps.bake-metadata.outputs.output_method == 'artifact' && contains(matrix.targets, 'osrdyne-test')
with:
name: osrdyne
path: osrd-osrdyne-test.tar

check_generated_railjson_sync:
runs-on: ubuntu-latest
Expand Down
9 changes: 6 additions & 3 deletions core/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ WORKDIR /home/gradle/src
COPY --from=test_data . /home/gradle/tests/data
COPY --chown=gradle:gradle . .
# Remove broken symlink (to prevent kaniko from crashing)
RUN gradle shadowJar --no-daemon && \
rm -rf /root/.kotlin/daemon/kotlin-daemon.* /tmp/hsperfdata_root/ /tmp/kotlin-daemon.*
RUN --mount=type=cache,target=/home/gradle/.gradle \
--mount=type=cache,target=/home/gradle/src/build \
gradle shadowJar --no-daemon && \
cp build/libs/osrd-all.jar / && \
rm -rf /root/.kotlin/daemon/kotlin-daemon.* /tmp/hsperfdata_root/ /tmp/kotlin-daemon.*

#### Running stage
FROM eclipse-temurin:21 AS running_env

COPY --from=build_env /home/gradle/src/build/libs/osrd-all.jar /app/osrd_core.jar
COPY --from=build_env /osrd-all.jar /app/osrd_core.jar
ADD 'https://dtdg.co/latest-java-tracer' /app/dd-java-agent.jar
ADD 'https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar' /app/opentelemetry-javaagent.jar

Expand Down
3 changes: 3 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ dependencies {
compileOnly libs.spotbugs.annotations
testCompileOnly libs.jcip.annotations
testCompileOnly libs.spotbugs.annotations

// rabbitmq
implementation libs.amqp.client
}
// endregion

Expand Down
2 changes: 2 additions & 0 deletions core/gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ dd-trace-api = { module = 'com.datadoghq:dd-trace-api', version = '1.31.2' }

kaml = { module = 'com.charleskorn.kaml:kaml', version = '0.59.0' } # Apache 2.0

amqp-client = { module = 'com.rabbitmq:amqp-client', version = '5.21.0' }

[plugins]
# kotlin
ksp = { id = 'com.google.devtools.ksp', version.ref = 'ksp' }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,18 +193,6 @@ private fun buildBlockName(
descriptor.signals.map {
"${rawInfra.getLogicalSignalName(it)}-${rawInfra.getSignalingSystemId(it)}"
}
val tracks = mutableListOf<String>()
val trackIds = mutableListOf<DirTrackSectionId>()
for (zonePath in descriptor.path) {
for (chunk in rawInfra.getZonePathChunks(zonePath)) {
val trackName = rawInfra.getTrackSectionName(rawInfra.getTrackFromChunk(chunk.value))
if (tracks.isNotEmpty() && tracks[tracks.size - 1] == trackName) continue
tracks.add(trackName)
trackIds.add(
DirTrackSectionId(rawInfra.getTrackFromChunk(chunk.value), chunk.direction)
)
}
}
val trackNodes = descriptor.path.flatMap { rawInfra.getZonePathMovableElements(it) }
val trackNodeConfig = descriptor.path.flatMap { rawInfra.getZonePathMovableElementsConfigs(it) }
val trackNodeConfigNames = mutableListOf<String>()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package fr.sncf.osrd.reporting.exceptions;

import com.squareup.moshi.Json;

public enum ErrorCause {
@Json(name = "Internal")
INTERNAL,
@Json(name = "User")
USER
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class OSRDError extends RuntimeException {
public Map<String, Object> context = new HashMap<>();

public final transient ErrorType osrdErrorType;
public final transient ErrorCause cause;
public final ErrorCause cause;

/**
* Constructs a new OSRDError with the specified error type.
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/fr/sncf/osrd/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public static void main(String[] args) {
commands.put("api", new ApiServerCommand());
commands.put("standalone-simulation", new StandaloneSimulationCommand());
commands.put("load-infra", new ValidateInfra());
commands.put("worker", new WorkerCommand());

// prepare the command line parser
var argsParserBuilder = JCommander.newBuilder();
Expand Down
236 changes: 236 additions & 0 deletions core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package fr.sncf.osrd.cli

import com.beust.jcommander.Parameter
import com.beust.jcommander.Parameters
import com.rabbitmq.client.*
import fr.sncf.osrd.api.*
import fr.sncf.osrd.api.api_v2.conflicts.ConflictDetectionEndpointV2
import fr.sncf.osrd.api.api_v2.path_properties.PathPropEndpoint
import fr.sncf.osrd.api.api_v2.pathfinding.PathfindingBlocksEndpointV2
import fr.sncf.osrd.api.api_v2.project_signals.SignalProjectionEndpointV2
import fr.sncf.osrd.api.api_v2.standalone_sim.SimulationEndpoint
import fr.sncf.osrd.api.api_v2.stdcm.STDCMEndpointV2
import fr.sncf.osrd.api.pathfinding.PathfindingBlocksEndpoint
import fr.sncf.osrd.api.stdcm.STDCMEndpoint
import fr.sncf.osrd.reporting.warnings.DiagnosticRecorderImpl
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapGetter
import okhttp3.OkHttpClient
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.takes.Request
import java.io.InputStream
import java.util.concurrent.TimeUnit


@Parameters(commandDescription = "RabbitMQ worker mode")
class WorkerCommand : CliCommand {


@Parameter(
names = ["--editoast-url"],
description = "The base URL of editoast (used to query infrastructures)"
)
private var editoastUrl: String? = null

@Parameter(
names = ["--editoast-authorization"],
description = "The HTTP Authorization header sent to editoast"
)
private var editoastAuthorization: String? = null

@Parameter(names = ["-j", "--threads"], description = "The number of threads to serve requests from")
private var threads: Int = 1


val WORKER_ID: String
val WORKER_KEY: String
val WORKER_AMQP_URI: String
val WORKER_POOL: String
val WORKER_REQUESTS_QUEUE: String
val WORKER_ACTIVITY_EXCHANGE: String

init {
// TODO: handle errors more gracefully, etc
WORKER_ID = System.getenv("WORKER_ID")!!
WORKER_KEY = System.getenv("WORKER_KEY")!!
WORKER_AMQP_URI = System.getenv("WORKER_AMQP_URI") ?: "amqp://osrd:[email protected]:5672/%2f"
WORKER_POOL = System.getenv("WORKER_POOL") ?: "core"
WORKER_REQUESTS_QUEUE = System.getenv("WORKER_REQUESTS_QUEUE") ?: "$WORKER_POOL-req-$WORKER_KEY"
WORKER_ACTIVITY_EXCHANGE = System.getenv("WORKER_ACTIVITY_EXCHANGE") ?: "$WORKER_POOL-activity-xchg"
}

override fun run(): Int {
val maxMemory =
String.format("%.2f", Runtime.getRuntime().maxMemory() / (1 shl 30).toDouble())
logger.info("starting the API server with max {}Gi of java heap", maxMemory)

val httpClient = OkHttpClient.Builder().readTimeout(120, TimeUnit.SECONDS).build()

val infraId = WORKER_KEY
val diagnosticRecorder = DiagnosticRecorderImpl(false)
val infraManager = InfraManager(editoastUrl, editoastAuthorization, httpClient, false)
val electricalProfileSetManager =
ElectricalProfileSetManager(editoastUrl, editoastAuthorization, httpClient)

infraManager.load(infraId, null, diagnosticRecorder)

val monitoringType = System.getenv("CORE_MONITOR_TYPE")
if (monitoringType != null) {
logger.info("monitoring type: {}", monitoringType)
// TODO: implement monitoring
}

val tracer = GlobalOpenTelemetry.getTracerProvider().get("WorkerCommand")

val endpoints =
mapOf(
"/pathfinding/routes" to PathfindingBlocksEndpoint(infraManager),
"/v2/pathfinding/blocks" to PathfindingBlocksEndpointV2(infraManager),
"/v2/path_properties" to PathPropEndpoint(infraManager),
"/standalone_simulation" to
StandaloneSimulationEndpoint(infraManager, electricalProfileSetManager),
"/v2/standalone_simulation" to
SimulationEndpoint(infraManager, electricalProfileSetManager),
"/project_signals" to SignalProjectionEndpoint(infraManager),
"/v2/signal_projection" to SignalProjectionEndpointV2(infraManager),
"/detect_conflicts" to ConflictDetectionEndpoint(),
"/v2/conflict_detection" to ConflictDetectionEndpointV2(),
"/cache_status" to InfraCacheStatusEndpoint(infraManager),
"/version" to VersionEndpoint(),
"/stdcm" to STDCMEndpoint(infraManager),
"/v2/stdcm" to STDCMEndpointV2(infraManager),
"/infra_load" to InfraLoadEndpoint(infraManager),
)

val factory = ConnectionFactory()
factory.setUri(WORKER_AMQP_URI)
val connection = factory.newConnection()
connection.createChannel().use { channel ->
reportActivity(channel, "started")
}

val activityChannel = connection.createChannel()
val channel = connection.createChannel()
channel.basicConsume(
WORKER_REQUESTS_QUEUE,
false, mapOf(),
DeliverCallback { consumerTag, message ->
reportActivity(activityChannel, "request-received")

val replyTo = message.properties.replyTo
val correlationId = message.properties.correlationId
val body = message.body
val path = (message.properties.headers["x-rpc-path"] as ByteArray?)?.decodeToString()
if (path == null) {
logger.error("missing x-rpc-path header")
channel.basicReject(message.envelope.deliveryTag, false)
if (replyTo != null) {
// TODO: response format to handle protocol error
channel.basicPublish(
"",
replyTo,
null,
"missing x-rpc-path header".toByteArray()
)
}

return@DeliverCallback
}
logger.info("received request for path {}", path)

val endpoint = endpoints[path]
if (endpoint == null) {
logger.error("unknown path {}", path)
channel.basicReject(message.envelope.deliveryTag, false)
if (replyTo != null) {
// TODO: response format to handle protocol error
channel.basicPublish("", replyTo, null, "unknown path $path".toByteArray())
}

return@DeliverCallback
}

class RabbitMQTextMapGetter : TextMapGetter<Map<String, Any>> {
override fun keys(carrier: Map<String, Any>): Iterable<String> {
return carrier.keys
}

override fun get(carrier: Map<String, Any>?, key: String): String? {
return (carrier?.get(key) as ByteArray?)?.decodeToString()
}
}

val context = GlobalOpenTelemetry.getPropagators().textMapPropagator.extract(
Context.current(),
message.properties.headers,
RabbitMQTextMapGetter()
)
val span = tracer.spanBuilder(path).setParent(context).startSpan()

var payload: ByteArray
var status: ByteArray
try {
span.makeCurrent().use { scope ->
val response = endpoint.act(MQRequest(path, body))
payload = response
.body()
.readAllBytes() // TODO: check the response code too to catch
val httpHeader = response.head().first()
val statusCode = httpHeader.split(" ")[1]
status = (if (statusCode[0] == '2') "ok" else "core_error").encodeToByteArray()
}
} catch (t: Throwable) {
span.recordException(t)
payload = "ERROR, exception received".toByteArray() // TODO: have a valid payload for uncaught exceptions
status = "core_error".encodeToByteArray()
} finally {
span.end()
}

if (replyTo != null) {
val properties = AMQP.BasicProperties().builder()
.correlationId(correlationId)
.headers(mapOf("x-status" to status))
.build()
channel.basicPublish("", replyTo, properties, payload)
}

channel.basicAck(message.envelope.deliveryTag, false)
logger.info("request for path {} processed", path)
}, { _ ->
logger.error("consumer cancelled")
}, { consumerTag, e ->
logger.info("consume shutdown: {}, {}", consumerTag, e.toString())
})

logger.info("consume ended")

while (true) {
Thread.sleep(100)
if (!channel.isOpen()) break
}

return 0
}

private fun reportActivity(activityChannel: Channel, event: String) {
val properties = AMQP.BasicProperties().builder().headers(mapOf("x-event" to event)).build()
activityChannel.basicPublish(WORKER_ACTIVITY_EXCHANGE, WORKER_KEY, properties, null)
}

class MQRequest(private val path: String, private val body: ByteArray) : Request {
override fun head(): MutableIterable<String> {
return mutableListOf("POST $path HTTP/1.1")
}

override fun body(): InputStream {
return body.inputStream()
}
}

companion object {
val logger: Logger = LoggerFactory.getLogger(WorkerCommand::class.java)
}
}
Loading
Loading