Wednesday, September 14, 2011

Pivoting columns into rows using Pentaho

Another interesting Pentaho data integration tool's functionality that I played around with is: pivot delimited fields in a file into rows. Following is the description of the test case.

This transformation requires pivoting a column in the incoming row into multiple rows. Input & outputs are file based so no database table structure is presented in this section.

Input Data:

Input file contains two data elements - email address and list of pages (properties) visited by the user. Second field is of variable length and number of properties (e.g. 3 pages for the first email as compared to 1 for the second email address) is each line is not known beforehand. Email & "properties" are separated by ","  while individual elements in the second field is separated by semi-colon (";").

Example:
Input data
a@sbcglobal.net,Auto;Mail;Finance
b@sbcglobal.net,Auto

Intended Transformation:

The transformation will generate two output files. The format of the output files are described below.
First file will contain columns from source pivoted into rows.
Output will look like
Output data
a@sbcglobal.net,Auto
a@sbcglobal.net,Mail
a@sbcglobal.net,Finance
b@sbcglobal.net,Auto

Second file will contain count of unique emails per property. So this output file will look like:
Output Data
Auto,2
Mail,1
Finance,1

Generation of test data:

Oracle PL/SQL block is used to generated random data set with email ids and pages that these users visited.

DECLARE
   l_fp UTL_FILE.file_type := UTL_FILE.fopen('DATA_DIR','output.txt','w') ;
BEGIN
   FOR c1 IN (SELECT email,
                     CASE
                        WHEN rn = 1 THEN TRIM(SUBSTR(property_list,1,INSTR(property_list,';') - 1))
                        ELSE TRIM(SUBSTR(property_list,INSTR(property_list || ';' ,';',1,rn - 1) + 1, INSTR(property_list || ';' ,';',1,rn ) - INSTR(property_list || ';' ,';',1,rn - 1 ) - 1 ))
                     END property_name
               FROM  (SELECT a.* , b.column_value rn
                      FROM   (SELECT a.email, a.property_list, LENGTH(property_list || ';' ) - LENGTH(REPLACE(property_list || ';' ,';','')) no_of_elements
                              FROM   property_visited_ext a) a,
                             TABLE(generate_rows(no_of_elements)) b
                     )
              )
   LOOP
      UTL_FILE.put_line(l_fp,c1.email || ',' || c1.property_name) ;
   END LOOP ;
   UTL_FILE.fclose(l_fp) ;
END ;

Slowly changing dimension maintenance using Pentaho

After installed Pentaho enterprise data integration tool in my laptop I attempted to test few of Pentaho’s functionality. Out-of-the-box data warehousing transformation for slowly changing dimension appears to be very impressive. So I started playing around with it and in due course generated some random data for testing purposes. During my exploration some of the shortcomings became evident.
  • The tool CAN support hybrid dimension (type I & II mixed).
  • It will maintain effective start and end date in dimension record.
  • If the dimension happens to have a version number, it can maintain that as well.
  • I wanted to have a current record indicator and didn’t find any way to maintain it automatically.
Current record indicator (allowable values Y/N) has immense value in identifying the latest version of a record. I felt restricted by not being able to maintain it as part of “dimension lookup/update” step. What this means is several additional steps (update/conditional branches) to complete such a dimension maintenance ETL step. Additional updates against a big dimension table can turn out to be quite expensive.




The following section describes the test case for SCD2

DROP TABLE product_dim PURGE ;

CREATE TABLE product_dim (
   product_sid NUMBER , /* surrogate key */
   product_id NUMBER , /* operational key */
   product_name VARCHAR2(256) , /* type I attribute. In place update of all records with same operational key when changes are detected in this field */
   manufacturer_name VARCHAR2(256) , /* type I attribute. In place update of all records with same operational key when changes are detected in this field */
   product_category VARCHAR2(64) , /* type II attribute. Create new record with product SID generated out a database sequence (or similar object). End date the last record. Mark current_record_indicator = 'N' for that record */
   effective_start_date DATE,
   effective_end_date DATE , /* active record for an operational key will have effective_end_date = DATE '2500-01-01' */
   record_version NUMBER, /* version number of dimension record */
   current_record_indicator VARCHAR2(1) DEFAULT 'N', /* allowable values - Y/N */
   CONSTRAINT product_dim_pk PRIMARY KEY (product_sid) USING INDEX) ;

