PostgreSQL, Hadoop, and Pentaho to Analyze and Visualize Data

This tutorial shows how to use Postgres, Hadoop, Hive, Hbase, and Pig to load, refine, and store big data for visualization- all in fewer than five minutes.

You can watch the BigSQL, Postgres + Hadoop, Pentaho Demo here or follow along with the written version below.

[youtube url=”http://www.youtube.com/watch?v=eSEe_33pImA” fs=”1″ rel=”0″]

The following tutorial shows the scripts used to accomplish the demo. It also shows how to complete the additional step of loading the data into HBase, which allows you to update, insert and delete data in the HDFS from Postgres.

The data used for this example consists of three files:

  • Products, which includes the category of products and their corresponding urls
  • Logs, which is semi-structured website logs that includes data like a timestamp, user id, and geocoded ip address for each entry
  • Users, which contains the users’ ids, birthday and gender

In Bigsql, we will create the Hive table definitions and move all of the data to the hadoop file system:

   hive > CREATE TABLE users ( SWID STRING, BIRTH_DT STRING, GENDER_CD STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION '/user/data/users';

   hive > CREATE TABLE products ( url STRING, category STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION '/user/data/products';

   hive > CREATE TABLE Logs ( col_0 STRING, col_1 STRING, col_2 STRING, col_3 STRING, col_4 STRING, col_5 STRING, col_6 STRING, col_7 STRING, col_8 STRING, col_9 STRING, col_10 STRING, col_11 STRING, col_12 STRING, col_13 STRING, col_14 STRING, col_15 STRING, col_16 STRING, col_17 STRING, col_18 STRING, col_19 STRING, col_20 STRING, col_21 STRING, col_22 STRING, col_23 STRING, col_24 STRING, col_25 STRING, col_26 STRING, col_27 STRING, col_28 STRING, col_29 STRING, col_30 STRING, col_31 STRING, col_32 STRING, col_33 STRING, col_34 STRING, col_35 STRING, col_36 STRING, col_37 STRING, col_38 STRING, col_39 STRING, col_40 STRING, col_41 STRING,        col_42 STRING, col_43 STRING, col_44 STRING, col_45 STRING, col_46 STRING, col_47 STRING,        col_48 STRING, col_49 STRING, col_50 STRING, col_51 STRING, col_52 STRING, col_53 STRING, col_54 STRING, col_55 STRING, col_56 STRING, col_57 STRING, col_58 STRING, col_59 STRING, col_60 STRING, col_61 STRING, col_62 STRING, col_63 STRING, col_64 STRING, col_65 STRING, col_66 STRING, col_67 STRING, col_68 STRING, col_69 STRING, col_70 STRING, col_71 STRING, col_72 STRING, col_73 STRING, col_74 STRING, col_75 STRING, col_76 STRING, col_77 STRING, col_78 STRING, col_79 STRING, col_80 STRING, col_81 STRING, col_82 STRING, col_83 STRING, col_84 STRING, col_85 STRING, col_86 STRING, col_87 STRING, col_88 STRING, col_89 STRING, col_90 STRING, col_91 STRING, col_92 STRING, col_93 STRING, col_94 STRING, col_95 STRING, col_96 STRING, col_97 STRING, col_98 STRING, col_99 STRING, col_100 STRING, col_101 STRING, col_102 STRING, col_103 STRING, col_104 STRING, col_105 STRING, col_106 STRING, col_107 STRING, col_108 STRING, col_109 STRING, col_110 STRING, col_111 STRING, col_112 STRING, col_113 STRING, col_114 STRING, col_115 STRING, col_116 STRING, col_117 STRING, col_118 STRING, col_119 STRING, col_120 STRING, col_121 STRING, col_122 STRING, col_123 STRING, col_124 STRING, col_125 STRING, col_126 STRING, col_127 STRING, col_128 STRING, col_129 STRING, col_130 STRING, col_131 STRING, col_132 STRING, col_133 STRING, col_134 STRING, col_135 STRING, col_136 STRING, col_137 STRING, col_138 STRING, col_139 STRING, col_140 STRING, col_141 STRING, col_142 STRING, col_143 STRING, col_144 STRING, col_145 STRING, col_146 STRING, col_147 STRING, col_148 STRING, col_149 STRING, col_150 STRING, col_151 STRING, col_152 STRING, col_153 STRING, col_154 STRING, col_155 STRING, col_156 STRING, col_157 STRING, col_158 STRING, col_159 STRING, col_160 STRING, col_161 STRING, col_162 STRING, col_163 STRING, col_164 STRING, col_165 STRING, col_166 STRING, col_167 STRING, col_168 STRING, col_169 STRING, col_170 STRING, col_171 STRING, col_172 STRING, col_173 STRING, col_174 STRING, col_175 STRING, col_176 STRING, col_177 STRING, col_178 STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION '/user/data/logs';

   hive > LOAD DATA LOCAL INPATH 'Downloads/webLogAnalytics/users.tsv' OVERWRITE INTO TABLE users;
   hive > LOAD DATA LOCAL INPATH 'Downloads/webLogAnalytics/products.tsv' OVERWRITE INTO TABLE products;
   hive > LOAD DATA LOCAL INPATH 'Downloads/webLogAnalytics/Omniture.tsv' OVERWRITE INTO TABLE logs;

Now, use the hbase shell to define the webloganalytics table:

   $ hbase shell
   hbase > create 'webloganalytics', 'info','user','location'

Next we will use the following Pig script to refine the data. You may need to export the following as your Hadoop_Classpath in order for pig to work correctly with HBase:

export HADOOP_CLASSPATH=$HBASE_HOME/hbase-0.94.11.jar:$HADOOP_HOME/hadoop-core-1.2.1.jar:$ZOOKEEPER_HOME/zookeeper-3.4.5.jar:$HBASE_HOME/lib/protobuf-java-2.4.0a.jar

First, register the piggybank, Hbase and time jars, and then define the two time functions we will use.

   grunt >  cd /user/data
   grunt >  REGISTER bigsql-9.3.0-rc2.56/pig-0.11.1-src/contrib/piggybank/java/piggybank.jar 
   grunt >  REGISTER bigsql-9.3.0-rc2.56/pig-0.11.1-src/build/ivy/lib/Pig/joda-time-1.6.jar
   grunt >  REGISTER bigsql-9.3.0-rc2.56/hbase/lib/protobuf-java-2.4.0a.jar
   grunt >  DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
   grunt >  DEFINE ISOYearsBetween org.apache.pig.piggybank.evaluation.datetime.diff.ISOYearsBetween();

Next, load the three tables from the hadoop file system:

   grunt >  users = LOAD '/user/data/users/users.csv' USING PigStorage(',') AS (SWID:chararray,BIRTH_DT:chararray,GENDER_CD:chararray);
   grunt >  products = LOAD '/user/data/products/products.tsv' USING PigStorage('\t') AS (url:chararray,category:chararray);
   grunt >  logs = LOAD '/user/data/logs/Omniture.0.tsv' USING PigStorage('\t') AS (col_0:chararray,col_1:chararray,col_2:chararray,col_3:chararray,col_4:chararray,col_5:chararray,col_6:chararray,col_7:chararray,col_8:chararray,col_9:chararray,col_10:chararray,col_11:chararray,col_12:chararray,col_13:chararray,col_14:chararray,col_15:chararray,col_16:chararray,col_17:chararray,col_18:chararray,col_19:chararray,col_20:chararray,col_21:chararray,col_22:chararray,col_23:chararray,col_24:chararray,col_25:chararray,col_26:chararray,col_27:chararray,col_28:chararray,col_29:chararray,col_30:chararray,col_31:chararray,col_32:chararray,col_33:chararray,col_34:chararray,col_35:chararray,col_36:chararray,col_37:chararray,col_38:chararray,col_39:chararray,col_40:chararray,col_41:chararray,col_42:chararray,col_43:chararray,col_44:chararray,col_45:chararray,col_46:chararray,col_47:chararray,col_48:chararray,col_49:chararray,col_50:chararray,col_51:chararray,col_52:chararray,col_53:chararray,col_54:chararray,col_55:chararray,col_56:chararray,col_57:chararray,col_58:chararray,col_59:chararray,col_60:chararray,col_61:chararray,col_62:chararray,col_63:chararray,col_64:chararray,col_65:chararray,col_66:chararray,col_67:chararray,col_68:chararray,col_69:chararray,col_70:chararray,col_71:chararray,col_72:chararray,col_73:chararray,col_74:chararray,col_75:chararray,col_76:chararray,col_77:chararray,col_78:chararray,col_79:chararray,col_80:chararray,col_81:chararray,col_82:chararray,col_83:chararray,col_84:chararray,col_85:chararray,col_86:chararray,col_87:chararray,col_88:chararray,col_89:chararray,col_90:chararray,col_91:chararray,col_92:chararray,col_93:chararray,col_94:chararray,col_95:chararray,col_96:chararray,col_97:chararray,col_98:chararray,col_99:chararray,col_100:chararray,col_101:chararray,col_102:chararray,col_103:chararray,col_104:chararray,col_105:chararray,col_106:chararray,col_107:chararray,col_108:chararray,col_109:chararray,col_110:chararray,col_111:chararray,col_112:chararray,col_113:chararray,col_114:chararray,col_115:chararray,col_116:chararray,col_117:chararray,col_118:chararray,col_119:chararray,col_120:chararray,col_121:chararray,col_122:chararray,col_123:chararray,col_124:chararray,col_125:chararray,col_126:chararray,col_127:chararray,col_128:chararray,col_129:chararray,col_130:chararray,col_131:chararray,col_132:chararray,col_133:chararray,col_134:chararray,col_135:chararray,col_136:chararray,col_137:chararray,col_138:chararray,col_139:chararray,col_140:chararray,col_141:chararray,col_142:chararray,col_143:chararray,col_144:chararray,col_145:chararray,col_146:chararray,col_147:chararray,col_148:chararray,col_149:chararray,col_150:chararray,col_151:chararray,col_152:chararray,col_153:chararray,col_154:chararray,col_155:chararray,col_156:chararray,col_157:chararray,col_158:chararray,col_159:chararray,col_160:chararray,col_161:chararray,col_162:chararray,col_163:chararray,col_164:chararray,col_165:chararray,col_166:chararray,col_167:chararray,col_168:chararray,col_169:chararray,col_170:chararray,col_171:chararray,col_172:chararray,col_173:chararray,col_174:chararray,col_175:chararray,col_176:chararray,col_177:chararray,col_178:chararray);

Select only the important fields from the logs table, convert the ‘yyyy-MM-dd HH:mm:ss’ format dates to an ISO timestamp, and ensure that all of the countries and states are all uppercase strings:

   grunt >  omniture = FOREACH logs GENERATE CustomFormatToISO(col_1, 'yyyy-MM-dd HH:mm:ss') AS logdate, col_7 AS ip, col_12 AS url, col_13 AS swid, col_49 AS city, col_50 AS country, col_52 AS state;
   grunt >  omniture2 = FOREACH omniture GENERATE logdate, url, ip, city, UPPER(state) AS state, UPPER(country) AS country, swid;

Convert the users’ table’s ‘dd-MMM-yy’ format birth dates to ISO timestamps and then calculate the age of each user:

   grunt >  users2 = FILTER users BY BIRTH_DT IS NOT NULL;
   grunt >  users3 = FOREACH users2 GENERATE SWID,ISOYearsBetween('2013-01-01T02:00:00.000Z', CustomFormatToISO(BIRTH_DT, 'dd-MMM-yy')) AS AGE,GENDER_CD;
   grunt >  users4 = JOIN users BY (SWID, GENDER_CD) LEFT OUTER, users3 BY (SWID, GENDER_CD);
   grunt >  users5 = FOREACH users4 GENERATE users::SWID AS swid, users3::AGE AS age, users::GENDER_CD AS gender;

Next, join the logs table with the user table by the user id, and then join that table with the products table by the url:

   grunt >  webloganalytics1 = JOIN omniture2 BY swid LEFT OUTER, users5 BY swid;
   grunt >  webloganalytics2 = JOIN webloganalytics1 BY url LEFT OUTER, products BY url;
   grunt >  webloganalytics3 = ORDER webloganalytics2 BY logdate;
   grunt >  webloganalytics4 = FOREACH webloganalytics3 GENERATE logdate, products::url, ip, city,state, country, category, users5::age, users5::gender;

For this tutorial, instead of just saving the data to HDFS, it will actually be loaded into hbase. In order to accomplish this, we need to first add a key to the webloganalytics table.

   grunt >  webloganalytics = RANK webloganalytics4;    
   grunt >  STORE webloganalytics INTO 'hbase://webloganalytics' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('info:logdate info:url user:ip location:city location:state location:country info:category user:age user:gender');

The resulting data, not including the new id:

Now, to use the foreign data wrapper’s insert, update and delete capabilities, you need to create the corresponding hive table:

   hive > CREATE TABLE webloganalytics ( id INT, logdate STRING, url STRING, ip STRING, city STRING, state STRING, country STRING, category STRING,   age INT, gender STRING ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" =":key,info:logdate,info:url,user:ip, location:city,location:state,location:country,info:category,user:age,user:gender") TBLPROPERTIES("hbase.table.name"="webloganalytics");

Next, use the Hadoop Foreign Data Wrapper to create the corresponding Postgres table:

   postgres=# CREATE EXTENSION hadoop_fdw;

   postgres=# CREATE SERVER hadoop_server FOREIGN DATA WRAPPER hadoop_fdw OPTIONS (address '127.0.0.1', port '10000');

   postgres=# CREATE USER MAPPING FOR PUBLIC SERVER hadoop_server;

   postgres=# CREATE FOREIGN TABLE webloganalytics ( id INT, logdate TIMESTAMP, url TEXT, ip TEXT, city TEXT, state TEXT, country TEXT, category TEXT, age INT, gender TEXT ) SERVER hadoop_server OPTIONS ( table 'webloganalytics' hbase_port '9090', hbase_address 'localhost', hbase_mapping ':key,info:logdate,info:url,user:ip,location:city, location:state,location:country,info:category,user:age,user:gender' );

Now we can show how we are able to select, insert, update, and delete data from postgres:

postgres=# SELECT * FROM webloganalytics WHERE state = 'CA' LIMIT 5;
  id    |       logdate     |                  url                 |       ip       |     city   | state  | country  |category | age | gender 
 --------+---------------------+-------------------------------------------+------------------+-------------+--------+---------+----------+-----+-------- 
 100002 | 2012-03-13 12:36:42 | http://www.acme.com/SH55126545/VD55177927 | 108.0.201.188   | long beach  | CA    | USA     | clothing |  25 | U
 100005 | 2012-03-13 12:36:43 | http://www.acme.com/SH55126545/VD55177927 | 108.0.201.188   | long beach  | CA    | USA     | clothing |  25 | U
 100010 | 2012-03-13 12:37:07 | http://www.acme.com/SH55126545/VD55179433 | 162.119.238.163 | los angeles | CA    | USA     | shoes    |     | 
 100011 | 2012-03-13 12:37:07 | http://www.acme.com/SH55126545/VD55179433 | 162.119.238.163 | los angeles | CA    | USA     | shoes    |     | 
 100012 | 2012-03-13 12:37:10 | http://www.acme.com/SH55126545/VD55179433 | 108.198.177.186 | los angeles | CA    | USA     | shoes    |  27 | M

postgres=# DELETE FROM webloganalytics WHERE id=100010;
postgres=# SELECT * FROM webloganalytics WHERE state = 'CA' LIMIT 5;
  id    |       logdate     |                  url                 |       ip       |     city   | state  | country  |category | age | gender 
 --------+---------------------+-------------------------------------------+------------------+-------------+--------+---------+----------+-----+-------- 
 100002 | 2012-03-13 12:36:42 | http://www.acme.com/SH55126545/VD55177927 | 108.0.201.188   | long beach  | CA    | USA     | clothing |  25 | U
 100005 | 2012-03-13 12:36:43 | http://www.acme.com/SH55126545/VD55177927 | 108.0.201.188   | long beach  | CA    | USA     | clothing |  25 | U
 100011 | 2012-03-13 12:37:07 | http://www.acme.com/SH55126545/VD55179433 | 162.119.238.163 | los angeles | CA    | USA     | shoes    |     | 
 100012 | 2012-03-13 12:37:10 | http://www.acme.com/SH55126545/VD55179433 | 108.198.177.186 | los angeles | CA    | USA     | shoes    |  27 | M
 100013 | 2012-03-13 12:37:13 | http://www.acme.com/SH55126545/VD55177927 | 108.0.201.188   | long beach  | CA    | USA     | clothing |  25 | U

postgres=# UPDATE webloganalytics SET age=30, gender='F' WHERE id=100011;
postgres=# SELECT * FROM webloganalytics WHERE state = 'CA' LIMIT 5;                                             
  id    |       logdate     |                  url                 |       ip       |     city   | state  | country  |category | age | gender 
 --------+---------------------+-------------------------------------------+------------------+-------------+--------+---------+----------+-----+-------- 
100002  | 2012-03-13 12:36:42 | http://www.acme.com/SH55126545/VD55177927 | 108.0.201.188   | long beach  | CA    | USA     | clothing |  25 | U 
100005  | 2012-03-13 12:36:43 | http://www.acme.com/SH55126545/VD55177927 | 108.0.201.188   | long beach  | CA    | USA     | clothing |  25 | U 
100011  | 2012-03-13 12:37:07 | http://www.acme.com/SH55126545/VD55179433 | 162.119.238.163 | los angeles | CA    | USA     | shoes    |  30 | F 
100012  | 2012-03-13 12:37:10 | http://www.acme.com/SH55126545/VD55179433 | 108.198.177.186 | los angeles | CA    | USA     | shoes    |  27 | M
100013  | 2012-03-13 12:37:13 | http://www.acme.com/SH55126545/VD55177927 | 108.0.201.188   | long beach  | CA    | USA     | clothing |  25 | U

postgres=# INSERT INTO webloganalytics VALUES(100010,'2012-03-13 12:37:07','http://www.acme.com/SH55126545/VD55179433','162.119/238/163','los angeles','CA','USA','shoes',30,'F'); INSERT 0 1 postgres=# SELECT * FROM webloganalytics WHERE state = 'CA' LIMIT 5;                                                                                                 
  id    |       logdate     |                  url                 |       ip       |     city   | state  | country  |category | age | gender 
 --------+---------------------+-------------------------------------------+------------------+-------------+--------+---------+----------+-----+-------- 
 100002 | 2012-03-13 12:36:42 | http://www.acme.com/SH55126545/VD55177927 | 108.0.201.188   | long beach  | CA    | USA     | clothing |  25 | U
 100005 | 2012-03-13 12:36:43 | http://www.acme.com/SH55126545/VD55177927 | 108.0.201.188   | long beach  | CA    | USA     | clothing |  25 | U
 100010 | 2012-03-13 12:37:07 | http://www.acme.com/SH55126545/VD55179433 | 162.119/238/163 | los angeles | CA    | USA     | shoes    |  30 | F
 100011 | 2012-03-13 12:37:07 | http://www.acme.com/SH55126545/VD55179433 | 162.119.238.163 | los angeles | CA    | USA     | shoes    |  30 | F
 100012 | 2012-03-13 12:37:10 | http://www.acme.com/SH55126545/VD55179433 | 108.198.177.186 | los angeles | CA    | USA     | shoes    |  27 | M