Select Page

Problem

When working with massive amounts of data, an MPP (massively parallel processing) architecture allows many compute nodes to process segments of the data individually before aggregating the results together.  However, interactive applications have lower latency tolerances than what most MPP designs provide.  This results in an operational database, which exports the data into archives that are imported back into the data warehouse.

An ideal solution has MPP processing apply to the larger data set, while a subset dedicated to operational work, reducing the need to import and export the data at all.  Such a solution is possible by leveraging Postgres, Greenplum, and having the managed by the Heimdall Database Proxy.

Solution

In order to achieve OLTP performance for data warehouse, Heimdall Data is deployed as the broker that connects applications and users for data access.  Heimdall analyzes the queries and through a simple rule-based design, and determines which system is appropriate to service the query to either Postgres, which maintains a subset of the data most often used, or to Greenplum, the full warehouse.  Further, Heimdall detects conditions that result in the need to re-synchronize the two databases, such as a copy operation into Greenplum.  DML operations are executed against the Postgres database, which then provides bulk updates back to Greenplum for more optimal ingestion, providing further optimization vs. using a pure Analytics store alone.

Note: While not shown, any interaction between the Postgres and Greenplum databases is routed through Heimdall, in order to provide compatibility with the Postgres foreign data wrapper, which currently doesn’t support Greenplum directly.

Benefits

There are numerous benefits with the solution:

  1. Improved performance for any OLTP type traffic, both due to intelligent routing between Postgres and Greenplum, and SQL results caching, all orchestrated by Heimdall Data;
  2. Control–particular queries can be directed to operate on one database vs. another individually, without code changes;
  3. Improved DML performance, as queries initially operate on Postgres, then are synced behind the scenes to Greenplum asynchronously;
  4. Visibility of query performance through Heimdall’s analytics engine;
  5. Sub-query delivery to Greenplum:  Postgres can parse a large, complex query containing subqueries it can fulfill, while remaining query components can be pushed down to Greenplum;
  6. Improved compatibility between Postgres and Greenplum, through query rewriting to avoid SQL support limitations on Greenplum, allowing the use of foreign data wrappers.

Database Configuration

On Greenplum, no extra configuration is necessary.  For the context of this blog, here is the definition used to build the data set, leveraging data used by Amazon for Redshift examples:

cd /home/gpadmin
wget http://s3.us-west-2.amazonaws.com/awssampledbuswest2/tickit/allusers_pipe.txt
wget http://s3.us-west-2.amazonaws.com/awssampledbuswest2/tickit/venue_pipe.txt
wget http://s3.us-west-2.amazonaws.com/awssampledbuswest2/tickit/category_pipe.txt
wget http://s3.us-west-2.amazonaws.com/awssampledbuswest2/tickit/date2008_pipe.txt
wget http://s3.us-west-2.amazonaws.com/awssampledbuswest2/tickit/allevents_pipe.txt
wget http://s3.us-west-2.amazonaws.com/awssampledbuswest2/tickit/listings_pipe.txt
wget http://s3.us-west-2.amazonaws.com/awssampledbuswest2/tickit/sales_tab.txt

/*** Create the tables
 note--the primary key for each table is set to the ID pkey to minimize effort in synchronization ***/
create table users( pkey integer not null PRIMARY KEY, username char(8), firstname varchar(30), lastname 
  varchar(30), city varchar(30), state char(2), email varchar(100), phone char(14), likesports boolean, 
  liketheatre boolean, likeconcerts boolean, likejazz boolean, likeclassical boolean, likeopera boolean, 
  likerock boolean, likevegas boolean, likebroadway boolean, likemusicals boolean) DISTRIBUTED BY (pkey);
create table venue( pkey smallint not null primary key, venuename varchar(100), venuecity varchar(30), 
  venuestate char(2), venueseats integer) DISTRIBUTED BY (pkey);
create table category( pkey smallint not null primary key, catgroup varchar(10), catname varchar(10), 
  catdesc varchar(50)) DISTRIBUTED BY (pkey); 
create table date( pkey smallint not null primary key, caldate date not null, day character(3) not null, 
  week smallint not null, month character(5) not null, qtr character(5) not null, year smallint not null, 
  holiday boolean default false) DISTRIBUTED BY (pkey); 
