Skip to content

Commit

Permalink
Merge pull request #115 from jster1357/release/1.3
Browse files Browse the repository at this point in the history
add queryAll SOQL operation
  • Loading branch information
rmstar authored Jan 31, 2022
2 parents d760c5a + 32c741b commit 6937f7c
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 16 deletions.
6 changes: 5 additions & 1 deletion docs/Salesforce-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,8 @@ PK chunking only works with the following objects:

Support also includes custom objects, and any Sharing and History tables that support standard objects.

**Chunk Size:** Specify size of chunk. Maximum Size is 250,000. Default Size is 100,000.
**Chunk Size:** Specify size of chunk. Maximum Size is 250,000. Default Size is 100,000.

**Query Operation:**
Specify the query operation to run on the table. If query is selected, only current records will be returned.
If queryAll is selected, all current and deleted records will be returned. Default operation is query.
4 changes: 4 additions & 0 deletions docs/SalesforceMultiObjects-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ from 3am (inclusive) to 9am (exclusive). The duration is specified using numbers
Several units can be specified, but each unit can only be used once. For example, `2 days, 1 hours, 30 minutes`.
The duration is ignored if a value is already specified for `Last Modified After` or `Last Modified Before`.

**Query Operation:**
Specify the query operation to run on the table. If query is selected, only current records will be returned.
If queryAll is selected, all current and deleted records will be returned. Default operation is query.

**Offset:** Filter data to only read records where the system field `LastModifiedDate` is less than the logical start time
of the pipeline minus the given offset. For example, if duration is '6 hours' and the offset is '1 hours', and the pipeline
runs at 9am, data last modified between 2am (inclusive) and 8am (exclusive) will be read.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<name>Salesforce plugins</name>
<groupId>io.cdap.plugin</groupId>
<artifactId>salesforce-plugins</artifactId>
<version>1.3.11</version>
<version>1.3.12</version>
<packaging>jar</packaging>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.cdap.plugin.salesforce.plugin.source.batch;

