Thursday, July 12, 2012

Riak Java Client Distilled

In this blog, we will show how to use Riak Java client to,
  • Create/update objects
  • Enable and search by secondary index
  • Add links and walk links
  • Enable and search by free text through MapReduce

Riak Configuration

There are a few configuration changes that we will need to make to app.config to enable secondary index and listening for all ports,
  1. Change back end to use ELevelDB, the only storage engine that supports secondary index
  2. Change localhost or 127.0.0.1 to 0.0.0.0 for all IP addresses so Riak will listen on all ports
  3. Enable Riak for search by modifying app.config file
  4. {riak_search, [
                    %% To enable Search functionality set this 'true'.
                    {enabled, true}
                   ]}
    

Riak Java Client

All Riak server access is done through a Riak client.

Which Riak Client to Use
Riak Java library offers two types of Riak clients, which is very confusion. We found that most tasks can be accomplished using the pbc (low level protocol buffer client) client, except for the following exceptions that one must use the HPTT client,
  • Enable free text search for buckets
How to Obtain a Riak Client

 RiakClient riakClient = RiakFactory.pbcClient(host, port);  

Shutdown Riak Client in the End

One must shutdown all active risk client before shutting down the application/Tomcat server itself.
 riakClient.shutdown();  

Create, Update, and Lookup Object

Riak Client API offers a few annotations to indicate a particular field and we highly recommend use them rather than playing around the metadata ourselves,

  • The Riak Key field (through @RiakKey annotation)
  • A Riak secondary index field (through @RiakIndex annotation)
  • A Riak links collection field (through @RiakLinks annotation)
When we persist an annotated object through Riak client, Riak client will process the key, secondary indices, and links first before handling the object to Jackson for serializing into JSON string and storing the JSON string in Riak. If we choose to manage the object serialization/deserialization through Jackson ourselves, we must also handle the metadata changes like a new secondary index is added/removed or new links are added or removed. If not handled carefully, we could easily lose the existing secondary indices/links when an object is updated.

Here is an example highlighting the usage of above annotations,

 public class JsonObject  
 {  
   @JsonProperty  
   String bucket;  
   
   @RiakKey  
   String key;  
   
   @JsonProperty  
   String name;  
   
     
   @RiakLinks  
   @JsonIgnore  
   Collection<RiakLink> riakLinks = new ArrayList<RiakLink>();  
   
     
   @RiakIndex(name = "uri")  
   @JsonProperty  
   String uriIndex;  
   
  }  

To save/update an object,

 this.riakClient.createBucket(bucket).execute().store(object).execute();  
   

To lookup an object by key,
   
 @Override  
   public <T> T get(final String bucket, final String key, final Class<T> kclass)  
   {  
     try  
     {  
       return this.riakClient.fetchBucket(bucket).execute().fetch(key, kclass).execute();  
     }  
     catch (final RiakRetryFailedException e)  
     {  
       throw new RuntimeException(e);  
     }  
   }  

Secondary Index Creation and Retrieval
When an object's field has @RiakIndex annotated, secondary index is automatically created/updated when the object is stored or updated.

To look up an object based on secondary index,

 public List<String> fetchIndex(final String bucket, final String indexName, final String indexValue)  
   {  
     try  
     {  
   
       return this.riakClient.fetchBucket(bucket).execute().fetchIndex(BinIndex.named(indexName))  
           .withValue(indexValue).execute();  
     }  
     catch (final RiakException e)  
     {  
       throw new RuntimeException(e);  
     }  
   
     // Collection<String> collection = results.getResult(String.class);  
   }  

Riak Search
Riak search must be enabled at the bucket level before Riak will index properties on all objects in the bucket. To

 bin/search-cmd install my_bucket_name  
To execute a Riak search on a given bucket,
 @Override  
   public Collection<JsonObject> search(final String bucket, final String criteria)  
   {  
     try  
     {  
       final MapReduceResult mapReduceResult = this.riakClient.  
           mapReduce(bucket, criteria)  
           .addMapPhase(new NamedJSFunction("Riak.mapValuesJson")).execute();  
       return mapReduceResult.getResult(JsonObject.class);  
     }  
     catch (final Exception e)  
     {  
       throw new RuntimeException(e);  
     }  
   }  
Where parameter bucket is the bucket name and the criteria is the search criteria like "type=Folder" or "(type=Folder AND name=Hello)".

Riak Link Walking
Riak link walking apparently only with HTTP client, not the pbc client for some reason.

Here is a sample code to link walk a specific number of steps from the current object, identified by key.
  @Override  
   public List<List<String>> walk(  
                   final String bucket, // bucket name  
                   final String key,  // originating object key  
                   final String linkName,  // link name  
                   final int steps   // number of steps to walk. Riak will stop if it can't walk further  
                   )  
   {  
     final List<List<String>> walkResults = new ArrayList<List<String>>();  
   
     try  
     {  
       final LinkWalk linkWalk = this.riakHttpClient.walk(this.riakHttpClient.createBucket(bucket).execute().fetch(key)  
           .execute());  
   
       for (int i = 0; i < steps; i++)  
       {  
         linkWalk.addStep(bucket, linkName, true);  
       }  
       final WalkResult walkResult = linkWalk.execute();  
   
       final Iterator<Collection<IRiakObject>> it = walkResult.iterator();  
       while (it.hasNext())  
       {  
         final List<String> list = new ArrayList<String>();  
         final Collection<IRiakObject> collections = it.next();  
   
         for (final IRiakObject riakObject : collections)  
         {  
           list.add(riakObject.getKey());  
         }  
         if (list.size() > 0)  
         {  
           walkResults.add(list);  
         }  
       }  
     }  
     catch (final Exception e)  
     {  
       throw new RuntimeException(e);  
     }  
   
     return walkResults;  
   }  

No comments:

Post a Comment