Skip to content

Commit

Permalink
Automatically delete customers
Browse files Browse the repository at this point in the history
  • Loading branch information
carlesarnal committed Jan 18, 2024
1 parent a629db9 commit 00cb92e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,49 @@

import io.apicurio.registry.probe.persistence.CustomerEntity;
import io.apicurio.registry.probe.smoke.ProbeMonitoring;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Multi;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.enterprise.event.Observes;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestSseElementType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@ApplicationScoped
@Path("/consumed-customers")
public class ConsumedCustomersResource {

private static final Logger log = LoggerFactory.getLogger(ProbeMonitoring.class);

@Channel("customers-from-kafka")
Multi<CustomerEntity> customers;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestSseElementType(MediaType.TEXT_PLAIN)
public Multi<String> stream() {
return customers.map(customer -> {
public void startMonitoring(@Observes StartupEvent startupEvent) {
int concurrentTasks = 2;

try {
concurrentTasks = Integer.parseInt(System.getenv("CONCURRENT_TASKS"));
} catch (Exception e) {
log.warn("Cannot load concurrent tasks environment variable", e);
}

ExecutorService e = Executors.newFixedThreadPool(concurrentTasks);

for (int i = 0; i < concurrentTasks; i++) {
e.submit(() -> {
log.info("Removing customers...");
removeCustomers();
});
}
}

public void removeCustomers() {
customers.map(customer -> {
final String stringToReturn = String.format("'%s' from %s", customer.getFirstName(), customer.getEmail());
try {
log.info("Deleting customer with email: {}", customer.getEmail());
customer.delete();
} catch (Exception e) {
log.error("Exception detected in the Probe application: {}", e.getCause(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.apicurio.registry.probe.persistence.CustomerEntity;
import io.apicurio.registry.probe.smoke.ProbeMonitoring;
import io.apicurio.registry.rest.client.RegistryClient;
import io.quarkus.runtime.StartupEvent;
import jakarta.enterprise.event.Observes;
import jakarta.transaction.Transactional;
Expand All @@ -18,8 +17,6 @@ public class LoadGenerator {
private static final Logger log = LoggerFactory.getLogger(ProbeMonitoring.class);

public void startMonitoring(@Observes StartupEvent startupEvent) {


int concurrentTasks = 2;

try {
Expand Down

0 comments on commit 00cb92e

Please sign in to comment.