Skip to content

Commit

Permalink
Features/entities/auto convert sub to entities (#870)
Browse files Browse the repository at this point in the history
* enhance: PATCH datasets/:name can now auto convert pending submissions into entity

* use loop instead of recursion in runSequentially

* correct SQL query

* fix: event.details.data can be null dataset update worker

* use entity source table

* correct event id and save parent event id in source table

* added assertion for audit of auto-created entity

* lint it
  • Loading branch information
sadiqkhoja authored May 19, 2023
1 parent b7fc414 commit e9dba19
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 23 deletions.
42 changes: 37 additions & 5 deletions lib/model/query/datasets.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

const { sql } = require('slonik');
const { extender, QueryOptions, equals, updater } = require('../../util/db');
const { Dataset, Form } = require('../frames');
const { Dataset, Form, Audit } = require('../frames');
const { validatePropertyName } = require('../../data/dataset');
const { isEmpty, isNil, either, reduceBy, groupBy, uniqWith, equals: rEquals } = require('ramda');
const { isEmpty, isNil, either, reduceBy, groupBy, uniqWith, equals: rEquals, map } = require('ramda');

const Problem = require('../../util/problem');
const { construct } = require('../../util/util');
Expand Down Expand Up @@ -241,6 +241,11 @@ const getById = (id, extended = false) => ({ maybeOne }) => {
return _get(maybeOne, options.withCondition({ id }), true);
};

// Get by Actee ID - return only published dataset
const getByActeeId = (acteeId, extended = false) => ({ maybeOne }) => {
const options = extended ? QueryOptions.extended : QueryOptions.none;
return _get(maybeOne, options.withCondition({ acteeId }), true);
};


////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -391,12 +396,39 @@ const getDiff = (projectId, xmlFormId, forDraft) => ({ all }) => all(sql`
.then(r => Object.keys(r).map(k => ({ name: k.split(',')[0], isNew: forDraft ? k.split(',')[1] === 'true' : undefined, properties: r[k] })));

const update = (datasets, data) => ({ one }) => one(updater(datasets, data)).then(construct(Dataset));
update.audit = (datasets, data) => (log) => log('dataset.update', datasets, { data });
update.audit = (datasets, data, autoConvert) => (log) => log('dataset.update', datasets, { data, autoConvert });

const _unprocessedSubmissions = (datasetId, fields) => sql`
SELECT ${fields} FROM dataset_form_defs dfd
JOIN submission_defs sd ON sd."formDefId" = dfd."formDefId"
JOIN submissions s ON sd."submissionId" = s.id
JOIN forms f ON s."formId" = f.id
JOIN (
SELECT DISTINCT ON ((details->'submissionId')::INTEGER) * FROM audits
WHERE action IN ('submission.create', 'submission.update.version')
ORDER BY (details->'submissionId')::INTEGER, id DESC
) audits ON (audits.details->'submissionId')::INTEGER = s.id
LEFT JOIN (
entity_defs ed
JOIN entity_def_sources es ON es.id = ed."sourceId"
JOIN entities e ON e.id = ed."entityId" AND e."datasetId" = ${datasetId}
) ON es."submissionDefId" = sd.id
WHERE sd.current AND dfd."datasetId" = ${datasetId}
AND (s."reviewState" IS NULL OR s."reviewState" = 'edited')
AND ed.id IS NULL
`;

const countUnprocessedSubmissions = (datasetId) => ({ oneFirst }) => oneFirst(_unprocessedSubmissions(datasetId, sql`COUNT(1)`));

const getUnprocessedSubmissions = (datasetId) => ({ all }) =>
all(_unprocessedSubmissions(datasetId, sql`audits.*`))
.then(map(construct(Audit)));

module.exports = {
createOrMerge, publishIfExists,
getList, get, getById,
getList, get, getById, getByActeeId,
getMetadata,
getProperties, getFieldsByFormDefId,
getDiff, update
getDiff, update, countUnprocessedSubmissions,
getUnprocessedSubmissions
};
25 changes: 18 additions & 7 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const { odataFilter } = require('../../data/odata-filter');
const { odataToColumnMap, parseSubmissionXml } = require('../../data/entity');
const { isTrue } = require('../../util/http');
const Problem = require('../../util/problem');
const { runSequentially } = require('../../util/promise');

////////////////////////////////////////////////////////////////////////////////
// ENTITY CREATE
Expand Down Expand Up @@ -90,8 +91,9 @@ const createSource = (details = null, subDefId = null, eventId = null) => ({ one


// Entrypoint to where submissions (a specific version) become entities
const _processSubmissionDef = (event) => async ({ Datasets, Entities, Submissions, Forms, Audits }) => {
const { submissionDefId, submissionId } = event.details;
const _processSubmissionEvent = (event, parentEvent) => async ({ Datasets, Entities, Submissions, Forms, Audits }) => {
const { submissionId, submissionDefId } = event.details;

const existingEntity = await Entities.getDefBySubmissionId(submissionId);
// If the submission has already been used to make an entity, don't try again
// and don't log it as an error.
Expand Down Expand Up @@ -130,7 +132,7 @@ const _processSubmissionDef = (event) => async ({ Datasets, Entities, Submission

const partial = await Entity.fromParseEntityData(entityData);

const sourceDetails = { submission: { instanceId: submissionDef.instanceId } };
const sourceDetails = { submission: { instanceId: submissionDef.instanceId }, parentEventId: parentEvent ? parentEvent.id : undefined };
const sourceId = await Entities.createSource(sourceDetails, submissionDefId, event.id);
const entity = await Entities.createNew(dataset, partial, submissionDef, sourceId);

Expand All @@ -144,9 +146,9 @@ const _processSubmissionDef = (event) => async ({ Datasets, Entities, Submission
});
};

const processSubmissionEvent = (event) => (container) =>
const processSubmissionEvent = (event, parentEvent) => (container) =>
container.db.transaction((trxn) =>
container.with({ db: trxn }).Entities._processSubmissionDef(event))
container.with({ db: trxn }).Entities._processSubmissionEvent(event, parentEvent))
.catch((err) =>
// err could be any kind of problem, from an entity violation error, to a
// database constraint error, to some other kind of error within the processing code.
Expand All @@ -163,6 +165,14 @@ const processSubmissionEvent = (event) => (container) =>
errorMessage: err.message,
problem: (err.isProblem === true) ? err : null }));

const createEntitiesFromPendingSubmissions = (submissionEvents, parentEvent) => (container) =>
// run sequentially because we want to isolate transaction for each submission
runSequentially(submissionEvents.map(event =>
() => processSubmissionEvent(event, parentEvent)(container)));





////////////////////////////////////////////////////////////////////////////////
// UPDATING ENTITIES
Expand Down Expand Up @@ -311,11 +321,12 @@ const del = (entity) => ({ run }) =>
del.audit = (entity, dataset) => (log) => log('entity.delete', entity.with({ acteeId: dataset.acteeId }), { uuid: entity.uuid });

module.exports = {
createNew, _processSubmissionDef,
createNew, _processSubmissionEvent,
createSource,
processSubmissionEvent, streamForExport,
getDefBySubmissionId,
createVersion,
countByDatasetId, getById,
getAll, getAllDefs, del
getAll, getAllDefs, del,
createEntitiesFromPendingSubmissions
};
22 changes: 19 additions & 3 deletions lib/resources/datasets.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const { streamEntityCsv } = require('../data/entity');
const { contentDisposition, withEtag } = require('../util/http');
const { md5sum } = require('../util/crypto');
const { Dataset } = require('../model/frames');
const Problem = require('../util/problem');

module.exports = (service, endpoint) => {
service.get('/projects/:id/datasets', endpoint(({ Projects, Datasets }, { auth, params, queryOptions }) =>
Expand All @@ -27,10 +28,25 @@ module.exports = (service, endpoint) => {
.then((dataset) => auth.canOrReject('dataset.read', dataset)
.then(() => Datasets.getMetadata(dataset)))));

service.patch('/projects/:projectId/datasets/:name', endpoint(async ({ Datasets }, { params, body, auth }) => {
const dataset = await Datasets.get(params.projectId, params.name).then(getOrNotFound);
service.patch('/projects/:projectId/datasets/:name', endpoint(async ({ Datasets }, { params, body, auth, query }) => {
const dataset = await Datasets.get(params.projectId, params.name, true).then(getOrNotFound);
await auth.canOrReject('dataset.update', dataset);
const updatedDataset = await Datasets.update(dataset, Dataset.fromApi(body));
const newDataset = Dataset.fromApi(body);

// validate value of convert query parameter
if (query.convert !== undefined && query.convert !== 'true' && query.convert !== 'false')
return Problem.user.unexpectedValue({ field: 'convert', value: query.convert });

// return warning if approvalRequired is false and there are pending submissions
if (!newDataset.approvalRequired) {
if (query.convert === undefined) {
const unprocessedSubmissions = await Datasets.countUnprocessedSubmissions(dataset.id);
if (unprocessedSubmissions > 0) {
return Problem.user.pendingSubmissions({ count: unprocessedSubmissions });
}
}
}
const updatedDataset = await Datasets.update(dataset, newDataset, query.convert === 'true');
return Datasets.getMetadata(updatedDataset);
}));

Expand Down
3 changes: 3 additions & 0 deletions lib/util/problem.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ const problems = {
// problem parsing the entity data (probably JSON) itself
invalidEntity: problem(400.28, ({ reason }) => `The entity is invalid. ${reason}`),

// warning: there are pending submissions
pendingSubmissions: problem(400.29, ({ count }) => `There are ${count} pending submissions`),

// no detail information for security reasons.
authenticationFailed: problem(401.2, () => 'Could not authenticate with the provided credentials.'),

Expand Down
14 changes: 10 additions & 4 deletions lib/util/promise.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,17 @@ const block = () => {

// inspired from https://stackoverflow.com/questions/54867318/sequential-execution-of-promise-all
const runSequentially = async (functions) => {
if (functions.length === 0) {
return [];
const results = [];

for (const fn of functions) {
// reason: we want to run functions sequentially
// Current use: each function is dependent on DB transaction
// we can't do parallel processing with transaction
// eslint-disable-next-line no-await-in-loop
results.push(await fn());
}
const [first, ...rest] = functions;
return [await first(), ...(await runSequentially(rest))];

return results;
};

module.exports = {
Expand Down
21 changes: 21 additions & 0 deletions lib/worker/dataset.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const { getOrNotFound } = require('../util/promise');

const createEntitiesFromPendingSubmissions = async ({ Entities, Datasets }, event) => {
if (event.details.data && !event.details.data.approvalRequired && event.details.autoConvert) {
const dataset = await Datasets.getByActeeId(event.acteeId).then(getOrNotFound);
const pendingSubmissions = await Datasets.getUnprocessedSubmissions(dataset.id);

await Entities.createEntitiesFromPendingSubmissions(pendingSubmissions, event);
}
};

module.exports = { createEntitiesFromPendingSubmissions };
4 changes: 3 additions & 1 deletion lib/worker/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ const jobs = {
'form.update.publish': [ require('./form').updatePublish ],

'upgrade.process.form.draft': [ require('./form').updateDraftSet ],
'upgrade.process.form': [ require('./form').updatePublish ]
'upgrade.process.form': [ require('./form').updatePublish ],

'dataset.update': [ require('./dataset').createEntitiesFromPendingSubmissions ]
};

module.exports = { jobs };
Expand Down
Loading

0 comments on commit e9dba19

Please sign in to comment.