Skip to content

Commit

Permalink
Merge pull request #14 from HSLdevcom/feat/passenger_count
Browse files Browse the repository at this point in the history
feat/passenger count
  • Loading branch information
mjaakko authored Jul 27, 2020
2 parents 8df02ae + 9179457 commit 91251d0
Show file tree
Hide file tree
Showing 14 changed files with 529 additions and 56 deletions.
54 changes: 48 additions & 6 deletions src/main/java/fi/hsl/suomenlinna_hfp/HfpProducer.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package fi.hsl.suomenlinna_hfp;

import fi.hsl.suomenlinna_hfp.common.PassengerCountProvider;
import fi.hsl.suomenlinna_hfp.common.VehiclePositionProvider;
import fi.hsl.suomenlinna_hfp.common.model.PassengerCount;
import fi.hsl.suomenlinna_hfp.common.model.VehicleMetadata;
import fi.hsl.suomenlinna_hfp.common.model.VehiclePosition;
import fi.hsl.suomenlinna_hfp.common.utils.MathUtils;
import fi.hsl.suomenlinna_hfp.gtfs.model.Stop;
import fi.hsl.suomenlinna_hfp.gtfs.model.StopTime;
import fi.hsl.suomenlinna_hfp.gtfs.provider.GtfsProvider;
Expand All @@ -21,10 +24,10 @@
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;