create table event( pkey integer not null primary key, venueid smallint not null, catid smallint not null, 
  dateid smallint not null, eventname varchar(200), starttime timestamp) DISTRIBUTED BY (pkey); 
create table listing( pkey integer not null primary key, sellerid integer not null, eventid integer not 
  null, dateid smallint not null, numtickets smallint not null, priceperticket decimal(8,2), totalprice 
  decimal(8,2), listtime timestamp) DISTRIBUTED BY (pkey); 
create table sales( pkey integer not null primary key, listid integer not null, sellerid integer not null, 
  buyerid integer not null, eventid integer not null, dateid smallint not null, qtysold smallint not null, 
  pricepaid decimal(8,2), commission decimal(8,2), saletime timestamp) DISTRIBUTED BY (pkey);

/*** copy the data into the tables from the master node ***/
COPY users FROM '/home/gpadmin/allusers_pipe.txt' delimiter '|' null '';
COPY venue FROM '/home/gpadmin/venue_pipe.txt' delimiter '|' null '';
COPY category FROM '/home/gpadmin/category_pipe.txt' delimiter '|' null '';
COPY date FROM '/home/gpadmin/date2008_pipe.txt' delimiter '|' null '';
COPY event FROM '/home/gpadmin/allevents_pipe.txt' delimiter '|' null '';
COPY listing FROM '/home/gpadmin/listings_pipe.txt' delimiter '|' null '';
COPY sales FROM '/home/gpadmin/sales_tab.txt' null '';

Next, on our front-end Postgres server, we will setup the foreign data wrapper and setup foreign tables, which will be used for the synchronization of the data:

/*** enable the foreign data wrapper ***/
CREATE EXTENSION postgres_fdw;

CREATE SERVER warehouse
 FOREIGN DATA WRAPPER postgres_fdw
 OPTIONS (host '172.31.1.2', port '5432', dbname 'postgres');
 
/*** map the user(s) for accessing the FDW ***/
CREATE USER MAPPING FOR sa
SERVER warehouse 
OPTIONS (user 'sa', password 'password');

/*** this schema will be used to house the foreign tables ***/
CREATE SCHEMA warehouse;

/*** this maps all the tables in the specified schema into foreign tables locally ***/
IMPORT FOREIGN SCHEMA public
FROM SERVER warehouse INTO warehouse;

Note:  The IP address used for the foreign data wrapper will actually be pointing back to the Heimdall instance.  The reason for this is that a direct connection between Postgres to Greenplum is not supported, as Postgres ends up issuing commands not supported by Greenplum.  Heimdall is able to intercept these calls and translate them into appropriate calls.

At this point, we will define a few table generic functions.  We have used the primary key name of “pkey” in order to insure that any synchronization logic is identical between different tables, which simplifies some of the logic.

First the function refresh_table(‘tablename’) is used to populate a local table with the warehouse data:

/*** function to refresh an actual table based on the table name from the warehouse ***/
CREATE OR REPLACE FUNCTION refresh_table(tables variadic text[]) RETURNS void AS $$
DECLARE
 str text;
 query text;
