HOW TO CREATE A CUSTOM SERVER USING KATTA ========================================= Introduction ------------ This guide explains how to use Katta to write a new kind of client/server on top of Katta. Katta will manage loading of shards, pools of nodes, maintaining replication levels etc... The domain specific work is done by custom client and server classes. Katta ships with Lucene and MapFile clients and servers. If these do not meet your needs then you must to write your own client and server. This guide describes one sequence of steps to achieve this, but you do not have to do it in this order. The Problem ----------- For this example our problem is grep-ing a large file for regular expressions. We started off just using grep, but the file size has grown to the point where latency is too large. So we need to cut the file up into pieces and distribute the work across a pool of nodes. Katta to the rescue! Specifically, our data set is an ordered list of newline terminated strings. Our search key is a regular expression, and the result is a list of strings that match the regular expression, in the order they occur in the original list. Katta Infrastructure -------------------- A Katta index is a directory containing nothing but "shards" (directories). The name is chosed at adIndex time (below). We will use "grep" as our index name. The name of the index is used as part of the shard names. You can have multiple shards per node, and this allows for a more even distribution as the number of nodes changes. We decide on 10 nodes and 100 shards. So we create a directory "grep_v1", and subdirectories "shard00" .."shard99". The final shard names will then be "grep_shard00" .. "grep_shard99". We will be sorting based on shard name in the client, so we zero-pad the numbers. The contents of the shard directories are totally up to you. Katta does not care, it just copies the directories to the server nodes and notifies them where the directories are. So we cut the input file into 100 equal sized pieces and place them each shard directory as the file "data.txt". See the Katta documentation for deployment options to make the index available to the nodes. It is common to upload to a Hadoop file system, but this is not required. Also see Katta documentation for setting up Zookeeper. At this point we will assume we have in place one or more Zookeeper nodes, 10 server machines, one or more client machines, and one machine to add the index (you can do this on any of the other machines, or use a separate machine). Now we need the custom software. Custom Server ------------- We start by defining an interface for our server. These methods will be called by our custom client classes, not external users, so no one but us will see them. Hadoop RPC is used, so all the data must be Writable. We only need 1 method, which takes a String as input. But to be Writable, we use Text. In order to enable sorting in the client, we need to return the shard names, and the list of strings for each shard. So we return a map of Text to list of Text: interface IGrepServer extends VersionedProtocol { public Map> grep(Text regex, String[] shards) throws Exception; } The VersionedProtocol is required because in the client Hadoop RPC returns proxy objects of that type. Due to replication and retries, the list of shards may be a subset of the shards on each node. In the simple no replication case, each node will get exactly 1 call with its full set of shards listed. No retries will occur. In the client, we will use this interface to look up the grep() method. All of our RPC calls from the client to the server nodes are listed in this interface. Now we are ready to write our actual server class. This class must implement our IGrepServer interface and also the INodeManaged interface. INodeManaged lists all the calls our server will get from the Katta Node class (start using shard, stop using shard, shutdown). To keep things simple we extend AbstractServer, which implements this interface and maintains the list of shards (directories) for us. It also takes care of VersionedProtocol (always returns version 0). public class GrepServer extends AbstractServer implements IGrepServer { public Map> grep(Text regex, String[] shards) throws Exception { Map> result = new HashMap>(); for (String shard : shards) { result.put(shard, grepShard(regex.toString(), shard)); } return result; } private List grepShard(String regex, File shard) { File dataFile = new File(shard, "data.txt"); // Grep file using Scanner class. } } You could search the N shards in parallel if you wish. Custom Client ------------- To outside users, we want to hide the fact that we use Katta. So we define a client interface. This is not used or required by Katta but is good practice. public interface IGrepClient { public List grep(String regex) throws Exception; } The actual client class creates an instance of Client, uses it's broadcast() method to get a result set, then throws an exception if any occurred. If not, it appends all the per-shard lists in alphabetical shard order. This guarantees that the final list is in the right order. You could pass line numbers explicitly in your data, but we wanted to keep it simple for this example. public class GrepClient { private Client client = new Client(IGrepClient.class); public List grep(String regex) throws Exception { // Call Katta. Method method = IGrepClient.class.getMethod("grep", String.class, String[].class); ClientResult> result = client.broadcastToAll(3000, true, method, 1, new Text(regex), null); // If there were any errors, pick one and throw it. if (result.isError()) { throw result.getException(); } // Combine into 1 big map of shard --> strings, sorted by shard name. Map> combined = new TreeMap>; for (Map> result : results.getResults()) { combined.putAll(result); } // Build results. List strings = new ArrayList(); for (List texts : combined.valueSet()) { for (Text text : texts) { strings.add(texts.toString()) } } return strings; } } Things To Notice ---------------- The only server class used by the client is IGrepClient, which contains only the RPC methods used by the client. Normally you would cache the method lookup. The second argument to grep() is the array of shard names. The last 2 arguments to broadcastToAll() are the args to pass into the method. The second argument will be overwritten by Katta, so we pass in null. The fourth argument to broadcastToAll() tells Katta which argument (if any) to overwrite (1 is second argument). If this is < 0 no overwriting is done. The first two arguments to broadcastToAll say that we are willing to wait up to 3 seconds for complete results (data or error for all shards - due to retires this might not be all nodes), and then terminate the call. The ClientResult object keeps track of results on a per-set-of-shards basis. Because we need to know the exact shard for each string (for sorting), we pass that inside our result. Starting The Nodes ------------------ Make your classes are available on the classpath. One way to to drop your jar in Katt's lib dir. Then on all 10 server nodes tell katta to start a node using your server class: bin/katta startNode com.foo.GrepServer Deploy The Index ---------------- bin/katta addIndex grep Add an index with the name "grep". In this example the location would end in "/grep_v1", but the deployed index name is "grep". The replication level should be 1 for no replication. Using The Client ---------------- Now you are ready to use your client: System.out.println("Katta references: " + new GrepClient().grep("[Kk]atta"); Conclusion ---------- I hope this has made it clear what you need to do to create a custom client/server using Katta, and how easy it is. Please see the Katta documentation and discussion groups for more information. http://katta.sourceforge.net/