/*
|
* Licensed to the Apache Software Foundation (ASF) under one
|
* or more contributor license agreements. See the NOTICE file
|
* distributed with this work for additional information
|
* regarding copyright ownership. The ASF licenses this file
|
* to you under the Apache License, Version 2.0 (the
|
* "License"); you may not use this file except in compliance
|
* with the License. You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing,
|
* software distributed under the License is distributed on an
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
* KIND, either express or implied. See the License for the
|
* specific language governing permissions and limitations
|
* under the License.
|
*/
|
|
#include "postgres.h"
|
|
#include "access/heapam.h"
|
#include "access/htup_details.h"
|
#include "access/multixact.h"
|
#include "access/table.h"
|
#include "access/xact.h"
|
#include "executor/tuptable.h"
|
#include "nodes/execnodes.h"
|
#include "nodes/extensible.h"
|
#include "nodes/nodes.h"
|
#include "nodes/plannodes.h"
|
#include "parser/parsetree.h"
|
#include "storage/bufmgr.h"
|
#include "utils/rel.h"
|
#include "common/hashfn.h"
|
|
#include "catalog/ag_label.h"
|
#include "executor/cypher_executor.h"
|
#include "executor/cypher_utils.h"
|
#include "nodes/cypher_nodes.h"
|
#include "utils/agtype.h"
|
#include "utils/graphid.h"
|
|
static void begin_cypher_delete(CustomScanState *node, EState *estate,
|
int eflags);
|
static TupleTableSlot *exec_cypher_delete(CustomScanState *node);
|
static void end_cypher_delete(CustomScanState *node);
|
static void rescan_cypher_delete(CustomScanState *node);
|
|
static void process_delete_list(CustomScanState *node);
|
|
static void check_for_connected_edges(CustomScanState *node);
|
static agtype_value *extract_entity(CustomScanState *node,
|
TupleTableSlot *scanTupleSlot,
|
int entity_position);
|
static void delete_entity(EState *estate, ResultRelInfo *resultRelInfo,
|
HeapTuple tuple);
|
|
const CustomExecMethods cypher_delete_exec_methods = {DELETE_SCAN_STATE_NAME,
|
begin_cypher_delete,
|
exec_cypher_delete,
|
end_cypher_delete,
|
rescan_cypher_delete,
|
NULL,
|
NULL,
|
NULL,
|
NULL,
|
NULL,
|
NULL,
|
NULL,
|
NULL};
|
|
/*
|
* Initialization at the beginning of execution. Setup the child node,
|
* setup its scan tuple slot and projection info, expression context,
|
* collect metadata about visible edges, and alter the commandid for
|
* the transaction.
|
*/
|
static void begin_cypher_delete(CustomScanState *node, EState *estate,
|
int eflags)
|
{
|
cypher_delete_custom_scan_state *css =
|
(cypher_delete_custom_scan_state *)node;
|
Plan *subplan;
|
HASHCTL hashctl;
|
|
Assert(list_length(css->cs->custom_plans) == 1);
|
|
// setup child
|
subplan = linitial(css->cs->custom_plans);
|
node->ss.ps.lefttree = ExecInitNode(subplan, estate, eflags);
|
|
// setup expr context
|
ExecAssignExprContext(estate, &node->ss.ps);
|
|
// setup scan tuple slot and projection info
|
ExecInitScanTupleSlot(estate, &node->ss,
|
ExecGetResultType(node->ss.ps.lefttree),
|
&TTSOpsHeapTuple);
|
|
if (!CYPHER_CLAUSE_IS_TERMINAL(css->flags))
|
{
|
TupleDesc tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
|
|
ExecAssignProjectionInfo(&node->ss.ps, tupdesc);
|
}
|
|
/*
|
* Get all the labels that are visible to this delete clause at this point
|
* in the transaction. To be used later when the delete clause finds
|
* vertices.
|
*/
|
css->edge_labels = get_all_edge_labels_per_graph(estate, css->delete_data->graph_oid);
|
|
/* init vertex_id_htab */
|
MemSet(&hashctl, 0, sizeof(hashctl));
|
hashctl.keysize = sizeof(graphid);
|
hashctl.entrysize =
|
sizeof(graphid); // entries are not used, but entrysize must >= keysize
|
hashctl.hash = tag_hash;
|
css->vertex_id_htab = hash_create(DELETE_VERTEX_HTAB_NAME,
|
DELETE_VERTEX_HTAB_SIZE, &hashctl,
|
HASH_ELEM | HASH_FUNCTION);
|
|
/*
|
* Postgres does not assign the es_output_cid in queries that do
|
* not write to disk, ie: SELECT commands. We need the command id
|
* for our clauses, and we may need to initialize it. We cannot use
|
* GetCurrentCommandId because there may be other cypher clauses
|
* that have modified the command id.
|
*/
|
if (estate->es_output_cid == 0)
|
estate->es_output_cid = estate->es_snapshot->curcid;
|
|
Increment_Estate_CommandId(estate);
|
}
|
|
/*
|
* Called once per tuple. If this is a terminal DELETE clause
|
* process everyone of its child tuple, otherwise process the
|
* next tuple.
|
*/
|
static TupleTableSlot *exec_cypher_delete(CustomScanState *node)
|
{
|
cypher_delete_custom_scan_state *css =
|
(cypher_delete_custom_scan_state *)node;
|
EState *estate = css->css.ss.ps.state;
|
ExprContext *econtext = css->css.ss.ps.ps_ExprContext;
|
TupleTableSlot *slot;
|
|
if (CYPHER_CLAUSE_IS_TERMINAL(css->flags))
|
{
|
/*
|
* If the DELETE clause was the final cypher clause written
|
* then we aren't returning anything from this result node.
|
* So the exec_cypher_delete function will only be called once.
|
* Therefore we will process all tuples from the subtree at once.
|
*/
|
while(true)
|
{
|
//Process the subtree first
|
Decrement_Estate_CommandId(estate)
|
slot = ExecProcNode(node->ss.ps.lefttree);
|
Increment_Estate_CommandId(estate)
|
|
if (TupIsNull(slot))
|
break;
|
|
// setup the scantuple that the process_delete_list needs
|
econtext->ecxt_scantuple =
|
node->ss.ps.lefttree->ps_ProjInfo->pi_exprContext->ecxt_scantuple;
|
|
process_delete_list(node);
|
}
|
|
return NULL;
|
}
|
else
|
{
|
//Process the subtree first
|
Decrement_Estate_CommandId(estate)
|
slot = ExecProcNode(node->ss.ps.lefttree);
|
Increment_Estate_CommandId(estate)
|
|
if (TupIsNull(slot))
|
return NULL;
|
|
// setup the scantuple that the process_delete_list needs
|
econtext->ecxt_scantuple =
|
node->ss.ps.lefttree->ps_ProjInfo->pi_exprContext->ecxt_scantuple;
|
|
process_delete_list(node);
|
|
econtext->ecxt_scantuple =
|
ExecProject(node->ss.ps.lefttree->ps_ProjInfo);
|
|
return ExecProject(node->ss.ps.ps_ProjInfo);
|
}
|
}
|
|
/*
|
* Called at the end of execution. Tell its child to
|
* end its execution.
|
*/
|
static void end_cypher_delete(CustomScanState *node)
|
{
|
check_for_connected_edges(node);
|
|
hash_destroy(((cypher_delete_custom_scan_state *)node)->vertex_id_htab);
|
|
ExecEndNode(node->ss.ps.lefttree);
|
}
|
|
/*
|
* Used for rewinding the scan state and reprocessing the results.
|
*
|
* XXX: This is not currently supported. We need to find out
|
* when this function will be called and determine a process
|
* for allowing the Delete clause to run multiple times without
|
* redundant edits to the database.
|
*/
|
static void rescan_cypher_delete(CustomScanState *node)
|
{
|
ereport(ERROR,
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
errmsg("cypher DELETE clause cannot be rescanned"),
|
errhint("its unsafe to use joins in a query with a Cypher DELETE clause")));
|
}
|
|
/*
|
* Create the CustomScanState from the CustomScan and pass
|
* necessary metadata.
|
*/
|
Node *create_cypher_delete_plan_state(CustomScan *cscan)
|
{
|
cypher_delete_custom_scan_state *cypher_css =
|
palloc0(sizeof(cypher_delete_custom_scan_state));
|
cypher_delete_information *delete_data;
|
char *serialized_data;
|
Const *c;
|
|
cypher_css->cs = cscan;
|
|
// get the serialized data structure from the Const and deserialize it.
|
c = linitial(cscan->custom_private);
|
serialized_data = (char *)c->constvalue;
|
delete_data = stringToNode(serialized_data);
|
|
Assert(is_ag_node(delete_data, cypher_delete_information));
|
|
cypher_css->delete_data = delete_data;
|
cypher_css->flags = delete_data->flags;
|
|
cypher_css->css.ss.ps.type = T_CustomScanState;
|
cypher_css->css.methods = &cypher_delete_exec_methods;
|
|
return (Node *)cypher_css;
|
}
|
|
/*
|
* Extract the vertex or edge to be deleted, perform some type checking to
|
* validate datum is an agtype vertex or edge.
|
*/
|
static agtype_value *extract_entity(CustomScanState *node,
|
TupleTableSlot *scanTupleSlot,
|
int entity_position)
|
{
|
agtype_value *original_entity_value;
|
agtype *original_entity;
|
TupleDesc tupleDescriptor;
|
|
tupleDescriptor = scanTupleSlot->tts_tupleDescriptor;
|
|
// type checking, make sure the entity is an agtype vertex or edge
|
if (tupleDescriptor->attrs[entity_position -1].atttypid != AGTYPEOID)
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
errmsg("DELETE clause can only delete agtype")));
|
|
original_entity = DATUM_GET_AGTYPE_P(scanTupleSlot->tts_values[entity_position - 1]);
|
original_entity_value = get_ith_agtype_value_from_container(&original_entity->root, 0);
|
|
if (original_entity_value->type != AGTV_VERTEX && original_entity_value->type != AGTV_EDGE)
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
errmsg("DELETE clause can only delete vertices and edges")));
|
|
return original_entity_value;
|
}
|
|
/*
|
* Try and delete the entity that is describe by the HeapTuple in the table
|
* described by the resultRelInfo.
|
*/
|
static void delete_entity(EState *estate, ResultRelInfo *resultRelInfo,
|
HeapTuple tuple)
|
{
|
ResultRelInfo **saved_resultRels;
|
LockTupleMode lockmode;
|
TM_FailureData hufd;
|
TM_Result lock_result;
|
TM_Result delete_result;
|
Buffer buffer;
|
|
// Find the physical tuple, this variable is coming from
|
saved_resultRels = estate->es_result_relations;
|
estate->es_result_relations = &resultRelInfo;
|
|
lockmode = ExecUpdateLockMode(estate, resultRelInfo);
|
|
lock_result = heap_lock_tuple(resultRelInfo->ri_RelationDesc, tuple,
|
GetCurrentCommandId(false), lockmode,
|
LockWaitBlock, false, &buffer, &hufd);
|
|
/*
|
* It is possible the entity may have already been deleted. If the tuple
|
* can be deleted, the lock result will be HeapTupleMayBeUpdated. If the
|
* tuple was already deleted by this DELETE clause, the result would be
|
* TM_SelfModified, if the result was deleted by a previous delete
|
* clause, the result will TM_Invisible. Throw an error if any
|
* other result was returned.
|
*/
|
if (lock_result == TM_Ok)
|
{
|
delete_result = heap_delete(resultRelInfo->ri_RelationDesc,
|
&tuple->t_self, GetCurrentCommandId(true),
|
estate->es_crosscheck_snapshot, true, &hufd,
|
false);
|
|
/*
|
* Unlike locking, the heap_delete either succeeded
|
* HeapTupleMayBeUpdate, or it failed and returned any other result.
|
*/
|
switch (delete_result)
|
{
|
case TM_Ok:
|
break;
|
case TM_SelfModified:
|
ereport(
|
ERROR,
|
(errcode(ERRCODE_INTERNAL_ERROR),
|
errmsg(
|
"deleting the same entity more than once cannot happen")));
|
/* ereport never gets here */
|
break;
|
case TM_Updated:
|
ereport(
|
ERROR,
|
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
errmsg("could not serialize access due to concurrent update")));
|
/* ereport never gets here */
|
break;
|
default:
|
elog(ERROR, "Entity failed to be update");
|
/* elog never gets here */
|
break;
|
}
|
/* increment the command counter */
|
CommandCounterIncrement();
|
}
|
else if (lock_result != TM_Invisible && lock_result != TM_SelfModified)
|
{
|
ereport(ERROR,
|
(errcode(ERRCODE_INTERNAL_ERROR),
|
errmsg("Entity could not be locked for updating")));
|
|
}
|
|
ReleaseBuffer(buffer);
|
|
estate->es_result_relations = saved_resultRels;
|
}
|
|
/*
|
* After the delete's subtress has been processed, we then go through the list
|
* of variables to be deleted.
|
*/
|
static void process_delete_list(CustomScanState *node)
|
{
|
cypher_delete_custom_scan_state *css =
|
(cypher_delete_custom_scan_state *)node;
|
ListCell *lc;
|
ExprContext *econtext = css->css.ss.ps.ps_ExprContext;
|
TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple;
|
EState *estate = node->ss.ps.state;
|
|
foreach(lc, css->delete_data->delete_items)
|
{
|
cypher_delete_item *item;
|
agtype_value *original_entity_value, *id, *label;
|
ScanKeyData scan_keys[1];
|
TableScanDesc scan_desc;
|
ResultRelInfo *resultRelInfo;
|
HeapTuple heap_tuple;
|
char *label_name;
|
Value *pos;
|
int entity_position;
|
|
item = lfirst(lc);
|
|
pos = item->entity_position;
|
entity_position = pos->val.ival;
|
|
/* skip if the entity is null */
|
if (scanTupleSlot->tts_isnull[entity_position - 1])
|
continue;
|
|
original_entity_value = extract_entity(node, scanTupleSlot,
|
entity_position);
|
|
id = GET_AGTYPE_VALUE_OBJECT_VALUE(original_entity_value, "id");
|
label = GET_AGTYPE_VALUE_OBJECT_VALUE(original_entity_value, "label");
|
label_name = pnstrdup(label->val.string.val, label->val.string.len);
|
|
resultRelInfo = create_entity_result_rel_info(estate, css->delete_data->graph_name, label_name);
|
|
/*
|
* Setup the scan key to require the id field on-disc to match the
|
* entity's graphid.
|
*/
|
if (original_entity_value->type == AGTV_VERTEX)
|
{
|
ScanKeyInit(&scan_keys[0], Anum_ag_label_vertex_table_id,
|
BTEqualStrategyNumber, F_GRAPHIDEQ,
|
GRAPHID_GET_DATUM(id->val.int_value));
|
}
|
else if (original_entity_value->type == AGTV_EDGE)
|
{
|
ScanKeyInit(&scan_keys[0], Anum_ag_label_edge_table_id,
|
BTEqualStrategyNumber, F_GRAPHIDEQ,
|
GRAPHID_GET_DATUM(id->val.int_value));
|
}
|
else
|
{
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
errmsg("DELETE clause can only delete vertices and edges")));
|
}
|
|
/*
|
* Setup the scan description, with the correct snapshot and scan keys.
|
*/
|
estate->es_snapshot->curcid = GetCurrentCommandId(false);
|
estate->es_output_cid = GetCurrentCommandId(false);
|
scan_desc = table_beginscan(resultRelInfo->ri_RelationDesc,
|
estate->es_snapshot, 1, scan_keys);
|
|
/* Retrieve the tuple. */
|
heap_tuple = heap_getnext(scan_desc, ForwardScanDirection);
|
|
/*
|
* If the heap tuple still exists (It wasn't deleted after this variable
|
* was created) we can delete it. Otherwise, its safe to skip this
|
* delete.
|
*/
|
if (!HeapTupleIsValid(heap_tuple))
|
{
|
table_endscan(scan_desc);
|
destroy_entity_result_rel_info(resultRelInfo);
|
|
continue;
|
}
|
|
/*
|
* For vertices, we insert the vertex ID in the hashtable
|
* vertex_id_htab. This hashtable is used later to process
|
* connected edges.
|
*/
|
if (original_entity_value->type == AGTV_VERTEX)
|
{
|
bool found;
|
hash_search(css->vertex_id_htab, (void *)&(id->val.int_value),
|
HASH_ENTER, &found);
|
}
|
|
/* At this point, we are ready to delete the node/vertex. */
|
delete_entity(estate, resultRelInfo, heap_tuple);
|
|
/* Close the scan and the relation. */
|
table_endscan(scan_desc);
|
destroy_entity_result_rel_info(resultRelInfo);
|
}
|
}
|
|
/*
|
* Scans the edge tables and checks if the deleted vertices are connected to
|
* any edge(s). For DETACH DELETE, the connected edges are deleted. Otherwise,
|
* an error is thrown.
|
*/
|
static void check_for_connected_edges(CustomScanState *node)
|
{
|
ListCell *lc;
|
cypher_delete_custom_scan_state *css =
|
(cypher_delete_custom_scan_state *)node;
|
EState *estate = css->css.ss.ps.state;
|
char *graph_name = css->delete_data->graph_name;
|
|
/* scans each label from css->edge_labels */
|
foreach (lc, css->edge_labels)
|
{
|
char *label_name = lfirst(lc);
|
ResultRelInfo *resultRelInfo;
|
TableScanDesc scan_desc;
|
HeapTuple tuple;
|
TupleTableSlot *slot;
|
|
resultRelInfo = create_entity_result_rel_info(estate, graph_name,
|
label_name);
|
estate->es_snapshot->curcid = GetCurrentCommandId(false);
|
estate->es_output_cid = GetCurrentCommandId(false);
|
scan_desc = table_beginscan(resultRelInfo->ri_RelationDesc,
|
estate->es_snapshot, 0, NULL);
|
slot = ExecInitExtraTupleSlot(
|
estate, RelationGetDescr(resultRelInfo->ri_RelationDesc),
|
&TTSOpsHeapTuple);
|
|
/* for each row */
|
while (true)
|
{
|
graphid startid;
|
graphid endid;
|
bool isNull;
|
bool found_startid = false;
|
bool found_endid = false;
|
|
tuple = heap_getnext(scan_desc, ForwardScanDirection);
|
|
/* no more tuples to process, break and scan the next label. */
|
if (!HeapTupleIsValid(tuple))
|
{
|
break;
|
}
|
|
ExecStoreHeapTuple(tuple, slot, false);
|
|
startid = GRAPHID_GET_DATUM(slot_getattr(
|
slot, Anum_ag_label_edge_table_start_id, &isNull));
|
endid = GRAPHID_GET_DATUM(
|
slot_getattr(slot, Anum_ag_label_edge_table_end_id, &isNull));
|
|
hash_search(css->vertex_id_htab, (void *)&startid, HASH_FIND,
|
&found_startid);
|
|
if (!found_startid)
|
{
|
hash_search(css->vertex_id_htab, (void *)&endid, HASH_FIND,
|
&found_endid);
|
}
|
|
if (found_startid || found_endid)
|
{
|
if (css->delete_data->detach)
|
{
|
delete_entity(estate, resultRelInfo, tuple);
|
}
|
else
|
{
|
ereport(
|
ERROR,
|
(errcode(ERRCODE_INTERNAL_ERROR),
|
errmsg(
|
"Cannot delete a vertex that has edge(s). "
|
"Delete the edge(s) first, or try DETACH DELETE.")));
|
}
|
}
|
}
|
|
table_endscan(scan_desc);
|
destroy_entity_result_rel_info(resultRelInfo);
|
}
|
}
|