BEGIN
 FOREACH str IN ARRAY tables LOOP
 BEGIN
    -- check if we have an update query, if so, use it to update the table
    EXECUTE 'SELECT query FROM refresh_queries where tablename = '''||str||''' ' INTO query;
    IF (length(query)) > 0 THEN
       EXECUTE 'TRUNCATE TABLE '||str||' '; 
       EXECUTE 'INSERT INTO '||str||' ('||query||')';
       -- on exception, the truncate will fail, and the original data remains
    END IF;
    EXCEPTION WHEN OTHERS THEN
    -- keep looping, in case we have an error, try to update everything else
    END;
 END LOOP;
END; $$
LANGUAGE PLPGSQL;

For this function to work, we have an additional table that provides the queries that are used to populate the data from the warehouse into the local datastore.  While in this example, they are all pulling all the rows from the back-end, in a real-world scenario, the queries would likely remove any data unnecessary for the OLTP workload, i.e. any non-active records for inventory, old sales data, etc.  When this data is necessary, it can be directly queried from the warehouse.

/*** we want one query per warehouse mirrored table that allows proper updates ***/
CREATE TABLE refresh_queries ( tablename varchar(100) not null primary key, query varchar(1000));
INSERT INTO refresh_queries VALUES ('users', 'SELECT * FROM warehouse.users');
INSERT INTO refresh_queries VALUES ('venue', 'SELECT * FROM warehouse.venue');
INSERT INTO refresh_queries VALUES ('category', 'SELECT * FROM warehouse.category');
INSERT INTO refresh_queries VALUES ('date', 'SELECT * FROM warehouse.date');
INSERT INTO refresh_queries VALUES ('event', 'SELECT * FROM warehouse.event');
INSERT INTO refresh_queries VALUES ('listing', 'SELECT * FROM warehouse.listing');
INSERT INTO refresh_queries VALUES ('sales', 'SELECT * FROM warehouse.sales');

Next, flush_table is used to dump changed data from the local instance back to the warehouse.  This logic can be changed to do batch data download into a file, then to trigger the warehouse to pull the data in via a copy operation.  For small amounts of data, this should be unnecessary, but may be important in a highly distributed environment where individual insert operations are extremely expensive.

/*** function to flush locally modified data into the warehouse ***/
CREATE OR REPLACE FUNCTION flush_table(tables variadic text[]) RETURNS void AS $$
DECLARE
  str text;
  newkey integer;
  inserts text;
  delquery text;
  rec RECORD;
BEGIN
  FOREACH str IN ARRAY tables LOOP
    -- delete first, as a modify is a delete+insert, and we don't want dups
    delquery := 'SELECT * from deleted_'||str;

    FOR rec IN EXECUTE delquery LOOP
      EXECUTE 'DELETE FROM warehouse.'||str||' WHERE pkey='||rec.pkey;
    END LOOP;
    EXECUTE 'INSERT INTO warehouse.'||str||' SELECT * from new_'||str;

    -- get the new data for the table from warehouse into the MV
    EXECUTE 'SELECT refresh_table('''||str||''')';

    -- Now clear the records from the new and deleted tables
    EXECUTE 'DELETE FROM new_'||str;
    EXECUTE 'DELETE FROM deleted_'||str;
  END LOOP;
END; $$
LANGUAGE PLPGSQL;

Note:  It would be possible to store the insert/update/delete queries into a table like the refresh_table function is, in order to allow for a more generic structure that doesn’t depend on the primary key being named “pkey”.

Now, we get into the table specific configuration.  For the most part, the configuration for each table is replicated except for a search/replace for the table name.  As such, while we have 7 tables for this example (category, date, event, listing, sales, user and venue) only one table, the users table, will be included in this document.

First we create the local instance of the table.  We will use the LIKE option so we mirror the data types from the warehouse, reducing the complexity of the configuration:

/* create table for new and updated user data from warehouse's table definition */
CREATE TABLE users (LIKE warehouse.users INCLUDING ALL);
ALTER TABLE users ADD CONSTRAINT user_pk PRIMARY KEY (pkey);

Next, we will create two additional tables, to track the inserted/updated data, and another to track deletes:

/* create table for new and updated user data from warehouse's table definition */
CREATE TABLE new_users (LIKE warehouse.users INCLUDING ALL);
ALTER TABLE new_users ADD CONSTRAINT new_user_unique PRIMARY KEY (pkey);

/* this table is to track deleted users */
CREATE TABLE deleted_users (
 pkey INTEGER NOT NULL UNIQUE PRIMARY KEY
 );

Next, we create a function to track the DML against the table:

/* create a DML handler function to intercept writes to the unified view */
CREATE OR REPLACE FUNCTION users_dml()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $function$
 BEGIN
  IF TG_OP = 'INSERT' THEN 
    INSERT INTO new_users SELECT (NEW).*; -- track to insert into warehouse
    RETURN NEW;
  ELSIF TG_OP = 'UPDATE' THEN 
    DELETE from new_users where pkey = NEW.pkey; -- in case it is added then modified before sync
    INSERT INTO new_users SELECT (NEW).*; -- to track what needs to be inserted back 
    INSERT INTO deleted_users VALUES( NEW.pkey ); -- we need to delete old values before inserting
  RETURN NEW;
  ELSIF TG_OP = 'DELETE' THEN
    INSERT INTO deleted_users VALUES( OLD.pkey );
    DELETE from new_users where pkey = OLD.pkey; -- in case it is added/modified then deleted before sync
    RETURN NULL;
  END IF;
 RETURN NEW;
 END;
$function$;

And attach the trigger to the table:

/* attach the trigger to the front-end view */
CREATE TRIGGER users_dml_trig
 AFTER INSERT OR UPDATE OR DELETE ON
 users FOR EACH ROW EXECUTE PROCEDURE users_dml();

Once we have the structure in place, we can trigger the initial population of the table and verify that it is populated

sa=> select refresh_table('users');
 refresh_table
---------------
(1 row)
Time: 1748.478 ms
sa=> select count(*) from users;
 count
-------
 49990
(1 row)
Time: 6.241 ms

This table definition would be replicated for each table:

sa=> \dt public.*
 List of relations
 Schema | Name | Type | Owner
--------+------------------+-------+-------
 public | category | table | sa
 public | date | table | sa
 public | deleted_category | table | sa
 public | deleted_date | table | sa
 public | deleted_event | table | sa
 public | deleted_listing | table | sa
 public | deleted_sales | table | sa
 public | deleted_users | table | sa
 public | deleted_venue | table | sa
 public | listing | table | sa
 public | new_category | table | sa
 public | new_date | table | sa
 public | new_event | table | sa
 public | new_listing | table | sa
 public | new_sales | table | sa
 public | new_users | table | sa
 public | new_venue | table | sa
 public | refresh_queries | table | sa
 public | sales | table | sa
 public | users | table | sa
 public | venue | table | sa
(21 rows)

Heimdall Configuration

The first step is to create a data source definition for each database, i.e. the Postgres and the Greenplum servers.  The configuration for each is effectively identical, so only the Greenplum is shown here:

The next step is to create the VDB, and configure it so that it has both the Postgres (first) and Greenplum (second) servers attached to it.  The order is important, as we want Postgres to be the default data source to reduce the work on Greenplum:

With the VDB configured, we can now connect using psql, and verify that we are connecting to the front-end Postgres server:

# psql --user=sa --port=5439 --host=127.0.0.1 -P pager=off
Password for user sa:
psql (9.5.12, server 9.6.6)
WARNING: psql major version 9.5, server major version 9.6.
 Some psql features might not work.
Type "help" for help.

sa=> select version();
 version
------------------------------------------------------------------------------
 PostgreSQL 9.6.6 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.9.3, 64-bit
(1 row)

Once we have this working, we can start building the rules that will help control the flow of requests, and allow us to manage which database will be used for which tasks.  On the rules tab, we will first want to create three rules, one that will direct traffic to Postgres directly, one that will direct traffic to Greenplum directly, and one that will control the behavior of DML operations.  First, the direct to Postgres rule:

The first rule allows us to prepend “POSTGRES” or “PG” to the start of any query to explicitly control where the query will go, and Heimdall will remove this as part of the forwarding logic.  It is using capture groups to handle this

The second rule matches anything hitting this particular rule, and will send it to the source named “Postgres-test”.

Next, we will create a similar rule list for Greenplum:

The first two rules have the same meaning as with  the Postgres rules, but we also have a third rule here, to handle copy operations.  When a copy is triggered on Greenplum, it implies that the table is going to receive data.  We want to trigger an immediate update on Postgres after the copy operation has completed.  Here is an expanded view of the third rule:

This rule has a trigger action type, which means that after this condition is met, some behavior will be triggered by Heimdall.  In this case, the action is explained with the type of SQL, and command of “SELECT refresh_table(${tables})” will be executed.  From the configuration we did on the database, recall that the refresh_table() function is used to repopulate a table in Postgres with data from the warehouse.  The timing of “after” means that the action will be performed after the copy returns without exception.  The table parameter is set to extract the table name from the query, and insert it into the Heimdall metadata for the query.  This is necessary as Heimdall doesn’t parse the copy command natively, so we have to use the regular expression to do the work for us.  Finally, the source specifies where the trigger will occur.

In summary for this rule, when we see a copy command executed against Greenplum, we will trigger the Postgres instance to repopulate its copy of the data from Greenplum, so as to provide immediate access to the data.

The next rule we will setup is to control DML operations:

An expanded view of this rule helps view the complete parameter list:

Like with the to Greenplum rule, this is also a trigger, but this time, instead of it being an “after” trigger, it is “parallel” and has delay parameters.  The behavior here is that we want to buffer potentially many DML operations before calling the flush_table function on the Postgres server.  We will wait at least one minute, but up to two minutes if more DML operations occur in that first minute.  The queueUnique means that only one such trigger will be pending at once–if set to false, then we would queue a trigger for every DML, which isn’t the desire here.  Finally, we include the debug option of “printmatch” to specify that on rule match, we should output information in the log.  The source option is not present here, as the default data source is Postgres anyway.

Finally, once we have these three rule lists, we can bring them together with one more rule list, which will be attached to the vdb:

Here is the meaning of each rule:

  1.  This rule logs all queries, which allows the Analytics to work;
  2. This is to route any copy operation to the Hybrid-to-GP rule, which will forward the query to Greenplum;
  3. This allows unconditional access to the Greenplum server, and no other rules in the current list will be processed due to the stop flag;
  4. This allows unconditional access to the Postgres server, and no other rules in the current list will be processed due to the stop flag;
  5. This enables Heimdall caching on any query that matches.  This also means that if a query was directed using the ‘GP’ or ‘PG’ notation, that it will not be cached, since rule processing will stop at that rule;
  6. This routes any insert/update/delete to the DML rule for processing.

Finally, we will bind this master rule to the vdb, along with all sub-rules, as well as enable caching:

The first rule to attach should be the hybrid-rules, as the first rule dictates what rule will start the processing.  The rest of the rules will be attached automatically if they are left off.

For caching, we are specifying a local cache, as for our setup we will only be using one instance of Heimdall, so no distributed caching is necessary.

Once everything is in place, we can start testing with psql.  To verify proper routing is working, we can use “SELECT version()”:

sa=> PG select version();
 version
------------------------------------------------------------------------------
 PostgreSQL 9.6.6 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.9.3, 64-bit
(1 row)

sa=> GP select version();
 version 
-------------------------------------------------------------------------------------------------------
 PostgreSQL 8.2.15 (Greenplum Database 4.3.10.0 build commit: f413ff3b006655f14b6b9aa217495ec94da5c96c) 
on x86_64-unknown-linux-gnu, compiled by GCC gcc (GCC) 4.4.2 compiled on Oct 21 2016 19:36:26
(1 row)

We can further perform some multi-table selects to evaluate the performance of each database, against using direct routing:

sa=> GP SELECT sum(qtysold)
FROM sales, date
WHERE sales.dateid = date.pkey
AND caldate = '2008-01-05';
 sum
-----
 210
(1 row)

Time: 32.076 ms
sa=> PG SELECT sum(qtysold)
FROM sales, date
WHERE sales.dateid = date.pkey
AND caldate = '2008-01-05';
 sum
-----
 210
(1 row)

Time: 22.524 ms

In this simple case (without Greenplum partitioning active), directing the query to Postgres is delivering about a 30% improvement over the same query against Greenplum (this test was run multiple times and representative of the results over time).  In a real-world use-case, the improvement is likely to be faster, when the OLTP front-end only needs a small sample of the data that is otherwise stored in the warehouse.

The next performance test is to illustrate the benefits of caching in Heimdall for repeated queries even over the already fast performance of Postgres:

sa=> SELECT sum(qtysold)
FROM sales, date
WHERE sales.dateid = date.pkey
AND caldate = '2008-01-05';
 sum
-----
 210
(1 row)

Time: 0.911 ms

Here, the query result time went from 22.5 ms via Postgres to less than 1ms from Heimdall when served from cache.

Next, we can demonstrate the tracking of the DML operation and the trigger behavior.  To simplify this, caching will be disabled.

sa=> insert into users values (50000, 'aaaaaaaa', 'Erik', 'Brandsberg', 'Somewhere', 'CA', 
  'email@host.com', '(408) 555-1212');
INSERT 0 1
Time: 65.699 ms
sa=> select * from new_users;
 pkey | username | firstname | lastname | city | state | email | phone | likesports | liketheatre | 
likeconcerts | likejazz | likeclassical | likeopera | likerock | likevegas | likebroadway | likemusicals
-------+----------+-----------+------------+-----------+-------+----------------+----------------+--------
----+-------------+--------------+----------+---------------+-----------+----------+-----------+----------
----+--------------
 50000 | aaaaaaaa | Erik | Brandsberg | Somewhere | CA | email@host.com | (408) 555-1212 | | | | | | | | | |
(1 row)

Time: 2.280 ms

From the Heimdall logs:

[2018-05-02 20:49:11,819] Execute trigger: { "type": "sql", "pattern": "insert into users values (?, ?, ?, 
  ?, ?, ?, ?, ? )", "command": 
"SELECT flush_table(\u0027users\u0027);", "dataSource": "Postgres-test", "delay": 60000, 
  "maxDelay": 120000, "unique": true }

After one minute, on the Postgres server, the new_user table has been flushed to Greenplum

sa=> select * from new_users;
 pkey | username | firstname | lastname | city | state | email | phone | likesports | liketheatre | 
likeconcerts | likejazz | likeclassical | likeopera | likerock | likevegas | likebroadway | likemusicals
------+----------+-----------+----------+------+-------+-------+-------+------------+-------------+-------
-------+----------+---------------+-----------+----------+-----------+--------------+--------------
(0 rows)

And on the Greenplum server, the data is received:

postgres=# select * from users where pkey = 50000;
 pkey | username | firstname | lastname | city | state | email | phone | likesports | liketheatre | 
likeconcerts | likejazz | likeclassical | likeopera | likerock | likevegas | likebroadway | likemusicals
-------+----------+-----------+------------+-----------+-------+----------------+----------------+--------
----+-------------+--------------+----------+---------------+-----------+----------+-----------+----------
----+--------------
 50000 | aaaaaaaa | Erik | Brandsberg | Somewhere | CA | email@host.com | (408) 555-1212 | | | | | | | | | |
(1 row)

And in the Postgres server again:

sa=> select * from users where pkey=50000;
 pkey | username | firstname | lastname | city | state | email | phone | likesports | liketheatre | 
likeconcerts | likejazz | likeclassical | likeopera | likerock | likevegas | likebroadway | likemusicals
-------+----------+-----------+------------+-----------+-------+----------------+----------------+--------
----+-------------+--------------+----------+---------------+-----------+----------+-----------+----------
----+--------------
 50000 | aaaaaaaa | Erik | Brandsberg | Somewhere | CA | email@host.com | (408) 555-1212 | | | | | | | | | 
(1 row)

Time: 2.339 ms

Postgres to Greenplum Configuration

As has been mentioned before, at the moment Greenplum isn’t directly accessible from Postgres via the built-in Foreign Data Wrappers.  When used, a variety of queries are used that aren’t supported on Greenplum.  This includes:

SET TRANSACTION ISOLATION LEVEL REPEATABLE READ

SELECT … ORDER BY … NULLS FIRST|LAST

There may end up being some other behaviors that are not yet supported, as well.  For the cases that have been isolated, the following rules can be used on a separate VDB that will edit the queries to provide a reasonable replacement behavior:

Detailed views of the rules involved:

To handle the unsupported Repeatable Read clause:

To handle NULLS FIRST:

And NULLS LAST:

Summary

The net result of this configuration is:

  1. Effectively adding “offload” materialized views to Greenplum for use in report generation;
  2. Improved SQL dialect compatibility for tools designed for Postgres, as once populated, the views will be processed on Postgres;
  3. Distributed processing across two different database clusters;
  4. Ability to keep data in sync between clusters without manual intervention;
  5. Using the right tool for the job–Greenplum for the larger dataset, and Postgres for summarized datasets that are processed and used repeatedly as part of report generation.

This example also illustrates the power of using Heimdall as a Database Proxy, and the type of flexibility it gives an administrator to manage the flow of queries and augment an existing database with additional resources.  As David Wheeler is often quoted as having said, “All problems in computer science can be solved by another level of indirection”.  Heimdall provides a layer of indirection between the application and the database, and through this layer, many challenging issues can be resolved without code modification.