forked from Azure/azure-kusto-node
-
Notifications
You must be signed in to change notification settings - Fork 0
/
example.js
137 lines (116 loc) · 4.36 KB
/
example.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const IngestClient = require("azure-kusto-ingest").IngestClient;
const IngestStatusQueues = require("azure-kusto-ingest").KustoIngestStatusQueues;
const IngestionProps = require("azure-kusto-ingest").IngestionProperties;
const { ReportLevel, ReportMethod } = require("azure-kusto-ingest").IngestionPropertiesEnums;
const KustoConnectionStringBuilder = require("azure-kusto-data").KustoConnectionStringBuilder;
const { DataFormat, JsonColumnMapping, IngestionMappingType, CompressionType } = require("azure-kusto-ingest").IngestionPropertiesEnums;
const { BlobDescriptor, StreamDescriptor } = require("azure-kusto-ingest").IngestionDescriptors;
const StreamingIngestClient = require("azure-kusto-ingest").StreamingIngestClient;
const fs = require('fs');
const clusterName = null;
const appId = null;
const appKey = null;
const authorityId = null;
const props = new IngestionProps({
database: "Database",
table: "Table",
format: DataFormat.JSON,
ingestionMapping: [
new JsonColumnMapping("TargetColumn1", "$.sourceProp1"),
new JsonColumnMapping("TargetColumn2", "$.sourceProp2"),
new JsonColumnMapping("TargetColumn3", "$.sourceProp3")
],
ingestionMappingType: IngestionMappingType.JSON,
reportLevel: ReportLevel.FailuresAndSuccesses,
reportMethod: ReportMethod.Queue
});
const ingestClient = new IngestClient(
KustoConnectionStringBuilder.withAadApplicationKeyAuthentication(
`https://ingest-${clusterName}.kusto.windows.net`, appId, appKey, authorityId
),
props
);
const statusQueues = new IngestStatusQueues(ingestClient);
startIngestion();
// Streaming ingest client
const props2 = new IngestionProps({
database: "Database",
table: "Table",
format: DataFormat.JSON,
ingestionMappingReference: "Pre-defiend mapping name" // For json format mapping is required
});
// Init with engine endpoint
const streamingIngestClient = new StreamingIngestClient(
KustoConnectionStringBuilder.withAadApplicationKeyAuthentication(
`https://${clusterName}.kusto.windows.net`, appId, appKey, authorityId
),
props2
);
startStreamingIngestion();
async function startIngestion() {
console.log("Ingest from file");
try {
await ingestClient.ingestFromFile("file.json");
console.log("Ingestion done?");
await waitForStatus();
}
catch (err) {
console.log(err);
}
try {
await ingestClient.ingestFromBlob(new BlobDescriptor("https://<account>.blob.core.windows.net/<container>/file.json.gz", 1024 * 50 /* 50MB file */));
console.log("Ingestion done?");
await waitForStatus();
}
catch (err) {
console.log(err);
}
}
async function waitForStatus(numberOFIngestions = 1) {
while (await statusQueues.failure.isEmpty() && await statusQueues.success.isEmpty()) {
console.log("Waiting for status...");
await sleep(1000);
}
const failures = await statusQueues.failure.pop(numberOFIngestions);
for (let failure of failures) {
console.log(`Failed: ${JSON.stringify(failure)}`);
}
const successes = await statusQueues.success.pop(numberOFIngestions);
for (let success of successes) {
console.log(`Succeeded: ${JSON.stringify(success)}`);
}
}
function sleep(ms) {
return new Promise((resolve) => { setTimeout(resolve, ms); });
}
async function startStreamingIngestion() {
// Ingest from file with either file path or FileDescriptor
try {
await streamingIngestClient.ingestFromFile("file.json", props2);
console.log("Ingestion done");
}
catch (err) {
console.log(err);
}
// Ingest from stream with either ReadStream or StreamDescriptor
const stream = fs.createReadStream("file.json");
try {
await streamingIngestClient.ingestFromStream("file.json", props2);
console.log("Ingestion done");
}
catch (err) {
console.log(err);
}
// For gzip data set StreamDescriptor.compressionType to CompressionType.GZIP
const stream = fs.createReadStream("file.json.gz");
const streamDescriptor = new StreamDescriptor(stream, "id", CompressionType.GZIP);
try {
await streamingIngestClient.ingestFromStream(streamDescriptor, props2);
console.log("Ingestion done");
}
catch (err) {
console.log(err);
}
}