From 61a3e08edd5c3ce3cbcc750a2c074922afd6676d Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler@timbira.com.br>
Date: Wed, 14 Mar 2018 00:53:17 +0000
Subject: [PATCH v2 8/9] Debug for row filtering

---
 src/backend/commands/publicationcmds.c      | 11 ++++
 src/backend/replication/logical/tablesync.c |  1 +
 src/backend/replication/pgoutput/pgoutput.c | 66 +++++++++++++++++++++
 3 files changed, 78 insertions(+)

diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index bc7f9210e9..8e107bddfb 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -341,6 +341,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 	List	   *rels = NIL;
 	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
 	Oid			pubid = pubform->oid;
+	ListCell   *xpto;
 
 	/* Check that user is allowed to manipulate the publication tables. */
 	if (pubform->puballtables)
@@ -352,6 +353,16 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 	Assert(list_length(stmt->tables) > 0);
 
+	foreach(xpto, stmt->tables)
+	{
+		PublicationTable *t = lfirst(xpto);
+
+		if (t->whereClause == NULL)
+			elog(DEBUG3, "publication \"%s\" has no WHERE clause", NameStr(pubform->pubname));
+		else
+			elog(DEBUG3, "publication \"%s\" has WHERE clause", NameStr(pubform->pubname));
+	}
+
 	/*
 	 * ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause.  Use
 	 * publication_table_list node (that accepts a WHERE clause) but forbid the
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index fc37f74e89..c86affad03 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -871,6 +871,7 @@ copy_table(Relation rel)
 		appendStringInfo(&cmd, "COPY %s TO STDOUT",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
 	}
+	elog(DEBUG2, "COPY for initial synchronization: %s", cmd.data);
 	res = walrcv_exec(wrconn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e9646ac483..5012cfdde7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -34,6 +34,7 @@
 #include "utils/builtins.h"
 #include "utils/inval.h"
 #include "utils/int8.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
@@ -323,6 +324,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	MemoryContext old;
 	RelationSyncEntry *relentry;
 
+	Form_pg_class	class_form;
+	char			*schemaname;
+	char			*tablename;
+
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -347,6 +352,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	class_form = RelationGetForm(relation);
+	schemaname = get_namespace_name(class_form->relnamespace);
+	tablename = NameStr(class_form->relname);
+
+	if (change->action == REORDER_BUFFER_CHANGE_INSERT)
+		elog(DEBUG1, "INSERT \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+	else if (change->action == REORDER_BUFFER_CHANGE_UPDATE)
+		elog(DEBUG1, "UPDATE \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+	else if (change->action == REORDER_BUFFER_CHANGE_DELETE)
+		elog(DEBUG1, "DELETE \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+
 	/* ... then check row filter */
 	if (list_length(relentry->row_filter) > 0)
 	{
@@ -363,6 +379,42 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		tupdesc = RelationGetDescr(relation);
 		estate = create_estate_for_relation(relation);
 
+#ifdef	_NOT_USED
+		if (old_tuple)
+		{
+			int i;
+
+			for (i = 0; i < tupdesc->natts; i++)
+			{
+				Form_pg_attribute	attr;
+				HeapTuple			type_tuple;
+				Oid					typoutput;
+				bool				typisvarlena;
+				bool				isnull;
+				Datum				val;
+				char				*outputstr = NULL;
+
+				attr = TupleDescAttr(tupdesc, i);
+
+				/* Figure out type name */
+				type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(attr->atttypid));
+				if (HeapTupleIsValid(type_tuple))
+				{
+					/* Get information needed for printing values of a type */
+					getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena);
+
+					val = heap_getattr(old_tuple, i + 1, tupdesc, &isnull);
+					if (!isnull)
+					{
+						outputstr = OidOutputFunctionCall(typoutput, val);
+						elog(DEBUG2, "row filter: REPLICA IDENTITY %s: %s", NameStr(attr->attname), outputstr);
+						pfree(outputstr);
+					}
+				}
+			}
+		}
+#endif
+
 		/* prepare context per tuple */
 		ecxt = GetPerTupleExprContext(estate);
 		oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
@@ -378,6 +430,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Oid			expr_type;
 			Datum		res;
 			bool		isnull;
+			char		*s = NULL;
 
 			row_filter = (Node *) lfirst(lc);
 
@@ -388,14 +441,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			expr_state = ExecInitExpr(expr, NULL);
 			res = ExecEvalExpr(expr_state, ecxt, &isnull);
 
+			elog(DEBUG3, "row filter: result: %s ; isnull: %s", (DatumGetBool(res)) ? "true" : "false", (isnull) ? "true" : "false");
+
 			/* if tuple does not match row filter, bail out */
 			if (!DatumGetBool(res) || isnull)
 			{
+				s = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, CStringGetTextDatum(nodeToString(row_filter)), ObjectIdGetDatum(relentry->relid)));
+				elog(DEBUG2, "row filter \"%s\" was not matched", s);
+				pfree(s);
+
 				MemoryContextSwitchTo(oldcxt);
 				ExecDropSingleTupleTableSlot(ecxt->ecxt_scantuple);
 				FreeExecutorState(estate);
 				return;
 			}
+
+			s = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, CStringGetTextDatum(nodeToString(row_filter)), ObjectIdGetDatum(relentry->relid)));
+			elog(DEBUG2, "row filter \"%s\" was matched", s);
+			pfree(s);
 		}
 
 		MemoryContextSwitchTo(oldcxt);
@@ -666,9 +729,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				{
 					MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext);
 					char	*s = TextDatumGetCString(rf_datum);
+					char	*t = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, rf_datum, ObjectIdGetDatum(entry->relid)));
 					Node	*rf_node = stringToNode(s);
 					entry->row_filter = lappend(entry->row_filter, rf_node);
 					MemoryContextSwitchTo(oldctx);
+
+					elog(DEBUG2, "row filter \"%s\" found for publication \"%s\" and relation \"%s\"", t, pub->name, get_rel_name(relid));
 				}
 
 				ReleaseSysCache(rf_tuple);
-- 
2.19.1

