1
秦芳睿
9 天以前 e70a362606b78a822e93d5117a9013e8f9086faf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
 
#include "postgres.h"
 
#include "utils/agtype_raw.h"
 
/*
 * Used for building an agtype container.
 */
struct agtype_build_state
{
    int a_offset;        // next location to write agtentry
    int i;               // index of current agtentry being processed
    int d_start;         // start of variable-length portion
    StringInfo buffer;
};
 
/*
 * Define the type and size of the agt_header.
 * Copied from agtype_ext.c.
 */
#define AGT_HEADER_TYPE uint32
#define AGT_HEADER_SIZE sizeof(AGT_HEADER_TYPE)
 
/*
 * Following macros are usable in the context where
 * agtype_build_state is available
 */
#define BUFFER_RESERVE(size) reserve_from_buffer(bstate->buffer, (size))
#define BUFFER_WRITE_PAD() pad_buffer_to_int(bstate->buffer)
#define BUFFER_WRITE_CONST(offset, type, val) *((type *)(bstate->buffer->data + (offset))) = (val)
#define BUFFER_WRITE_PTR(offset, ptr, len) memcpy(bstate->buffer->data + offset, ptr, len)
 
static int write_pointer(agtype_build_state *bstate, char *ptr, int len);
static void write_agtentry(agtype_build_state *bstate, agtentry agte);
 
/*
 * Same as `write_ptr` except the content comes from
 * constant value instead of pointer.
 */
#define write_const(val, type) \
    do \
    { \
        int len = sizeof(type); \
        int offset = BUFFER_RESERVE(len); \
        BUFFER_WRITE_CONST(offset, type, val); \
    } \
    while (0)
#define write_ptr(ptr, len) write_pointer(bstate, ptr, len)
#define write_agt(agte) write_agtentry(bstate, agte)
 
/*
 * Copies the content of `ptr` to the tail of the buffer (variable-length
 * portion).
 */
static int write_pointer(agtype_build_state *bstate, char *ptr, int len)
{
    int offset = BUFFER_RESERVE(len);
    BUFFER_WRITE_PTR(offset, ptr, len);
    return len;
}
 
/*
 * Copies the content of `agte` to the next available location in the agtentry
 * portion of the buffer. That location is pointed by `bstate->a_offset`.
 *
 * This function must be called after data is written to the variable-length
 * portion.
 */
static void write_agtentry(agtype_build_state *bstate, agtentry agte)
{
    int totallen = bstate->buffer->len - bstate->d_start;
 
    /*
     * Bail out if total variable-length data exceeds what will fit in a
     * agtentry length field.  We check this in each iteration, not just
     * once at the end, to forestall possible integer overflow.
     */
    if (totallen > AGTENTRY_OFFLENMASK)
    {
        ereport(
            ERROR,
            (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
             errmsg(
                "total size of agtype array elements exceeds the maximum of %u bytes",
                AGTENTRY_OFFLENMASK)));
    }
 
    if (((bstate->i) % AGT_OFFSET_STRIDE) == 0)
    {
        agte = (agte & AGTENTRY_TYPEMASK) | totallen | AGTENTRY_HAS_OFF;
    }
 
    BUFFER_WRITE_CONST(bstate->a_offset, agtentry, agte);
    bstate->a_offset += sizeof(agtentry);
}
 
/*
 * `header_flag` = a valid agtype_container header field
 * `size` = size of container (number of pairs or elements)
 */
agtype_build_state *init_agtype_build_state(uint32 size, uint32 header_flag)
{
    int agtentry_count;
    int agtentry_len;
    agtype_build_state *bstate;
 
    bstate = palloc0(sizeof(agtype_build_state));
    bstate->buffer = makeStringInfo();
    bstate->a_offset = 0;
    bstate->i = 0;
 
    // reserve for varlen header
    BUFFER_RESERVE(VARHDRSZ);
    bstate->a_offset += VARHDRSZ;
 
    // write container header
    BUFFER_RESERVE(sizeof(uint32));
    BUFFER_WRITE_CONST(bstate->a_offset, uint32, header_flag | size);
    bstate->a_offset += sizeof(uint32);
 
    // reserve for agtentry headers
    if ((header_flag & AGT_FOBJECT) != 0)
    {
        agtentry_count = size * 2;
    }
    else if ((header_flag & AGT_FARRAY) != 0)
    {
        agtentry_count = size;
    }
    else
    {
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                 errmsg("Invalid container type.")));
    }
 
    agtentry_len = sizeof(agtentry) * agtentry_count;
    BUFFER_RESERVE(agtentry_len);
    bstate->d_start = bstate->a_offset + agtentry_len;
 
    return bstate;
}
 
agtype *build_agtype(agtype_build_state *bstate)
{
    agtype *result = (agtype *) bstate->buffer->data;
    SET_VARSIZE(result, bstate->buffer->len);
    return result;
}
 
void pfree_agtype_build_state(agtype_build_state *bstate)
{
    /*
     * bstate->buffer->data is not pfree'd because this pointer
     * is returned by the `build_agtype` function.
     */
    pfree(bstate->buffer);
    pfree(bstate);
}
 
void write_string(agtype_build_state *bstate, char *str)
{
    int length = strlen(str);
 
    write_ptr(str, length);
    write_agt(AGTENTRY_IS_STRING | length);
 
    bstate->i++;
}
 
void write_graphid(agtype_build_state *bstate, graphid graphid)
{
    int length = 0;
 
    // padding
    length += BUFFER_WRITE_PAD();
 
    // graphid header
    write_const(AGT_HEADER_INTEGER, AGT_HEADER_TYPE);
    length += AGT_HEADER_SIZE;
 
    // graphid value
    write_const(graphid, int64);
    length += sizeof(int64);
 
    // agtentry
    write_agt(AGTENTRY_IS_AGTYPE | length);
 
    bstate->i++;
}
 
void write_container(agtype_build_state *bstate, agtype *agtype)
{
    int length = 0;
 
    // padding
    length += BUFFER_WRITE_PAD();
 
    // varlen data
    length += write_ptr((char *) &agtype->root, VARSIZE(agtype));
 
    // agtentry
    write_agt(AGTENTRY_IS_CONTAINER | length);
 
    bstate->i++;
}
 
/*
 * `val` = container of the extended type
 * `header` = AGT_HEADER_VERTEX, AGT_HEADER_EDGE or AGT_HEADER_PATH
 */
void write_extended(agtype_build_state *bstate, agtype *val, uint32 header)
{
    int length = 0;
 
    // padding
    length += BUFFER_WRITE_PAD();
 
    // vertex header
    write_const(header, AGT_HEADER_TYPE);
    length += AGT_HEADER_SIZE;
 
    // vertex data
    length += write_ptr((char *) &val->root, VARSIZE(val));
 
    // agtentry
    write_agt(AGTENTRY_IS_AGTYPE | length);
 
    bstate->i++;
}