-
Notifications
You must be signed in to change notification settings - Fork 162
/
mysql_pushability.c
325 lines (269 loc) · 8.05 KB
/
mysql_pushability.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
/*-------------------------------------------------------------------------
*
* mysql_pushability.c
* routines for FDW pushability
*
* Portions Copyright (c) 2022-2024, EnterpriseDB Corporation.
*
* IDENTIFICATION
* mysql_pushability.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "common/string.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
#include "miscadmin.h"
#include "mysql_pushability.h"
#include "storage/fd.h"
#include "utils/fmgrprotos.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
static char *get_config_filename(void);
static void populate_pushability_hash(void);
static void config_invalid_error_callback(void *arg);
static bool get_line_buf(FILE *stream, StringInfo buf);
/* Hash table for caching the configured pushdown objects */
static HTAB *pushabilityHash = NULL;
/*
* Memory context to hold the hash table, need to free incase of any error
* while parsing the configuration file.
*/
static MemoryContext htab_ctx;
/*
* get_config_filename
* Returns the path for the pushdown object configuration file for the
* foreign-data wrapper.
*/
static char *
get_config_filename(void)
{
char sharepath[MAXPGPATH];
char *result;
get_share_path(my_exec_path, sharepath);
result = (char *) palloc(MAXPGPATH);
snprintf(result, MAXPGPATH, "%s/extension/%s_pushdown.config", sharepath,
FDW_MODULE_NAME);
return result;
}
/*
* mysql_check_remote_pushability
* Lookups into hash table by forming the hash key from provided object
* oid.
*/
bool
mysql_check_remote_pushability(Oid object_oid)
{
bool found = false;
/* Populate pushability hash if not already. */
if (unlikely(!pushabilityHash))
populate_pushability_hash();
hash_search(pushabilityHash, &object_oid, HASH_FIND, &found);
return found;
}
/*
* populate_pushability_hash
* Creates the hash table and populates the hash entries by reading the
* pushdown object configuration file.
*/
static void
populate_pushability_hash(void)
{
FILE *file = NULL;
char *config_filename;
HASHCTL ctl;
ErrorContextCallback errcallback;
unsigned int line_no = 0;
StringInfoData linebuf;
HTAB *hash;
Assert(pushabilityHash == NULL);
/*
* Create a memory context to hold hash table. This makes it easy to
* clean up in the case of error, we don't make the context long-lived
* until we parse the complete config file without an error.
*/
htab_ctx = AllocSetContextCreate(CurrentMemoryContext,
"mysql pushability_hash",
ALLOCSET_DEFAULT_SIZES);
ctl.keysize = sizeof(Oid);
ctl.entrysize = sizeof(FDWPushdownObject);
ctl.hcxt = htab_ctx;
/* Create the hash table */
hash = hash_create("mysql_fdw push elements hash", 256,
&ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Get the config file name */
config_filename = get_config_filename();
file = AllocateFile(config_filename, PG_BINARY_R);
if (file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open \"%s\": %m", config_filename)));
/* Set up callback to provide the error context */
errcallback.callback = config_invalid_error_callback;
errcallback.arg = (void *) config_filename;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
initStringInfo(&linebuf);
/*
* Read the pushdown object configuration file and push object information
* to the in-memory hash table for a faster lookup.
*/
while (get_line_buf(file, &linebuf))
{
FDWPushdownObject *entry;
Oid objectId;
ObjectType objectType;
bool found;
char *str;
line_no++;
/* If record starts with #, then consider as comment. */
if (linebuf.data[0] == '#')
continue;
/* Ignore if all blank */
if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
continue;
/* Strip trailing newline, including \r in case we're on Windows */
while (linebuf.len > 0 && (linebuf.data[linebuf.len - 1] == '\n' ||
linebuf.data[linebuf.len - 1] == '\r'))
linebuf.data[--linebuf.len] = '\0';
/* Strip leading whitespaces. */
str = linebuf.data;
while (isspace(*str))
str++;
if (pg_strncasecmp(str, "ROUTINE", 7) == 0)
{
/* Move over ROUTINE */
str = str + 7;
/* Move over any whitespace */
while (isspace(*str))
str++;
objectType = OBJECT_FUNCTION;
objectId =
DatumGetObjectId(DirectFunctionCall1(regprocedurein,
CStringGetDatum(str)));
}
else if (pg_strncasecmp(str, "OPERATOR", 8) == 0)
{
/* Move over OPERATOR */
str = str + 8;
/* Move over any whitespace */
while (isspace(*str))
str++;
objectType = OBJECT_OPERATOR;
objectId =
DatumGetObjectId(DirectFunctionCall1(regoperatorin,
CStringGetDatum(str)));
}
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid object type in configuration file at line number: %d",
line_no),
errhint("Valid values are: \"ROUTINE\", \"OPERATOR\".")));
/* Insert the new element to the hash table */
entry = hash_search(hash, &objectId, HASH_ENTER, &found);
/* Two different objects cannot have the same system object id */
if (found && entry->objectType != objectType)
elog(ERROR, "different pushdown objects have the same oid \"%d\"",
objectId);
entry->objectType = objectType;
}
if (ferror(file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m", config_filename)));
error_context_stack = errcallback.previous;
pfree(linebuf.data);
FreeFile(file);
/*
* We have fully parsed the config file. Reparent hash table context so
* that it has the right lifespan.
*/
MemoryContextSetParent(htab_ctx, CacheMemoryContext);
pushabilityHash = hash;
}
/*
* config_invalid_error_callback
* Error callback to define the context.
*/
static void
config_invalid_error_callback(void *arg)
{
char *filename = (char *) arg;
/* Destroy the hash in case of error */
hash_destroy(pushabilityHash);
pushabilityHash = NULL;
errcontext("while processing \"%s\" file", filename);
}
/*
* get_line_buf
* Returns true if a line was successfully collected (including
* the case of a non-newline-terminated line at EOF).
*
* Returns false if there was an I/O error or no data was available
* before EOF. In the false-result case, buf is reset to empty.
* (Borrowed the code from pg_get_line_buf().)
*/
bool
get_line_buf(FILE *stream, StringInfo buf)
{
int orig_len;
/* We just need to drop any data from the previous call */
resetStringInfo(buf);
orig_len = buf->len;
/* Read some data, appending it to whatever we already have */
while (fgets(buf->data + buf->len, buf->maxlen - buf->len, stream) != NULL)
{
buf->len += strlen(buf->data + buf->len);
/* Done if we have collected a newline */
if (buf->len > orig_len && buf->data[buf->len - 1] == '\n')
return true;
/* Make some more room in the buffer, and loop to read more data */
enlargeStringInfo(buf, 128);
}
/* Check for I/O errors and EOF */
if (ferror(stream) || buf->len == orig_len)
{
/* Discard any data we collected before detecting error */
buf->len = orig_len;
buf->data[orig_len] = '\0';
return false;
}
/* No newline at EOF, but we did collect some data */
return true;
}
/*
* mysql_get_configured_pushdown_objects
* Returns the hash table objects by sequentially scanning the hash table.
*/
List *
mysql_get_configured_pushdown_objects(bool reload)
{
List *result = NIL;
HASH_SEQ_STATUS scan;
FDWPushdownObject *entry;
FDWPushdownObject *object;
Size size = sizeof(FDWPushdownObject);
/*
* To avoid the memory leak, destroy the existing hash in case of
* reloading.
*/
if (reload)
{
hash_destroy(pushabilityHash);
pushabilityHash = NULL;
MemoryContextDelete(htab_ctx);
}
/* Reload configuration if that not loaded at all */
if (!pushabilityHash)
populate_pushability_hash();
hash_seq_init(&scan, pushabilityHash);
while ((entry = (FDWPushdownObject *) hash_seq_search(&scan)) != NULL)
{
object = (FDWPushdownObject *) palloc(size);
memcpy(object, entry, size);
result = lappend(result, object);
}
return result;
}