Skip to content

Commit

Permalink
Reordered user account registration with quorum modifications (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mingela committed Oct 29, 2019
1 parent 4bdeabb commit 9d42d3c
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import iroha.validation.validators.Validator;
import java.util.Objects;

/**
* Simple data structure used to aggregate all significant modules of the brvs in a one place
*/
public class ValidationServiceContext {

private final Validator validator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.container.ContainerResponseFilter;

/**
* Useful utility class to provide CORS-friendly interaction
*/
public class CrossDomainFilter implements ContainerResponseFilter {

/**
* {@inheritDoc}
*/
@Override
public void filter(ContainerRequestContext creq, ContainerResponseContext cres) {
cres.getHeaders().add("Access-Control-Allow-Origin", "*");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Wrapper class that reuses {@link ReliableIrohaChainListener4J} and provides additional
* functionality to make friendly processable abstraction on top of Iroha batches
*/
public class BrvsIrohaChainListener implements Closeable {

private static final String BRVS_QUEUE_RMQ_NAME = "brvs";
Expand All @@ -47,7 +51,7 @@ public BrvsIrohaChainListener(
Objects.requireNonNull(queryAPI, "Query API must not be null");
Objects.requireNonNull(userKeyPair, "User Keypair must not be null");

irohaChainListener = new ReliableIrohaChainListener4J(rmqConfig, BRVS_QUEUE_RMQ_NAME,false);
irohaChainListener = new ReliableIrohaChainListener4J(rmqConfig, BRVS_QUEUE_RMQ_NAME, false);
this.irohaAPI = queryAPI.getApi();
this.brvsAccountId = queryAPI.getAccountId();
this.brvsKeyPair = queryAPI.getKeyPair();
Expand Down Expand Up @@ -126,6 +130,11 @@ private List<TransactionBatch> constructBatches(List<Transaction> transactions)
return transactionBatches;
}

/**
* Iroha blocks observable entrypoint
*
* @return {@link Observable} of {@link BlockSubscription}
*/
public Observable<BlockSubscription> getBlockStreaming() {
return irohaChainListener.getBlockObservable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,4 @@ public BinaryTransaction(String hexString) {
Objects.requireNonNull(hexString);
this.hexString = hexString;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Core BRVS service abstraction impl
*/
public class ValidationServiceImpl implements ValidationService {

private static Logger logger = LoggerFactory.getLogger(ValidationServiceImpl.class);
Expand Down Expand Up @@ -73,6 +76,12 @@ public void verifyTransactions() {
);
}

/**
* Calls relevant validators and rules for each passed user transaction
*
* @param transactionBatch user related {@link TransactionBatch}
* @return the same {@link TransactionBatch} that was passed as an argument
*/
private TransactionBatch processTransactionBatch(TransactionBatch transactionBatch) {
final List<String> hex = ValidationUtils.hexHash(transactionBatch);
try {
Expand All @@ -93,6 +102,9 @@ private TransactionBatch processTransactionBatch(TransactionBatch transactionBat
return transactionBatch;
}

/**
* Reads Iroha details containing a list of accounts that should be checked by BRVS
*/
private void registerExistentAccounts() {
logger.info("Going to register existent user accounts in BRVS: " + brvsData.getHostname());
final Iterable<String> userAccounts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ void setUserQuorumDetail(String targetAccount,
Iterable<String> publicKeys,
long creationTimeMillis);

/**
* Method for getting relevant user Iroha account quorum
*
* @param targetAccount account id in Iroha
*/
int getUserAccountQuorum(String targetAccount);

/**
* Method for setting relevant user Iroha account quorum
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/**
* Class responsible for user related Iroha interaction
*/
public class AccountManager implements UserQuorumProvider, RegistrationProvider {

private static final Logger logger = LoggerFactory.getLogger(AccountManager.class);
Expand Down Expand Up @@ -169,6 +172,14 @@ public void setUserQuorumDetail(String targetAccount,
logger.info("Successfully set signatories detail: " + targetAccount + " - " + jsonedKeys);
}

/**
* {@inheritDoc}
*/
@Override
public int getUserAccountQuorum(String targetAccount) {
return queryAPI.getAccount(targetAccount).getAccount().getQuorum();
}

/**
* {@inheritDoc}
*/
Expand All @@ -177,7 +188,7 @@ public void setUserAccountQuorum(String targetAccount, int quorum, long createdT
if (quorum < 1) {
throw new IllegalArgumentException("Quorum must be positive, got: " + quorum);
}
final int currentQuorum = getAccountQuorum(targetAccount);
final int currentQuorum = getUserAccountQuorum(targetAccount);
final Set<String> userSignatoriesDetail = getUserSignatoriesDetail(targetAccount);
final int userDetailQuorum =
userSignatoriesDetail.isEmpty() ? INITIAL_KEYS_AMOUNT : userSignatoriesDetail.size();
Expand Down Expand Up @@ -317,7 +328,7 @@ private void setBrvsSignatoriesToUser(String userAccountId, int count) {

private void modifyQuorumOnRegistration(String userAccountId) {
final int quorum = getValidQuorumForUserAccount(userAccountId, true);
if (getAccountQuorum(userAccountId) == quorum) {
if (getUserAccountQuorum(userAccountId) == quorum) {
logger.warn("Account " + userAccountId + " already has valid quorum: " + quorum);
return;
}
Expand All @@ -333,11 +344,7 @@ private int getValidQuorumForUserAccount(String accountId, boolean onRegistratio
if (userQuorum == 0 && onRegistration) {
userQuorum = INITIAL_USER_QUORUM_VALUE;
}
return (PROPORTION * userQuorum * getAccountQuorum(brvsAccountId));
}

private int getAccountQuorum(String targetAccountId) {
return queryAPI.getAccount(targetAccountId).getAccount().getQuorum();
return (PROPORTION * userQuorum * getUserAccountQuorum(brvsAccountId));
}

private List<String> getAccountSignatories(String targetAccountId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Service interacting with cached user transactions queues and block listener
*/
public class BasicTransactionProvider implements TransactionProvider {

private static final Logger logger = LoggerFactory.getLogger(BasicTransactionProvider.class);
Expand Down Expand Up @@ -122,13 +124,22 @@ private void monitorIrohaPending() {
}

private boolean isBatchSignedByUsers(TransactionBatch transactionBatch) {
for (Transaction transaction : transactionBatch) {
if (transaction.getSignaturesCount() < userQuorumProvider
.getUserSignatoriesDetail(ValidationUtils.getTxAccountId(transaction)).size()) {
return false;
}
return transactionBatch
.stream()
.allMatch(transaction ->
transaction.getSignaturesCount() >= getSignatoriesToPresentNum(transaction)
);
}

private int getSignatoriesToPresentNum(Transaction transaction) {
final String creatorAccountId = ValidationUtils.getTxAccountId(transaction);
int signatoriesToPresent = userQuorumProvider
.getUserSignatoriesDetail(creatorAccountId).size();
if (signatoriesToPresent == 0) {
signatoriesToPresent =
userQuorumProvider.getUserAccountQuorum(creatorAccountId) / ValidationUtils.PROPORTION;
}
return true;
return signatoriesToPresent;
}

private boolean savedMissingInStorage(TransactionBatch transactionBatch) {
Expand Down Expand Up @@ -176,8 +187,8 @@ private void processCommitted(List<Transaction> blockTransactions) {
blockTransactions.forEach(transaction -> {
tryToRemoveLock(transaction);
try {
modifyUserQuorumIfNeeded(transaction);
registerCreatedAccountByTransactionScanning(transaction);
modifyUserQuorumIfNeeded(transaction);
} catch (Exception e) {
logger.warn("Couldn't process account changes from the committed block", e);
}
Expand All @@ -192,10 +203,7 @@ private void modifyUserQuorumIfNeeded(Transaction blockTransaction) {
return;
}

if (StreamSupport.stream(registrationProvider.getRegisteredAccounts().spliterator(), false)
.noneMatch(registeredAccount -> registeredAccount.equals(creatorAccountId))) {
return;
}
// TODO add multithreading compatible registered accounts check (not just contains)

final List<Command> commands = blockTransaction
.getPayload()
Expand Down Expand Up @@ -224,7 +232,8 @@ private void modifyUserQuorumIfNeeded(Transaction blockTransaction) {
}

final Set<String> userSignatories = new HashSet<>(
userQuorumProvider.getUserSignatoriesDetail(creatorAccountId));
userQuorumProvider.getUserSignatoriesDetail(creatorAccountId)
);
userSignatories.removeAll(removedSignatories);
userSignatories.addAll(addedSignatories);
if (userSignatories.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package iroha.validation.transactions.provider.impl.util;

/**
* Simple pojo to be used as an service instance identity
*/
public class BrvsData {

private String hexPubKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.stream.StreamSupport;
import org.springframework.util.CollectionUtils;

/**
* Service in-memory queue of transactions. Implements isolated queue processing for each user.
*/
public class CacheProvider {

// Local BRVS cache
Expand All @@ -32,6 +35,8 @@ public class CacheProvider {
// Observable
private final PublishSubject<TransactionBatch> subject = PublishSubject.create();

// Puts a transaction in the corresponding user queue if needed
// Or immediately consumes it if possible
public synchronized void put(TransactionBatch transactionBatch) {
if (isBatchUnlocked(transactionBatch)) {
// do not even put in cache if possible
Expand All @@ -45,6 +50,7 @@ public synchronized void put(TransactionBatch transactionBatch) {
cache.get(accountId).add(transactionBatch);
}

// Initiates consuming of a user queue
private synchronized void consumeUnlockedTransactionBatches(String accountId) {
final Set<TransactionBatch> accountTransactions = cache.get(accountId);
if (!CollectionUtils.isEmpty(accountTransactions)) {
Expand All @@ -64,6 +70,7 @@ private synchronized void consumeUnlockedTransactionBatches(String accountId) {
}
}

// Consumes a single transaction of the queue and locks a user queue from next consuming if needed
private synchronized void consumeAndLockAccountByTransactionIfNeeded(
TransactionBatch transactionBatch) {
if (transactionBatch != null) {
Expand All @@ -82,6 +89,7 @@ private synchronized void consumeAndLockAccountByTransactionIfNeeded(
}
}

// Returns accounts locked by a transaction hash provided
public synchronized Set<String> getAccountsBlockedBy(String txHash) {
return pendingAccounts.entrySet()
.stream()
Expand All @@ -94,6 +102,7 @@ public synchronized void unlockPendingAccount(String account) {
unlockPendingAccounts(Collections.singleton(account));
}

// Unlocks accounts and continues consuming
public synchronized void unlockPendingAccounts(Iterable<String> accounts) {
accounts.forEach(pendingAccounts::remove);
accounts.forEach(this::consumeUnlockedTransactionBatches);
Expand All @@ -103,12 +112,14 @@ public synchronized Observable<TransactionBatch> getObservable() {
return subject;
}

// Returns all transactions from all user queues
public synchronized Iterable<Transaction> getTransactions() {
return Iterables.concat(StreamSupport
.stream(Iterables.concat(cache.values()).spliterator(), false)
.map(TransactionBatch::getTransactionList).distinct().collect(Collectors.toList()));
}

// Checks if the batch lead to locking of the queue
private boolean isBatchUnlocked(TransactionBatch transactionBatch) {
return transactionBatch.stream().noneMatch(transaction ->
transaction.getPayload().getReducedPayload()
Expand Down

0 comments on commit 9d42d3c

Please sign in to comment.