Skip to content

Commit

Permalink
Improve KeyManager API import operation (#968)
Browse files Browse the repository at this point in the history
Improve KeyManager API import operation. Process imported validators in parallel.
---------

Co-authored-by: Simon Dudley <[email protected]>
  • Loading branch information
usmansaleem and siladu authored Feb 16, 2024
1 parent bec53cc commit 371a5dd
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 74 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Next version

### Features Added
- Improve Key Manager API import operation to use parallel processing instead of serial processing.

### Bugs fixed
- Ensure that Web3Signer stops the http server when a sigterm is received

Expand Down
18 changes: 8 additions & 10 deletions core/src/main/java/tech/pegasys/web3signer/core/Eth2Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,14 @@ private void registerEth2Routes(

router
.route(HttpMethod.POST, KEYSTORES_PATH)
.handler(
new BlockingHandlerDecorator(
new ImportKeystoresHandler(
objectMapper,
baseConfig.getKeyConfigPath(),
slashingProtectionContext.map(
SlashingProtectionContext::getSlashingProtection),
blsSignerProvider,
validatorManager),
false))
.blockingHandler(
new ImportKeystoresHandler(
objectMapper,
baseConfig.getKeyConfigPath(),
slashingProtectionContext.map(SlashingProtectionContext::getSlashingProtection),
blsSignerProvider,
validatorManager),
false)
.failureHandler(errorHandler);

router
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2024 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.web3signer.core.service.http.handlers.keymanager.imports;

public record ImportKeystoreData(
int index,
String pubKey,
String keystoreJson,
String password,
ImportKeystoreResult importKeystoreResult)
implements Comparable<ImportKeystoreData> {

@Override
public int compareTo(ImportKeystoreData other) {
return Integer.compare(this.index, other.index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;

public class ImportKeystoreResult {
private final ImportKeystoreStatus status;
private final String message;
private ImportKeystoreStatus status;
private String message;

@JsonCreator
public ImportKeystoreResult(
Expand All @@ -36,4 +36,12 @@ public ImportKeystoreStatus getStatus() {
public String getMessage() {
return message;
}

public void setStatus(final ImportKeystoreStatus status) {
this.status = status;
}

public void setMessage(final String message) {
this.message = message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE;
import static tech.pegasys.web3signer.core.service.http.handlers.ContentTypes.JSON_UTF_8;
import static tech.pegasys.web3signer.core.service.http.handlers.keymanager.imports.ImportKeystoreStatus.DUPLICATE;
import static tech.pegasys.web3signer.core.service.http.handlers.keymanager.imports.ImportKeystoreStatus.IMPORTED;
import static tech.pegasys.web3signer.signing.KeystoreFileManager.KEYSTORE_JSON_EXTENSION;
import static tech.pegasys.web3signer.signing.KeystoreFileManager.KEYSTORE_PASSWORD_EXTENSION;
import static tech.pegasys.web3signer.signing.KeystoreFileManager.METADATA_YAML_EXTENSION;
Expand All @@ -29,13 +31,12 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -78,110 +79,161 @@ public ImportKeystoresHandler(
public void handle(final RoutingContext context) {
// API spec - https://github.com/ethereum/keymanager-APIs/tree/master/flows#import
final ImportKeystoresRequestBody parsedBody;
// step 0: Parse and verify the request body
try {
parsedBody = parseRequestBody(context.body());
} catch (final IllegalArgumentException | JsonProcessingException e) {
handleInvalidRequest(context, e);
return;
}

// check that keystores have matching passwords
// step 1: verify if keystores/passwords list length is same
if (parsedBody.getKeystores().size() != parsedBody.getPasswords().size()) {
context.fail(BAD_REQUEST);
return;
}

// no keystores passed in, nothing to do, return 200 and empty response.
// step 2: no keystores passed in, nothing to do, return 200 and empty response.
if (parsedBody.getKeystores().isEmpty()) {
try {
context
.response()
.putHeader(CONTENT_TYPE, JSON_UTF_8)
.setStatusCode(SUCCESS)
.end(
objectMapper.writeValueAsString(
new ImportKeystoresResponse(Collections.emptyList())));
} catch (JsonProcessingException e) {
context.fail(SERVER_ERROR, e);
}
return;
}

// extract pubkeys to import first
final List<String> pubkeysToImport;
try {
pubkeysToImport =
parsedBody.getKeystores().stream()
.map(json -> new JsonObject(json).getString("pubkey"))
.map(IdentifierUtils::normaliseIdentifier)
.collect(Collectors.toList());
} catch (Exception e) {
context.fail(BAD_REQUEST, e);
context
.response()
.putHeader(CONTENT_TYPE, JSON_UTF_8)
.setStatusCode(SUCCESS)
.end("{\"data\": []}");
return;
}

// load existing keys
final Set<String> existingPubkeys =
// "active" keys which are already loaded by Web3Signer before this import call.
final Set<String> existingPubKeys =
artifactSignerProvider.availableIdentifiers().stream()
.map(IdentifierUtils::normaliseIdentifier)
.collect(Collectors.toSet());

// filter out already loaded keys for slashing data import
final List<String> nonLoadedPubkeys =
pubkeysToImport.stream()
.filter(key -> !existingPubkeys.contains(key))
.collect(Collectors.toList());
// map incoming keystores either as duplicate or to be imported
final List<ImportKeystoreData> importKeystoreDataList =
getKeystoreDataToProcess(parsedBody, existingPubKeys);

// Step 3: import slashing protection data for all to-be-IMPORTED keys
final List<String> pubKeysToBeImported = getPubKeysToBeImported(importKeystoreDataList);

// read slashing protection data if present and import data matching non-loaded keys to import
// only
if (slashingProtection.isPresent()
&& !StringUtils.isEmpty(parsedBody.getSlashingProtection())) {
try {
final InputStream slashingProtectionData =
new ByteArrayInputStream(
parsedBody.getSlashingProtection().getBytes(StandardCharsets.UTF_8));
slashingProtection.get().importDataWithFilter(slashingProtectionData, nonLoadedPubkeys);
} catch (Exception e) {
slashingProtection.get().importDataWithFilter(slashingProtectionData, pubKeysToBeImported);
} catch (final Exception e) {
// since we haven't written any keys to the file system, we don't need to clean up
context.fail(BAD_REQUEST, e);
return;
}
}

final List<ImportKeystoreResult> results = new ArrayList<>();
for (int i = 0; i < parsedBody.getKeystores().size(); i++) {
final String pubkey = pubkeysToImport.get(i);
try {
final String jsonKeystoreData = parsedBody.getKeystores().get(i);
final String password = parsedBody.getPasswords().get(i);

if (existingPubkeys.contains(pubkey)) {
// keystore already loaded
results.add(new ImportKeystoreResult(ImportKeystoreStatus.DUPLICATE, null));
} else {
validatorManager.addValidator(Bytes.fromHexString(pubkey), jsonKeystoreData, password);
results.add(new ImportKeystoreResult(ImportKeystoreStatus.IMPORTED, null));
}
} catch (final Exception e) {
// cleanup the current key being processed and continue
removeSignersAndCleanupImportedKeystoreFiles(List.of(pubkey));
results.add(
new ImportKeystoreResult(
ImportKeystoreStatus.ERROR, "Error importing keystore: " + e.getMessage()));
}
}
// must return status 200 from here onward ...

// step 4: add validators to be imported
importValidators(importKeystoreDataList);

// final step, send sorted results ...
try {
final List<ImportKeystoreResult> results = getImportKeystoreResults(importKeystoreDataList);
context
.response()
.putHeader(CONTENT_TYPE, JSON_UTF_8)
.setStatusCode(SUCCESS)
.end(objectMapper.writeValueAsString(new ImportKeystoresResponse(results)));
} catch (final Exception e) {
removeSignersAndCleanupImportedKeystoreFiles(nonLoadedPubkeys);
// critical bug, clean out imported keystores files ...
removeSignersAndCleanupImportedKeystoreFiles(pubKeysToBeImported);
context.fail(SERVER_ERROR, e);
}
}

private void importValidators(final List<ImportKeystoreData> importKeystoreDataList) {
importKeystoreDataList.stream()
.filter(ImportKeystoresHandler::imported)
.parallel()
.forEach(
data -> {
try {
final Bytes pubKeyBytes = Bytes.fromHexString(data.pubKey());
validatorManager.addValidator(pubKeyBytes, data.keystoreJson(), data.password());
} catch (final Exception e) {
// modify the result to error status
data.importKeystoreResult().setStatus(ImportKeystoreStatus.ERROR);
data.importKeystoreResult()
.setMessage("Error importing keystore: " + e.getMessage());
}
});

// clean out failed validators
removeSignersAndCleanupImportedKeystoreFiles(getFailedValidators(importKeystoreDataList));
}

private static List<ImportKeystoreResult> getImportKeystoreResults(
final List<ImportKeystoreData> importKeystoreDataList) {
return importKeystoreDataList.stream()
.sorted()
.map(ImportKeystoreData::importKeystoreResult)
.toList();
}

private List<ImportKeystoreData> getKeystoreDataToProcess(
final ImportKeystoresRequestBody requestBody, final Set<String> activePubKeys) {
return IntStream.range(0, requestBody.getKeystores().size())
.mapToObj(
i -> {
final String jsonKeystoreData = requestBody.getKeystores().get(i);
final String password = requestBody.getPasswords().get(i);
final String pubkey;
try {
pubkey = parseAndNormalizePubKey(jsonKeystoreData);
} catch (final Exception e) {
final ImportKeystoreResult errorResult =
new ImportKeystoreResult(
ImportKeystoreStatus.ERROR, "Error parsing pubkey: " + e.getMessage());
return new ImportKeystoreData(i, null, null, null, errorResult);
}
if (activePubKeys.contains(pubkey)) {
return new ImportKeystoreData(
i, pubkey, null, null, new ImportKeystoreResult(DUPLICATE, null));
}

return new ImportKeystoreData(
i, pubkey, jsonKeystoreData, password, new ImportKeystoreResult(IMPORTED, null));
})
.toList();
}

private static List<String> getPubKeysToBeImported(
final List<ImportKeystoreData> importKeystoreDataList) {
return importKeystoreDataList.stream()
.filter(ImportKeystoresHandler::imported)
.map(ImportKeystoreData::pubKey)
.toList();
}

private static List<String> getFailedValidators(List<ImportKeystoreData> importKeystoreDataList) {
return importKeystoreDataList.stream()
.filter(ImportKeystoresHandler::failed)
.map(ImportKeystoreData::pubKey)
.toList();
}

private static boolean imported(ImportKeystoreData data) {
return data.importKeystoreResult().getStatus() == IMPORTED;
}

private static boolean failed(ImportKeystoreData data) {
return data.importKeystoreResult().getStatus() == ImportKeystoreStatus.ERROR
&& data.pubKey() != null;
}

private static String parseAndNormalizePubKey(final String json) {
return IdentifierUtils.normaliseIdentifier(new JsonObject(json).getString("pubkey"));
}

private ImportKeystoresRequestBody parseRequestBody(final RequestBody requestBody)
throws JsonProcessingException {
final String body = requestBody.asString();
Expand Down

0 comments on commit 371a5dd

Please sign in to comment.