Objects in Riak. Riak stores values as opaque binary objects. The most convenient way of storing objects: JSON- encoded objects, e.g. we have a "book" object. Yokozuna. Webmachine HTTP. Riak KV image courtesy of Eric Redmond, "A Little Riak Book" caite.info 5. Riak KV image courtesy of Eric Redmond, "A Little Riak Book" .. see http://www. caite.info
|Language:||English, Spanish, French|
|ePub File Size:||26.86 MB|
|PDF File Size:||18.18 MB|
|Distribution:||Free* [*Regsitration Required]|
Riak Handbook. Mathias Meyer. Revision 27e1e7fb How to read the book. .. Using Riak's Built-in MapReduce Functions Riak caite.info - Ebook download as PDF File .pdf), Text File .txt) or read book online. Basho Technologies today announced the immediate availability of the second edition of Riak Handbook. CAMBRIDGE, MA – June 1,
Here's how to run our function on a single tweet. We already used a bunch of them. Try updating our tweet a few times. We could either install our own functions or use Riak's built-ins. Why don't we do that right away? We can use the arguments passed to each function for just that. Changelog Version 1.
A Little Riak Book. Find File. Download ZIP. Sign in Sign up. Launching GitHub Desktop Go back. Launching Xcode Launching Visual Studio Fetching latest commit…. A Little Riak Book You can download different formats from rendered. You must have ruby installed, tested mostly on 1. Thanks to that team. You signed in with another tab or window. Reload to refresh your session. It now not only serves requests for its own data.
Some databases and commercial concepts take consistent hashing even further to reduce the potential of overloading and uneven spread of data. The solution once again is rather simple. This simple concept is called a virtual node and solves two problems at once. It helps to spread request load evenly across the cluster. Dealing with Overload and Data Loss Every node that joins the cluster not only grabs its own slice of the ring.
The goal was for it to survive network partitions and be easily replaceable even across data centers. It's like an evenly sliced pizza.
Dynamo is an accumulation of techniques and technologies. All these requirements stemmed from actual business requirements. Dynamo takes it a step further.
Amazon's Dynamo Amazon's Dynamo One of the more influential products and papers in the field has been Amazon's Dynamo. We already came across them as a solution to spread load in a cluster using consistent hashing. The result is a distributed. When a cluster is defined.
Let's go through the most important ones. It takes concepts like eventual consistency. Basics Dynamo is meant to be easily scalable in a linear fashion by adding and removing nodes. Virtual Nodes Dynamo takes the idea of consistent hashing and adds virtual nodes to the mix. Say you have a cluster with 3 nodes and 32 partitions.
A partition is hosted by a virtual node. Master-less Cluster A hash ring with equally sized partitions. Whenever a node joins. When the data is stored in a partition on a different node. Master-less Cluster No node in a Dynamo cluster is special. Every client can request data from any node and write data to any node. When you bring a fourth node into the ring. As the cluster grows and shrinks the virtual node may or may not move to other physical nodes. Whenever a client requests data from a node.
The advantage of choosing a partitioning scheme like that is that the ring setup is known and constant throughout the cluster's life. Every node in the cluster has knowledge of the partitioning scheme. As I mentioned already. They don't need to keep track of a table with partitions and their respective nodes. Conflict Resolution using Vector Clocks Before you're nodding off with all the theoretical things we're going through here. Read Repair and Hinted Handoff Read repair is a way to ensure consistency of data between replicas.
While the node is down. Whenever a client Riak Handbook A vector clock is a pair of a server identifier and a version. So if every piece of data is replicated three times across the cluster. Hinted handoff is an active process. Quorum-based Replication This has the added benefit that clients don't need to know about the way data is partitioned. PN the number of physical nodes. It's a passive process that kicks in during a read operation to ensure all replicas have an up-to-date view of the data.
When the node comes back up. Amazon leaves it up to a specific engineering team's preference how to deal with read and write consistency in their particular setup. In a distributed database system. Quorum-based Replication As explained above in the section on consistent hashing. They simply ask any node for the data they're interested in.
A physical node may not only hold the data in the partitions it picked. The quorum is the consistency-availability tuning knob in a Dynamo cluster.
All of Bob's updates descent from one another. Like the conflicts created by Alice and then Carol in the picture above. There are two great summaries on the Basho blog.
As long as the path through the pairs is the same. Conflict Resolution using Vector Clocks updates an object it provides the vector clock it's referring to.
We've run into a conflict. I'm sure. The basic idea of vector clocks goes way back into the seventies. But it wasn't until that the idea of vector clocks that include both time and a secondary means of deriving ordering was published.
A simplified view of a vector clock. The fun starts when two different clients update the same objects. That was in Each client adds a new identifier to the list of pairs. Let's have a look at an example. Dynamo doesn't really bother with the conflict. We now have two vector clocks that aren't descendants of each other. Vector clocks can be pretty mind-bending. When the object is updated the coordinating node adds a new pair with server identifier and version. Cassandra also drew some inspiration from it.
They're just a means for a database to discover conflicting updates. Conclusion Dynamo throws quite a punch. Conclusion Vector clocks are confusing. It's a great collection of different algorithms and technologies.
There have been several open source implementations. Even though it's a lot to take in. Project Voldemort. One of the first things they added was the ability to have links between objects stored in Riak. And Then Some There's more to Riak than meets the eye though.
What is Riak? Riak does one thing. Riak is pretty agnostic. But when I mention Riak KV. Usually when I say Riak. A Riak cluster can scale in a linear and predictable fashion.
To no-one's surprise. The basic way to store data is by specifying a key and a value for it. Another noteworthy feature is MapReduce. Throw on top the whole shebang of fault tolerance. At the very core. As a means of indexing and querying data. Simple as that. Riak is an implementation of Amazon's Dynamo. Riak's feature set has grown beyond just storing keys and values.
Riak offers full-text search and secondary indexes. A value stored with a key can be anything. Over time. We're looking at the basic feature set of Riak KV first. All of them neatly include the Erlang distribution required to run Riak. Riak requires Erlang R14B03 or newer. Which is most of them. Basho provides a bunch of binary packages for common systems like Debian. Be aware that Riak doesn't run on Windows. As of this writing. After you're done. That includes the installation process too.
When properly installed and started using riak start. Running npm install http: First we'll just use the tweet's identifier to reference tweets. That saves you the trouble of dealing with Linux distributions that come with outdated versions of Erlang.
Installing Riak using Binary Packages Riak is known to be easy to handle from an operational perspective. Installation Installation While you can use Homebrew and a simple brew install riak to install Riak.
We'll talk to Riak using Node. Important details links. Fetching Objects Now that we got that out of the way. This allows you to have stricter rules for objects that are of more importance in terms of consistency and replication than data for which a lack of immediate replication is acceptable. A bucket is nothing more than a way to logically separate physical data. The beauty of this holistic approach to packaging Riak is that it's easy to automate.
Talking to Riak So when you're on Ubuntu or Debian. Buckets Other than a key and a value.
Riak divides data into buckets. We'll start off the basics using both a client library and curl. A bucket is also a way to set different properties for things like replication for different types of data.
Creating Objects When you're installing and starting Riak. Creating Objects To create or update an object using riak-js: Here's how you'd do that using curl. If you haven't done so already. You can try this yourself using curl. After this example I'm assuming the riak library is loaded in the Node. By default. Examples are the vector clock. Type Ctrl-D after you're done with the body to send the request.
Custom Metadata You can specify a set of custom metadata yourself. Try updating our tweet a few times. So in the example above. Custom Metadata responseEncoding: Let's attach some location information to the tweet. We could of course simply store the original tweet's identifier. If you fancy object- oriented programming. A link can be tagged to give the connection context.. If you don't believe me. To store a link.. You can create logical trees or even graphs of objects When using HTTP.
In our tweets example however. Riak doesn't enforce any referential integrity on links though. Say frank Linking Objects Linking objects is one of the neat additions of Riak over and above Dynamo. Note that.. Which makes using riak-js all the better. It's not recommended to have more than links on a single object.
When you request a single object. Now when you fetch the new object via HTTP. The tag in this case identifies this tweet as a reply to the one we had before.
This way we can store entire conversations as a combination of links and key-value. You can fetch them with riak-js too. Walking Links So now that we have links in place. Walking Links key: The number of links on an object also adds to its total size.. Just add more link specifications to the URL. Given the nature of a Twitter conversation it will usually be just one. If you have multiple tags you're interested in. Play along to see what the results look like.
The following query will simply return all linked objects. Let's have a look at an example.. Walking Nested Links riak-js doesn't have support to walk links from objects in this way yet. We'll do this in our Node console.
When you run the command above you'll receive a multi-part response from Riak which is not exactly pretty to look at. It defaults to 0. Before we try it out. The response includes all the objects that are linked to from this tweet. So when you think of a bucket. And yes. A bucket is not a physical entity. A bucket is just a name in Riak. It's a name that allows you to set some configuration properties like data distribution.
All this has a couple of implications. Mind-bending in a way. Riak has to go through its entire dataset. To look up data Riak always uses both the bucket and the key. It's just a namespace. A bucket is nothing like a table in a relational database. Whenever you reference a bucket-key value in Riak to fetch a value.
The Anatomy of a Bucket There is great confusion around what a bucket in Riak actually is. If you need to keep track of the data. A table is a physical entity that's stored in a different location than other tables. The same as a plain old HTTP request using curl: The configuration for every bucket created over the lifetime of a cluster is part of the whole ring configuration that all nodes in a Riak cluster share.
That will cause the whole cluster to load the keys and return them in one go. Performance also depends on the storage backend chosen. This is because listing all keys generates a really long header with links to all the objects in the bucket.
Now that we got the caveats out of the way. You'll probably want to use the streaming version of listing keys as shown further down. Going through millions of keys is not a feat done in one second. You won't get all keys in one response. Depending on the number of keys in your cluster. With tens of millions of keys. I'll give them the attention they deserve in the next section. There's one thing to be aware of when deleting based on listing keys.
In general. You can also keep a list of keys as a separate Riak object. It takes an EventEmitter object. As bucket and keys are one and the same. If you use it cautiously with streaming key listings. We'll do the simplest thing possible and dump the keys onto the console. When you do list keys. List All Of The Keys coordinating the request will send the keys to the clients as they are sent by all the other nodes.
As they are quite comprehensive. The approach of using key listings to delete data has certainly been used in the past. So you need to do your homework. Redis has been used for this in the past. Riak comes with both. To do that set the parameter keys to the value stream. As you probably realize by now. In riak-js. You may see ghost keys showing up when listing keys immediately after deleting Riak Handbook We already covered how you can get an object out of Riak.
The short version: There is no built-in way of getting the exact number of keys in a bucket. The latter is simple enough. You could use a range large enough. The bottom line is. The downside of this approach is that it may not catch all keys.
Atomically incrementing an integer value is a feat that's not easy to achieve in a distributed system as it requires coordination. The list of keys is always an indication. Querying Data Now that we got the basics out of the way. You can stream the keys in a bucket to the client and keep counting the result. If you need statistics on the number of objects.
You could even use built-in mechanisms. The longer version involves either building indices. Querying Data objects. Or you keep track of the number of objects through some external means. You could feed them into a monitoring tool like Graphite or Munin. If ad-hoc numbers are what you need. Otherwise you'll pay with decreased performance numbers.
Retrofitting solutions gets harder and harder the more data you store in Riak. That won't help you right now. You can change the search term to anything you want. If you can't wait. Because Justin Bieber is so wildly popular. There are some caveats involved. The script will also store replies as proper links. There are some inherent problems involved when wanting to run a query across the entire data set stored in a Riak cluster. Speaking of Riak's MapReduce as a means to query data is actually a bit of a lie.
A word of warning up-front: MapReduce simply by using its key. It's somewhat of the dilemma of using a key-value store. MapReduce Assuming you have a whole bunch of tweets in your local Riak. I whipped up a script to use Twitter's streaming API to fetch all the tweets mentioning him.
The problem with that approach is that you have to know the key. Leave it running for an hour or so. You'll still have at least a hundred tweets as a result. Should you run into an error running the examples below. Let's start by running a simple map function. If a map phase is supposed to be chained with a subsequent map phase. Let's build a map function first. The first part is usually specifying an input.
I don't have the slightest doubt that you'll see a result here. Using Reduce to Count Tweets if doc. So to aggregate the data in that list. Using Reduce to Count Tweets What if we want to count the tweets using the output we got from the map function above?
This may or may not happen. Riak's MapReduce doesn't feed all results into the reduce functions immediately. Re-reducing for Great Good The result is weird. We can fix this no problem.
All we really need to do to make it work is make it aware that values can be either objects or numbers. Say the list of tweets returned by the map function is split into chunks of Re-reducing for Great Good It's not unlikely that a map function will return a pretty large number of results. When you rerun the query. For efficiency reasons. But in general your reduce function should be prepared to receive two different inputs.
I'm sure you'll now have a reasonable number of "love" tweets as a result. I'm sure you'll agree that the number is particularly crazy in comparison to the total number of tweets.
This can be the cause of great confusion. Each chunk is fed into the reduce function as an array. When it's a number. There's a reason for this. Why don't we do that right away? Let's determine the particular hour of the day from a tweet and then group the results on that hour. You could reduce a list of results from a previous reduce function even further.
The resulting method is something that can easily be reused. Counting all Tweets Counting all Tweets We'll build a map function that returns 1 for every document. The map function is. Since the return value from our map function is already a number. The map function parses the date and time. It'll be a tough call. Now let's run a map reduce using the map function above and this reduce function.
I'm really not. Chaining Reduce Phases common pattern in aggregation. The reduce phase iterates over all values and all attributes in the value. If you let it run for more than an hour. In case you're wondering by now if I'm a big fan of his. So let's add something that will extract the top five busiest Bieber hours in the day.
Both map and reduce functions can accept arguments. The result is then sliced to get the top five elements from the list. You could argue that the transformation into an array of arrays should probably be done in a map function. You can specify additional arguments for every phase separately. Note that so far we're pretty oblivious whether the input is coming from a map or a reduce function.
Now let's chain us some reduce functions for great good. You should see a nice list of sorted hours and the number of tweets as a result. Parameterizing MapReduce Queries Let's say we want to be able to fetch the top three. We can simply modify it to accept a second argument which we can then specify when making the initial MapReduce request.
No problem. First it transforms the objects returned by the previous phase into an array of arrays. Notice the second parameter we added to the second reduce phase. Let's run it real quick to verify that it actually works. A map function that's part of a chain can't just return any data. To understand why this is the case. The timeout can be specified on a per-request basis too. If your inputs reference two objects on two different nodes. The simple rule of how a MapReduce request is spread throughout the cluster is this: The coordinating node has a timeout kicking in after a configurable amount of time.
Chaining them requires a bit more attention. But first things first. The coordinating node collects all the results from the nodes that ran the map phases.
The node accepting the request is called the coordinating node. Depending on the amount of data you're sifting through with a MapReduce request and the load on the cluster. And that was pretty much all there is to it.
And just like that. We can almost reuse the original function without any changes. MapReduce in a Riak Cluster If the coordinating node detects that a map phase is followed by another. How could we put this to good use with our collection of tweets? As you can see. When it's true the function returns the object's value.
We can use the arguments passed to each function for just that. In a key-value store. The user is identified by his login name.
Given this preference. This turns out to be a very similar to link walking. Efficiency of Buckets as Inputs The results of this will once again depend on the number of tweets you have in your Riak bucket. You can use it then to generate a new key to fetch more details about the email address.
Efficiency of Buckets as Inputs There's a caveat when running a MapReduce request with just a bucket as input. Say you have a user object with an email address as an attribute. You hand in that object to a map phase. Gathering all the data from the whole cluster is a very expensive operation.
It's a recommended practice to specify a restricted set of inputs on a production system instead of using a bucket. It requires for every node to walk its entire key space and load all data in the bucket from disk.
When would you use this kind of MapReduce magic? The above example shows you one use case. If you don't get anything. Say we want to fetch the above tweets. We run some sort of analysis or query on a set of data identified by keys or ranges of keys. Key filters are specified together with a bucket to restrict the initial set of keys that needs to be sifted through.
Be aware that this won't include the usual metadata you'll get when fetching a single object. The preprocessing is optional. Let's build another filter. Key Filters Enter key filters. We could use the keys' names as a way to restrict the input to the map phase properly. There are one or more preprocessing filters followed by a list of one or more matching filters.
Key filters can be used to fetch only a certain range of keys that match a regular expression or fall into a certain range of integers or strings. Back on our original track. The keys can be transformed by splitting them up based on a token. The Riak. Let's say we want to fetch all tweets whose keys start with Key Filters riak. Say we want do the same based on a numeric range. Just like the top level list of key filters. You can combine any number of transformation filters.
To combine a number of matching filters. It's an oddity with the Twitter API that came up when they changed their way of generating tweet identifiers. Mostly it's a matter of speed when resorting to using the built-in functions.
We already did the latter by using Riak. Riak's Configuration Files Intermission: Riak's Configuration Files Before we go any further. It contains the configuration for Riak and all its components. IP addresses to bind to. The first file of interest is app. It contains flags that are handed over to the Erlang process when Riak is started.
The other relevant file is vm. Here's a small snippet of the Riak Core section. They'll be important in the following section. Riak has two relevant configuration files. You'll see this error instead of Riak returning a result. They'll come up in irregular intervals. I removed several bits of Erlang for brevity. All nodes in the same cluster should use the same cookie or fhey will not be able to communicate..
Then you tell Riak where to find them and restart it. Ideally you'd put them in version control and check them out into a directory on every server running Riak. To reduce the likelihood of the error popping up. The bug still exists in the current version of Riak. Restart Riak using the riak restart command. The default is shown below. Note that this will also increase the amount of memory required to run Riak.
Even if you're not going to write your own Erlang code to run as a map or reduce phase. I've got nothing against Erlang myself. The problem or rather. If you change the line to something like shown below.
Hit return when you see the first output message. The Riak code itself is well readable and nicely documented too. Using Erlang for MapReduce riak. That's a pretty neat tool right there. That said. You can bring it up with the command riak attach. You can hit Ctrl-D anytime to quit the console.
Let's stick with a simple map function for now. The first line defines an anonymous function. It's immediately decoded using mochijson2. This fetches a local client. You can type the following lines into the console. Using that. An Erlang map function takes three arguments. The resulting output is the Erlang object as it is stored in Riak. First of all. We're ditching the key data and phase arguments. Don't forget to end every statement with a period. The second line extracts the value from the object stored in Riak.
Here's the full code of the function.
Here's how you fetch an object. I'll omit the console sugar to focus on the Erlang code. Obj ] end. Instead of qfun the code specifies modfun. We're using pattern matching to get rid of the struct term. Using the qfun term. Erlang map functions are expected to return lists too. You can take this a lot further. The third line extracts a value from the leftover property list a list of key- values.
You can also run built-in Erlang map and reduce functions this way. Nice touch to store intermediate results back into Riak. Here's an example for a full MapReduce with two built-in functions. The second line specifies a map phase. Phases are also specified in a list of tuples.
The first line specifies the input for the MapReduce request. That's it. No magic. If you're working with Riak in production. The resulting output should include a list of just one tweet body. It comes in handy every now and then.
Here's how to run our function on a single tweet. To run it. On Full-Bucket MapReduce and Key-Filters Performance Before we move on to investigate more options to query data in Riak, a word on the general performance implications of using MapReduce and key filters on the whole data set. The simple version is that running a MapReduce query on all objects in a bucket requires Riak to go through all the keys stored in a cluster. See the section on the anatomy of a Riak bucket for a deeper explanation of why that is.
The same is true for key filters. Both actually work very much alike. For a full bucket MapReduce query Riak needs to go through its entire set of keys to find the ones belonging to that particular bucket, tweets in our example.
For key filters, Riak also goes through the entire set, matching not only the bucket but also the conditions you specified to the key name. This process works reasonably well when you only have a small-ish number of keys, up to