Index: src/main/java/net/sf/katta/client/Client.java
===================================================================
--- src/main/java/net/sf/katta/client/Client.java	(revision 478)
+++ src/main/java/net/sf/katta/client/Client.java	(working copy)
@@ -38,6 +38,7 @@
 import net.sf.katta.node.IQuery;
 import net.sf.katta.node.ISearch;
 import net.sf.katta.node.QueryWritable;
+import net.sf.katta.node.SortWritable;
 import net.sf.katta.util.CollectionUtil;
 import net.sf.katta.util.KattaException;
 import net.sf.katta.util.ZkConfiguration;
@@ -54,6 +55,7 @@
 import org.apache.lucene.queryParser.ParseException;
 import org.apache.lucene.queryParser.QueryParser;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Sort;
 
 /**
  * Default implementation of {@link IClient}.
@@ -231,6 +233,29 @@
     return result;
   }
 
+  public Hits search(final Query query, final String[] indexNames, 
+      final Sort sort, final int count) throws KattaException {
+    final Map<String, List<String>> nodeShardsMap = getNode2ShardsMap(indexNames);
+    final Hits result = new Hits();
+
+    List<NodeInteraction> nodeInteractions = new ArrayList<NodeInteraction>();
+    for (final String node : nodeShardsMap.keySet()) {
+      nodeInteractions.add(new SortSearchInteraction(node, nodeShardsMap, query, sort, result, count));
+    }
+    execute(nodeInteractions);
+
+    long start = 0;
+    if (LOG.isDebugEnabled()) {
+      start = System.currentTimeMillis();
+    }
+    result.sort(count);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Time for sorting: " + (System.currentTimeMillis() - start) + " ms");
+    }
+    _queryCount++;
+    return result;
+  }
+
   @Deprecated
   public int count(final IQuery query, final String[] indexNames) throws KattaException {
     try {
@@ -531,6 +556,35 @@
     }
   }
 
+  private class SortSearchInteraction extends NodeInteraction {
+
+    private final Query _query;
+    private final int _count;
+    private final Sort _sort;
+    private final Hits _result;
+
+    public SortSearchInteraction(String node, 
+        Map<String, List<String>> node2ShardsMap, Query query, Sort sort,
+        Hits result, int count) {
+      super(node, node2ShardsMap);
+      _query = query;
+      _sort = sort;
+      _result = result;
+      _count = count;
+    }
+
+    @Override
+    protected void doInteraction(ISearch search, String node, List<String> shards) throws IOException {
+      Hits hits;
+      final String[] shardsArray = shards.toArray(new String[shards.size()]);
+      final HitsMapWritable shardToHits = search.search(
+          new QueryWritable(_query), new SortWritable(_sort), shardsArray, _count);
+      hits = shardToHits.getHits();
+      _result.addHits(hits.getHits());
+      _result.addTotalHits(hits.size());
+    }
+  }
+  
   /**
    * This class encapsulates an interaction with one or multiple node's in order
    * to query information for a set of shards.
Index: src/main/java/net/sf/katta/node/ISearch.java
===================================================================
--- src/main/java/net/sf/katta/node/ISearch.java	(revision 478)
+++ src/main/java/net/sf/katta/node/ISearch.java	(working copy)
@@ -20,6 +20,7 @@
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.lucene.queryParser.ParseException;
+import org.apache.lucene.search.Sort;
 
 public interface ISearch extends VersionedProtocol {
 
@@ -52,6 +53,19 @@
       throws IOException;
 
   /**
+   * @param query
+   * @param sort
+   * @param shardNames
+   * @param count
+   *          the top n hits according to sort
+   * @return
+   * @throws ParseException
+   * @throws IOException
+   */
+  public HitsMapWritable search(QueryWritable query, SortWritable sort, String[] shardNames, int count)
+      throws IOException;
+
+  /**
    * Returns the number of documents a term occurs in. In a distributed search
    * environment, we need to get this first and then query all nodes again with
    * this information to ensure we compute TF IDF correctly. See {@link http
Index: src/main/java/net/sf/katta/node/KattaMultiSearcher.java
===================================================================
--- src/main/java/net/sf/katta/node/KattaMultiSearcher.java	(revision 478)
+++ src/main/java/net/sf/katta/node/KattaMultiSearcher.java	(working copy)
@@ -17,6 +17,8 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -179,6 +181,80 @@
   }
 
   /**
+   * Search in the given shards and return max hits for given query
+   * 
+   * @param query
+   * @param freqs
+   * @param shards
+   * @param result
+   * @param max
+   * @throws IOException
+   */
+  public final void search(final Query query, 
+      final DocumentFrequenceWritable freqs, final Sort sort,
+      final String[] shards, final HitsMapWritable result, final int max) 
+  throws IOException {
+    final Query rewrittenQuery = rewrite(query, shards);
+    final int numDocs = freqs.getNumDocs();
+    final int limit = Math.min(numDocs, max);
+    
+    int totalHits = 0;
+    final int shardsCount = shards.length;
+    
+    // Order shards in descending order if Sort.reverse = true
+    if(sort.getSort()[0].getReverse()) {
+      Arrays.sort(shards, new Comparator<String>() {
+        public int compare(String o1, String o2) {return o2.compareTo(o1);}
+      });
+    } else {
+      Arrays.sort(shards);
+    }
+    
+    // run the search parallel on the shards with a thread pool
+    List<Future<SearchResult>> tasks = new ArrayList<Future<SearchResult>>();
+    for (int i = 0; i < shardsCount; i++) {
+      SortedSearchCall call = new SortedSearchCall(shards[i], rewrittenQuery, 
+          sort, limit);
+      Future<SearchResult> future = _threadPool.submit(call);
+      tasks.add(future);
+    }
+
+    final List<ShardDoc> sortedDocs = new ArrayList<ShardDoc>(limit);
+    for (int i = 0; i < shardsCount; i++) {
+      SearchResult searchResult;
+      try {
+        searchResult = tasks.get(i).get();
+        for(ScoreDoc doc : searchResult._scoreDocs) {
+          totalHits++;
+          sortedDocs.add(new ShardDoc(doc,shards[i]));
+          if(totalHits == limit) break;
+        }
+        if(totalHits == limit) break;
+      } catch (InterruptedException e) {
+        throw new IOException("Multithread shard search interrupred:", e);
+      } catch (ExecutionException e) {
+        throw new IOException("Multithread shard search could not be executed:", e);
+      }
+    }
+   
+    result.addTotalHits(totalHits);
+    
+    for(ShardDoc shardDoc : sortedDocs) {
+      result.addHitToShard(shardDoc.shard, 
+          new Hit(shardDoc.shard, _node, shardDoc.doc.score, shardDoc.doc.doc));
+    }
+  }
+  
+  private class ShardDoc {
+    ScoreDoc doc;
+    String shard;
+    ShardDoc(ScoreDoc doc, String shard) {
+      this.doc = doc;
+      this.shard = shard;
+    }
+  }
+
+  /**
    * Returns the number of documents a shard has.
    * 
    * @param shardName
@@ -276,6 +352,31 @@
     }
 
   }
+  
+  private class SortedSearchCall implements Callable<SearchResult> {
+
+    private final String _shardName;
+    private final Query _query;
+    private final Sort _sort;
+    private final int _limit;
+
+    public SortedSearchCall(String shardName, Query query, Sort sort, 
+        int limit) {
+      _shardName = shardName;
+      _query = query;
+      _sort = sort;
+      _limit = limit;
+    }
+
+    @Override
+    public SearchResult call() throws Exception {
+      final IndexSearcher indexSearcher = _searchers.get(_shardName);
+      final TopDocs docs = indexSearcher.search(_query, null, _limit, _sort);
+      // totalHits += docs.totalHits; // update totalHits
+      return new SearchResult(docs.totalHits, docs.scoreDocs);
+    }
+
+  }
 
   private class SearchResult {
 
Index: src/main/java/net/sf/katta/node/Node.java
===================================================================
--- src/main/java/net/sf/katta/node/Node.java	(revision 478)
+++ src/main/java/net/sf/katta/node/Node.java	(working copy)
@@ -64,6 +64,7 @@
 import org.apache.lucene.queryParser.QueryParser;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Sort;
 
 public class Node implements ISearch, IZkReconnectListener {
 
@@ -454,6 +455,46 @@
     return result;
   }
 
+  @Override
+  public HitsMapWritable search(QueryWritable query, SortWritable sort,
+      String[] shards, int count) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("You are searching with the query: '" + query.getQuery() + "'");
+    }
+
+    Query luceneQuery = query.getQuery();
+    Sort luceneSort = sort.getSort();
+   
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Lucene query: " + luceneQuery.toString());
+    }
+
+    long completeSearchTime = 0;
+    final DocumentFrequenceWritable docFreqs = getDocFreqs(query, shards);
+    final HitsMapWritable result = new net.sf.katta.node.HitsMapWritable(_nodeName);
+    if (_searcher != null) {
+      long start = 0;
+      if (LOG.isDebugEnabled()) {
+        start = System.currentTimeMillis();
+      }
+      _searcher.search(luceneQuery, docFreqs, luceneSort, shards, result, count);
+      if (LOG.isDebugEnabled()) {
+        final long end = System.currentTimeMillis();
+        LOG.debug("Search took " + (end - start) / 1000.0 + "sec.");
+        completeSearchTime += (end - start);
+      }
+    } else {
+      LOG.error("No searcher for index found on '" + _nodeName + "'.");
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Complete search took " + completeSearchTime / 1000.0 + "sec.");
+      final DataOutputBuffer buffer = new DataOutputBuffer();
+      result.write(buffer);
+      LOG.debug("Result size to transfer: " + buffer.getLength());
+    }
+    return result;
+  }
+  
   public long getProtocolVersion(final String protocol, final long clientVersion) throws IOException {
     return _protocolVersion;
   }
Index: src/main/java/net/sf/katta/node/SortWritable.java
===================================================================
--- src/main/java/net/sf/katta/node/SortWritable.java	(revision 0)
+++ src/main/java/net/sf/katta/node/SortWritable.java	(revision 0)
@@ -0,0 +1,75 @@
+/**
+ * Copyright 2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package net.sf.katta.node;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.lucene.search.Sort;
+
+public class SortWritable implements Writable {
+
+  private Sort _sort;
+
+  public SortWritable() {
+  }
+
+  public SortWritable(Sort sort) {
+    _sort = sort;
+
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    int readInt = input.readInt();
+    byte[] bs = new byte[readInt];
+    input.readFully(bs);
+    ObjectInputStream objectStream = new ObjectInputStream(new ByteArrayInputStream(bs));
+    try {
+      _sort = (Sort) objectStream.readObject();
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Unable to deseriaize lucene query", e);
+    }
+
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
+    ObjectOutputStream objectStream = new ObjectOutputStream(byteArrayStream);
+    objectStream.writeObject(_sort);
+    objectStream.close();
+    byte[] byteArray = byteArrayStream.toByteArray();
+    output.writeInt(byteArray.length);
+    output.write(byteArray);
+  }
+
+  public Sort getSort() {
+    return _sort;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    Sort other = ((SortWritable) obj).getSort();
+    return _sort.equals(other);
+  }
+}

