-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
240 lines (210 loc) · 7.77 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import fs from 'fs'
import cors from 'cors'
import express from 'express'
import Promise from 'bluebird'
import bodyParser from 'body-parser'
import http from 'http'
import ApolloServerExpress from 'apollo-server-express'
import ApolloFederation from '@apollo/federation'
import GraphQLTools from 'graphql-tools'
import { dirname } from 'path'
import { fileURLToPath } from 'url'
import { Schema, elasticsearch } from 'backend-shared'
import { setup, childSetup } from './services/setup.js'
import { setNtee } from './services/irs_990_importer/set_ntee.js'
import { loadAllForYear } from './services/irs_990_importer/load_all_for_year.js'
import {
processUnprocessedNonprofits, processEin, fixBadFundImports, processUnprocessedFunds
} from './services/irs_990_importer/index.js'
import { parseWebsitesByNtee } from './services/irs_990_importer/parse_websites.js'
import IrsNonprofit990 from './graphql/irs_nonprofit_990/model.js'
import * as directives from './graphql/directives.js'
import config from './config.js'
const { ApolloServer } = ApolloServerExpress
const { buildFederatedSchema } = ApolloFederation
const { SchemaDirectiveVisitor } = GraphQLTools
const __dirname = dirname(fileURLToPath(import.meta.url))
let resolvers, schemaDirectives
let typeDefs = fs.readFileSync('./graphql/type.graphql', 'utf8')
const schemaPromise = Schema.getSchema({ directives, typeDefs, dirName: __dirname })
Promise.config({ warnings: false })
const app = express()
app.set('x-powered-by', false)
app.use(cors())
app.use(bodyParser.json({ limit: '1mb' }))
// Avoid CORS preflight
app.use(bodyParser.json({ type: 'text/plain', limit: '1mb' }))
app.use(bodyParser.urlencoded({ extended: true })) // Kiip uses
app.get('/', (req, res) => res.status(200).send('ok'))
const validTables = [
'irs_orgs', 'irs_org_990s', 'irs_funds', 'irs_fund_990s',
'irs_persons', 'irs_contributions'
]
app.get('/tableCount', function (req, res) {
if (validTables.indexOf(req.query.tableName) === -1) {
res.send({ error: 'invalid table name' })
}
return elasticsearch.client.count({
index: req.query.tableName
})
.then(c => res.send(JSON.stringify(c)))
})
app.get('/unprocessedCount', function (req, res) {
return IrsNonprofit990.search({
trackTotalHits: true,
limit: 1, // 16 cpus, 16 chunks
query: {
bool: {
must: {
range: {
importVersion: {
lt: config.CURRENT_IMPORT_VERSION
}
}
}
}
}
})
.then(c => res.send(JSON.stringify(c)))
})
// settings that supposedly make ES bulk insert faster
// (refresh interval -1 and 0 replicas). but it doesn't seem to make it faster
app.get('/setES', async function (req, res) {
let replicas
if (req.query.mode === 'bulk') {
replicas = 0
// refreshInterval = -1
} else { // default / reset
replicas = 2
}
// refreshInterval = null
return res.send(await Promise.map(validTables, async function (tableName) {
const settings = await elasticsearch.client.indices.getSettings({
index: tableName
})
const previous = settings[tableName].settings.index
const diff =
{ number_of_replicas: replicas }
// refresh_interval: refreshInterval
await elasticsearch.client.indices.putSettings({
index: tableName,
body: diff
})
return JSON.stringify({ previous, diff })
}
, { concurrency: 1 }))
})
app.get('/setMaxWindow', async function (req, res) {
if (validTables.indexOf(req.query.tableName) === -1) {
res.send({ error: 'invalid table name' })
}
const maxResultWindow = parseInt(req.query.maxResultWindow)
if ((maxResultWindow < 10000) || (maxResultWindow > 100000)) {
res.send({ error: 'must be number between 10,000 and 100,000' })
}
return res.send(await elasticsearch.client.indices.putSettings({
index: req.query.tableName,
body: { max_result_window: maxResultWindow }
}))
})
// 2500/s on 4 pods each w/ 4vcpu (1.7mm total) = ~11 min
// bottleneck is queries-in-flight limit for scylla & es
// (throttled by # of cpus / concurrencyPerCpu in jobs settings / queue rate limiter)
// realistically the queue rate limiter is probably the blocker (x per second)
// set to as high as you can without getting scylla complaints.
// 25/s seems to be the sweet spot with current scylla/es setup (1 each)
app.get('/setNtee', function (req, res) {
setNtee()
return res.send('syncing')
})
// pull in all eins / xml urls that filed for a given year
// run for 2014, 2015, 2016, 2017, 2018, 2019, 2020
// 2015, 2016 done FIXME rm this line
// each takes ~3 min (1 cpu)
// bottleneck is elasticsearch writes (bulk goes through, but some error if server is overwhelmed).
app.get('/loadAllForYear', function (req, res) {
loadAllForYear(req.query.year)
return res.send(`syncing ${req.query.year || 'sample_index'}`)
})
// go through every 990 we haven't processed, and get data for it from xml file/irsx
// ES seems to be main bottleneck. we bulk reqs, but they're still slow.
// 1/2 of time is spent on irsx, 1/2 on es upserts
// if we send too many bulk reqs at once, es will start to send back errors
// i think the issue is bulk upserts in ES are just slow in general.
// faster ES node seems to help a little, but not much...
// cheapest / best combo seems to be 4vcpu/8gb for ES, 8x 2vcpu/2gb for api.
// ^^ w/ 2 job concurrencyPerCpu, that's 32. 32 * 300 (chunk) = 9600 (limit)
// seems to be sweet spot w/ ~150-250 nonprofits/s (2-3 hours total)
// could probably go faster with more cpus (bottleneck at this point is irsx)
// might need to increase thread_pool.write.queue_size to 1000
app.get('/processUnprocessedNonprofits', function (req, res) {
processUnprocessedNonprofits(req.query)
return res.send('processing nonprofits')
})
app.get('/processEin', function (req, res) {
processEin(req.query.ein, { type: req.query.type })
return res.send('processing nonprofit')
})
app.get('/fixBadFundImports', function (req, res) {
fixBadFundImports({ limit: req.query.limit })
return res.send('fixing bad fund imports')
})
// chunkConcurrency=10
// chunkConcurrency = how many nonprofits of a chunk to process simultaneously...
// doesn't matter for nonprofits, but for funds it does (since there's an es fetch)
// sweet spot is 1600&chunkSize=50&chunkConcurrency=3 (slow)
// even with that, scylla might fail upserts for large funds
// so maybe run at chunk 1 concurrency 1 for assets > 100m
app.get('/processUnprocessedFunds', function (req, res) {
processUnprocessedFunds(req.query)
return res.send('processing funds')
})
app.get('/parseWebsitesByNtee', function (req, res) {
if (req.query.ntee) {
parseWebsitesByNtee(req.query.ntee)
return res.send('syncing')
} else {
return res.send('specify ntee')
}
})
const serverPromise = schemaPromise.then((schema) => {
({ typeDefs, resolvers, schemaDirectives } = schema)
schema = buildFederatedSchema({ typeDefs, resolvers })
// https://github.com/apollographql/apollo-feature-requests/issues/145
SchemaDirectiveVisitor.visitSchemaDirectives(schema, schemaDirectives)
const defaultQuery = `
query($query: ESQuery!) {
irsNonprofits(query: $query) {
nodes {
name
employeeCount
volunteerCount
}
}
}
`
const defaultQueryVariables = `
{
"query": {"range": {"volunteerCount": {"gte": 10000}}}
}
`
const graphqlServer = new ApolloServer({
schema,
introspection: true,
playground: {
// settings:
tabs: [
{
endpoint: config.ENV === config.ENVS.DEV
? `http://localhost:${config.PORT}/graphql`
: 'https://api.techby.org/990/v1/graphql',
query: defaultQuery,
variables: defaultQueryVariables
}
]
}
})
graphqlServer.applyMiddleware({ app, path: '/graphql' })
return http.createServer(app)
})
export { serverPromise, setup, childSetup }