public class HfpProducer {
Expand All @@ -37,6 +40,7 @@ public class HfpProducer {

private final GtfsProvider gtfsProvider;
private final VehiclePositionProvider vehiclePositionProvider;
private final PassengerCountProvider passengerCountProvider;
private final HfpPublisher hfpPublisher;

private final BlockingQueue<Throwable> errorQueue = new LinkedBlockingQueue<>();
Expand All @@ -48,12 +52,13 @@ public class HfpProducer {

private Thread thread;

public HfpProducer(Topic.TransportMode transportMode, Map<String, VehicleId> vehicleIdMap, TripProcessor tripProcessor, GtfsProvider gtfsProvider, VehiclePositionProvider vehiclePositionProvider, HfpPublisher hfpPublisher) {
public HfpProducer(Topic.TransportMode transportMode, Map<String, VehicleId> vehicleIdMap, TripProcessor tripProcessor, GtfsProvider gtfsProvider, VehiclePositionProvider vehiclePositionProvider, PassengerCountProvider passengerCountProvider, HfpPublisher hfpPublisher) {
this.transportMode = transportMode;
this.vehicleIdMap = vehicleIdMap;
this.tripProcessor = tripProcessor;
this.gtfsProvider = gtfsProvider;
this.vehiclePositionProvider = vehiclePositionProvider;
this.passengerCountProvider = passengerCountProvider;
this.hfpPublisher = hfpPublisher;
}

Expand Down Expand Up @@ -105,10 +110,11 @@ public void run() throws Throwable {

tripProcessor.processVehiclePosition(vehicleId, vehiclePosition.getCoordinates(), vehiclePosition.getTimestamp());

TripDescriptor tripDescriptor = tripProcessor.getRegisteredTrip(vehicleId);
boolean registeredForTrip = tripDescriptor != null;
TripProcessor.TripAndRouteWithStopTimes trip = tripProcessor.getRegisteredTrip(vehicleId);
TripDescriptor tripDescriptor = trip != null ? trip.getTripDescriptor() : null;
boolean registeredForTrip = trip != null;

if (tripDescriptor == null) {
if (trip == null) {
tripDescriptor = getPetitionTripDescriptor(vehiclePosition);
//Not registered to an actual trip
registeredForTrip = false;
Expand All @@ -123,6 +129,9 @@ public void run() throws Throwable {
int hdg = (int)Math.round(vehiclePosition.getHeading());

if (tripDescriptor != null) {
//Get passenger count for the trip
OptionalInt occu = getPassengerCount(trip);

boolean isAtCurrentStop = false;
Map.Entry<StopTime, Stop> currentStop = null;
String nextStopId = "";
Expand Down Expand Up @@ -150,7 +159,7 @@ public void run() throws Throwable {
tst, tsi, spd, hdg,
vehiclePosition.getCoordinates().getLatitude(), vehiclePosition.getCoordinates().getLongitude(), null, null, null, null,
tripDescriptor.departureDate, null, null, tripDescriptor.startTime, "GPS", isAtCurrentStop ? currentStop.getValue().getId() : null,
tripDescriptor.routeId, 0, vehicleMetadata != null ? vehicleMetadata.getLabel() : null);
tripDescriptor.routeId, occu.orElse(0), vehicleMetadata != null ? vehicleMetadata.getLabel() : null);

hfpPublisher.publish(topic, payload);
} else {
Expand All @@ -172,6 +181,39 @@ public void run() throws Throwable {
}
}


/**
* Gets passenger count for the trip asynchronously. Returns empty value if operation is not yet completed
* @param trip
*/
private OptionalInt getPassengerCount(TripProcessor.TripAndRouteWithStopTimes trip) {
if (passengerCountProvider != null) {
final String stopCode = trip.stops.get(trip.getFirstStopTime().getStopId()).getCode();
final CompletableFuture<PassengerCount> passengerCountFuture = passengerCountProvider.getPassengerCountByStartTimeAndStopCode(trip.getStartTime(), stopCode);

if (passengerCountFuture.isDone()) {
try {
final PassengerCount passengerCount = passengerCountFuture.join();
if (passengerCount == null) {
return OptionalInt.empty();
}

final int passengerCountAsPercentage = MathUtils.percentageAsInteger(MathUtils.clamp(passengerCount.getPercentage(), 0, 1));

return OptionalInt.of(passengerCountAsPercentage);
} catch (Throwable throwable) {
LOG.warn("Failed to fetch passenger count", throwable);
return OptionalInt.empty();
}
} else {
//Passenger count not available yet
return OptionalInt.empty();
}
} else {
return OptionalInt.empty();
}
}

//"Hack" to generate trip descriptors for robot bus 29R when it is running only when requested
//TODO: remove this after robot bus 29R is no longer running or figure out a better way to do this
private static TripDescriptor getPetitionTripDescriptor(VehiclePosition vehiclePosition) {
Expand Down
55 changes: 41 additions & 14 deletions src/main/java/fi/hsl/suomenlinna_hfp/Main.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
package fi.hsl.suomenlinna_hfp;

import com.typesafe.config.*;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import fi.hsl.suomenlinna_hfp.common.PassengerCountProvider;
import fi.hsl.suomenlinna_hfp.common.VehiclePositionProvider;
import fi.hsl.suomenlinna_hfp.digitraffic.provider.*;
import fi.hsl.suomenlinna_hfp.gtfs.provider.*;
import fi.hsl.suomenlinna_hfp.health.*;
import fi.hsl.suomenlinna_hfp.hfp.model.*;
import fi.hsl.suomenlinna_hfp.hfp.publisher.*;
import fi.hsl.suomenlinna_hfp.digitraffic.provider.MqttVesselLocationProvider;
import fi.hsl.suomenlinna_hfp.gtfs.provider.GtfsProvider;
import fi.hsl.suomenlinna_hfp.gtfs.provider.HttpGtfsProvider;
import fi.hsl.suomenlinna_hfp.health.HealthNotificationService;
import fi.hsl.suomenlinna_hfp.health.HealthServer;
import fi.hsl.suomenlinna_hfp.hfp.model.Topic;
import fi.hsl.suomenlinna_hfp.hfp.model.VehicleId;
import fi.hsl.suomenlinna_hfp.hfp.publisher.MqttHfpPublisher;
import fi.hsl.suomenlinna_hfp.lati.provider.LatiPassengerCountProvider;
import fi.hsl.suomenlinna_hfp.sbdrive.provider.PollingVehicleStateProvider;

import java.io.*;
import java.net.http.*;
import java.time.*;
import java.time.temporal.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
import java.io.IOException;
import java.net.http.HttpClient;
import java.time.Duration;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class Main {
public static void main(String[] args) throws Throwable {
Expand Down Expand Up @@ -51,6 +60,7 @@ public static void main(String[] args) throws Throwable {
Map<String, VehicleId> vehicleIdMap = null;
Topic.TransportMode transportMode = null;
VehiclePositionProvider vehiclePositionProvider = null;
PassengerCountProvider passengerCountProvider = null;

switch (configType) {
case SUOMENLINNA:
Expand All @@ -64,6 +74,23 @@ public static void main(String[] args) throws Throwable {
transportMode = Topic.TransportMode.FERRY;
vehiclePositionProvider = new MqttVesselLocationProvider(meriDigitrafficBroker, meriDigitrafficUser, meriDigitrafficPassword, vehicleIdMap.keySet());

if (config.getBoolean("passengerCount.enabled")) {
String endpoint = config.getString("passengerCount.endpoint");

Map<String, String> vesselNameToMmsi = new HashMap<>();
Map<String, Integer> mmsiToMaxPassengerCount = new HashMap<>();
config.getConfigList("passengerCount.vessels").forEach(c -> {
String mmsi = c.getString("mmsi");

String vesselName = c.getString("name");
vesselNameToMmsi.put(vesselName, mmsi);
int maxPassengerCount = c.getInt("maxPassengers");
mmsiToMaxPassengerCount.put(mmsi, maxPassengerCount);
});

passengerCountProvider = new LatiPassengerCountProvider(httpClient, endpoint, vesselNameToMmsi, mmsiToMaxPassengerCount);
}

break;
case SBDRIVE:
vehicleIdMap = config.getConfigList("sbDriveVehicleIdToVehicleId").stream()
Expand All @@ -87,7 +114,7 @@ public static void main(String[] args) throws Throwable {
}
}

new HfpProducer(transportMode, vehicleIdMap, tripProcessor, gtfsProvider, vehiclePositionProvider, mqttHfpPublisher).run();
new HfpProducer(transportMode, vehicleIdMap, tripProcessor, gtfsProvider, vehiclePositionProvider, passengerCountProvider, mqttHfpPublisher).run();
}

private enum ConfigType {
Expand Down
51 changes: 26 additions & 25 deletions src/main/java/fi/hsl/suomenlinna_hfp/TripProcessor.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fi.hsl.suomenlinna_hfp;

import fi.hsl.suomenlinna_hfp.common.model.LatLng;
import fi.hsl.suomenlinna_hfp.common.utils.TimeUtils;
import fi.hsl.suomenlinna_hfp.gtfs.model.Route;
import fi.hsl.suomenlinna_hfp.gtfs.model.Stop;
import fi.hsl.suomenlinna_hfp.gtfs.model.StopTime;
Expand All @@ -14,6 +15,7 @@
import org.slf4j.LoggerFactory;

import java.time.*;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.function.Function;
Expand All @@ -22,8 +24,6 @@
public class TripProcessor {
private static final Logger LOG = LoggerFactory.getLogger(TripProcessor.class);

private static final int ONE_DAY_IN_SECONDS = 24 * 60 * 60;

private Map<VehicleId, TripAndRouteWithStopTimes> registeredTrips = new HashMap<>();
//Time when vehicle registered for the trip
private Map<VehicleId, ZonedDateTime> registrationTimes = new HashMap<>();
Expand Down Expand Up @@ -90,16 +90,8 @@ public void updateGtfsData(GtfsIndex gtfsIndex, ServiceDates serviceDates) {
value = new TreeMap<>();
}

ZonedDateTime startTime;

int departureTimeSeconds = stopTimes.first().getDepartureTime();
if (departureTimeSeconds >= ONE_DAY_IN_SECONDS) {
//If the trip starts at or after midnight add one day to date and minus one day in seconds from start time
//This is needed to get the start time for correct date
startTime = date.plusDays(1).atTime(LocalTime.ofSecondOfDay(departureTimeSeconds - ONE_DAY_IN_SECONDS)).atZone(timezone);
} else {
startTime = date.atTime(LocalTime.ofSecondOfDay(departureTimeSeconds)).atZone(timezone);
}
ZonedDateTime startTime = TimeUtils.gtfsTimeToLocalDateTime(date, departureTimeSeconds).atZone(timezone);

value.put(startTime, new TripAndRouteWithStopTimes(trip,
gtfsIndex.routesById.get(trip.getRouteId()),
Expand All @@ -119,9 +111,8 @@ public void updateGtfsData(GtfsIndex gtfsIndex, ServiceDates serviceDates) {
}
}

public TripDescriptor getRegisteredTrip(VehicleId vehicleId) {
TripAndRouteWithStopTimes trip = registeredTrips.get(vehicleId);
return trip != null ? trip.getTripDescriptor() : null;
public TripAndRouteWithStopTimes getRegisteredTrip(VehicleId vehicleId) {
return registeredTrips.get(vehicleId);
}

public boolean isAtCurrentStop(VehicleId vehicleId) {
Expand Down Expand Up @@ -297,28 +288,38 @@ private void updateStopStatus(VehicleId vehicleId, LatLng position) {
}
}

private static class TripAndRouteWithStopTimes {
private final Trip trip;
private final Route route;
private final LocalDate operatingDate;
private final NavigableMap<Integer, StopTime> stopTimes;
private final Map<String, Stop> stops;
public static class TripAndRouteWithStopTimes {
public final Trip trip;
public final Route route;
public final LocalDate operatingDate;
public final NavigableMap<Integer, StopTime> stopTimes;
public final Map<String, Stop> stops;

public TripAndRouteWithStopTimes(Trip trip, Route route, LocalDate operatingDate, Set<StopTime> stopTimes, Set<Stop> stops) {
private TripAndRouteWithStopTimes(Trip trip, Route route, LocalDate operatingDate, Set<StopTime> stopTimes, Set<Stop> stops) {
this.trip = trip;
this.route = route;
this.operatingDate = operatingDate;
this.stopTimes = stopTimes.stream().collect(Collectors.toMap(StopTime::getStopSequence, Function.identity(), (a, b) -> a, TreeMap::new));
this.stops = stops.stream().collect(Collectors.toMap(Stop::getId, Function.identity()));
}

public StopTime getFirstStopTime() {
return stopTimes.firstEntry().getValue();
}

public LocalDateTime getStartTime() {
return TimeUtils.gtfsTimeToLocalDateTime(operatingDate, getFirstStopTime().getDepartureTime());
}

public TripDescriptor getTripDescriptor() {
return new TripDescriptor(route.getId(),
return new TripDescriptor(
route.getId(),
route.getShortName(),
operatingDate.toString(),
HfpUtils.formatStartTime(stopTimes.firstEntry().getValue().getDepartureTime()),
operatingDate.format(DateTimeFormatter.ISO_LOCAL_DATE),
HfpUtils.formatStartTime(getFirstStopTime().getDepartureTime()),
String.valueOf(trip.getDirectionId() + 1),
trip.getHeadsign());
trip.getHeadsign()
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package fi.hsl.suomenlinna_hfp.common;

import fi.hsl.suomenlinna_hfp.common.model.PassengerCount;

import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;

public interface PassengerCountProvider {
CompletableFuture<PassengerCount> getPassengerCountByStartTimeAndStopCode(LocalDateTime startTime, String stopCode);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package fi.hsl.suomenlinna_hfp.common.model;

import java.util.Objects;

public class PassengerCount {
public final String vehicleId;
public final int currentPassengers;
public final int maxPassengers;

public PassengerCount(String vehicleId, int currentPassengers, int maxPassengers) {
this.vehicleId = vehicleId;
this.currentPassengers = currentPassengers;
this.maxPassengers = maxPassengers;
}

public double getPercentage() {
return currentPassengers / (double)maxPassengers;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PassengerCount that = (PassengerCount) o;
return currentPassengers == that.currentPassengers &&
maxPassengers == that.maxPassengers &&
Objects.equals(vehicleId, that.vehicleId);
}

@Override
public int hashCode() {
return Objects.hash(vehicleId, currentPassengers, maxPassengers);
}

@Override
public String toString() {
return "PassengerCount{" +
"vehicleId='" + vehicleId + '\'' +
", currentPassengers=" + currentPassengers +
", maxPassengers=" + maxPassengers +
'}';
}
}
25 changes: 25 additions & 0 deletions src/main/java/fi/hsl/suomenlinna_hfp/common/utils/MathUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package fi.hsl.suomenlinna_hfp.common.utils;

public class MathUtils {
private MathUtils() {}

/**
* Returns percentage as integer
* @param percentage Percentage 0.0 - 1.0
* @return Integer, 0-100
*/
public static int percentageAsInteger(double percentage) {
return (int)Math.round(percentage * 100);
}

/**
* Clamps value into range [min, max]
* @param value Value
* @param min Min value
* @param max Max value
* @return Value in range [min, max]
*/
public static double clamp(double value, double min, double max) {
return Math.max(min, Math.min(max, value));
}
}
Loading

0 comments on commit 91251d0

Please sign in to comment.