Skip to content

Commit

Permalink
query optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
browjm4 committed Oct 24, 2024
1 parent b179d19 commit 0935e52
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Result from '../../../../common_classes/result';
import Mapper from '../../mapper';
import { PoolClient, QueryConfig } from 'pg';
import ReportQuery from '../../../../domain_objects/data_warehouse/data/report_query';
import ReportQuery, { CompletedQueryMatch } from '../../../../domain_objects/data_warehouse/data/report_query';

const format = require('pg-format');

Expand Down Expand Up @@ -88,6 +88,12 @@ export default class ReportQueryMapper extends Mapper {
return super.runStatement(this.deleteStatement(id), { transaction });
}

CheckQueryExists(query: string): Promise<Result<CompletedQueryMatch>> {
return super.retrieve<CompletedQueryMatch>(this.checkQueryExistsStatement(query), {
resultClass: CompletedQueryMatch
});
}

// Below are a set of query building functions. So far they're very simple
// and the return value is something that the postgres driver can understand.
// The hope is that this method will allow us to be more flexible and create
Expand Down Expand Up @@ -187,4 +193,18 @@ export default class ReportQueryMapper extends Mapper {
values: [queryID, fileID],
};
}

private checkQueryExistsStatement(query: string): QueryConfig {
// compare a query with those in the database, sanitizing queries
// of extraneous whitespace and casing before comparison
// return {
const text = `SELECT rq.id, r.status_message
FROM report_queries rq JOIN reports r ON rq.report_id = r.id
WHERE r.status = 'completed'
AND LOWER(REGEXP_REPLACE(TRIM(rq.query), '\\s+', ' ', 'g')) = LOWER(REGEXP_REPLACE(TRIM($1), '\\s+', ' ', 'g'))
ORDER BY rq.created_at DESC LIMIT 1`;
const values = [query];
// }
return {text, values}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import RepositoryInterface, {QueryOptions, Repository} from '../../repository';
import ReportQuery, { TimeseriesInitialRequest } from '../../../../domain_objects/data_warehouse/data/report_query';
import ReportQuery, { ReportQueryMetadata, TimeseriesInitialRequest } from '../../../../domain_objects/data_warehouse/data/report_query';
import Result from '../../../../common_classes/result';
import ReportQueryMapper from '../../../mappers/data_warehouse/data/report_query_mapper';
import {PoolClient} from 'pg';
Expand Down Expand Up @@ -72,17 +72,13 @@ export default class ReportQueryRepository extends Repository implements Reposit
return Promise.resolve(Result.Success(true));
}

// perform all necessary checks before kicking off the query including verifying
// all files are timeseries and checking for previously executed/select * queries
async initiateQuery(containerID: string, dataSourceID: string, request: TimeseriesInitialRequest, user: User, describe: boolean): Promise<Result<string>> {
// check that all files exist and are timeseries, return an error if not
const isTimeseries = await this.#fileRepo.checkTimeseries(request.file_ids!);
if (isTimeseries.isError) {return Promise.resolve(Result.Pass(isTimeseries))}

// create a new report object
const report = new Report({container_id: containerID});
const reportSaved = await this.#reportMapper.Create(user.id!, report);
if (reportSaved.isError) {return Promise.resolve(Result.Pass(reportSaved))}
const reportID = reportSaved.value.id!

// formulate query if describe, check for presence of table name if regular query
if (describe) {
const describeQueries: string[] = [];
Expand All @@ -98,18 +94,70 @@ export default class ReportQueryRepository extends Repository implements Reposit
}
}

// create a report query based on the timeseries rust module query request
// create a new report object to return the ID if a SELECT * or repeated query is found
const reportSaved = await this.#reportMapper.Create(user.id!, new Report({container_id: containerID}));
if (reportSaved.isError) {return Promise.resolve(Result.Pass(reportSaved))}
const reportID = reportSaved.value.id!

// check if the query text was already successfully used in a previous query
// if so return the result file from that original query
const previousQueryResults = await this.#mapper.CheckQueryExists(request.query!);
// if an error is found, simply log and move on
if (previousQueryResults.isError) {
Logger.error(previousQueryResults.error.error);
}

if (previousQueryResults.value) {
// grab and use the previous status message for this report
void this.#reportRepo.setStatus(reportID, 'completed', previousQueryResults.value.status_message);

return Promise.resolve(Result.Success(reportID));
}

// create a query object if a previous query was not found
const reportQuery = new ReportQuery({query: request.query!, report_id: reportID});
const querySaved = await this.#mapper.Create(user.id!, reportQuery);
if (querySaved.isError) { return Promise.resolve(Result.Pass(querySaved))}
const queryID = querySaved.value.id!

