Skip to content

Commit

Permalink
Added logs, exit on error (#143) (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mingela authored Oct 15, 2019
1 parent 197263c commit 2787ed5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import java.util.concurrent.ConcurrentMap;
import jp.co.soramitsu.iroha.java.IrohaAPI;
import jp.co.soramitsu.iroha.java.QueryAPI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrvsIrohaChainListener implements Closeable {

private static final String BRVS_QUEUE_RMQ_NAME = "brvs";
private static final Logger logger = LoggerFactory.getLogger(BrvsIrohaChainListener.class);

private final IrohaAPI irohaAPI;
// BRVS keypair to query Iroha
Expand Down Expand Up @@ -64,6 +67,7 @@ public Set<TransactionBatch> getAllPendingTransactions(Iterable<String> accounts
accountsToMonitor.forEach(account ->
pendingTransactions.addAll(getPendingTransactions(account, userKeyPair))
);
logger.info("Got " + pendingTransactions.size() + " pending batches from Iroha");
return pendingTransactions;
}

Expand All @@ -75,14 +79,27 @@ public Set<TransactionBatch> getAllPendingTransactions(Iterable<String> accounts
* @return list of user transactions that are in pending state
*/
private List<TransactionBatch> getPendingTransactions(String accountId, KeyPair keyPair) {
queryAPIMap.putIfAbsent(accountId, new QueryAPI(irohaAPI, accountId, keyPair));
return constructBatches(
queryAPIMap.get(accountId)
getQueryApiFor(accountId, keyPair)
.getPendingTransactions()
.getTransactionsList()
);
}

/**
* Returns a relevant query api instance
*
* @param accountId user that transactions should be queried for
* @param keyPair user keypair
* @return user {@link QueryAPI} instance
*/
private synchronized QueryAPI getQueryApiFor(String accountId, KeyPair keyPair) {
if (!queryAPIMap.containsKey(accountId)) {
queryAPIMap.put(accountId, new QueryAPI(irohaAPI, accountId, keyPair));
}
return queryAPIMap.get(accountId);
}

/**
* Converts Iroha transactions to brvs batches
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package iroha.validation.transactions.provider.impl;

import static com.d3.commons.util.ThreadUtilKt.createPrettyScheduledThreadPool;
import static com.d3.commons.util.ThreadUtilKt.createPrettySingleThreadPool;

import com.google.common.base.Strings;
Expand All @@ -31,7 +32,6 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -48,7 +48,9 @@ public class BasicTransactionProvider implements TransactionProvider {
private final RegistrationProvider registrationProvider;
private final BlockStorage blockStorage;
private final BrvsIrohaChainListener irohaReliableChainListener;
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService executor = createPrettyScheduledThreadPool(
"brvs", "pending-processor"
);
private final Scheduler blockScheduler = Schedulers.from(createPrettySingleThreadPool(
"brvs", "block-processor"
));
Expand Down Expand Up @@ -102,17 +104,20 @@ public synchronized Observable<TransactionBatch> getPendingTransactionsStreaming
}

private void monitorIrohaPending() {
irohaReliableChainListener
.getAllPendingTransactions(registrationProvider.getRegisteredAccounts())
.forEach(transactionBatch -> {
// if only BRVS signatory remains
if (isBatchSignedByUsers(transactionBatch)) {
if (savedMissingInStorage(transactionBatch)) {
try {
irohaReliableChainListener
.getAllPendingTransactions(registrationProvider.getRegisteredAccounts())
.forEach(transactionBatch -> {
// if only BRVS signatory remains
if (isBatchSignedByUsers(transactionBatch) && savedMissingInStorage(transactionBatch)) {
cacheProvider.put(transactionBatch);
}
}
}
);
);
} catch (Throwable t) {
logger.error("Pending transactions monitor encountered an error", t);
System.exit(1);
}
}

private boolean isBatchSignedByUsers(TransactionBatch transactionBatch) {
Expand Down

0 comments on commit 2787ed5

Please sign in to comment.