/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "postgres.h" #include "access/heapam.h" #include "access/htup_details.h" #include "access/xact.h" #include "executor/tuptable.h" #include "nodes/execnodes.h" #include "nodes/extensible.h" #include "nodes/nodes.h" #include "nodes/plannodes.h" #include "rewrite/rewriteHandler.h" #include "utils/rel.h" #include "catalog/ag_label.h" #include "executor/cypher_executor.h" #include "executor/cypher_utils.h" #include "nodes/cypher_nodes.h" #include "utils/agtype.h" #include "utils/graphid.h" static void begin_cypher_create(CustomScanState *node, EState *estate, int eflags); static TupleTableSlot *exec_cypher_create(CustomScanState *node); static void end_cypher_create(CustomScanState *node); static void rescan_cypher_create(CustomScanState *node); static void create_edge(cypher_create_custom_scan_state *css, cypher_target_node *node, Datum prev_vertex_id, ListCell *next, List *list); static Datum create_vertex(cypher_create_custom_scan_state *css, cypher_target_node *node, ListCell *next, List *list); static void process_pattern(cypher_create_custom_scan_state *css); const CustomExecMethods cypher_create_exec_methods = {CREATE_SCAN_STATE_NAME, begin_cypher_create, exec_cypher_create, end_cypher_create, rescan_cypher_create, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}; static void begin_cypher_create(CustomScanState *node, EState *estate, int eflags) { cypher_create_custom_scan_state *css = (cypher_create_custom_scan_state *)node; ListCell *lc; Plan *subplan; Assert(list_length(css->cs->custom_plans) == 1); subplan = linitial(css->cs->custom_plans); node->ss.ps.lefttree = ExecInitNode(subplan, estate, eflags); ExecAssignExprContext(estate, &node->ss.ps); ExecInitScanTupleSlot(estate, &node->ss, ExecGetResultType(node->ss.ps.lefttree), &TTSOpsHeapTuple); if (!CYPHER_CLAUSE_IS_TERMINAL(css->flags)) { TupleDesc tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; ExecAssignProjectionInfo(&node->ss.ps, tupdesc); } foreach (lc, css->pattern) { cypher_create_path *path = lfirst(lc); ListCell *lc2; foreach (lc2, path->target_nodes) { cypher_target_node *cypher_node = (cypher_target_node *)lfirst(lc2); Relation rel; if (!CYPHER_TARGET_NODE_INSERT_ENTITY(cypher_node->flags)) continue; // Open relation and acquire a row exclusive lock. rel = table_open(cypher_node->relid, RowExclusiveLock); // Initialize resultRelInfo for the vertex cypher_node->resultRelInfo = makeNode(ResultRelInfo); InitResultRelInfo(cypher_node->resultRelInfo, rel, list_length(estate->es_range_table), NULL, estate->es_instrument); // Open all indexes for the relation ExecOpenIndices(cypher_node->resultRelInfo, false); // Setup the relation's tuple slot cypher_node->elemTupleSlot = table_slot_create( rel, &estate->es_tupleTable); if (cypher_node->id_expr != NULL) { cypher_node->id_expr_state = ExecInitExpr(cypher_node->id_expr, (PlanState *)node); } if (cypher_node->prop_expr != NULL) { cypher_node->prop_expr_state = ExecInitExpr(cypher_node->prop_expr, (PlanState *)node); } } } /* * Postgres does not assign the es_output_cid in queries that do * not write to disk, ie: SELECT commands. We need the command id * for our clauses, and we may need to initialize it. We cannot use * GetCurrentCommandId because there may be other cypher clauses * that have modified the command id. */ if (estate->es_output_cid == 0) { estate->es_output_cid = estate->es_snapshot->curcid; } Increment_Estate_CommandId(estate); } /* * CREATE the vertices and edges for a CREATE clause pattern. */ static void process_pattern(cypher_create_custom_scan_state *css) { ListCell *lc2; foreach (lc2, css->pattern) { cypher_create_path *path = lfirst(lc2); List *list = path->target_nodes; ListCell *lc = list_head(list); /* * Create the first vertex. The create_vertex function will * create the rest of the path, if necessary. */ create_vertex(css, lfirst(lc), lnext(list, lc), list); /* * If this path is a variable, take the list that was accumulated * in the vertex/edge creation, create a path datum, and add to the * scantuple slot. */ if (path->path_attr_num != InvalidAttrNumber) { TupleTableSlot *scantuple; PlanState *ps; Datum result; ps = css->css.ss.ps.lefttree; scantuple = ps->ps_ExprContext->ecxt_scantuple; result = make_path(css->path_values); scantuple->tts_values[path->path_attr_num - 1] = result; scantuple->tts_isnull[path->path_attr_num - 1] = false; } css->path_values = NIL; } } static TupleTableSlot *exec_cypher_create(CustomScanState *node) { cypher_create_custom_scan_state *css = (cypher_create_custom_scan_state *)node; EState *estate = css->css.ss.ps.state; ExprContext *econtext = css->css.ss.ps.ps_ExprContext; TupleTableSlot *slot; bool terminal = CYPHER_CLAUSE_IS_TERMINAL(css->flags); bool used = false; /* * If the CREATE clause was the final cypher clause written then we aren't * returning anything from this result node. So the exec_cypher_create * function will only be called once. Therefore we will process all tuples * from the subtree at once. */ do { /*Process the subtree first */ Decrement_Estate_CommandId(estate) slot = ExecProcNode(node->ss.ps.lefttree); Increment_Estate_CommandId(estate) /* break when there are no tuples */ if (TupIsNull(slot)) { break; } /* setup the scantuple that the process_pattern needs */ econtext->ecxt_scantuple = node->ss.ps.lefttree->ps_ProjInfo->pi_exprContext->ecxt_scantuple; process_pattern(css); /* * This may not be necessary. If we have an empty pattern, nothing was * inserted and the current command Id was not used. So, only flag it * if there is a non empty pattern. */ if (list_length(css->pattern) > 0) { /* the current command Id has been used */ used = true; } } while (terminal); /* * If the current command Id wasn't used, nothing was inserted and we're * done. */ if (!used) { return NULL; } /* update the current command Id */ CommandCounterIncrement(); /* if this was a terminal CREATE just return NULL */ if (terminal) { return NULL; } econtext->ecxt_scantuple = ExecProject(node->ss.ps.lefttree->ps_ProjInfo); return ExecProject(node->ss.ps.ps_ProjInfo); } static void end_cypher_create(CustomScanState *node) { cypher_create_custom_scan_state *css = (cypher_create_custom_scan_state *)node; ListCell *lc; // increment the command counter CommandCounterIncrement(); ExecEndNode(node->ss.ps.lefttree); foreach (lc, css->pattern) { cypher_create_path *path = lfirst(lc); ListCell *lc2; foreach (lc2, path->target_nodes) { cypher_target_node *cypher_node = (cypher_target_node *)lfirst(lc2); if (!CYPHER_TARGET_NODE_INSERT_ENTITY(cypher_node->flags)) { continue; } // close all indices for the node ExecCloseIndices(cypher_node->resultRelInfo); // close the relation itself table_close(cypher_node->resultRelInfo->ri_RelationDesc, RowExclusiveLock); } } } static void rescan_cypher_create(CustomScanState *node) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cypher create clause cannot be rescanned"), errhint("its unsafe to use joins in a query with a Cypher CREATE clause"))); } Node *create_cypher_create_plan_state(CustomScan *cscan) { cypher_create_custom_scan_state *cypher_css = palloc0(sizeof(cypher_create_custom_scan_state)); cypher_create_target_nodes *target_nodes; char *serialized_data; Const *c; cypher_css->cs = cscan; // get the serialized data structure from the Const and deserialize it. c = linitial(cscan->custom_private); serialized_data = (char *)c->constvalue; target_nodes = stringToNode(serialized_data); Assert(is_ag_node(target_nodes, cypher_create_target_nodes)); cypher_css->path_values = NIL; cypher_css->pattern = target_nodes->paths; cypher_css->flags = target_nodes->flags; cypher_css->graph_oid = target_nodes->graph_oid; cypher_css->css.ss.ps.type = T_CustomScanState; cypher_css->css.methods = &cypher_create_exec_methods; return (Node *)cypher_css; } /* * Create the edge entity. */ static void create_edge(cypher_create_custom_scan_state *css, cypher_target_node *node, Datum prev_vertex_id, ListCell *next, List *list) { bool isNull; EState *estate = css->css.ss.ps.state; ExprContext *econtext = css->css.ss.ps.ps_ExprContext; ResultRelInfo *resultRelInfo = node->resultRelInfo; ResultRelInfo **old_estate_es_result_relations = NULL; TupleTableSlot *elemTupleSlot = node->elemTupleSlot; TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple; Datum id; Datum start_id, end_id, next_vertex_id; List *prev_path = css->path_values; Assert(node->type == LABEL_KIND_EDGE); Assert(lfirst(next) != NULL); /* * Create the next vertex before creating the edge. We need the * next vertex's id. */ css->path_values = NIL; next_vertex_id = create_vertex(css, lfirst(next), lnext(list, next), list); /* * Set the start and end vertex ids */ if (node->dir == CYPHER_REL_DIR_RIGHT) { // create pattern (prev_vertex)-[edge]->(next_vertex) start_id = prev_vertex_id; end_id = next_vertex_id; } else if (node->dir == CYPHER_REL_DIR_LEFT) { // create pattern (prev_vertex)<-[edge]-(next_vertex) start_id = next_vertex_id; end_id = prev_vertex_id; } else { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("edge direction must be specified in a CREATE clause"))); } /* * Set estate's result relation to the vertex's result * relation. * * Note: This obliterates what was their previously */ /* save the old result relation info */ old_estate_es_result_relations = estate->es_result_relations; estate->es_result_relations = &resultRelInfo; ExecClearTuple(elemTupleSlot); // Graph Id for the edge id = ExecEvalExpr(node->id_expr_state, econtext, &isNull); elemTupleSlot->tts_values[edge_tuple_id] = id; elemTupleSlot->tts_isnull[edge_tuple_id] = isNull; // Graph id for the starting vertex elemTupleSlot->tts_values[edge_tuple_start_id] = start_id; elemTupleSlot->tts_isnull[edge_tuple_start_id] = false; // Graph id for the ending vertex elemTupleSlot->tts_values[edge_tuple_end_id] = end_id; elemTupleSlot->tts_isnull[edge_tuple_end_id] = false; // Edge's properties map elemTupleSlot->tts_values[edge_tuple_properties] = scanTupleSlot->tts_values[node->prop_attr_num]; elemTupleSlot->tts_isnull[edge_tuple_properties] = scanTupleSlot->tts_isnull[node->prop_attr_num]; // Insert the new edge insert_entity_tuple(resultRelInfo, elemTupleSlot, estate); /* restore the old result relation info */ estate->es_result_relations = old_estate_es_result_relations; /* * When the edge is used by clauses higher in the execution tree * we need to create an edge datum. When the edge is a variable, * add to the scantuple slot. When the edge is part of a path * variable, add to the list. */ if (CYPHER_TARGET_NODE_OUTPUT(node->flags)) { PlanState *ps = css->css.ss.ps.lefttree; TupleTableSlot *scantuple = ps->ps_ExprContext->ecxt_scantuple; Datum result; result = make_edge( id, start_id, end_id, CStringGetDatum(node->label_name), PointerGetDatum(scanTupleSlot->tts_values[node->prop_attr_num])); if (CYPHER_TARGET_NODE_IN_PATH(node->flags)) { prev_path = lappend(prev_path, DatumGetPointer(result)); css->path_values = list_concat(prev_path, css->path_values); } if (CYPHER_TARGET_NODE_IS_VARIABLE(node->flags)) { scantuple->tts_values[node->tuple_position - 1] = result; scantuple->tts_isnull[node->tuple_position - 1] = false; } } } /* * Creates the vertex entity, returns the vertex's id in case the caller is * the create_edge function. */ static Datum create_vertex(cypher_create_custom_scan_state *css, cypher_target_node *node, ListCell *next, List *list) { bool isNull; Datum id; EState *estate = css->css.ss.ps.state; ExprContext *econtext = css->css.ss.ps.ps_ExprContext; ResultRelInfo *resultRelInfo = node->resultRelInfo; TupleTableSlot *elemTupleSlot = node->elemTupleSlot; TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple; Assert(node->type == LABEL_KIND_VERTEX); /* * Vertices in a path might already exists. If they do get the id * to pass to the edges before and after it. Otherwise, insert the * new vertex into it's table and then pass the id along. */ if (CYPHER_TARGET_NODE_INSERT_ENTITY(node->flags)) { ResultRelInfo **old_estate_es_result_relations = NULL; /* * Set estate's result relation to the vertex's result * relation. * * Note: This obliterates what was their previously */ /* save the old result relation info */ old_estate_es_result_relations = estate->es_result_relations; estate->es_result_relations = &resultRelInfo; ExecClearTuple(elemTupleSlot); // get the next graphid for this vertex. id = ExecEvalExpr(node->id_expr_state, econtext, &isNull); elemTupleSlot->tts_values[vertex_tuple_id] = id; elemTupleSlot->tts_isnull[vertex_tuple_id] = isNull; // get the properties for this vertex elemTupleSlot->tts_values[vertex_tuple_properties] = scanTupleSlot->tts_values[node->prop_attr_num]; elemTupleSlot->tts_isnull[vertex_tuple_properties] = scanTupleSlot->tts_isnull[node->prop_attr_num]; // Insert the new vertex insert_entity_tuple(resultRelInfo, elemTupleSlot, estate); /* restore the old result relation info */ estate->es_result_relations = old_estate_es_result_relations; /* * When the vertex is used by clauses higher in the execution tree * we need to create a vertex datum. When the vertex is a variable, * add to the scantuple slot. When the vertex is part of a path * variable, add to the list. */ if (CYPHER_TARGET_NODE_OUTPUT(node->flags)) { TupleTableSlot *scantuple; PlanState *ps; Datum result; ps = css->css.ss.ps.lefttree; scantuple = ps->ps_ExprContext->ecxt_scantuple; // make the vertex agtype result = make_vertex(id, CStringGetDatum(node->label_name), PointerGetDatum(scanTupleSlot->tts_values[node->prop_attr_num])); // append to the path list if (CYPHER_TARGET_NODE_IN_PATH(node->flags)) { css->path_values = lappend(css->path_values, DatumGetPointer(result)); } /* * Put the vertex in the correct spot in the scantuple, so parent * execution nodes can reference the newly created variable. */ if (CYPHER_TARGET_NODE_IS_VARIABLE(node->flags)) { scantuple->tts_values[node->tuple_position - 1] = result; scantuple->tts_isnull[node->tuple_position - 1] = false; } } } else { agtype *a; agtype_value *v; agtype_value *id_value; TupleTableSlot *scantuple; PlanState *ps; ps = css->css.ss.ps.lefttree; scantuple = ps->ps_ExprContext->ecxt_scantuple; // get the vertex agtype in the scanTupleSlot a = DATUM_GET_AGTYPE_P(scantuple->tts_values[node->tuple_position - 1]); // Convert to an agtype value v = get_ith_agtype_value_from_container(&a->root, 0); if (v->type != AGTV_VERTEX) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("agtype must resolve to a vertex"))); } // extract the id agtype field id_value = GET_AGTYPE_VALUE_OBJECT_VALUE(v, "id"); // extract the graphid and cast to a Datum id = GRAPHID_GET_DATUM(id_value->val.int_value); /* * Its possible the variable has already been deleted. There are two * ways this can happen. One is the query explicitly deleted the * variable, the is_deleted flag will catch that. However, it is * possible the user deleted the vertex using another variable name. We * need to scan the table to find the vertex's current status relative * to this CREATE clause. If the variable was initially created in this * clause, we can skip this check, because the transaction system * guarantees that nothing can happen to that tuple, as far as we are * concerned with at this time. */ if (!SAFE_TO_SKIP_EXISTENCE_CHECK(node->flags)) { if (!entity_exists(estate, css->graph_oid, DATUM_GET_GRAPHID(id))) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("vertex assigned to variable %s was deleted", node->variable_name))); } } if (CYPHER_TARGET_NODE_IN_PATH(node->flags)) { Datum vertex = scanTupleSlot->tts_values[node->tuple_position - 1]; css->path_values = lappend(css->path_values, DatumGetPointer(vertex)); } } // If the path continues, create the next edge, passing the vertex's id. if (next != NULL) { create_edge(css, lfirst(next), id, lnext(list, next), list); } return id; }