- Create/update objects
- Enable and search by secondary index
- Add links and walk links
- Enable and search by free text through MapReduce
Riak Configuration
There are a few configuration changes that we will need to make to app.config to enable secondary index and listening for all ports,
- Change back end to use ELevelDB, the only storage engine that supports secondary index
- Change localhost or 127.0.0.1 to 0.0.0.0 for all IP addresses so Riak will listen on all ports
- Enable Riak for search by modifying app.config file
{riak_search, [ %% To enable Search functionality set this 'true'. {enabled, true} ]}
Riak Java Client
All Riak server access is done through a Riak client.
Riak Java library offers two types of Riak clients, which is very confusion. We found that most tasks can be accomplished using the pbc (low level protocol buffer client) client, except for the following exceptions that one must use the HPTT client,
- Enable free text search for buckets
How to Obtain a Riak Client
RiakClient riakClient = RiakFactory.pbcClient(host, port);
Shutdown Riak Client in the End
One must shutdown all active risk client before shutting down the application/Tomcat server itself.
riakClient.shutdown();
Create, Update, and Lookup Object
Riak Client API offers a few annotations to indicate a particular field and we highly recommend use them rather than playing around the metadata ourselves,
- The Riak Key field (through @RiakKey annotation)
- A Riak secondary index field (through @RiakIndex annotation)
- A Riak links collection field (through @RiakLinks annotation)
When we persist an annotated object through Riak client, Riak client will process the key, secondary indices, and links first before handling the object to Jackson for serializing into JSON string and storing the JSON string in Riak. If we choose to manage the object serialization/deserialization through Jackson ourselves, we must also handle the metadata changes like a new secondary index is added/removed or new links are added or removed. If not handled carefully, we could easily lose the existing secondary indices/links when an object is updated.
Here is an example highlighting the usage of above annotations,
public class JsonObject
{
@JsonProperty
String bucket;
@RiakKey
String key;
@JsonProperty
String name;
@RiakLinks
@JsonIgnore
Collection<RiakLink> riakLinks = new ArrayList<RiakLink>();
@RiakIndex(name = "uri")
@JsonProperty
String uriIndex;
}
To save/update an object,
this.riakClient.createBucket(bucket).execute().store(object).execute();
To lookup an object by key,
@Override
public <T> T get(final String bucket, final String key, final Class<T> kclass)
{
try
{
return this.riakClient.fetchBucket(bucket).execute().fetch(key, kclass).execute();
}
catch (final RiakRetryFailedException e)
{
throw new RuntimeException(e);
}
}
Secondary Index Creation and Retrieval
When an object's field has @RiakIndex annotated, secondary index is automatically created/updated when the object is stored or updated.
To look up an object based on secondary index,
public List<String> fetchIndex(final String bucket, final String indexName, final String indexValue)
{
try
{
return this.riakClient.fetchBucket(bucket).execute().fetchIndex(BinIndex.named(indexName))
.withValue(indexValue).execute();
}
catch (final RiakException e)
{
throw new RuntimeException(e);
}
// Collection<String> collection = results.getResult(String.class);
}
Riak Search
Riak search must be enabled at the bucket level before Riak will index properties on all objects in the bucket. To
bin/search-cmd install my_bucket_name
To execute a Riak search on a given bucket,
@Override
public Collection<JsonObject> search(final String bucket, final String criteria)
{
try
{
final MapReduceResult mapReduceResult = this.riakClient.
mapReduce(bucket, criteria)
.addMapPhase(new NamedJSFunction("Riak.mapValuesJson")).execute();
return mapReduceResult.getResult(JsonObject.class);
}
catch (final Exception e)
{
throw new RuntimeException(e);
}
}
Where parameter bucket is the bucket name and the criteria is the search criteria like "type=Folder" or "(type=Folder AND name=Hello)".Riak Link Walking
Riak link walking apparently only with HTTP client, not the pbc client for some reason.
Here is a sample code to link walk a specific number of steps from the current object, identified by key.
@Override
public List<List<String>> walk(
final String bucket, // bucket name
final String key, // originating object key
final String linkName, // link name
final int steps // number of steps to walk. Riak will stop if it can't walk further
)
{
final List<List<String>> walkResults = new ArrayList<List<String>>();
try
{
final LinkWalk linkWalk = this.riakHttpClient.walk(this.riakHttpClient.createBucket(bucket).execute().fetch(key)
.execute());
for (int i = 0; i < steps; i++)
{
linkWalk.addStep(bucket, linkName, true);
}
final WalkResult walkResult = linkWalk.execute();
final Iterator<Collection<IRiakObject>> it = walkResult.iterator();
while (it.hasNext())
{
final List<String> list = new ArrayList<String>();
final Collection<IRiakObject> collections = it.next();
for (final IRiakObject riakObject : collections)
{
list.add(riakObject.getKey());
}
if (list.size() > 0)
{
walkResults.add(list);
}
}
}
catch (final Exception e)
{
throw new RuntimeException(e);
}
return walkResults;
}