import com.sforce.async.OperationEnum;
import com.sforce.ws.ConnectionException;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
Expand Down Expand Up @@ -75,6 +76,14 @@ public abstract class SalesforceBaseSourceConfig extends BaseSalesforceConfig {
@Macro
private String offset;

@Name(SalesforceSourceConstants.PROPERTY_OPERATION)
@Description("If set to query, the query result will only return current rows. If set to queryAll, " +
"all records, including deletes will be sourced")
@Nullable
private String operation;

private static final String DEFAULT_OPERATION = "query";

protected SalesforceBaseSourceConfig(String referenceName,
String consumerKey,
String consumerSecret,
Expand All @@ -85,12 +94,14 @@ protected SalesforceBaseSourceConfig(String referenceName,
@Nullable String datetimeBefore,
@Nullable String duration,
@Nullable String offset,
@Nullable String securityToken) {
@Nullable String securityToken,
@Nullable String operation) {
super(referenceName, consumerKey, consumerSecret, username, password, loginUrl, securityToken);
this.datetimeAfter = datetimeAfter;
this.datetimeBefore = datetimeBefore;
this.duration = duration;
this.offset = offset;
this.operation = operation;
}

public Map<ChronoUnit, Integer> getDuration() {
Expand Down Expand Up @@ -132,6 +143,11 @@ protected void validateFilters(FailureCollector collector) {
} catch (InvalidConfigException e) {
collector.addFailure(e.getMessage(), null).withConfigProperty(e.getProperty());
}
try {
validateOperationProperty(SalesforceSourceConstants.PROPERTY_OPERATION, getOperation());
} catch (InvalidConfigException e) {
collector.addFailure(e.getMessage(), null).withConfigProperty(e.getProperty());
}
}

/**
Expand Down Expand Up @@ -189,7 +205,6 @@ private SObjectFilterDescriptor getSObjectFilterDescriptor(long logicalStartTime
return filterDescriptor;
}

@Nullable
private void validateIntervalFilterProperty(String propertyName, String datetime) {
if (containsMacro(propertyName)) {
return;
Expand All @@ -203,6 +218,16 @@ private void validateIntervalFilterProperty(String propertyName, String datetime
}
}

private void validateOperationProperty(String propertyName, String operation) {
try {
OperationEnum.valueOf(operation);
} catch (InvalidConfigException e) {
throw new InvalidConfigException(
String.format("Invalid Query Operation: '%s' value: '%s'. Valid values are query and queryAll.",
propertyName, operation), propertyName);
}
}

private void validateRangeFilterProperty(String propertyName, Map<ChronoUnit, Integer> rangeValue) {
if (containsMacro(propertyName) || rangeValue.isEmpty()) {
return;
Expand Down Expand Up @@ -269,4 +294,8 @@ private int parseUnitValue(String propertyName, String value) {
private ZonedDateTime parseDatetime(String datetime) throws DateTimeParseException {
return StringUtils.isBlank(datetime) ? null : ZonedDateTime.parse(datetime, DateTimeFormatter.ISO_DATE_TIME);
}

public String getOperation() {
return operation == null ? DEFAULT_OPERATION : operation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void prepareRun(BatchSourceContext context) throws ConnectionException {
config.getLoginUrl());
BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials);
List<SalesforceSplit> querySplits = queries.parallelStream()
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false))
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, config.getOperation()))
.flatMap(Collection::stream).collect(Collectors.toList());
// store the jobIds so be used in onRunFinish() to close the connections
querySplits.parallelStream().forEach(salesforceSplit -> jobIds.add(salesforceSplit.getJobId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public void prepareRun(BatchSourceContext context) {
}
bulkConnection.addHeader(SalesforceSourceConstants.HEADER_ENABLE_PK_CHUNK, String.join(";", chunkHeaderValues));
}
List<SalesforceSplit> querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnection, enablePKChunk);
List<SalesforceSplit> querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnection,
enablePKChunk, config.getOperation());
querySplits.parallelStream().forEach(salesforceSplit -> jobIds.add(salesforceSplit.getJobId()));
context.setInput(Input.of(config.referenceName, new SalesforceInputFormatProvider(
config, ImmutableMap.of(sObjectName, schema.toString()), querySplits, null)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ public SalesforceMultiSourceConfig(String referenceName,
@Nullable String whiteList,
@Nullable String blackList,
@Nullable String sObjectNameField,
@Nullable String securityToken) {
@Nullable String securityToken,
@Nullable String operation) {
super(referenceName, consumerKey, consumerSecret, username, password, loginUrl,
datetimeAfter, datetimeBefore, duration, offset, securityToken);
datetimeAfter, datetimeBefore, duration, offset, securityToken, operation);
this.whiteList = whiteList;
this.blackList = blackList;
this.sObjectNameField = sObjectNameField;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,12 @@ public class SalesforceSourceConfig extends SalesforceBaseSourceConfig {
@Nullable String offset,
@Nullable String schema,
@Nullable String securityToken,
@Nullable String operation,
@Nullable Boolean enablePKChunk,
@Nullable Integer chunkSize,
@Nullable String parent) {
super(referenceName, consumerKey, consumerSecret, username, password, loginUrl,
datetimeAfter, datetimeBefore, duration, offset, securityToken);
datetimeAfter, datetimeBefore, duration, offset, securityToken, operation);
this.query = query;
this.sObjectName = sObjectName;
this.schema = schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class SalesforceSourceConstants {
public static final String HEADER_VALUE_PK_CHUNK = "chunkSize=%d";
public static final String HEADER_PK_CHUNK_PARENT = "parent=%s";

public static final String PROPERTY_OPERATION = "operation";

public static final String CONFIG_SOBJECT_NAME_FIELD = "mapred.salesforce.input.sObjectNameField";

public static final int WIDE_QUERY_MAX_BATCH_COUNT = 2000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.sforce.async.JobStateEnum;
import com.sforce.async.OperationEnum;
import io.cdap.plugin.salesforce.BulkAPIBatchException;
import io.cdap.plugin.salesforce.InvalidConfigException;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SalesforceBulkUtil;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
Expand Down Expand Up @@ -56,8 +57,8 @@ public final class SalesforceSplitUtil {
* @return list of salesforce splits
*/
public static List<SalesforceSplit> getQuerySplits(String query, BulkConnection bulkConnection,
boolean enablePKChunk) {
return Stream.of(getBatches(query, bulkConnection, enablePKChunk))
boolean enablePKChunk, String operation) {
return Stream.of(getBatches(query, bulkConnection, enablePKChunk, operation))
.map(batch -> new SalesforceSplit(batch.getJobId(), batch.getId(), query))
.collect(Collectors.toList());
}
Expand All @@ -72,13 +73,14 @@ public static List<SalesforceSplit> getQuerySplits(String query, BulkConnection
* @param enablePKChunk enable PK Chunking
* @return array of batch info
*/
private static BatchInfo[] getBatches(String query, BulkConnection bulkConnection, boolean enablePKChunk) {
private static BatchInfo[] getBatches(String query, BulkConnection bulkConnection,
boolean enablePKChunk, String operation) {
try {
if (!SalesforceQueryUtil.isQueryUnderLengthLimit(query)) {
LOG.debug("Wide object query detected. Query length '{}'", query.length());
query = SalesforceQueryUtil.createSObjectIdQuery(query);
}
BatchInfo[] batches = runBulkQuery(bulkConnection, query, enablePKChunk);
BatchInfo[] batches = runBulkQuery(bulkConnection, query, enablePKChunk, operation);
LOG.debug("Number of batches received from Salesforce: '{}'", batches.length);
return batches;
} catch (AsyncApiException | IOException e) {
Expand All @@ -96,11 +98,13 @@ private static BatchInfo[] getBatches(String query, BulkConnection bulkConnectio
* @throws AsyncApiException if there is an issue creating the job
* @throws IOException failed to close the query
*/
private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String query, boolean enablePKChunk)
private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String query,
boolean enablePKChunk, String operation)
throws AsyncApiException, IOException {

SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromQuery(query);
JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, sObjectDescriptor.getName(), OperationEnum.query, null);
JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, sObjectDescriptor.getName(),
getOperationEnum(operation), null);
BatchInfo batchInfo;
try (ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes())) {
batchInfo = bulkConnection.createBatchFromStream(job, bout);
Expand Down Expand Up @@ -212,4 +216,13 @@ public static void closeJobs(Set<String> jobIds, AuthenticatorCredentials authen
throw runtimeException;
}
}

private static OperationEnum getOperationEnum(String operation) {
try {
return OperationEnum.valueOf(operation);
} catch (IllegalArgumentException ex) {
throw new InvalidConfigException("Unsupported value for operation: " + operation,
SalesforceSourceConstants.PROPERTY_OPERATION);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class SalesforceSourceConfigBuilder {
private Boolean enablePKChunk;
private Integer chunkSize;
private String parent;
private String operation;

public SalesforceSourceConfigBuilder setReferenceName(String referenceName) {
this.referenceName = referenceName;
Expand Down Expand Up @@ -123,9 +124,14 @@ public SalesforceSourceConfigBuilder setParent(String parent) {
return this;
}

public SalesforceSourceConfigBuilder setOperation(String operation) {
this.operation = operation;
return this;
}

public SalesforceSourceConfig build() {
return new SalesforceSourceConfig(referenceName, consumerKey, consumerSecret, username, password, loginUrl,
query, sObjectName, datetimeAfter, datetimeBefore, duration, offset, schema,
securityToken, enablePKChunk, chunkSize, parent);
securityToken, operation, enablePKChunk, chunkSize, parent);
}
}
19 changes: 19 additions & 0 deletions widgets/Salesforce-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,25 @@
{
"label": "Advanced",
"properties": [
{
"widget-type": "radio-group",
"label": "SOQL Operation Type",
"name": "operation",
"widget-attributes": {
"layout": "inline",
"default": "query",
"options": [
{
"id": "query",
"label": "query"
},
{
"id": "queryAll",
"label": "queryAll"
}
]
}
},
{
"widget-type": "toggle",
"name": "enablePKChunk",
Expand Down
19 changes: 19 additions & 0 deletions widgets/SalesforceMultiObjects-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,25 @@
{
"label": "Advanced",
"properties": [
{
"widget-type": "radio-group",
"label": "SOQL Operation Type",
"name": "operation",
"widget-attributes": {
"layout": "inline",
"default": "query",
"options": [
{
"id": "query",
"label": "query"
},
{
"id": "queryAll",
"label": "queryAll"
}
]
}
},
{
"widget-type": "textbox",
"label": "SObject Name Field",
Expand Down

0 comments on commit 6937f7c

Please sign in to comment.