Hadoop Hive UDTF Tutorial – Extending Apache Hive with Table Functions

Source: http://beekeeperdata.com/posts/hadoop/2015/07/26/Hive-UDTF-Tutorial.html

Author: Matthew Rathbone
Co-author: Elena Akhmatova

 

Article

Hadoop Hive UDTF Tutorial – Extending Apache Hive with Table Functions

While working with both Primitive types and Embedded Data Structures was discussed in part one, the UDF interfaces are limited to a single output.

In this post we will look at user defined table functions represented by org.apache.hadoop.hive.ql.udf.generic.GenericUDTF interface. This function type is more complex, but allows us to output multiple rows and multiple columns for a single input (nifty!).

Code

All code and data used in this post can be found in my hive examples GitHub repository.

Demonstration Data

The table that will be used for demonstration is called people. It has one column – name, which contains names of individuals and couples.

It is stored in a file called people.txt

~$ cat ./people.txt

John Smith
John and Ann White
Ted Green
Dorothy

We can upload this to Hadoop to a directory called people:

hadoop fs -mkdir people
hadoop fs -put ./people.txt people

Then load up the hive shell, and create the hive table

CREATE EXTERNAL TABLE people (name string)
ROW FORMAT DELIMITED FIELDS 
  TERMINATED BY '\t' 
  ESCAPED BY '' 
  LINES TERMINATED BY '\n'
STORED AS TEXTFILE 
LOCATION '/user/matthew/people';

The Value of UDTF

The UDF and GenericUDF functions from the previous article manipulate a single row of data. They return one element, and they must return a value.

This is not convenient for all data processing tasks. As Hive can store data of many kinds sometimes we do not want to have exactly one row of output for a given input. Perhaps we wish to output a few rows per input row, or output no rows at all. As an example, think what the function explode (a Hive Built-In function) can do.

Similarly, perhaps we also wish to output several columns of data, instead of simply returning a single value.

Both these things we can accomplish with a UDTF.

A Practical Example

Lets suppose that we would like to create a cleaner table of peoples’ names. The new table will have:

  • Separate columns for First Name and Surname.
  • No records that do not contain both first and last names (have no separating white space).
  • Separate rows for each person in a couple (eg Nick and Nicole Smith).

To accomplish this goal, we will implement the org.apache.hadoop.hive.ql.udf.generic.GenericUDTF API.

We have to override 3 methods:

 

// in this method we specify input and output parameters: input ObjectInspector and an output struct
abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException; 

// here we process an input record and write out any resulting records 
abstract void process(Object[] record) throws HiveException;

// this function is Called to notify the UDTF that there are no more rows to process. Clean up code or additional output can be produced here.
abstract void close() throws HiveException;

Full Implementation

 

public class NameParserGenericUDTF extends GenericUDTF {

    private PrimitiveObjectInspector stringOI = null;

    @Override
    public StructObjectInspector initialize(ObjectInspector[] args) UDFArgumentException {

      if (args.length != 1) {
        throw new UDFArgumentException("NameParserGenericUDTF() takes exactly one argument");
      }

      if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE
          && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
        throw new UDFArgumentException("NameParserGenericUDTF() takes a string as a parameter");
      }
        
      // input inspectors
      stringOI = (PrimitiveObjectInspector) args[0];

      // output inspectors -- an object with two fields!
      List<String> fieldNames = new ArrayList<String>(2);
      List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);
      fieldNames.add("name");
      fieldNames.add("surname");
      fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
      fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
      return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }
    	  
    public ArrayList<Object[]> processInputRecord(String name){
        ArrayList<Object[]> result = new ArrayList<Object[]>();
      
        // ignoring null or empty input
        if (name == null || name.isEmpty()) {
          return result;
        }
        
        String[] tokens = name.split("\\s+");
        
        if (tokens.length == 2){
        	result.add(new Object[] { tokens[0], tokens[1] });
        }else if (tokens.length == 4 && tokens[1].equals("and")){
        	result.add(new Object[] { tokens[0], tokens[3] });
        	result.add(new Object[] { tokens[2], tokens[3] });
        }
        
        return result;
    }
    
    @Override
    public void process(Object[] record) throws HiveException {

      final String name = stringOI.getPrimitiveJavaObject(record[0]).toString();

      ArrayList<Object[]> results = processInputRecord(name);

      Iterator<Object[]> it = results.iterator();
      
      while (it.hasNext()){
      	Object[] r = it.next();
      	forward(r);
      }
    }

    @Override
    public void close() throws HiveException {
      // do nothing
    }
}

Please check the github directory for the code.

Code Walkthrough

The UDTF takes string as a parameter and returns a struct with two fields. Similarly to the GenericUDF, we have to manually configure all of the input and output object inspectors Hive needs in order to understand the inputs and outputs.

We identify a PrimitiveObjectInspector for the input string.

stringOI = (PrimitiveObjectInspector) args[0]

Defining the output object inspectors requires us to define both field names, and the object inspectors required to read each field (in our case, both fields are strings).

 

List<String> fieldNames = new ArrayList<String>(2);
fieldNames.add("name");
fieldNames.add("surname");

List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);

The bulk of our logic resides in the processInputRecord function which is fairly straightforward. Separating our logic allows easier testing without having to struggle with object inspectors.

Finally, once we have the result we can forward it, this registers that object as an output record for Hive to process.

    while (it.hasNext()){
      Object[] r = it.next();
      forward(r);
    }
}

Using our function

We can build our function and use it in Hive

mvn package
cp target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar ./ext.jar

Then use it from hive

 

ADD JAR ./ext.jar;

CREATE TEMPORARY FUNCTION process_names as 'com.matthewrathbone.example.NameParserGenericUDTF'; 

SELECT 
  adTable.name,
  adTable.surname 
FROM people 
  lateral view process_names(name) adTable as name, surname;
OK
John	Smith
John	White
Ann		White
Ted		Green

Testing

It is best to divide testing of a UDTF into two parts. Testing the data processing itself, and then testing the function as a whole in Hive. Testing in Hive is always recommended due to the complexity of the different elements, input formats, and data.

Below is an example unit test for splitting person’s name into name and surname, again this can found in full on GitHub:

public class NameParserGenericUDTFTest {

    @Test
    public void testUDTFOneSpace() {

      // set up the models we need
      NameParserGenericUDTF example = new NameParserGenericUDTF();
      
      ObjectInspector[] inputOI = {PrimitiveObjectInspectorFactory.javaStringObjectInspector};
     
      // create the actual UDF arguments
      String name = "John Smith";

      // the value exists
      try{
      	example.initialize(inputOI);
      }catch(Exception ex){

      }
   
      ArrayList<Object[]> results = example.processInputRecord(name);
      Assert.assertEquals(1, results.size());
      Assert.assertEquals("John", results.get(0)[0]);
      Assert.assertEquals("Smith", results.get(0)[1]);
    }
}

Finishing up

By now you should be a pro at customizing Hive functions.

If you need more resources you can check out my personal blog post for a walkthrough of building regular user defined functions, or take a look at the Apache Hive Book.

 

 

You may also like...

Leave a Reply

Your email address will not be published. Required fields are marked *