diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 000000000..d17d6180f --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +/kubernetes @Digital-Engineering/infrastructure diff --git a/.gitignore b/.gitignore index 70b7c9329..0527c237d 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,4 @@ profile-logs /RedisLoader/RedisLoader.iml /AdminWebApp/src/**/*.js /AdminWebApp/src/**/*.js.map +/NodeLibraries/dl-fast-load/build-output.txt diff --git a/AdminWebApp/package-lock.json b/AdminWebApp/package-lock.json index 26481744b..707fba025 100644 --- a/AdminWebApp/package-lock.json +++ b/AdminWebApp/package-lock.json @@ -31,6 +31,7 @@ "element-ui": "^2.14.1", "eslint": "^7.5.0", "eslint-plugin-vue": "^9.2.0", + "flatpickr": "^4.6.13", "force-graph": "^1.42.9", "highlight.js": "^10.4.1", "is-svg": "^4.3.1", @@ -49,7 +50,6 @@ "vue": "^2.6.12", "vue-class-component": "^7.2.6", "vue-cli-plugin-vuetify": "~2.5.8", - "vue-flatpickr-component": "^8.1.7", "vue-i18n": "^8.25.0", "vue-json-viewer": "^2.2.22", "vue-observe-visibility": "^1.0.0", @@ -11478,21 +11478,6 @@ "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==" }, - "node_modules/vue-flatpickr-component": { - "version": "8.1.7", - "resolved": "https://registry.npmjs.org/vue-flatpickr-component/-/vue-flatpickr-component-8.1.7.tgz", - "integrity": "sha512-T9aapLERf5XrisKUHw8QVByFpN9UB583Bhu6+HtzvhCcfXqIYBtRc3rQC0ZhFSRk3CNMo7533U+B5Qs4WAbhyA==", - "dependencies": { - "flatpickr": "^4.6.6" - }, - "engines": { - "node": ">= 4.2.0", - "npm": ">= 3.0.0" - }, - "peerDependencies": { - "vue": "^2.0.0" - } - }, "node_modules/vue-hot-reload-api": { "version": "2.3.4", "resolved": "https://registry.npmjs.org/vue-hot-reload-api/-/vue-hot-reload-api-2.3.4.tgz", @@ -20886,14 +20871,6 @@ } } }, - "vue-flatpickr-component": { - "version": "8.1.7", - "resolved": "https://registry.npmjs.org/vue-flatpickr-component/-/vue-flatpickr-component-8.1.7.tgz", - "integrity": "sha512-T9aapLERf5XrisKUHw8QVByFpN9UB583Bhu6+HtzvhCcfXqIYBtRc3rQC0ZhFSRk3CNMo7533U+B5Qs4WAbhyA==", - "requires": { - "flatpickr": "^4.6.6" - } - }, "vue-hot-reload-api": { "version": "2.3.4", "resolved": "https://registry.npmjs.org/vue-hot-reload-api/-/vue-hot-reload-api-2.3.4.tgz", diff --git a/AdminWebApp/package.json b/AdminWebApp/package.json index 31e3d109b..8a437a31a 100644 --- a/AdminWebApp/package.json +++ b/AdminWebApp/package.json @@ -32,6 +32,7 @@ "element-ui": "^2.14.1", "eslint": "^7.5.0", "eslint-plugin-vue": "^9.2.0", + "flatpickr": "^4.6.13", "force-graph": "^1.42.9", "highlight.js": "^10.4.1", "is-svg": "^4.3.1", @@ -50,7 +51,6 @@ "vue": "^2.6.12", "vue-class-component": "^7.2.6", "vue-cli-plugin-vuetify": "~2.5.8", - "vue-flatpickr-component": "^8.1.7", "vue-i18n": "^8.25.0", "vue-json-viewer": "^2.2.22", "vue-observe-visibility": "^1.0.0", diff --git a/AdminWebApp/src/api/client.ts b/AdminWebApp/src/api/client.ts index 94c2d4dfe..64683df2f 100644 --- a/AdminWebApp/src/api/client.ts +++ b/AdminWebApp/src/api/client.ts @@ -86,6 +86,14 @@ export class Client { return this.postRawReturn(`/containers/${containerID}/graphs/nodes/${nodeID}/timeseries`, query); } + submitDataSourceGraphQLQuery(containerID: string, dataSourceID: string, query: any): Promise { + if (query.query) { + query.query = query.query.replace(/\n/g, ''); + } + + return this.postRawReturn(`/containers/${containerID}/import/datasources/${dataSourceID}/data`, query); + } + listContainers(): Promise { return this.get('/containers'); } diff --git a/AdminWebApp/src/components/data/nodeTimeseriesDialog.vue b/AdminWebApp/src/components/data/nodeTimeseriesDialog.vue index c175efeca..b5ab498bb 100644 --- a/AdminWebApp/src/components/data/nodeTimeseriesDialog.vue +++ b/AdminWebApp/src/components/data/nodeTimeseriesDialog.vue @@ -4,11 +4,16 @@ fullscreen hide-overlay transition="dialog-bottom-transition" + persistent + no-click-animation + ref="dialog" > + + @@ -17,6 +22,7 @@ color="warning" flat tile + v-observe-visibility="setDatePickers" > mdi-close - {{$t("timeseries.searchView")}} - {{transformation.name}} + {{$t("timeseries.searchView")}} - {{transformation.name}} - - - -

{{$t('timeseries.searchTimeRange')}}: {{this.startDate}} {{this.startTime}} - {{this.endDate}} {{this.endTime}}

-

{{$t('timeseries.indexRange')}}: {{this.startIndex}} - {{this.endIndex}}

- - - -
- - -

{{$t('timeseries.start')}}:

- - -
-
- - - -

{{$t('timeseries.end')}}:

- - -
-
-
+ + + + + + + + + +
+
+ +
+ +
+ +
+
- - - - + - - - - - {{$t('timeseries.runSearch')}} - - + + + + + + + + + + + + + + + + +
+ + + {{$t('timeseries.runSearch')}} + Stop Stream + + + + + + + +
+
+ + + + + + + + + + - + + + + + + + + + + + + +
+ + +
+ + + mdi-table + + + mdi-arrow-up-drop-circle + + +
@@ -120,6 +246,12 @@ +
+ + + No Results + +
@@ -130,10 +262,13 @@ \ No newline at end of file + + + \ No newline at end of file diff --git a/AdminWebApp/src/components/data/timeseriesAnnotationDialog.vue b/AdminWebApp/src/components/data/timeseriesAnnotationDialog.vue new file mode 100644 index 000000000..e30395efb --- /dev/null +++ b/AdminWebApp/src/components/data/timeseriesAnnotationDialog.vue @@ -0,0 +1,89 @@ + + + \ No newline at end of file diff --git a/AdminWebApp/src/translations.ts b/AdminWebApp/src/translations.ts index f1231b912..6495edf02 100644 --- a/AdminWebApp/src/translations.ts +++ b/AdminWebApp/src/translations.ts @@ -167,6 +167,10 @@ export default { startIndex: 'Start Index', endIndex: 'End Index', indexRange: 'Index Range', + columnName: 'Column Name', + dataSource: 'Data Source', + type: 'Type', + resultLimit: 'Result Limit', }, dataQuery: { deleteNode: 'Delete Node', diff --git a/src/data_access_layer/mappers/data_warehouse/import/data_source_mapper.ts b/src/data_access_layer/mappers/data_warehouse/import/data_source_mapper.ts index 96798c826..8b7d9f037 100644 --- a/src/data_access_layer/mappers/data_warehouse/import/data_source_mapper.ts +++ b/src/data_access_layer/mappers/data_warehouse/import/data_source_mapper.ts @@ -128,14 +128,14 @@ export default class DataSourceMapper extends Mapper { return super.runStatement(this.insertIntoHypertableStatement(source, records)); } - public async CopyFromHypertable(source: DataSourceRecord, startTime?: string, endTime?: string): Promise> { + public async CopyFromHypertable(source: DataSourceRecord, options?: copyTableOptions): Promise> { return new Promise((resolve) => { PostgresAdapter.Instance.Pool.connect((err: Error, client: PoolClient, done: any) => { if (err) { return resolve(Result.Failure('unable to secure postgres client')); } - const stream = client.query(copyTo(this.hypertableCopyToStatement(source, startTime, endTime))); + const stream = client.query(copyTo(this.hypertableCopyToStatement(source, options))); stream.on('error', done); stream.on('end', done); @@ -450,27 +450,57 @@ export default class DataSourceMapper extends Mapper { return format(text, values); } - private hypertableCopyToStatement(source: DataSourceRecord, startTime?: string, endTime?: string): string { + private hypertableCopyToStatement(source: DataSourceRecord, options?: copyTableOptions): string { const config = source.config as TimeseriesDataSourceConfig; - if (!startTime || !endTime) { - return `COPY (SELECT * FROM y_${source.id}) TO STDOUT WITH (FORMAT CSV, HEADER)`; + if (!options || !options.startTimeOrIndex || !options.endTime) { + if (options && options.secondaryIndexName && options.secondaryIndexStartValue) { + return `COPY (SELECT * FROM y_${source.id} WHERE ${format( + '%I > %L', + options.secondaryIndexName, + options.secondaryIndexStartValue, + )}) TO STDOUT WITH (FORMAT CSV, HEADER)`; + } else { + return `COPY (SELECT * FROM y_${source.id}) TO STDOUT WITH (FORMAT CSV, HEADER)`; + } } const primaryTimestampColumn = config.columns.find((c) => c.is_primary_timestamp); const formatString = 'YYYY-MM-DD HH24:MI:SS.US'; if (primaryTimestampColumn?.type === 'date') { - return `COPY (SELECT * FROM y_${source.id} WHERE ${format(primaryTimestampColumn?.column_name)} BETWEEN ${format( - 'to_timestamp("%s", %L)', - startTime, - formatString, - )} AND ${format('to_timestamp("%s", %L)', endTime, formatString)}) TO STDOUT WITH (FORMAT CSV, HEADER)`; + if (options && options.secondaryIndexName && options.secondaryIndexStartValue) { + return `COPY (SELECT * FROM y_${source.id} WHERE ${format(primaryTimestampColumn?.column_name)} BETWEEN ${format( + 'to_timestamp("%s", %L)', + options.startTimeOrIndex, + formatString, + )} AND ${options.endTime ? format('to_timestamp("%s", %L)', options.endTime, formatString) : format('NOW()')} AND ${format( + '%I > %L', + options.secondaryIndexName, + options.secondaryIndexStartValue, + )}) TO STDOUT WITH (FORMAT CSV, HEADER)`; + } else { + return `COPY (SELECT * FROM y_${source.id} WHERE ${format(primaryTimestampColumn?.column_name)} BETWEEN ${format( + 'to_timestamp("%s", %L)', + options.startTimeOrIndex, + formatString, + )} AND ${ + options.endTime ? format('to_timestamp("%s", %L)', options.endTime, formatString) : format('NOW()') + }) TO STDOUT WITH (FORMAT CSV, HEADER)`; + } } - return `COPY (SELECT * FROM y_${source.id} WHERE ${format(primaryTimestampColumn?.column_name)} BETWEEN '${format(startTime)}' AND '${format( - endTime, - )}') TO STDOUT WITH (FORMAT CSV, HEADER)`; + if (options && options.secondaryIndexName && options.secondaryIndexStartValue) { + return `COPY (SELECT * FROM y_${source.id} WHERE ${format(primaryTimestampColumn?.column_name)} >'${format(options.startTimeOrIndex)}' AND ${format( + '%I > %L', + options.secondaryIndexName, + options.secondaryIndexStartValue, + )}) TO STDOUT WITH (FORMAT CSV, HEADER)`; + } else { + return `COPY (SELECT * FROM y_${source.id} WHERE ${format(primaryTimestampColumn?.column_name)} > '${format( + options.startTimeOrIndex, + )}' AND '${format(options.endTime)}') TO STDOUT WITH (FORMAT CSV, HEADER)`; + } } // this will break an array of objects into a json string and insert it into the statement @@ -587,3 +617,11 @@ export default class DataSourceMapper extends Mapper { return format(statements.join(' '), values); } } + +type copyTableOptions = { + startTimeOrIndex?: string; + endTime?: string; // only applicable when start is a time + // we make a lot of assumptions with the secondary index, such as it's a number field and that it increments + secondaryIndexName?: string; + secondaryIndexStartValue?: string; +}; diff --git a/src/graphql/timeseries_schema.ts b/src/graphql/timeseries_schema.ts index aed6945e5..a60456b9a 100644 --- a/src/graphql/timeseries_schema.ts +++ b/src/graphql/timeseries_schema.ts @@ -16,13 +16,13 @@ import { } from 'graphql'; import Result from '../common_classes/result'; import GraphQLJSON from 'graphql-type-json'; -import { stringToValidPropertyName } from '../services/utilities'; +import {stringToValidPropertyName} from '../services/utilities'; import Logger from '../services/logger'; import Config from '../services/config'; import DataSourceRepository from '../data_access_layer/repositories/data_warehouse/import/data_source_repository'; -import { DataSource } from '../interfaces_and_impl/data_warehouse/import/data_source'; -import { TimeseriesDataSourceConfig } from '../domain_objects/data_warehouse/import/data_source'; -import { ResolverOptions } from './node_graph_schema'; +import {DataSource} from '../interfaces_and_impl/data_warehouse/import/data_source'; +import {TimeseriesDataSourceConfig} from '../domain_objects/data_warehouse/import/data_source'; +import {ResolverOptions} from './node_graph_schema'; // GraphQLSchemaGenerator takes a container and generates a valid GraphQL schema for all contained metatypes. This will // allow users to query and filter data found within their custom timeseries data source @@ -56,7 +56,7 @@ export default class DataSourceGraphQLSchemaGenerator { file_size: {type: GraphQLFloat}, md5hash: {type: GraphQLString}, metadata: {type: GraphQLJSON}, - url: {type: GraphQLString} + url: {type: GraphQLString}, }, }); @@ -70,7 +70,7 @@ export default class DataSourceGraphQLSchemaGenerator { }, }); - constructor(){ + constructor() { this.#dataSourceRepo = new DataSourceRepository(); } @@ -92,7 +92,7 @@ export default class DataSourceGraphQLSchemaGenerator { new GraphQLSchema({ query: new GraphQLObjectType({ name: 'Query', - fields: dataSourceGrapQLObject + fields: dataSourceGrapQLObject, }), }), ), @@ -107,85 +107,87 @@ export default class DataSourceGraphQLSchemaGenerator { dataSourceGrapQLObjects[stringToValidPropertyName(name)] = { args: { _record: {type: this.recordInputType}, - ...this.inputFieldsForDataSource(source) + ...this.inputFieldsForDataSource(source), }, description: `Timeseries data from the data source ${source.DataSourceRecord!.name}`, type: options.returnFile ? this.fileInfo - : new GraphQLList(new GraphQLObjectType({ - name: stringToValidPropertyName(name), - // needed because the return type accepts an object, but throws a fit about it - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - fields: () => { - const output: {[key: string]: {[key: string]: GraphQLNamedType | GraphQLList}} = {}; - output._record = {type: this.recordInfo}; - - (source.DataSourceRecord!.config as TimeseriesDataSourceConfig).columns.forEach((column) => { - // if it's not a column mapping, skip so we don't pollute the object - if (!column.column_name || !column.type) { - return; - } - - const propertyName = stringToValidPropertyName(column.column_name); - switch (column.type) { - // because we have no specification on our internal number type, - // we must set this as a float for now - case 'number': { - output[propertyName] = { - type: GraphQLInt, - }; - break; - } - - case 'float': { - output[propertyName] = { - type: GraphQLFloat, - }; - break; - } - - case 'number64' || 'float64': { - output[propertyName] = { - type: GraphQLString, - }; - break; - } - - case 'boolean': { - output[propertyName] = { - type: GraphQLBoolean, - }; - break; - } - - case 'string' || 'date' || 'file': { - output[propertyName] = { - type: GraphQLString, - }; - break; - } - - default: { - output[propertyName] = { - type: GraphQLJSON, - }; - } - } - }); - - return output; - }, - })), - resolve: this.resolverForDataSource(source, options) - } + : new GraphQLList( + new GraphQLObjectType({ + name: stringToValidPropertyName(name), + // needed because the return type accepts an object, but throws a fit about it + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + fields: () => { + const output: {[key: string]: {[key: string]: GraphQLNamedType | GraphQLList}} = {}; + output._record = {type: this.recordInfo}; + + (source.DataSourceRecord!.config as TimeseriesDataSourceConfig).columns.forEach((column) => { + // if it's not a column mapping, skip so we don't pollute the object + if (!column.column_name || !column.type) { + return; + } + + const propertyName = stringToValidPropertyName(column.column_name); + switch (column.type) { + // because we have no specification on our internal number type, + // we must set this as a float for now + case 'number': { + output[propertyName] = { + type: GraphQLInt, + }; + break; + } + + case 'float': { + output[propertyName] = { + type: GraphQLFloat, + }; + break; + } + + case 'number64' || 'float64': { + output[propertyName] = { + type: GraphQLString, + }; + break; + } + + case 'boolean': { + output[propertyName] = { + type: GraphQLBoolean, + }; + break; + } + + case 'string' || 'date' || 'file': { + output[propertyName] = { + type: GraphQLString, + }; + break; + } + + default: { + output[propertyName] = { + type: GraphQLJSON, + }; + } + } + }); + + return output; + }, + }), + ), + resolve: this.resolverForDataSource(source, options), + }; return dataSourceGrapQLObjects; } inputFieldsForDataSource(source: DataSource): {[key: string]: any} { const fields: {[key: string]: any} = {}; - const config = source.DataSourceRecord!.config as TimeseriesDataSourceConfig + const config = source.DataSourceRecord!.config as TimeseriesDataSourceConfig; config.columns.forEach((column) => { if (!column.column_name || !column.type) return; @@ -234,7 +236,7 @@ export default class DataSourceGraphQLSchemaGenerator { break; } - case 'json' : { + case 'json': { fields[propertyName] = { type: new GraphQLInputObjectType({ name: stringToValidPropertyName(`y_${source.DataSourceRecord!.id}` + column.column_name), @@ -267,7 +269,7 @@ export default class DataSourceGraphQLSchemaGenerator { resolverForDataSource(source: DataSource, options?: ResolverOptions): (_: any, {input}: {input: any}) => any { return async (_, input: {[key: string]: any}) => { let repo: DataSourceRepository; - const config = source.DataSourceRecord?.config as TimeseriesDataSourceConfig + const config = source.DataSourceRecord?.config as TimeseriesDataSourceConfig; if ( Object.keys(input).filter((key) => { @@ -310,80 +312,67 @@ export default class DataSourceGraphQLSchemaGenerator { // same statement but we must add "and" in front of each subsequent call to "query" // otherwise we'll get sql statement errors if (i === 0) { - repo = repo.query( - propertyMap[key].column_name, - input[key].operator, - input[key].value, - { - dataType: propertyMap[key].type, - tableName: `y_${source.DataSourceRecord?.id}` - } - ); + repo = repo.query(propertyMap[key].column_name, input[key].operator, input[key].value, { + dataType: propertyMap[key].type, + tableName: `y_${source.DataSourceRecord?.id}`, + }); i++; } else { - repo = repo.and().query( - propertyMap[key].column_name, - input[key].operator, - input[key].value, - { - dataType: propertyMap[key].type, - tableName: `y_${source.DataSourceRecord?.id}` - } - ); + repo = repo.and().query(propertyMap[key].column_name, input[key].operator, input[key].value, { + dataType: propertyMap[key].type, + tableName: `y_${source.DataSourceRecord?.id}`, + }); } }); if (options && options.returnFile) { - return new Promise( - (resolve, reject) => { - void repo - .listTimeseriesToFile(source.DataSourceRecord!.id!, { - file_type: options && options.returnFileType ? options.returnFileType: 'json', - file_name: `${source.DataSourceRecord?.name}-${new Date().toDateString()}`, - containerID: source.DataSourceRecord!.container_id!, - }) - .then((result) => { - if (result.isError) { - reject(`unable to list timeseries data to file ${result.error?.error}`); - } - - resolve(result.value); - }) - .catch((e) => { - reject(e); - }) - } - ); + return new Promise((resolve, reject) => { + void repo + .listTimeseriesToFile(source.DataSourceRecord!.id!, { + file_type: options && options.returnFileType ? options.returnFileType : 'json', + file_name: `${source.DataSourceRecord?.name}-${new Date().toDateString()}`, + containerID: source.DataSourceRecord!.container_id!, + }) + .then((result) => { + if (result.isError) { + reject(`unable to list timeseries data to file ${result.error?.error}`); + } + + resolve(result.value); + }) + .catch((e) => { + reject(e); + }); + }); } else { - return new Promise( - (resolve) => { - void repo - .listTimeseries(source.DataSourceRecord!.id!, { - limit: input._record?.limit ? input._record.limit : 10000, - offset: input._record?.page ? input._record.limit * input._record.page : undefined, - sortBy: input._record?.sortBy ? input._record.sortBy : undefined, - sortDesc: input._record?.sortDesc ? input._record.sortDesc : undefined, - }) - .then((results) => { - if (results.isError) { - Logger.error(`unable to list time series data ${results.error?.error}`); - resolve([]); - } - - const output: {[key: string]: any}[] = []; - - results.value.forEach((entry) => { - output.push({ - ...entry, - }); + return new Promise((resolve) => { + void repo + .listTimeseries(source.DataSourceRecord!.id!, { + limit: input._record?.limit ? input._record.limit : 10000, + offset: input._record?.page ? input._record.limit * input._record.page : undefined, + sortBy: input._record?.sortBy ? input._record.sortBy : undefined, + sortDesc: input._record?.sortDesc ? input._record.sortDesc : undefined, + tableName: `y_${source.DataSourceRecord?.id}`, + }) + .then((results) => { + if (results.isError) { + Logger.error(`unable to list time series data ${results.error?.error}`); + resolve([]); + } + + const output: {[key: string]: any}[] = []; + + results.value.forEach((entry) => { + output.push({ + ...entry, }); + }); - resolve(output); - }) - .catch((e) => resolve(e)) - } - ); + resolve(output); + }) + .catch((e) => resolve(e)); + }); } }; } -} \ No newline at end of file +} diff --git a/src/http_server/routes/data_warehouse/data/graph_functions/file_functions.ts b/src/http_server/routes/data_warehouse/data/graph_functions/file_functions.ts index 2e5039b3d..aabc35622 100644 --- a/src/http_server/routes/data_warehouse/data/graph_functions/file_functions.ts +++ b/src/http_server/routes/data_warehouse/data/graph_functions/file_functions.ts @@ -219,6 +219,14 @@ export default class FileFunctions { return; } + if (req.query.deleteAfter && String(req.query.deleteAfter).toLowerCase() === 'true') { + res.on('finish', () => { + fileRepo.delete(file.value).catch((e) => { + Logger.error(`unable to delete file after downloading ${e.message}`); + }); + }); + } + stream.pipe(res); }) .catch((err) => { diff --git a/src/http_server/routes/data_warehouse/import/data_source_routes.ts b/src/http_server/routes/data_warehouse/import/data_source_routes.ts index 7ab03dc57..a600abee5 100644 --- a/src/http_server/routes/data_warehouse/import/data_source_routes.ts +++ b/src/http_server/routes/data_warehouse/import/data_source_routes.ts @@ -8,6 +8,7 @@ import {QueryOptions} from '../../../../data_access_layer/repositories/repositor import DataStagingRepository from '../../../../data_access_layer/repositories/data_warehouse/import/data_staging_repository'; import Authorization from '../../../../domain_objects/access_management/authorization/authorization'; import DataSourceMapper from '../../../../data_access_layer/mappers/data_warehouse/import/data_source_mapper'; +import FileRepository from '../../../../data_access_layer/repositories/data_warehouse/data/file_repository'; const dataSourceRepo = new DataSourceRepository(); const dataSourceFactory = new DataSourceFactory(); @@ -125,11 +126,12 @@ export default class DataSourceRoutes { const mapper = DataSourceMapper.Instance; mapper - .CopyFromHypertable( - req.dataSource?.DataSourceRecord!, - req.query.startTime ? (req.query.startTime as string) : undefined, - req.query.endTime ? (req.query.endTime as string) : undefined, - ) + .CopyFromHypertable(req.dataSource?.DataSourceRecord!, { + startTimeOrIndex: req.query.startTime ? (req.query.startTime as string) : undefined, + endTime: req.query.endTime ? (req.query.endTime as string) : undefined, + secondaryIndexName: req.query.secondaryIndexName ? (req.query.secondaryIndexName as string) : undefined, + secondaryIndexStartValue: req.query.secondaryIndexStartValue ? (req.query.secondaryIndexStartValue as string) : undefined, + }) .then((stream) => { if (stream.isError) { stream.asResponse(res); @@ -137,8 +139,16 @@ export default class DataSourceRoutes { return; } - res.attachment(`${req.dataSource?.DataSourceRecord?.name} output.csv`); - stream.value.pipe(res); + const fileRepo = new FileRepository(); + fileRepo + .uploadFile(req.container?.id!, req.currentUser!, `${req.dataSource?.DataSourceRecord?.name}-${new Date().toISOString()}`, stream.value) + .then((file) => { + file.asResponse(res); + return; + }) + .catch((err) => { + Result.Error(err).asResponse(res); + }); }) .catch((err) => { Result.Error(err).asResponse(res); diff --git a/src/services/blob_storage/pg_large_file_impl.ts b/src/services/blob_storage/pg_large_file_impl.ts index c11fc8830..17cb99e7b 100644 --- a/src/services/blob_storage/pg_large_file_impl.ts +++ b/src/services/blob_storage/pg_large_file_impl.ts @@ -24,7 +24,10 @@ export default class LargeObject implements BlobStorage { .then(() => { client .query('COMMIT') - .then(() => resolve(Result.Success(true))) + .then(() => { + client.release(); + resolve(Result.Success(true)); + }) .catch((e) => reject(e)); }) .catch((e) => reject(e)); diff --git a/src/tests/data_warehouse/import/timeseries_data_source_impl.spec.ts b/src/tests/data_warehouse/import/timeseries_data_source_impl.spec.ts index 9d0b8dd84..8e88d498e 100644 --- a/src/tests/data_warehouse/import/timeseries_data_source_impl.spec.ts +++ b/src/tests/data_warehouse/import/timeseries_data_source_impl.spec.ts @@ -495,6 +495,107 @@ describe('A Standard DataSource Implementation can', async () => { return sourceRepo.delete(source!); }).timeout(10000); + it('can copy from hypertable to a stream with secondary index', async () => { + // build the data source first + const sourceRepo = new DataSourceRepository(); + + let source = new DataSourceFactory().fromDataSourceRecord( + new DataSourceRecord({ + container_id: containerID, + name: 'Test Data Source', + active: false, + adapter_type: 'timeseries', + config: new TimeseriesDataSourceConfig({ + columns: [ + { + column_name: 'primary_timestamp', + property_name: 'Timestamp', + is_primary_timestamp: true, + type: 'date', + date_conversion_format_string: 'YYYY-MM-DD HH:MI:SS', + }, + { + column_name: 'temperature', + property_name: 'Temperature (K)', + is_primary_timestamp: false, + type: 'number', + }, + { + column_name: 'velocity_i', + property_name: 'Velocity[i] (m/s)', + is_primary_timestamp: false, + type: 'number', + }, + { + column_name: 'velocity_j', + property_name: 'Velocity[j] (m/s)', + is_primary_timestamp: false, + type: 'number', + }, + { + column_name: 'x', + property_name: 'X (m)', + is_primary_timestamp: false, + type: 'number', + }, + { + column_name: 'y', + property_name: 'Y (m)', + is_primary_timestamp: false, + type: 'number', + }, + { + column_name: 'z', + property_name: 'Z (m)', + is_primary_timestamp: false, + type: 'number', + }, + ] as TimeseriesColumn[], + }), + }), + ); + + let results = await sourceRepo.save(source!, user); + expect(results.isError, results.error?.error).false; + expect(source!.DataSourceRecord?.id).not.undefined; + + // write the json test data out to a temporary file + fs.writeFileSync('./test-timeseries-data.json', sampleJSON); + + // now we create an import through the datasource + + let received = await source!.ReceiveData(fs.createReadStream('./test-timeseries-data.json'), user); + expect(received.isError, received.error?.error).false; + + // now let's try csv files + fs.writeFileSync('./test-timeseries-data.csv', sampleCSV); + + received = await source!.ReceiveData(fs.createReadStream('./test-timeseries-data.csv'), user, { + transformStream: csv({ + downstreamFormat: 'array', // needed because downstream expects an array of json, not single objects + }), + bufferSize: 1, + }); + expect(received.isError, received.error?.error).false; + + let stream = await DataSourceMapper.Instance.CopyFromHypertable(source?.DataSourceRecord!, { + secondaryIndexName: 'Y (m)', + secondaryIndexStartValue: '2', + }); + + const out = fs.createWriteStream('./out_index.csv', {flags: 'w'}); + + stream.value.on('end', () => { + expect(fs.statSync('./out_index.csv').size > 0); + fs.unlinkSync('./out_index.csv'); + }); + await stream.value.pipe(out); + + fs.unlinkSync('./test-timeseries-data.csv'); + fs.unlinkSync('./test-timeseries-data.json'); + return sourceRepo.delete(source!); + }).timeout(10000); + it('can ingest data to a hypertable using dl-fast-load', async (done) => { if (process.env.TEST_FAST_LOAD !== 'true') { done();