// fetch file metadata
const fileInfo = await this.#fileRepo.listPathMetadata(...request.file_ids!);
if (fileInfo.isError) {return Promise.resolve(Result.Failure('unable to find file information'))}
const files = fileInfo.value;
// check if the query is a SELECT * query; if so return original file instead of querying
// verify there's only one file being queried
if (request.file_ids!.length === 1) {
const fileID = request.file_ids![0];
// trim and case densensitize query to eliminate any syntax variance
const normalizedQuery = request.query?.trim().replace(/\s+/g, ' ').replace(';', '').toLowerCase();
if (normalizedQuery === `select * from table_${fileID}`) {
// set the original file as the report file and return report ID
const resultSet = await this.setResultFile(reportID, queryID, fileID);
if (resultSet.isError) {
const errorMessage = `error attaching record to report ${reportID}: ${resultSet.error.error}`;
void this.#reportRepo.setStatus(reportID, 'error', errorMessage);
Logger.error(errorMessage);
}

// if everything was successful, set the report status to completed
const successMessage = `results now available. Download them at "/containers/${containerID}/files/${fileID}/download"`;
void this.#reportRepo.setStatus(reportID, 'completed', successMessage);
return Promise.resolve(Result.Success(reportID));
}
}

const queryMetadata: ReportQueryMetadata = {
container_id: containerID,
data_source_id: dataSourceID,
request: request,
user: user,
report_id: reportID,
query: reportQuery,
query_id: queryID
}

// kickoff the query itself if there are no early return scenarios
return this.kickoffQuery(queryMetadata, describe);
}

// create a connection string based on the type of storage being used
// create a connection string based on the type of storage being used
async createConnectionString(containerID: string, dataSourceID: string): Promise<Result<string>> {
const uploadPath = `containers/${containerID}/datasources/${dataSourceID}`;
let storageConnection: string;
if (Config.file_storage_method === 'filesystem') {
Expand Down Expand Up @@ -141,29 +189,46 @@ export default class ReportQueryRepository extends Repository implements Reposit
return Promise.resolve(Result.Failure(`error: unsupported or unimplemented file storage method being used`));
}

return Promise.resolve(Result.Success(storageConnection));
}

async kickoffQuery(queryMetadata: ReportQueryMetadata, describe: boolean): Promise<Result<string>> {
// fetch file metadata
const fileInfo = await this.#fileRepo.listPathMetadata(...queryMetadata.request.file_ids!);
if (fileInfo.isError) {return Promise.resolve(Result.Pass(fileInfo))}
const files = fileInfo.value;

const getConnString = await this.createConnectionString(queryMetadata.container_id, queryMetadata.data_source_id);
if (getConnString.isError) {return Promise.resolve(Result.Pass(getConnString))}
const storageConnection = getConnString.value;

// set report status to "processing"
let statusSet = await this.#reportRepo.setStatus(
reportID, 'processing',
`executing query ${queryID}: "${reportQuery.query}" as part of report ${reportID}`
queryMetadata.report_id, 'processing',
`executing query ${queryMetadata.query_id}: "${queryMetadata.query.query}" as part of report ${queryMetadata.report_id}`
);
if (statusSet.isError) {return Promise.resolve(Result.Failure(`unable to set report status`))}

// kick off the describe or query process
if (describe) {
this.processDescribe(reportID, request.query!, storageConnection, files as FileMetadata[]);
this.processDescribe(
queryMetadata.report_id,
queryMetadata.request.query!,
storageConnection,
files as FileMetadata[]);
} else {
this.processQuery(
reportID,
request.query!,
queryMetadata.report_id,
queryMetadata.request.query!,
storageConnection,
files as FileMetadata[],
queryID,
user
queryMetadata.query_id,
queryMetadata.user
);
}

// return report ID to the user so they can poll for results
return Promise.resolve(Result.Success(reportID));
return Promise.resolve(Result.Success(queryMetadata.report_id));
}

async processQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,18 +392,12 @@ export default class TypeTransformationRepository extends Repository implements
// if the data_source value for any given param is an old ID for an existing data source, replace it with the actual data source ID
const backfillDataSources = (params: EdgeConnectionParameter[]) => {
params.forEach(param => {
console.log('old param val', param.value);
if (param.value) {
// backfill old ID with new ID if present
const matchedSource = dataSources.find(src => src?.DataSourceRecord?.old_id === param.value);
console.log('old id', matchedSource?.DataSourceRecord?.old_id);
console.log('new id', matchedSource?.DataSourceRecord?.id);
if (matchedSource) param.value = matchedSource!.DataSourceRecord!.id!;
console.log('new param val', param.value);
} else {
console.log('dsID', dataSourceID);
param.value = dataSourceID;
console.log('new param val', param.value);
}
})
}
Expand Down
21 changes: 21 additions & 0 deletions server/src/domain_objects/data_warehouse/data/report_query.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { BaseDomainClass } from "../../../common_classes/base_domain_class";
import {IsArray, IsOptional, IsString} from 'class-validator';
import Report from './report';
import { User } from "../../access_management/user";

/*
ReportQuery represents a query and its execution status.
Expand Down Expand Up @@ -39,6 +40,26 @@ export default class ReportQuery extends BaseDomainClass{
}
}

// input type for passing info between query methods
export type ReportQueryMetadata = {
container_id: string;
data_source_id: string;
request: TimeseriesInitialRequest;
user: User;
report_id: string;
query: ReportQuery;
query_id: string;
}

// domain object for the results of checkQueryExists query
export class CompletedQueryMatch {
@IsString()
query_id?: string;

@IsString()
status_message?: string;
}

// initial object used to create request for the timeseries rust module
export class TimeseriesInitialRequest {
@IsOptional()
Expand Down

0 comments on commit 0935e52

Please sign in to comment.