From badb1b2ecb36295f20181decb646a8d048a5bd8b Mon Sep 17 00:00:00 2001
From: Johannes Zillmann <jz@leo.(none)>
Date: Mon, 9 Nov 2009 22:32:54 +0100
Subject: [PATCH 1/2] initial version of lucene field sort capability

---
 .../java/net/sf/katta/client/ILuceneClient.java    |   27 ++-
 .../java/net/sf/katta/client/LuceneClient.java     |   33 +++-
 .../katta/client/lucene/FieldSortComparator.java   |  113 ++++++++++
 src/main/java/net/sf/katta/node/Hit.java           |   47 ++++-
 src/main/java/net/sf/katta/node/Hits.java          |   17 ++
 .../java/net/sf/katta/node/HitsMapWritable.java    |   53 +++++-
 src/main/java/net/sf/katta/node/ILuceneServer.java |   14 ++
 src/main/java/net/sf/katta/node/LuceneServer.java  |  140 +++++++++---
 src/main/java/net/sf/katta/node/SortWritable.java  |   76 +++++++
 src/main/java/net/sf/katta/util/WritableType.java  |  108 +++++++++
 .../java/net/sf/katta/LuceneComplianceTest.java    |  234 ++++++++++++++++++++
 .../java/net/sf/katta/client/LuceneClientTest.java |   64 +++++-
 .../client/lucene/FieldSortComparatorTest.java     |   97 ++++++++
 .../net/sf/katta/testutil/ExtendedTestCase.java    |   23 ++
 .../java/net/sf/katta/util/WritableTypeTest.java   |  100 +++++++++
 15 files changed, 1089 insertions(+), 57 deletions(-)
 create mode 100644 src/main/java/net/sf/katta/client/lucene/FieldSortComparator.java
 create mode 100644 src/main/java/net/sf/katta/node/SortWritable.java
 create mode 100644 src/main/java/net/sf/katta/util/WritableType.java
 create mode 100644 src/test/java/net/sf/katta/LuceneComplianceTest.java
 create mode 100644 src/test/java/net/sf/katta/client/lucene/FieldSortComparatorTest.java
 create mode 100644 src/test/java/net/sf/katta/util/WritableTypeTest.java