DROP TABLE changed_products ; -- daily modifications to product dimension

CREATE TABLE changed_products (
   product_id NUMBER,
   product_name VARCHAR2(256) ,
   manufacturer_name VARCHAR2(256),
   product_category VARCHAR2(64))

Instead of “changed product” table, I modified the transformation to read a file primarily to test out the capability of reading a compressed file. Oracle SQL script is used to generate some random data set:

DECLARE
   l_manf_names SMALL_VARCHAR_NTT := SMALL_VARCHAR_NTT('Kellogs','General Mills','Tyson','Foster Farm','Johnson','Philips') ;
   l_prod_category SMALL_VARCHAR_NTT := SMALL_VARCHAR_NTT('Baby','Beauty','Food','Health','Drugs') ;
   l_fp UTL_FILE.file_type := UTL_FILE.FOPEN('DATA_DIR','changed_products.txt','w') ;
BEGIN
   FOR i IN 1..10000
   LOOP
      UTL_FILE.put_line(l_fp,i || ',' || 'Product - ' || i || ',' || l_manf_names(TRUNC(Dbms_random.value(1,6))) || ',' ||  l_prod_category(TRUNC(Dbms_random.value(1,6)))) ;
   END LOOP ;
   UTL_FILE.fclose(l_fp) ;
END ;
/
TRUNCATE TABLE product_dim ;


DROP SEQUENCE product_sid_s ;

CREATE SEQUENCE product_sid_s START WITH 1 ;

Number of records in the incoming flat file can be easily controlled by changing the upper limit of the loop index. First run of the transformation creates new production dimension records. Type I attribute changes will require updating all records with new value for all records with same operational key (referred as “punch through” in Pentaho).Transformation process is repeated after incoming data file is changed to include type I & type II attribute changes. Data generation for second run is follows:

DECLARE
   l_manf_names SMALL_VARCHAR_NTT := SMALL_VARCHAR_NTT('Kellogs','General Mills','Tyson','Foster Farm','Johnson','Philips') ;
   l_prod_category SMALL_VARCHAR_NTT := SMALL_VARCHAR_NTT('Baby','Beauty','Food','Health','Drugs') ;
   l_product_ids NUMBER_NTT := NUMBER_NTT() ;
   l_fp UTL_FILE.file_type := UTL_FILE.FOPEN('DATA_DIR','changed_products.txt','w') ;
BEGIN
   SELECT product_id
   BULK COLLECT INTO l_product_ids
   FROM   (SELECT product_id
           FROM   (SELECT a.* , TRUNC(DBMS_RANDOM.value(1,10000)) rec_no
                   FROM   product_dim a
                   WHERE  current_record_indicator = 'Y')
           ORDER BY rec_no)
   WHERE  rownum <= 500 ;
   FOR c1 IN (SELECT column_value product_id
             FROM   TABLE(l_product_ids))
   LOOP
      UTL_FILE.put_line(l_fp,c1.product_id || ',' || 'Product - ' || c1.product_id || '.1' || ',' || l_manf_names(TRUNC(Dbms_random.value(1,6))) || ',' ||  l_prod_category(TRUNC(Dbms_random.value(1,6)))) ;
   END LOOP ;
   UTL_FILE.fclose(l_fp) ;
END ;
/
In the above script randomly selected 500 records type I & type II attributes are changed. Type II changes will have additional overhead of updating effective end date of the last record with same operational and inserting a new record. Here also, by changing the upper limit of the loop index we can make the incoming data set bigger and test scalability of the tool. Ability of Pentaho to read the compressed file is very beneficial specially for large data set.