diff --git a/src/main/java/net/sf/katta/client/ILuceneClient.java b/src/main/java/net/sf/katta/client/ILuceneClient.java
index b91991b..17d0055 100644
--- a/src/main/java/net/sf/katta/client/ILuceneClient.java
+++ b/src/main/java/net/sf/katta/client/ILuceneClient.java
@@ -24,15 +24,16 @@ import net.sf.katta.util.KattaException;
 
 import org.apache.hadoop.io.MapWritable;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Sort;
 
 /**
  * Client for searching document indices deployed on a katta cluster.
  * <p>
  * 
- * You provide a {@link IQuery} and the name of the deployed indices, and get
+ * You provide a {@link Query} and the name of the deployed indices, and get
  * back {@link Hits} which contains multiple {@link Hit} objects as the results.
  * <br>
- * See {@link #search(IQuery, String[], int)}.
+ * See {@link #search(Query, String[], int)}.
  * <p>
  * 
  * The details of a hit-document can be retrieved through the
@@ -40,7 +41,8 @@ import org.apache.lucene.search.Query;
  * 
  * @see Hit
  * @see Hits
- * @see IQuery
+ * @see Query
+ * @see Sort
  */
 public interface ILuceneClient {
 
@@ -80,7 +82,24 @@ public interface ILuceneClient {
 
   @Deprecated
   public Hits search(IQuery query, String[] indexNames, int count) throws KattaException;
-
+  
+  /**
+   * Searches with a given query in the supplied indexes for a limited amount of
+   * results and sorts the results based upon the sort parameter.
+   * 
+   * @param query
+   *          The query to search with.
+   * @param indexNames
+   *          A list of index names to search in.
+   * @param count
+   *          The count of results that should be returned.
+   * @param sort
+   *          Sort criteria for returned hits
+   * @return A object that capsulates all results.
+   * @throws KattaException
+   */
+  public Hits search(Query query, String[] indexNames, int count, Sort sort) throws KattaException;
+  
   /**
    * Gets all the details to a hit.
    * 
diff --git a/src/main/java/net/sf/katta/client/LuceneClient.java b/src/main/java/net/sf/katta/client/LuceneClient.java
index 6deddd9..cd75e93 100644
--- a/src/main/java/net/sf/katta/client/LuceneClient.java
+++ b/src/main/java/net/sf/katta/client/LuceneClient.java
@@ -32,6 +32,7 @@ import net.sf.katta.node.HitsMapWritable;
 import net.sf.katta.node.ILuceneServer;
 import net.sf.katta.node.IQuery;
 import net.sf.katta.node.QueryWritable;
+import net.sf.katta.node.SortWritable;
 import net.sf.katta.util.KattaException;
 import net.sf.katta.util.ZkConfiguration;
 
@@ -41,6 +42,7 @@ import org.apache.lucene.analysis.KeywordAnalyzer;
 import org.apache.lucene.queryParser.ParseException;
 import org.apache.lucene.queryParser.QueryParser;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Sort;
 
 /**
  * Default implementation of {@link ILuceneClient}.
@@ -107,6 +109,7 @@ public class LuceneClient implements ILuceneClient {
   }
 
   private static final Method SEARCH_METHOD;
+  private static final Method SORTED_SEARCH_METHOD;
   private static final int SEARCH_METHOD_SHARD_ARG_IDX = 2;
   static {
     try {
@@ -115,12 +118,30 @@ public class LuceneClient implements ILuceneClient {
     } catch (NoSuchMethodException e) {
       throw new RuntimeException("Could not find method search() in ILuceneSearch!");
     }
+    try {
+      SORTED_SEARCH_METHOD = ILuceneServer.class.getMethod("search", new Class[] { QueryWritable.class,
+              DocumentFrequencyWritable.class, String[].class, Integer.TYPE , SortWritable.class});
+    } catch (NoSuchMethodException e) {
+      throw new RuntimeException("Could not find method search() in ILuceneSearch!");
+    }
   }
 
   public Hits search(final Query query, final String[] indexNames, final int count) throws KattaException {
+    return search(query, indexNames, count, null);
+  }
+
+  public Hits search(final Query query, final String[] indexNames, final int count, final Sort sort)
+      throws KattaException {
     final DocumentFrequencyWritable docFreqs = getDocFrequencies(query, indexNames);
-    ClientResult<HitsMapWritable> results = kattaClient.broadcastToIndices(TIMEOUT, true, SEARCH_METHOD,
-            SEARCH_METHOD_SHARD_ARG_IDX, indexNames, new QueryWritable(query), docFreqs, null, Integer.valueOf(count));
+    ClientResult<HitsMapWritable> results;
+    if (sort == null) {
+      results = kattaClient.broadcastToIndices(TIMEOUT, true, SEARCH_METHOD, SEARCH_METHOD_SHARD_ARG_IDX, indexNames,
+          new QueryWritable(query), docFreqs, null, Integer.valueOf(count));
+    } else {
+      results = kattaClient.broadcastToIndices(TIMEOUT, true, SORTED_SEARCH_METHOD, SEARCH_METHOD_SHARD_ARG_IDX,
+          indexNames, new QueryWritable(query), docFreqs, null, Integer.valueOf(count), new SortWritable(sort));
+
+    }
     if (results.isError()) {
       throw results.getKattaException();
     }
@@ -134,13 +155,17 @@ public class LuceneClient implements ILuceneClient {
     if (LOG.isDebugEnabled()) {
       start = System.currentTimeMillis();
     }
-    result.sort(count);
+    if (sort == null) {
+      result.sort(count);
+    } else {
+      result.fieldSort(sort, results.getResults().iterator().next().getSortFieldTypes(), count);
+    }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Time for sorting: " + (System.currentTimeMillis() - start) + " ms");
     }
     return result;
   }
-
+  
   // public int getResultCount(QueryWritable query, String[] shards) throws
   // IOException;
 
diff --git a/src/main/java/net/sf/katta/client/lucene/FieldSortComparator.java b/src/main/java/net/sf/katta/client/lucene/FieldSortComparator.java
new file mode 100644
index 0000000..ad3265e
--- /dev/null
+++ b/src/main/java/net/sf/katta/client/lucene/FieldSortComparator.java
@@ -0,0 +1,113 @@
+/**
+ * Copyright 2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package net.sf.katta.client.lucene;
+
+import java.util.Comparator;
+
+import net.sf.katta.node.Hit;
+import net.sf.katta.util.WritableType;
+
+import org.apache.lucene.search.FieldSortedHitQueue;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
+
+/**
+ * Implementation of an {@link Comparator} that compares two {@link Hit} objects
+ * based on a given {@link Sort} specification. This comparator helps sorting a
+ * result list by field terms rather then by sore.
+ * 
+ * This code leans on the lucene code from {@link FieldSortedHitQueue}
+ * 
+ */
+public class FieldSortComparator implements Comparator<Hit> {
+
+  private static Comparator<Comparable> COMPARABLE_COMPARATOR = new ComparableComparator();
+  private static Comparator<Comparable> REVERSED_COMPARABLE_COMPARATOR = new ComparableComparator(true);
+
+  private final SortField[] _sortFields;
+  private final WritableType[] _fieldTypes;
+  private final Comparator<Comparable>[] _fieldComparators;
+
+  public FieldSortComparator(SortField[] sortFields, WritableType[] fieldTypes) {
+    _sortFields = sortFields;
+    _fieldTypes = fieldTypes;
+    _fieldComparators = new Comparator[sortFields.length];
+
+    // prepare a array of comparators, for each field one. For type-information
+    // we use the user provided SortField[] and the WritableType[] which are
+    // auto-detected on the node side.
+    for (int i = 0; i < sortFields.length; i++) {
+      if (_fieldTypes[i] == WritableType.TEXT && sortFields[i].getLocale() != null) {
+        throw new UnsupportedOperationException("locale-sensitive comparison not supported yet");
+        // jz: therefore we could use java.text.Collator class (see lucenes
+        // FieldSortedHitQueue)
+      }
+      if (_sortFields[i].getType() == SortField.SCORE) {
+        _fieldComparators[i] = REVERSED_COMPARABLE_COMPARATOR;
+      } else {
+        _fieldComparators[i] = COMPARABLE_COMPARATOR;
+      }
+    }
+  }
+
+  public SortField[] getSortFields() {
+    return _sortFields;
+  }
+
+  public WritableType[] getFieldTypes() {
+    return _fieldTypes;
+  }
+
+  @Override
+  public int compare(Hit hit1, Hit hit2) {
+    return compare(hit1.getSortFields(), hit2.getSortFields());
+  }
+
+  public int compare(Comparable[] fields1, Comparable[] fields2) {
+    int n = _sortFields.length;
+    int c = 0;
+    for (int i = 0; i < n && c == 0; ++i) {
+      Comparable fieldTerm1 = fields1[i];
+      Comparable fieldTerm2 = fields2[i];
+      c = (_sortFields[i].getReverse()) ? _fieldComparators[i].compare(fieldTerm2, fieldTerm1) : _fieldComparators[i]
+          .compare(fieldTerm1, fieldTerm2);
+    }
+    return c;
+  }
+
+  static class ComparableComparator implements Comparator<Comparable> {
+
+    private final boolean _reverse;
+
+    public ComparableComparator() {
+      this(false);
+    }
+
+    public ComparableComparator(boolean reverse) {
+      _reverse = reverse;
+    }
+
+    @Override
+    public int compare(Comparable o1, Comparable o2) {
+      if (_reverse) {
+        return o2.compareTo(o1);
+      }
+      return o1.compareTo(o2);
+    }
+
+  }
+
+}
diff --git a/src/main/java/net/sf/katta/node/Hit.java b/src/main/java/net/sf/katta/node/Hit.java
index b1b44d3..39ce0f2 100644
--- a/src/main/java/net/sf/katta/node/Hit.java
+++ b/src/main/java/net/sf/katta/node/Hit.java
@@ -19,8 +19,11 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import net.sf.katta.util.WritableType;
+
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Note: this class has a natural ordering that is inconsistent with equals.
@@ -37,8 +40,23 @@ public class Hit implements Writable, Comparable<Hit> {
   private float _score;
 
   private int _docId;
+  
+  private WritableComparable[] _sortFields;
+
+  private WritableType[] _sortFieldTypes;
 
+  public Hit() {
+    // needed for serialization
+  }
+  
   public Hit(final String shard, final String node, final float score, final int id) {
+    this(shard, node, score, id, null);
+  }
+
+  /**
+   * Construct a hit object with information about the types of the sort fields.
+   */
+  public Hit(final String shard, final String node, final float score, final int id, WritableType[] sortFieldTypes) {
     _shard = new Text(shard);
     if (node != null) {
       _node = new Text(node);
@@ -47,10 +65,7 @@ public class Hit implements Writable, Comparable<Hit> {
     }
     _score = score;
     _docId = id;
-  }
-
-  public Hit() {
-    // needed for serialization
+    _sortFieldTypes = sortFieldTypes;
   }
 
   // public Hit(Text shardName, Text serverName, float score, int docId) {
@@ -79,6 +94,14 @@ public class Hit implements Writable, Comparable<Hit> {
   public void setDocId(final int docId) {
     _docId = docId;
   }
+  
+  public void setSortFields(WritableComparable[] sortFields) {
+    _sortFields = sortFields;
+  }
+
+  public WritableComparable[] getSortFields() {
+    return _sortFields;
+  }
 
   public void readFields(final DataInput in) throws IOException {
     _score = in.readFloat();
@@ -92,6 +115,14 @@ public class Hit implements Writable, Comparable<Hit> {
     _shard = new Text();
     _shard.readFields(in);
     _docId = in.readInt();
+    byte sortFieldsLen = in.readByte();
+    if (sortFieldsLen > 0) {
+      _sortFields = new WritableComparable[sortFieldsLen];
+      for (int i = 0; i < sortFieldsLen; i++) {
+        _sortFields[i] = _sortFieldTypes[i].newWritableComparable();
+        _sortFields[i].readFields(in);
+      }
+    }
   }
 
   public void write(final DataOutput out) throws IOException {
@@ -104,6 +135,14 @@ public class Hit implements Writable, Comparable<Hit> {
     }
     _shard.write(out);
     out.writeInt(_docId);
+    if (_sortFields == null) {
+      out.writeByte(0);
+    } else {
+      out.writeByte(_sortFields.length);
+      for (Writable writable : _sortFields) {
+        writable.write(out);
+      }
+    }
   }
 
   public int compareTo(final Hit o) {
diff --git a/src/main/java/net/sf/katta/node/Hits.java b/src/main/java/net/sf/katta/node/Hits.java
index e51d9fb..80c222a 100644
--- a/src/main/java/net/sf/katta/node/Hits.java
+++ b/src/main/java/net/sf/katta/node/Hits.java
@@ -24,9 +24,12 @@ import java.util.List;
 import java.util.Vector;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import net.sf.katta.client.lucene.FieldSortComparator;
 import net.sf.katta.util.MergeSort;
+import net.sf.katta.util.WritableType;
 
 import org.apache.hadoop.io.Writable;
+import org.apache.lucene.search.Sort;
 
 public class Hits implements Writable {
 
@@ -93,6 +96,20 @@ public class Hits implements Writable {
     sortCollection(count);
   }
 
+  public void fieldSort(Sort sort, WritableType[] fieldTypes, int count) {
+    // TODO merge sort does not work due KATTA-93
+    final ArrayList<Hit> list = new ArrayList<Hit>(count);
+    final int size = _hitsList.size();
+    for (int i = 0; i < size; i++) {
+      list.addAll(_hitsList.remove(0));
+    }
+    _hitsList = new ArrayList<List<Hit>>();
+    if (!list.isEmpty()) {
+      Collections.sort(list, new FieldSortComparator(sort.getSort(), fieldTypes));
+    }
+    _sortedList = list.subList(0, Math.min(count, list.size()));
+  }
+
   @SuppressWarnings("unchecked")
   public void sortMerge() {
     final List<Hit>[] array = _hitsList.toArray(new List[_hitsList.size()]);
diff --git a/src/main/java/net/sf/katta/node/HitsMapWritable.java b/src/main/java/net/sf/katta/node/HitsMapWritable.java
index 8798121..d00110f 100644
--- a/src/main/java/net/sf/katta/node/HitsMapWritable.java
+++ b/src/main/java/net/sf/katta/node/HitsMapWritable.java
@@ -24,7 +24,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import net.sf.katta.util.WritableType;
+
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
 public class HitsMapWritable implements Writable {
@@ -34,6 +37,7 @@ public class HitsMapWritable implements Writable {
   private String _serverName;
   private final Map<String, List<Hit>> _hitToShard = new ConcurrentHashMap<String, List<Hit>>();
   private int _totalHits;
+  private WritableType[] _sortFieldTypes;
 
   public HitsMapWritable() {
     // for serialization
@@ -50,6 +54,13 @@ public class HitsMapWritable implements Writable {
     }
     _serverName = in.readUTF();
     _totalHits = in.readInt();
+    byte sortFieldTypesLen = in.readByte();
+    if (sortFieldTypesLen > 0) {
+      _sortFieldTypes = new WritableType[sortFieldTypesLen];
+      for (int i = 0; i < sortFieldTypesLen; i++) {
+        _sortFieldTypes[i] = WritableType.values()[in.readByte()];
+      }
+    }
     if (LOG.isDebugEnabled()) {
       LOG.debug("HitsMap reading start at: " + start + " for server " + _serverName);
     }
@@ -60,8 +71,22 @@ public class HitsMapWritable implements Writable {
       for (int j = 0; j < hitSize; j++) {
         final float score = in.readFloat();
         final int docId = in.readInt();
-        final Hit hit = new Hit(shardName, _serverName, score, docId);
+        final Hit hit;
+        if (sortFieldTypesLen > 0) {
+          hit = new Hit(shardName, _serverName, score, docId, _sortFieldTypes);
+        } else {
+          hit = new Hit(shardName, _serverName, score, docId);
+        }
         addHitToShard(shardName, hit);
+        byte sortFieldsLen = in.readByte();
+        if (sortFieldsLen > 0) {
+          WritableComparable[] sortFields = new WritableComparable[sortFieldsLen];
+          for (int k = 0; k < sortFieldsLen; k++) {
+            sortFields[k] = _sortFieldTypes[k].newWritableComparable();
+            sortFields[k].readFields(in);
+          }
+          hit.setSortFields(sortFields);
+        }
       }
     }
     if (LOG.isDebugEnabled()) {
@@ -77,6 +102,14 @@ public class HitsMapWritable implements Writable {
     }
     out.writeUTF(_serverName);
     out.writeInt(_totalHits);
+    if (_sortFieldTypes == null) {
+      out.writeByte(0);
+    } else {
+      out.writeByte(_sortFieldTypes.length);
+      for (WritableType writableType : _sortFieldTypes) {
+        out.writeByte(writableType.ordinal());
+      }
+    }
     final Set<String> keySet = _hitToShard.keySet();
     out.writeInt(keySet.size());
     for (final String key : keySet) {
@@ -86,6 +119,15 @@ public class HitsMapWritable implements Writable {
       for (final Hit hit : list) {
         out.writeFloat(hit.getScore());
         out.writeInt(hit.getDocId());
+        WritableComparable[] sortFields = hit.getSortFields();
+        if (sortFields == null) {
+          out.writeByte(0);
+        } else {
+          out.writeByte(sortFields.length);
+          for (Writable writable : sortFields) {
+            writable.write(out);
+          }
+        }
       }
     }
     if (LOG.isDebugEnabled()) {
@@ -124,4 +166,13 @@ public class HitsMapWritable implements Writable {
   public int getTotalHits() {
     return _totalHits;
   }
+
+  public WritableType[] getSortFieldTypes() {
+    return _sortFieldTypes;
+  }
+
+  public void setSortFieldTypes(WritableType[] sortFieldTypes) {
+    _sortFieldTypes = sortFieldTypes;
+  }
+  
 }
diff --git a/src/main/java/net/sf/katta/node/ILuceneServer.java b/src/main/java/net/sf/katta/node/ILuceneServer.java
index db83057..09a6b63 100644
--- a/src/main/java/net/sf/katta/node/ILuceneServer.java
+++ b/src/main/java/net/sf/katta/node/ILuceneServer.java
@@ -52,6 +52,20 @@ public interface ILuceneServer extends VersionedProtocol {
       throws IOException;
 
   /**
+   * Sorts the returned hits based on the sort parameter.
+   * 
+   * @param query         The query to run.
+   * @param freqs         Term frequency information for term weighting.
+   * @param shardNames    A array of shard names to search in.
+   * @param count         The top n high score hits.
+   * @param sort          sort criteria for returned hits
+   * @return A list of hits from the search.
+   * @throws IOException     If the search had a problem reading files.
+   */
+  public HitsMapWritable search(QueryWritable query, DocumentFrequencyWritable freqs, String[] shardNames, int count,
+      SortWritable sort) throws IOException;
+  
+  /**
    * Returns the number of documents a term occurs in. In a distributed search
    * environment, we need to get this first and then query all nodes again with
    * this information to ensure we compute TF IDF correctly. See
diff --git a/src/main/java/net/sf/katta/node/LuceneServer.java b/src/main/java/net/sf/katta/node/LuceneServer.java
index 83c6368..fe1d89e 100644
--- a/src/main/java/net/sf/katta/node/LuceneServer.java
+++ b/src/main/java/net/sf/katta/node/LuceneServer.java
@@ -31,6 +31,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import net.sf.katta.client.lucene.FieldSortComparator;
+import net.sf.katta.util.WritableType;
+
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.MapWritable;
@@ -45,6 +48,7 @@ import org.apache.lucene.index.Term;
 import org.apache.lucene.queryParser.ParseException;
 import org.apache.lucene.search.DefaultSimilarity;
 import org.apache.lucene.search.Explanation;
+import org.apache.lucene.search.FieldDoc;
 import org.apache.lucene.search.Filter;
 import org.apache.lucene.search.HitCollector;
 import org.apache.lucene.search.IndexSearcher;
@@ -201,8 +205,14 @@ public class LuceneServer implements INodeManaged, ILuceneServer {
    * @throws ParseException  If the query is ill-formed.
    * @throws IOException     If the search had a problem reading files.
    */
-  public HitsMapWritable search(final QueryWritable query, final DocumentFrequencyWritable freqs, final String[] shards,
-          final int count) throws IOException {
+  public HitsMapWritable search(final QueryWritable query, final DocumentFrequencyWritable freqs,
+      final String[] shards, final int count) throws IOException {
+    return search(query, freqs, shards, count, null);
+  }
+
+  @Override
+  public HitsMapWritable search(QueryWritable query, DocumentFrequencyWritable freqs, String[] shards, int count,
+      SortWritable sortWritable) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("You are searching with the query: '" + query.getQuery() + "'");
     }
@@ -219,7 +229,11 @@ public class LuceneServer implements INodeManaged, ILuceneServer {
     if (LOG.isDebugEnabled()) {
       start = System.currentTimeMillis();
     }
-    search(luceneQuery, freqs, shards, result, count);
+    Sort sort = null;
+    if (sortWritable != null) {
+      sort = sortWritable.getSort();
+    }
+    search(luceneQuery, freqs, shards, result, count, sort);
     if (LOG.isDebugEnabled()) {
       final long end = System.currentTimeMillis();
       LOG.debug("Search took " + (end - start) / 1000.0 + "sec.");
@@ -344,7 +358,6 @@ public class LuceneServer implements INodeManaged, ILuceneServer {
     return search(query, docFreqs, shards, 1).getTotalHits();
   }
 
-
   /**
    * Search in the given shards and return max hits for given query
    * 
@@ -356,31 +369,34 @@ public class LuceneServer implements INodeManaged, ILuceneServer {
    * @throws IOException
    */
   protected final void search(final Query query, final DocumentFrequencyWritable freqs, final String[] shards,
-          final HitsMapWritable result, final int max) throws IOException {
+      final HitsMapWritable result, final int max, Sort sort) throws IOException {
     final Query rewrittenQuery = rewrite(query, shards);
     final int numDocs = freqs.getNumDocs();
     final Weight weight = rewrittenQuery.weight(new CachedDfSource(freqs.getAll(), numDocs, new DefaultSimilarity()));
     // Limit the request to the number requested or the total number of documents, whichever is smaller.
     final int limit = Math.min(numDocs, max);
-    final KattaHitQueue hq = new KattaHitQueue(limit);
     int totalHits = 0;
     final int shardsCount = shards.length;
 
     // Run the search in parallel on the shards with a thread pool.
     List<Future<SearchResult>> tasks = new ArrayList<Future<SearchResult>>();
     for (int i = 0; i < shardsCount; i++) {
-      SearchCall call = new SearchCall(shards[i], weight, limit);
+      SearchCall call = new SearchCall(shards[i], weight, limit, sort);
       Future<SearchResult> future = _threadPool.submit(call);
       tasks.add(future);
     }
 
     final ScoreDoc[][] scoreDocs = new ScoreDoc[shardsCount][];
+    ScoreDoc scoreDocExample = null;
     for (int i = 0; i < shardsCount; i++) {
       SearchResult searchResult;
       try {
         searchResult = tasks.get(i).get();
         totalHits += searchResult._totalHits;
         scoreDocs[i] = searchResult._scoreDocs;
+        if (scoreDocExample == null && scoreDocs[i].length > 0) {
+          scoreDocExample = scoreDocs[i][0];
+        }
       } catch (InterruptedException e) {
         throw new IOException("Multithread shard search interrupted:", e);
       } catch (ExecutionException e) {
@@ -390,39 +406,51 @@ public class LuceneServer implements INodeManaged, ILuceneServer {
 
     result.addTotalHits(totalHits);
 
-    int pos = 0;
-    BitSet done = new BitSet(shardsCount);
-    while (done.cardinality() != shardsCount) {
-      ScoreDoc scoreDoc = null;
-      for (int i = 0; i < shardsCount; i++) {
-        // only process this shard if it is not yet done.
-        if (!done.get(i)) {
-          final ScoreDoc[] docs = scoreDocs[i];
-          if (pos < docs.length) {
-            scoreDoc = docs[pos];
-            final Hit hit = new Hit(shards[i], _nodeName, scoreDoc.score, scoreDoc.doc);
-            if (!hq.insert(hit)) {
-              // no doc left that has a higher score than the lowest score in
-              // the queue
+    final Iterable<Hit> finalHitList;
+    if (sort == null || totalHits == 0) {
+      final KattaHitQueue hq = new KattaHitQueue(limit);
+      int pos = 0;
+      BitSet done = new BitSet(shardsCount);
+      while (done.cardinality() != shardsCount) {
+        ScoreDoc scoreDoc = null;
+        for (int i = 0; i < shardsCount; i++) {
+          // only process this shard if it is not yet done.
+          if (!done.get(i)) {
+            final ScoreDoc[] docs = scoreDocs[i];
+            if (pos < docs.length) {
+              scoreDoc = docs[pos];
+              final Hit hit = new Hit(shards[i], _nodeName, scoreDoc.score, scoreDoc.doc);
+              if (!hq.insert(hit)) {
+                // no doc left that has a higher score than the lowest score in
+                // the queue
+                done.set(i, true);
+              }
+            } else {
+              // no docs left in this shard
               done.set(i, true);
             }
-          } else {
-            // no docs left in this shard
-            done.set(i, true);
           }
         }
+        // we always wait until we got all hits from this position in all
+        // shards.
+
+        pos++;
+        if (scoreDoc == null) {
+          // we do not have any more data
+          break;
+        }
       }
-      // we always wait until we got all hits from this position in all shards.
-      
-      
-      pos++;
-      if (scoreDoc == null) {
-        // we do not have any more data
-        break;
-      }
+      finalHitList = hq;
+    } else {
+      WritableType[] sortFieldsTypes = null;
+      FieldDoc fieldDoc = (FieldDoc) scoreDocExample;
+      sortFieldsTypes = WritableType.detectWritableTypes(fieldDoc.fields);
+      result.setSortFieldTypes(sortFieldsTypes);
+      finalHitList = mergeFieldSort(new FieldSortComparator(sort.getSort(), sortFieldsTypes), limit, scoreDocs, shards,
+          _nodeName);
     }
 
-    for (Hit hit : hq) {
+    for (Hit hit : finalHitList) {
       if (hit != null) {
         result.addHitToShard(hit.getShard(), hit);
       }
@@ -430,6 +458,40 @@ public class LuceneServer implements INodeManaged, ILuceneServer {
   }
 
   /**
+   * Merges the already sorted sub-lists to one big sorted list.
+   */
+  private final static List<Hit> mergeFieldSort(FieldSortComparator comparator, int count,
+      ScoreDoc[][] sortedFieldDocs, String[] shards, String nodeName) {
+    int[] arrayPositions = new int[sortedFieldDocs.length];
+    final List<Hit> sortedResult = new ArrayList<Hit>(count);
+
+    BitSet listDone = new BitSet(sortedFieldDocs.length);
+    do {
+      int fieldDocArrayWithSmallestFieldDoc = -1;
+      FieldDoc smallestFieldDoc = null;
+      for (int subListIndex = 0; subListIndex < arrayPositions.length; subListIndex++) {
+        if (!listDone.get(subListIndex)) {
+          FieldDoc hit = (FieldDoc) sortedFieldDocs[subListIndex][arrayPositions[subListIndex]];
+          if (smallestFieldDoc == null || comparator.compare(hit.fields, smallestFieldDoc.fields) < 0) {
+            smallestFieldDoc = hit;
+            fieldDocArrayWithSmallestFieldDoc = subListIndex;
+          }
+        }
+      }
+      ScoreDoc[] smallestElementList = sortedFieldDocs[fieldDocArrayWithSmallestFieldDoc];
+      FieldDoc fieldDoc = (FieldDoc) smallestElementList[arrayPositions[fieldDocArrayWithSmallestFieldDoc]];
+      arrayPositions[fieldDocArrayWithSmallestFieldDoc]++;
+      final Hit hit = new Hit(shards[fieldDocArrayWithSmallestFieldDoc], nodeName, fieldDoc.score, fieldDoc.doc);
+      hit.setSortFields(WritableType.convertComparable(comparator.getFieldTypes(), fieldDoc.fields));
+      sortedResult.add(hit);
+      if (arrayPositions[fieldDocArrayWithSmallestFieldDoc] >= smallestElementList.length) {
+        listDone.set(fieldDocArrayWithSmallestFieldDoc, true);
+      }
+    } while (sortedResult.size() < count && listDone.cardinality() < arrayPositions.length);
+    return sortedResult;
+  }
+
+  /**
    * Returns a specified lucene document from a given shard.
    * 
    * @param shardName
@@ -500,20 +562,26 @@ public class LuceneServer implements INodeManaged, ILuceneServer {
     private final String _shardName;
     private final Weight _weight;
     private final int _limit;
+    private final Sort _sort;
 
-    public SearchCall(String shardName, Weight weight, int limit) {
+    public SearchCall(String shardName, Weight weight, int limit, Sort sort) {
       _shardName = shardName;
       _weight = weight;
       _limit = limit;
+      _sort = sort;
     }
 
     @Override
     public SearchResult call() throws Exception {
       final IndexSearcher indexSearcher = _searchers.get(_shardName);
-      final TopDocs docs = indexSearcher.search(_weight, null, _limit);
+      final TopDocs docs;
+      if (_sort != null) {
+        docs = indexSearcher.search(_weight, null, _limit, _sort);
+      } else {
+        docs = indexSearcher.search(_weight, null, _limit);
+      }
       return new SearchResult(docs.totalHits, docs.scoreDocs);
     }
-
   }
 
   private static class SearchResult {
diff --git a/src/main/java/net/sf/katta/node/SortWritable.java b/src/main/java/net/sf/katta/node/SortWritable.java
new file mode 100644
index 0000000..bc79421
--- /dev/null
+++ b/src/main/java/net/sf/katta/node/SortWritable.java
@@ -0,0 +1,76 @@
+/**
+ * Copyright 2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package net.sf.katta.node;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.lucene.search.Sort;
+
+public class SortWritable implements Writable {
+
+  private Sort _sort;
+
+  public SortWritable() {
+    // default constructor
+  }
+
+  public SortWritable(Sort sort) {
+    _sort = sort;
+
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    int readInt = input.readInt();
+    byte[] bs = new byte[readInt];
+    input.readFully(bs);
+    ObjectInputStream objectStream = new ObjectInputStream(new ByteArrayInputStream(bs));
+    try {
+      _sort = (Sort) objectStream.readObject();
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Unable to deseriaize lucene query", e);
+    }
+
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
+    ObjectOutputStream objectStream = new ObjectOutputStream(byteArrayStream);
+    objectStream.writeObject(_sort);
+    objectStream.close();
+    byte[] byteArray = byteArrayStream.toByteArray();
+    output.writeInt(byteArray.length);
+    output.write(byteArray);
+  }
+
+  public Sort getSort() {
+    return _sort;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    Sort other = ((SortWritable) obj).getSort();
+    return _sort.equals(other);
+  }
+}
diff --git a/src/main/java/net/sf/katta/util/WritableType.java b/src/main/java/net/sf/katta/util/WritableType.java
new file mode 100644
index 0000000..7cf39fc
--- /dev/null
+++ b/src/main/java/net/sf/katta/util/WritableType.java
@@ -0,0 +1,108 @@
+/**
+ * Copyright 2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package net.sf.katta.util;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Helper class for dealing with hadoop writable <-> java primitive wrapper
+ * conversion.
+ * 
+ * @see Writable
+ */
+public enum WritableType {
+
+  TEXT, INT, LONG, FLOAT, DOUBLE;
+
+  public static WritableType detectWritableType(Comparable comparable) {
+    if (comparable instanceof Integer) {
+      return WritableType.INT;
+    } else if (comparable instanceof String) {
+      return WritableType.TEXT;
+    } else if (comparable instanceof Float) {
+      return WritableType.FLOAT;
+    } else if (comparable instanceof Long) {
+      return WritableType.LONG;
+    } else if (comparable instanceof Double) {
+      return WritableType.DOUBLE;
+    }
+    throw new IllegalArgumentException("no conversion rule for comparable of type " + comparable.getClass().getName());
+  }
+
+  public static WritableType[] detectWritableTypes(Comparable[] comparables) {
+    WritableType[] writablTypes = new WritableType[comparables.length];
+    for (int i = 0; i < comparables.length; i++) {
+      writablTypes[i] = detectWritableType(comparables[i]);
+    }
+    return writablTypes;
+  }
+
+  public WritableComparable newWritableComparable() {
+    switch (this) {
+    case TEXT:
+      return new Text();
+    case INT:
+      return new IntWritable();
+    case LONG:
+      return new LongWritable();
+    case FLOAT:
+      return new FloatWritable();
+    case DOUBLE:
+      return new DoubleWritable();
+    }
+    throw getUnhandledTypeException();
+  }
+
+  /**
+   * Convert a java primitive type wrapper (like String, Integer, Float, etc...)
+   * to the corresponding hadoop {@link WritableComparable}.
+   */
+  public WritableComparable convertComparable(Comparable comparable) {
+    switch (this) {
+    case TEXT:
+      return new Text((String) comparable);
+    case INT:
+      return new IntWritable(((Integer) comparable).intValue());
+    case LONG:
+      return new LongWritable((((Long) comparable).longValue()));
+    case FLOAT:
+      return new FloatWritable(((Float) comparable).floatValue());
+    case DOUBLE:
+      return new DoubleWritable(((Double) comparable).doubleValue());
+    }
+    throw getUnhandledTypeException();
+  }
+
+  public static WritableComparable[] convertComparable(WritableType[] writableTypes, Comparable[] comparables) {
+    WritableComparable[] writableComparables = new WritableComparable[comparables.length];
+    for (int i = 0; i < writableComparables.length; i++) {
+      writableComparables[i] = writableTypes[i].convertComparable(comparables[i]);
+
+    }
+    return writableComparables;
+  }
+
+  private RuntimeException getUnhandledTypeException() {
+    return new IllegalArgumentException("type " + this + " not handled");
+  }
+
+}
diff --git a/src/test/java/net/sf/katta/LuceneComplianceTest.java b/src/test/java/net/sf/katta/LuceneComplianceTest.java
new file mode 100644
index 0000000..9187ec5
--- /dev/null
+++ b/src/test/java/net/sf/katta/LuceneComplianceTest.java
@@ -0,0 +1,234 @@
+/**
+ * Copyright 2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package net.sf.katta;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import net.sf.katta.client.DefaultNodeSelectionPolicy;
+import net.sf.katta.client.DeployClient;
+import net.sf.katta.client.IDeployClient;
+import net.sf.katta.client.ILuceneClient;
+import net.sf.katta.client.LuceneClient;
+import net.sf.katta.index.IndexMetaData.IndexState;
+import net.sf.katta.master.Master;
+import net.sf.katta.node.Hit;
+import net.sf.katta.node.Hits;
+import net.sf.katta.node.LuceneServer;
+import net.sf.katta.node.Node;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.lucene.analysis.KeywordAnalyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriter.MaxFieldLength;
+import org.apache.lucene.queryParser.QueryParser;
+import org.apache.lucene.search.FieldDoc;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
+import org.apache.lucene.search.TopDocs;
+
+/**
+ * Test common lucene operations on sharded indices through katta interface
+ * versus pure lucene interface one big index.
+ * 
+ */
+public class LuceneComplianceTest extends AbstractKattaTest {
+
+  private static Node _node1;
+  private static Node _node2;
+  private static Master _master;
+  private static IDeployClient _deployClient;
+  private static ILuceneClient _client;
+
+  // index related fields
+  private static String FIELD_NAME = "text";
+  private static File kattaIndex;
+  private static File luceneIndex;
+  private static List<Document> documents1;
+  private static List<Document> documents2;
+
+  public LuceneComplianceTest() {
+    super(false);
+  }
+
+  @Override
+  protected void onBeforeClass() throws Exception {
+    MasterStartThread masterStartThread = startMaster();
+    _master = masterStartThread.getMaster();
+
+    NodeStartThread nodeStartThread1 = startNode(new LuceneServer());
+    NodeStartThread nodeStartThread2 = startNode(new LuceneServer());
+    _node1 = nodeStartThread1.getNode();
+    _node2 = nodeStartThread2.getNode();
+    masterStartThread.join();
+    nodeStartThread1.join();
+    nodeStartThread2.join();
+    waitOnNodes(masterStartThread, 2);
+
+    // generate 3 index (2 shards + once combined index)
+    _deployClient = new DeployClient(masterStartThread.getZkClient(), _conf);
+    kattaIndex = createClassWideFile("kattaIndex");
+    File shard1 = new File(kattaIndex, "shard1");
+    File shard2 = new File(kattaIndex, "shard2");
+    luceneIndex = createClassWideFile("luceneIndex");
+    documents1 = createSimpleNumberDocuments(FIELD_NAME, 123);
+    documents2 = createSimpleNumberDocuments(FIELD_NAME, 78);
+
+    writeIndex(shard1, documents1);
+    writeIndex(shard2, documents2);
+    writeIndex(luceneIndex, combineDocuments(documents1, documents2));
+
+    // deploy 2 indexes to katta
+    deployIndexToKatta(_deployClient, kattaIndex, 2);
+
+    _client = new LuceneClient(new DefaultNodeSelectionPolicy(), _conf);
+  }
+
+  @Override
+  protected void onAfterClass() throws Exception {
+    _client.close();
+    _node1.shutdown();
+    _node2.shutdown();
+    _master.shutdown();
+  }
+
+  public void testScoreSort() throws Exception {
+    // query and compare
+    IndexSearcher indexSearcher = new IndexSearcher(luceneIndex.getAbsolutePath());
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "0", null);
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "1", null);
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "2", null);
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "15", null);
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "23", null);
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "2 23", null);
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "nothing", null);
+  }
+
+  public void testFieldSort() throws Exception {
+    // query and compare
+    IndexSearcher indexSearcher = new IndexSearcher(luceneIndex.getAbsolutePath());
+    Sort sort = new Sort(new SortField[] { new SortField(FIELD_NAME) });
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "0", sort);
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "1", sort);
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "2", sort);
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "15", sort);
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "23", sort);
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "2 23", sort);
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "nothing", sort);
+  }
+
+  private void checkQueryResults(IndexSearcher indexSearcher, String kattaIndexName, String fieldName,
+      String queryTerm, Sort sort) throws Exception {
+    // check all documents
+    checkQueryResults(indexSearcher, kattaIndexName, fieldName, queryTerm, Short.MAX_VALUE, sort);
+
+    // check top n documents
+    checkQueryResults(indexSearcher, kattaIndexName, fieldName, queryTerm, (documents1.size() + documents2.size()) / 2,
+        sort);
+  }
+
+  private void checkQueryResults(IndexSearcher indexSearcher, String kattaIndexName, String fieldName,
+      String queryTerm, int resultCount, Sort sort) throws Exception {
+    final Query query = new QueryParser("", new KeywordAnalyzer()).parse(fieldName + ": " + queryTerm);
+    final TopDocs searchResultsLucene;
+    final Hits searchResultsKatta;
+    if (sort == null) {
+      searchResultsLucene = indexSearcher.search(query, resultCount);
+      searchResultsKatta = _client.search(query, new String[] { kattaIndexName }, resultCount);
+    } else {
+      searchResultsLucene = indexSearcher.search(query, null, resultCount, sort);
+      searchResultsKatta = _client.search(query, new String[] { kattaIndexName }, resultCount, sort);
+    }
+
+    assertEquals(searchResultsLucene.totalHits, searchResultsKatta.size());
+
+    ScoreDoc[] scoreDocs = searchResultsLucene.scoreDocs;
+    List<Hit> hits = searchResultsKatta.getHits();
+    if (sort == null) {
+      for (int i = 0; i < scoreDocs.length; i++) {
+        assertEquals(scoreDocs[i].score, hits.get(i).getScore());
+      }
+    } else {
+      System.err.println("------------------------");
+      for (int i = 0; i < scoreDocs.length; i++) {
+        Comparable[] luceneFields = ((FieldDoc) scoreDocs[i]).fields;
+        WritableComparable[] kattaFields = hits.get(i).getSortFields();
+        System.err.println(Arrays.asList(luceneFields) + " / " + Arrays.asList(kattaFields));
+        assertEquals(luceneFields.length, kattaFields.length);
+        for (int j = 0; j < luceneFields.length; j++) {
+          assertEquals(luceneFields[j].toString(), kattaFields[j].toString());
+        }
+
+        // Arrays.equals(scoreDocs, kattaFields);
+      }
+    }
+  }
+
+  private List<Document> createSimpleNumberDocuments(String textFieldName, int count) {
+    List<Document> documents = new ArrayList<Document>();
+    for (int i = 0; i < count; i++) {
+      String fieldContent = i + " " + (count - i);
+      if (i % 2 == 0) {
+        fieldContent += " 2";
+      } else {
+        fieldContent += " 1";
+      }
+      Document document = new Document();
+      document.add(new Field(textFieldName, fieldContent, Store.NO, Index.ANALYZED));
+      documents.add(document);
+    }
+    return documents;
+  }
+
+  private List<Document> combineDocuments(List<Document>... documentLists) {
+    ArrayList<Document> list = new ArrayList<Document>();
+    for (List<Document> documentsList : documentLists) {
+      list.addAll(documentsList);
+    }
+
+    return list;
+  }
+
+  private void writeIndex(File file, List<Document> documents) throws IOException {
+    IndexWriter indexWriter = new IndexWriter(file, new StandardAnalyzer(), true, MaxFieldLength.UNLIMITED);
+    for (Document document : documents) {
+      indexWriter.addDocument(document);
+    }
+    indexWriter.optimize();
+    indexWriter.close();
+
+  }
+
+  private void deployIndexToKatta(IDeployClient deployClient, File file, int replicationLevel)
+      throws InterruptedException {
+    IndexState indexState = deployClient.addIndex(file.getName(), file.getAbsolutePath(), replicationLevel)
+        .joinDeployment();
+    assertEquals(IndexState.DEPLOYED, indexState);
+    Thread.sleep(1000);// wait until lucene client is aware of the index
+  }
+
+}
diff --git a/src/test/java/net/sf/katta/client/LuceneClientTest.java b/src/test/java/net/sf/katta/client/LuceneClientTest.java
index 7eae72e..166d422 100644
--- a/src/test/java/net/sf/katta/client/LuceneClientTest.java
+++ b/src/test/java/net/sf/katta/client/LuceneClientTest.java
@@ -15,10 +15,12 @@
  */
 package net.sf.katta.client;
 
+import java.io.File;
 import java.util.List;
 import java.util.Set;
 
 import net.sf.katta.AbstractKattaTest;
+import net.sf.katta.index.IndexMetaData.IndexState;
 import net.sf.katta.master.Master;
 import net.sf.katta.node.Hit;
 import net.sf.katta.node.Hits;
@@ -32,9 +34,17 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.log4j.Logger;
 import org.apache.lucene.analysis.KeywordAnalyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriter.MaxFieldLength;
 import org.apache.lucene.queryParser.ParseException;
 import org.apache.lucene.queryParser.QueryParser;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Sort;
 
 /**
  * Test for {@link LuceneClient}.
@@ -72,12 +82,9 @@ public class LuceneClientTest extends AbstractKattaTest {
     waitOnNodes(masterStartThread, 2);
 
     _deployClient = new DeployClient(masterStartThread.getZkClient(), _conf);
-    _deployClient.addIndex(INDEX1, TestResources.INDEX1.getAbsolutePath(), 1)
-        .joinDeployment();
-    _deployClient.addIndex(INDEX2, TestResources.INDEX1.getAbsolutePath(), 1)
-        .joinDeployment();
-    _deployClient.addIndex(INDEX3, TestResources.INDEX1.getAbsolutePath(), 1)
-        .joinDeployment();
+    _deployClient.addIndex(INDEX1, TestResources.INDEX1.getAbsolutePath(), 1).joinDeployment();
+    _deployClient.addIndex(INDEX2, TestResources.INDEX1.getAbsolutePath(), 1).joinDeployment();
+    _deployClient.addIndex(INDEX3, TestResources.INDEX1.getAbsolutePath(), 1).joinDeployment();
     _client = new LuceneClient(new DefaultNodeSelectionPolicy(), _conf);
   }
 
@@ -136,8 +143,49 @@ public class LuceneClientTest extends AbstractKattaTest {
     }
     assertEquals(8, hits.size());
     assertEquals(8, hits.getHits().size());
-    for (final Hit hit : hits.getHits()) {
-      LOG.info(hit.getNode() + " -- " + hit.getScore() + " -- " + hit.getDocId());
+  }
+
+  public void testSortedSearch() throws Exception {
+    // write and deploy test index
+    File sortIndex = createFile("sortIndex");
+    String queryTerm = "2";
+    String sortFieldName = "sortField";
+    String textFieldName = "textField";
+    IndexWriter indexWriter = new IndexWriter(sortIndex, new StandardAnalyzer(), true, MaxFieldLength.UNLIMITED);
+    for (int i = 0; i < 20; i++) {
+      Document document = new Document();
+      document.add(new Field(sortFieldName, "" + i, Store.NO, Index.NOT_ANALYZED));
+      String textField = "sample text";
+      if (i % 2 == 0) {// produce some different scores
+        for (int j = 0; j < i; j++) {
+          textField += " " + queryTerm;
+        }
+      }
+      document.add(new Field(textFieldName, textField, Store.NO, Index.ANALYZED));
+      indexWriter.addDocument(document);
+    }
+    indexWriter.optimize();
+    indexWriter.close();
+    IndexState indexState = _deployClient.addIndex(sortIndex.getName(), sortIndex.getParentFile().getAbsolutePath(), 1)
+        .joinDeployment();
+    assertEquals(IndexState.DEPLOYED, indexState);
+    Thread.sleep(1000);// wait until lucene client is aware of the index
+
+    // query and compare results
+    final Query query = new QueryParser("", new KeywordAnalyzer()).parse(textFieldName + ": " + queryTerm);
+    Sort sort = new Sort(new String[] { "sortField" });
+    final Hits hits = _client.search(query, new String[] { sortIndex.getName() }, 20, sort);
+    assertNotNull(hits);
+    List<Hit> hitsList = hits.getHits();
+    for (final Hit hit : hitsList) {
+      writeToLog(hit);
+    }
+    assertEquals(9, hits.size());
+    assertEquals(9, hitsList.size());
+    assertTrue("results ordered after score", hitsList.get(0).getScore() < hitsList.get(1).getScore());
+    for (int i = 0; i < hitsList.size() - 1; i++) {
+      int compareTo = hitsList.get(i).getSortFields()[0].compareTo(hitsList.get(i + 1).getSortFields()[0]);
+      assertTrue("results not after field", compareTo == 0 || compareTo == -1);
     }
   }
 
diff --git a/src/test/java/net/sf/katta/client/lucene/FieldSortComparatorTest.java b/src/test/java/net/sf/katta/client/lucene/FieldSortComparatorTest.java
new file mode 100644
index 0000000..865b78a
--- /dev/null
+++ b/src/test/java/net/sf/katta/client/lucene/FieldSortComparatorTest.java
@@ -0,0 +1,97 @@
+/**
+ * Copyright 2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package net.sf.katta.client.lucene;
+
+import junit.framework.TestCase;
+import net.sf.katta.node.Hit;
+import net.sf.katta.util.WritableType;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.lucene.search.SortField;
+
+public class FieldSortComparatorTest extends TestCase {
+
+  public void testSingleIntFieldCompare() {
+    SortField[] sortFields = new SortField[] { new SortField("intField", SortField.INT) };
+    WritableType[] sortFieldTypes = new WritableType[] { WritableType.INT };
+    FieldSortComparator fieldSortComparator = new FieldSortComparator(sortFields, sortFieldTypes);
+
+    Hit hit1 = new Hit("shard", "node", 0.0f, 1, sortFieldTypes);
+    Hit hit2 = new Hit("shard", "node", 0.0f, 2, sortFieldTypes);
+
+    hit1.setSortFields(new WritableComparable[] { new IntWritable(1) });
+    hit2.setSortFields(new WritableComparable[] { new IntWritable(2) });
+
+    assertEquals(0, fieldSortComparator.compare(hit1, hit1));
+    assertEquals(1, fieldSortComparator.compare(hit2, hit1));
+    assertEquals(-1, fieldSortComparator.compare(hit1, hit2));
+  }
+
+  public void testTwoIntFieldCompare() {
+    SortField[] sortFields = new SortField[] { new SortField("intField1", SortField.INT),
+        new SortField("intField2", SortField.INT) };
+    WritableType[] sortFieldTypes = new WritableType[] { WritableType.INT, WritableType.INT };
+    FieldSortComparator fieldSortComparator = new FieldSortComparator(sortFields, sortFieldTypes);
+
+    Hit hit1 = new Hit("shard", "node", 0.0f, 1, sortFieldTypes);
+    Hit hit2 = new Hit("shard", "node", 0.0f, 2, sortFieldTypes);
+
+    hit1.setSortFields(new WritableComparable[] { new IntWritable(1), new IntWritable(1) });
+    hit2.setSortFields(new WritableComparable[] { new IntWritable(1), new IntWritable(2) });
+
+    assertEquals(0, fieldSortComparator.compare(hit1, hit1));
+    assertEquals(1, fieldSortComparator.compare(hit2, hit1));
+    assertEquals(-1, fieldSortComparator.compare(hit1, hit2));
+  }
+
+  public void testStringFieldDocIdCompare() {
+    SortField[] sortFields = new SortField[] { new SortField("stringField", SortField.STRING),
+        new SortField("docIdField", SortField.DOC) };
+    WritableType[] sortFieldTypes = new WritableType[] { WritableType.TEXT, WritableType.INT };
+    FieldSortComparator fieldSortComparator = new FieldSortComparator(sortFields, sortFieldTypes);
+
+    Hit hit1 = new Hit("shard", "node", 0.0f, 1, sortFieldTypes);
+    Hit hit2 = new Hit("shard", "node", 0.0f, 2, sortFieldTypes);
+
+    hit1.setSortFields(new WritableComparable[] { new Text("a"), new IntWritable(1) });
+    hit2.setSortFields(new WritableComparable[] { new Text("a"), new IntWritable(2) });
+
+    assertEquals(0, fieldSortComparator.compare(hit1, hit1));
+    assertEquals(1, fieldSortComparator.compare(hit2, hit1));
+    assertEquals(-1, fieldSortComparator.compare(hit1, hit2));
+  }
+
+  public void testStringFieldScoreCompare() {
+    SortField[] sortFields = new SortField[] { new SortField("stringField", SortField.STRING),
+        new SortField("scoreField", SortField.SCORE) };
+    WritableType[] sortFieldTypes = new WritableType[] { WritableType.TEXT, WritableType.FLOAT };
+    FieldSortComparator fieldSortComparator = new FieldSortComparator(sortFields, sortFieldTypes);
+
+    Hit hit1 = new Hit("shard", "node", 0.5f, 1, sortFieldTypes);
+    Hit hit2 = new Hit("shard", "node", 0.2f, 2, sortFieldTypes);
+
+    hit1.setSortFields(new WritableComparable[] { new Text("a"), new FloatWritable(hit1.getScore()) });
+    hit2.setSortFields(new WritableComparable[] { new Text("a"), new FloatWritable(hit2.getScore()) });
+
+    assertEquals(0, fieldSortComparator.compare(hit1, hit1));
+    assertEquals(1, fieldSortComparator.compare(hit2, hit1));
+    assertEquals(-1, fieldSortComparator.compare(hit1, hit2));
+  }
+
+}
diff --git a/src/test/java/net/sf/katta/testutil/ExtendedTestCase.java b/src/test/java/net/sf/katta/testutil/ExtendedTestCase.java
index db77439..082b040 100644
--- a/src/test/java/net/sf/katta/testutil/ExtendedTestCase.java
+++ b/src/test/java/net/sf/katta/testutil/ExtendedTestCase.java
@@ -20,7 +20,9 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import junit.framework.TestCase;
 import net.sf.katta.util.FileUtil;
@@ -45,6 +47,8 @@ public abstract class ExtendedTestCase extends TestCase {
 
   private Map<String, File> _testName2rootFolder = new HashMap<String, File>();
 
+  private Set<File> _classWideRootFolders = new HashSet<File>();
+
   private int countTestMethods() {
     int testMethodCount = 0;
     for (Method method : getClass().getMethods()) {
@@ -93,6 +97,12 @@ public abstract class ExtendedTestCase extends TestCase {
 
   protected void afterClass() throws Exception {
     // subclasses may override
+
+    // TDOD jz: introduce a onAfterClass() method so nobody overwrites this by
+    // accident
+    for (File file : _classWideRootFolders) {
+      FileUtil.deleteFolder(file);
+    }
   }
 
   /**
@@ -104,6 +114,19 @@ public abstract class ExtendedTestCase extends TestCase {
     return new File(rootFolder, name);
   }
 
+  protected File createClassWideFile(String name) {
+    String testName = this.getClass().getSimpleName();
+    try {
+      File rootFolder = File.createTempFile(testName + "_", ".tmp");
+      rootFolder.delete();
+      rootFolder.mkdirs();
+      _classWideRootFolders.add(rootFolder);
+      return new File(rootFolder, name);
+    } catch (IOException e) {
+      throw new RuntimeException("could not create tmp file", e);
+    }
+  }
+
   /**
    * Creates a path in a temporary empty folder. On tearing the test down this
    * folder will be deleted.
diff --git a/src/test/java/net/sf/katta/util/WritableTypeTest.java b/src/test/java/net/sf/katta/util/WritableTypeTest.java
new file mode 100644
index 0000000..23e5fa0
--- /dev/null
+++ b/src/test/java/net/sf/katta/util/WritableTypeTest.java
@@ -0,0 +1,100 @@
+/**
+ * Copyright 2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package net.sf.katta.util;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+
+public class WritableTypeTest extends TestCase {
+
+  private final static Comparable[] COMPARABLES;
+
+  private final static WritableComparable[] CONVERTED_COMPARABLES;
+
+  static {
+    // filled corresponding with WritableTypes array
+    List<Comparable> comparables = new ArrayList();
+    comparables.add("as");
+    comparables.add(new Integer(3));
+    comparables.add(new Long(3));
+    comparables.add(new Float(3));
+    comparables.add(new Double(3));
+    COMPARABLES = comparables.toArray(new Comparable[comparables.size()]);
+
+    // corresponds to the above list
+    List<WritableComparable> writableComparables = new ArrayList();
+    writableComparables.add(new Text("as"));
+    writableComparables.add(new IntWritable(3));
+    writableComparables.add(new LongWritable(3));
+    writableComparables.add(new FloatWritable(3));
+    writableComparables.add(new DoubleWritable(3));
+    CONVERTED_COMPARABLES = writableComparables.toArray(new WritableComparable[writableComparables.size()]);
+  }
+
+  public void testDetectWritableType() {
+    for (int i = 0; i < COMPARABLES.length; i++) {
+      assertEquals(WritableType.values()[i], WritableType.detectWritableType(COMPARABLES[i]));
+    }
+  }
+
+  public void testDetectWritableTypes() {
+    WritableType[] writableTypes = WritableType.detectWritableTypes(COMPARABLES);
+    for (int i = 0; i < writableTypes.length; i++) {
+      assertEquals(WritableType.values()[i], writableTypes[i]);
+    }
+  }
+
+  public void testNewWritableComparable() {
+    Comparable[] comparables = COMPARABLES;
+    for (Comparable comparable : comparables) {
+      assertNotNull(comparable);
+    }
+  }
+
+  public void testConvertComparable() {
+    for (int i = 0; i < COMPARABLES.length; i++) {
+      assertEquals(CONVERTED_COMPARABLES[i], WritableType.values()[i].convertComparable(COMPARABLES[i]));
+    }
+  }
+
+  public void testConvertComparables() {
+    WritableComparable[] writableComparables = WritableType.convertComparable(WritableType.values(), COMPARABLES);
+    for (int i = 0; i < writableComparables.length; i++) {
+      assertEquals(CONVERTED_COMPARABLES[i], writableComparables[i]);
+    }
+  }
+
+  public void testUnhandledTypes() {
+    try {
+      WritableType.detectWritableType(new Date());
+      fail("should throw exception");
+    } catch (Exception e) {
+      // expected
+    }
+
+  }
+
+}
-- 
1.6.1


From 0b3ae2ba1c6637da5fe7e326f7cc3546fe38abef Mon Sep 17 00:00:00 2001
From: Johannes Zillmann <jz@leo.(none)>
Date: Tue, 10 Nov 2009 14:13:28 +0100
Subject: [PATCH 2/2] add byte as field sort type\nthrow UnssupportedOperationException if a custom field sort is specified

---
 .../katta/client/lucene/FieldSortComparator.java   |    5 ++++-
 src/main/java/net/sf/katta/util/WritableType.java  |   11 +++++++++--
 .../java/net/sf/katta/LuceneComplianceTest.java    |   13 +++++++++----
 .../java/net/sf/katta/util/WritableTypeTest.java   |    3 +++
 4 files changed, 25 insertions(+), 7 deletions(-)

diff --git a/src/main/java/net/sf/katta/client/lucene/FieldSortComparator.java b/src/main/java/net/sf/katta/client/lucene/FieldSortComparator.java
index ad3265e..d7574de 100644
--- a/src/main/java/net/sf/katta/client/lucene/FieldSortComparator.java
+++ b/src/main/java/net/sf/katta/client/lucene/FieldSortComparator.java
@@ -51,10 +51,13 @@ public class FieldSortComparator implements Comparator<Hit> {
     // auto-detected on the node side.
     for (int i = 0; i < sortFields.length; i++) {
       if (_fieldTypes[i] == WritableType.TEXT && sortFields[i].getLocale() != null) {
-        throw new UnsupportedOperationException("locale-sensitive comparison not supported yet");
+        throw new UnsupportedOperationException("locale-sensitive field sort currently not supported");
         // jz: therefore we could use java.text.Collator class (see lucenes
         // FieldSortedHitQueue)
+      } else if (sortFields[i].getType() == SortField.CUSTOM) {
+        throw new UnsupportedOperationException("custom field sort currently not supported");
       }
+
       if (_sortFields[i].getType() == SortField.SCORE) {
         _fieldComparators[i] = REVERSED_COMPARABLE_COMPARATOR;
       } else {
diff --git a/src/main/java/net/sf/katta/util/WritableType.java b/src/main/java/net/sf/katta/util/WritableType.java
index 7cf39fc..627d2b1 100644
--- a/src/main/java/net/sf/katta/util/WritableType.java
+++ b/src/main/java/net/sf/katta/util/WritableType.java
@@ -15,6 +15,7 @@
  */
 package net.sf.katta.util;
 
+import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
@@ -31,10 +32,12 @@ import org.apache.hadoop.io.WritableComparable;
  */
 public enum WritableType {
 
-  TEXT, INT, LONG, FLOAT, DOUBLE;
+  TEXT, BYTE, INT, LONG, FLOAT, DOUBLE;
 
   public static WritableType detectWritableType(Comparable comparable) {
-    if (comparable instanceof Integer) {
+    if (comparable instanceof Byte) {
+      return WritableType.BYTE;
+    } else if (comparable instanceof Integer) {
       return WritableType.INT;
     } else if (comparable instanceof String) {
       return WritableType.TEXT;
@@ -60,6 +63,8 @@ public enum WritableType {
     switch (this) {
     case TEXT:
       return new Text();
+    case BYTE:
+      return new ByteWritable();
     case INT:
       return new IntWritable();
     case LONG:
@@ -80,6 +85,8 @@ public enum WritableType {
     switch (this) {
     case TEXT:
       return new Text((String) comparable);
+    case BYTE:
+      return new ByteWritable(((Byte) comparable).byteValue());
     case INT:
       return new IntWritable(((Integer) comparable).intValue());
     case LONG:
diff --git a/src/test/java/net/sf/katta/LuceneComplianceTest.java b/src/test/java/net/sf/katta/LuceneComplianceTest.java
index 9187ec5..862d81f 100644
--- a/src/test/java/net/sf/katta/LuceneComplianceTest.java
+++ b/src/test/java/net/sf/katta/LuceneComplianceTest.java
@@ -18,7 +18,6 @@ package net.sf.katta;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 import net.sf.katta.client.DefaultNodeSelectionPolicy;
@@ -129,7 +128,7 @@ public class LuceneComplianceTest extends AbstractKattaTest {
   }
 
   public void testFieldSort() throws Exception {
-    // query and compare
+    // query and compare (auto types)
     IndexSearcher indexSearcher = new IndexSearcher(luceneIndex.getAbsolutePath());
     Sort sort = new Sort(new SortField[] { new SortField(FIELD_NAME) });
     checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "0", sort);
@@ -139,6 +138,14 @@ public class LuceneComplianceTest extends AbstractKattaTest {
     checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "23", sort);
     checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "2 23", sort);
     checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "nothing", sort);
+
+    // check for explicit types
+    sort = new Sort(new SortField[] { new SortField(FIELD_NAME, SortField.BYTE) });
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "1", sort);
+    sort = new Sort(new SortField[] { new SortField(FIELD_NAME, SortField.INT) });
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "1", sort);
+    sort = new Sort(new SortField[] { new SortField(FIELD_NAME, SortField.LONG) });
+    checkQueryResults(indexSearcher, kattaIndex.getName(), FIELD_NAME, "1", sort);
   }
 
   private void checkQueryResults(IndexSearcher indexSearcher, String kattaIndexName, String fieldName,
@@ -173,11 +180,9 @@ public class LuceneComplianceTest extends AbstractKattaTest {
         assertEquals(scoreDocs[i].score, hits.get(i).getScore());
       }
     } else {
-      System.err.println("------------------------");
       for (int i = 0; i < scoreDocs.length; i++) {
         Comparable[] luceneFields = ((FieldDoc) scoreDocs[i]).fields;
         WritableComparable[] kattaFields = hits.get(i).getSortFields();
-        System.err.println(Arrays.asList(luceneFields) + " / " + Arrays.asList(kattaFields));
         assertEquals(luceneFields.length, kattaFields.length);
         for (int j = 0; j < luceneFields.length; j++) {
           assertEquals(luceneFields[j].toString(), kattaFields[j].toString());
diff --git a/src/test/java/net/sf/katta/util/WritableTypeTest.java b/src/test/java/net/sf/katta/util/WritableTypeTest.java
index 23e5fa0..45f419b 100644
--- a/src/test/java/net/sf/katta/util/WritableTypeTest.java
+++ b/src/test/java/net/sf/katta/util/WritableTypeTest.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
@@ -38,6 +39,7 @@ public class WritableTypeTest extends TestCase {
     // filled corresponding with WritableTypes array
     List<Comparable> comparables = new ArrayList();
     comparables.add("as");
+    comparables.add(new Byte((byte) 3));
     comparables.add(new Integer(3));
     comparables.add(new Long(3));
     comparables.add(new Float(3));
@@ -47,6 +49,7 @@ public class WritableTypeTest extends TestCase {
     // corresponds to the above list
     List<WritableComparable> writableComparables = new ArrayList();
     writableComparables.add(new Text("as"));
+    writableComparables.add(new ByteWritable((byte) 3));
     writableComparables.add(new IntWritable(3));
     writableComparables.add(new LongWritable(3));
     writableComparables.add(new FloatWritable(3));
-- 
1.6.1


