Index: src/main/java/net/sf/katta/node/BaseNode.java =================================================================== --- src/main/java/net/sf/katta/node/BaseNode.java (revision 445) +++ src/main/java/net/sf/katta/node/BaseNode.java Thu Apr 30 14:34:58 PDT 2009 @@ -15,22 +15,6 @@ */ package net.sf.katta.node; -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; - import net.sf.katta.index.AssignedShard; import net.sf.katta.index.DeployedShard; import net.sf.katta.index.ShardError; @@ -42,15 +26,20 @@ import net.sf.katta.zk.IZkReconnectListener; import net.sf.katta.zk.ZKClient; import net.sf.katta.zk.ZkPathes; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; import org.apache.zookeeper.Watcher.Event.KeeperState; -public abstract class BaseNode extends BaseRpcServer implements IRequestHandler, IZkReconnectListener { +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.*; +public abstract class BaseNode extends BaseRpcServer implements IZkReconnectListener { + protected final static Logger LOG = Logger.getLogger(BaseNode.class); public static final long _protocolVersion = 0; @@ -163,7 +152,7 @@ */ private void cleanupLocalWorkDir() throws KattaException { String node2ShardRootPath = ZkPathes.getNode2ShardRootPath(_nodeName); - List shardsToServe = Collections.EMPTY_LIST; + List shardsToServe = Collections.emptyList(); if (_zkClient.exists(node2ShardRootPath)) { shardsToServe = _zkClient.getChildren(node2ShardRootPath); @@ -450,6 +439,7 @@ @Override protected void finalize() throws Throwable { + super.finalize(); shutdown(); } Index: src/main/java/net/sf/katta/node/BaseRpcServer.java =================================================================== --- src/main/java/net/sf/katta/node/BaseRpcServer.java (revision 445) +++ src/main/java/net/sf/katta/node/BaseRpcServer.java Thu Apr 30 13:53:50 PDT 2009 @@ -55,7 +55,7 @@ serverPort++; // try again } else { - throw new RuntimeException("Tried " + tryCount + " ports and no one is free..."); + throw new RuntimeException("Tried " + tryCount + " ports and none is free..."); } } catch (final IOException e) { throw new RuntimeException("Unable to create rpc search server", e); Index: src/main/java/net/sf/katta/client/Client.java =================================================================== --- src/main/java/net/sf/katta/client/Client.java (revision 445) +++ src/main/java/net/sf/katta/client/Client.java Thu Apr 30 14:34:58 PDT 2009 @@ -15,29 +15,8 @@ */ package net.sf.katta.client; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - import net.sf.katta.index.IndexMetaData; -import net.sf.katta.node.DocumentFrequenceWritable; -import net.sf.katta.node.Hit; -import net.sf.katta.node.Hits; -import net.sf.katta.node.HitsMapWritable; -import net.sf.katta.node.IQuery; -import net.sf.katta.node.ISearch; -import net.sf.katta.node.QueryWritable; +import net.sf.katta.node.*; import net.sf.katta.util.CollectionUtil; import net.sf.katta.util.KattaException; import net.sf.katta.util.ZkConfiguration; @@ -45,7 +24,6 @@ import net.sf.katta.zk.IZkDataListener; import net.sf.katta.zk.ZKClient; import net.sf.katta.zk.ZkPathes; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.ipc.RPC; @@ -55,6 +33,11 @@ import org.apache.lucene.queryParser.QueryParser; import org.apache.lucene.search.Query; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.*; + /** * Default implementation of {@link IClient}. * @@ -216,7 +199,7 @@ public Hits search(final Query query, final String[] indexNames, final int count) throws KattaException { final Map> nodeShardsMap = getNode2ShardsMap(indexNames); final Hits result = new Hits(); - final DocumentFrequenceWritable docFreqs = getDocFrequencies(query, nodeShardsMap); + final DocumentFrequencyWritable docFreqs = getDocFrequencies(query, nodeShardsMap); List nodeInteractions = new ArrayList(); for (final String node : nodeShardsMap.keySet()) { @@ -303,9 +286,9 @@ return shards; } - private DocumentFrequenceWritable getDocFrequencies(final Query query, final Map> node2ShardsMap) + private DocumentFrequencyWritable getDocFrequencies(final Query query, final Map> node2ShardsMap) throws KattaException { - DocumentFrequenceWritable docFreqs = new DocumentFrequenceWritable(); + DocumentFrequencyWritable docFreqs = new DocumentFrequencyWritable(); List nodeInteractions = new ArrayList(); for (final String node : node2ShardsMap.keySet()) { nodeInteractions.add(new GetDocumentFrequencyInteraction(node, node2ShardsMap, query, docFreqs)); @@ -446,10 +429,10 @@ private class GetDocumentFrequencyInteraction extends NodeInteraction { private final Query _query; - private final DocumentFrequenceWritable _docFreqs; + private final DocumentFrequencyWritable _docFreqs; public GetDocumentFrequencyInteraction(String node, Map> node2ShardsMap, Query query, - DocumentFrequenceWritable docFreqs) { + DocumentFrequencyWritable docFreqs) { super(node, node2ShardsMap); _query = query; _docFreqs = docFreqs; @@ -457,7 +440,7 @@ @Override protected void doInteraction(ISearch search, String node, List shards) throws IOException { - final DocumentFrequenceWritable nodeDocFreqs = search.getDocFreqs(new QueryWritable(_query), shards + final DocumentFrequencyWritable nodeDocFreqs = search.getDocFreqs(new QueryWritable(_query), shards .toArray(new String[shards.size()])); _docFreqs.addNumDocs(nodeDocFreqs.getNumDocs()); _docFreqs.putAll(nodeDocFreqs.getAll()); @@ -513,11 +496,11 @@ private final Query _query; private final int _count; - private final DocumentFrequenceWritable _docFreqs; + private final DocumentFrequencyWritable _docFreqs; private final Hits _result; public SearchInteraction(String node, Map> node2ShardsMap, Query query, - DocumentFrequenceWritable docFreqs, Hits result, int count) { + DocumentFrequencyWritable docFreqs, Hits result, int count) { super(node, node2ShardsMap); _query = query; _docFreqs = docFreqs; Index: src/main/java/net/sf/katta/node/DocumentFrequencyWritable.java =================================================================== --- src/main/java/net/sf/katta/node/DocumentFrequencyWritable.java Thu Apr 30 13:53:50 PDT 2009 +++ src/main/java/net/sf/katta/node/DocumentFrequencyWritable.java Thu Apr 30 13:53:50 PDT 2009 @@ -0,0 +1,136 @@ +/** + * Copyright 2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.sf.katta.node; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.io.Writable; + +public class DocumentFrequencyWritable implements Writable { + private ReadWriteLock _frequenciesLock = new ReentrantReadWriteLock(true); + private Map _frequencies = new HashMap(); + + private AtomicInteger _numDocs = new AtomicInteger(); + + public void put(final String field, final String term, final int frequency) { + _frequenciesLock.writeLock().lock(); + try { + add(new TermWritable(field, term), frequency); + } finally { + _frequenciesLock.writeLock().unlock(); + } + } + + /** + * Assumes a write lock is already in place. + * @param key The item that has a frequency. + * @param frequency The frequency of the key. + */ + private void add(final TermWritable key, final int frequency) { + int result = frequency; + final Integer frequencyObject = _frequencies.get(key); + if (frequencyObject != null) { + result += frequencyObject; + } + _frequencies.put(key, result); + } + + public void putAll(final Map frequencyMap) { + _frequenciesLock.writeLock().lock(); + try { + final Set keySet = frequencyMap.keySet(); + for (final TermWritable key : keySet) { + add(key, frequencyMap.get(key)); + } + } finally { + _frequenciesLock.writeLock().unlock(); + } + } + + public Integer get(final String field, final String term) { + return get(new TermWritable(field, term)); + } + + public void addNumDocs(final int numDocs) { + _numDocs.addAndGet(numDocs); + } + + public Integer get(final TermWritable key) { + _frequenciesLock.readLock().lock(); + try { + return _frequencies.get(key); + } finally { + _frequenciesLock.readLock().unlock(); + } + } + + public Map getAll() { + return Collections.unmodifiableMap(_frequencies); + } + + public void readFields(final DataInput in) throws IOException { + _frequenciesLock.writeLock().lock(); + try { + final int size = in.readInt(); + for (int i = 0; i < size; i++) { + final TermWritable term = new TermWritable(); + term.readFields(in); + final int frequency = in.readInt(); + _frequencies.put(term, frequency); + } + _numDocs.set(in.readInt()); + } finally { + _frequenciesLock.writeLock().unlock(); + } + } + + public void write(final DataOutput out) throws IOException { + _frequenciesLock.readLock().lock(); + try { + out.writeInt(_frequencies.size()); + for (final TermWritable key : _frequencies.keySet()) { + key.write(out); + final Integer frequency = _frequencies.get(key); + out.writeInt(frequency); + } + out.writeInt(_numDocs.get()); + } finally { + _frequenciesLock.readLock().unlock(); + } + } + + public int getNumDocs() { + return _numDocs.get(); + } + + public void setNumDocs(final int numDocs) { + _numDocs.set(numDocs); + } + + @Override + public String toString() { + return "numDocs: " + getNumDocs() + getAll(); + } +} Index: src/main/java/net/sf/katta/node/Hits.java =================================================================== --- src/main/java/net/sf/katta/node/Hits.java (revision 445) +++ src/main/java/net/sf/katta/node/Hits.java Thu Apr 30 14:34:58 PDT 2009 @@ -30,9 +30,7 @@ public class Hits implements Writable { - /** - * - */ + @SuppressWarnings({"UnusedDeclaration"}) private static final long serialVersionUID = -732226190122340208L; private List> _hitsList = new Vector>(); Index: src/main/java/net/sf/katta/node/LuceneNode.java =================================================================== --- src/main/java/net/sf/katta/node/LuceneNode.java (revision 445) +++ src/main/java/net/sf/katta/node/LuceneNode.java Thu Apr 30 14:34:58 PDT 2009 @@ -15,21 +15,12 @@ */ package net.sf.katta.node; -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - import net.sf.katta.util.NodeConfiguration; import net.sf.katta.zk.ZKClient; - import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.Fieldable; @@ -37,6 +28,13 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + /** * Implementation of a node serving lucene shards. * @@ -51,24 +49,12 @@ super(zkClient, configuration); } - /* - * (non-Javadoc) - * - * @see - * net.sf.katta.node.IRequestHandler#handle(org.apache.hadoop.io.Writable) - */ - @Override - public Writable handle(Writable request) { - // TODO Auto-generated method stub - return null; - } - public int getResultCount(final QueryWritable query, final String[] shards) throws IOException { - final DocumentFrequenceWritable docFreqs = getDocFreqs(query, shards); + final DocumentFrequencyWritable docFreqs = getDocFreqs(query, shards); return search(query, docFreqs, shards, 1).getTotalHits(); } - public HitsMapWritable search(final QueryWritable query, final DocumentFrequenceWritable freqs, + public HitsMapWritable search(final QueryWritable query, final DocumentFrequencyWritable freqs, final String[] shards, final int count) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("You are searching with the query: '" + query.getQuery() + "'"); @@ -105,7 +91,7 @@ return result; } - public HitsMapWritable search(final QueryWritable query, final DocumentFrequenceWritable freqs, final String[] shards) + public HitsMapWritable search(final QueryWritable query, final DocumentFrequencyWritable freqs, final String[] shards) throws IOException { return search(query, freqs, shards, Integer.MAX_VALUE - 1); } @@ -146,19 +132,17 @@ return result; } - public DocumentFrequenceWritable getDocFreqs(final QueryWritable input, final String[] shards) throws IOException { + public DocumentFrequencyWritable getDocFreqs(final QueryWritable input, final String[] shards) throws IOException { Query luceneQuery = input.getQuery(); final Query rewrittenQuery = _searcher.rewrite(luceneQuery, shards); - final DocumentFrequenceWritable docFreqs = new DocumentFrequenceWritable(); + final DocumentFrequencyWritable docFreqs = new DocumentFrequencyWritable(); final HashSet termSet = new HashSet(); rewrittenQuery.extractTerms(termSet); int numDocs = 0; for (final String shard : shards) { - final java.util.Iterator termIterator = termSet.iterator(); - while (termIterator.hasNext()) { - final Term term = termIterator.next(); + for (Term term : termSet) { final int docFreq = _searcher.docFreq(shard, term); docFreqs.put(term.field(), term.text(), docFreq); } Index: src/test/java/net/sf/katta/node/NodeTest.java =================================================================== --- src/test/java/net/sf/katta/node/NodeTest.java (revision 445) +++ src/test/java/net/sf/katta/node/NodeTest.java Thu Apr 30 13:53:50 PDT 2009 @@ -147,7 +147,7 @@ QueryWritable writable = new QueryWritable(query); String[] shardArray = shardNames.toArray(new String[shardNames.size()]); - DocumentFrequenceWritable freqs = node.getDocFreqs(writable, shardArray); + DocumentFrequencyWritable freqs = node.getDocFreqs(writable, shardArray); ExecutorService es = Executors.newFixedThreadPool(100); List> tasks = new ArrayList>(); @@ -198,10 +198,10 @@ private LuceneNode _node; private QueryWritable _query; - private DocumentFrequenceWritable _freqs; + private DocumentFrequencyWritable _freqs; private String[] _shards; - public QueryClient(LuceneNode node, DocumentFrequenceWritable freqs, QueryWritable query, String[] shards) { + public QueryClient(LuceneNode node, DocumentFrequencyWritable freqs, QueryWritable query, String[] shards) { _node = node; _freqs = freqs; _query = query; @@ -237,7 +237,7 @@ // final AssignedShard shard1 = new AssignedShard("bla2", // "src/test/testIndexA/bIndex"); // searchServer.addShard(shard1); - // final DocumentFrequenceWritable docFreqs = + // final DocumentFrequencyWritable docFreqs = // searchServer.getDocFreqs(query, // new String[] { shard1.getName() }); // searchServer.setSimilarityDocFreqs(docFreqs); @@ -273,7 +273,7 @@ // AssignedShard shard = new AssignedShard("bla2", // "src/test/testIndexA/bIndex"); // searchServer.addShard(shard); - // DocumentFrequenceWritable docFreqs = searchServer.getDocFreqs(query, new + // DocumentFrequencyWritable docFreqs = searchServer.getDocFreqs(query, new // String[] { shard.getName() }); // searchServer.setSimilarityDocFreqs(docFreqs); // HitsMapWritable searchHits = searchServer.search(new Query("foo: bar"), @@ -333,7 +333,7 @@ // final AssignedShard shard = new AssignedShard("bla2", // "src/test/testIndexA/bIndex"); // searchServer.addShard(shard); - // DocumentFrequenceWritable docFreqs = searchServer.getDocFreqs(query, new + // DocumentFrequencyWritable docFreqs = searchServer.getDocFreqs(query, new // String[] { shard.getName() }); // searchServer.setSimilarityDocFreqs(docFreqs); // HitsMapWritable searchHits = searchServer.search(query, new String[] { @@ -412,10 +412,10 @@ // // final Query query = new Query("foo: bar"); // - // final DocumentFrequenceWritable docFreqs = + // final DocumentFrequencyWritable docFreqs = // searchServer1.getDocFreqs(query, // new String[] { shard.getName() }); - // final DocumentFrequenceWritable docFreqs2 = + // final DocumentFrequencyWritable docFreqs2 = // searchServer2.getDocFreqs(query, new String[] { shard2.getName() }); // docFreqs.putAll(docFreqs2.getAll()); // docFreqs.addNumDocs(docFreqs2.getNumDocs()); @@ -469,7 +469,7 @@ // searchServer.addShard(shard); // // final Query query = new Query("content: the"); - // final DocumentFrequenceWritable docFreqs = + // final DocumentFrequencyWritable docFreqs = // searchServer.getDocFreqs(query, // new String[] { shard.getName() }); // searchServer.setSimilarityDocFreqs(docFreqs); @@ -509,7 +509,7 @@ // searchServer.addShard(shard); // // final Query query = new Query("content: the"); - // final DocumentFrequenceWritable docFreqs = + // final DocumentFrequencyWritable docFreqs = // searchServer.getDocFreqs(query, // new String[] { shard.getName() }); // searchServer.setSimilarityDocFreqs(docFreqs); @@ -592,7 +592,7 @@ // searchServer.addShard(shard); // // final Query query = new Query("content: the"); - // final DocumentFrequenceWritable docFreqs = + // final DocumentFrequencyWritable docFreqs = // searchServer.getDocFreqs(query, // new String[] { shard.getName() }); // searchServer.setSimilarityDocFreqs(docFreqs); Index: src/main/java/net/sf/katta/node/KattaMultiSearcher.java =================================================================== --- src/main/java/net/sf/katta/node/KattaMultiSearcher.java (revision 445) +++ src/main/java/net/sf/katta/node/KattaMultiSearcher.java Thu Apr 30 14:34:58 PDT 2009 @@ -15,45 +15,22 @@ */ package net.sf.katta.node; -import java.io.IOException; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - import org.apache.log4j.Logger; import org.apache.lucene.document.Document; import org.apache.lucene.document.FieldSelector; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.Term; -import org.apache.lucene.search.DefaultSimilarity; -import org.apache.lucene.search.Explanation; -import org.apache.lucene.search.Filter; -import org.apache.lucene.search.HitCollector; -import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.*; import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.Searchable; -import org.apache.lucene.search.Searcher; -import org.apache.lucene.search.Similarity; -import org.apache.lucene.search.Sort; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.search.TopFieldDocs; -import org.apache.lucene.search.Weight; import org.apache.lucene.util.PriorityQueue; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; + /** - * Implements search over a set of Searchables. + * Implements search over a set of named Searchables. * - *

- * Applications usually need only call the inherited {@link #search(Query)} or - * {@link #search(Query,Filter)} methods. */ public class KattaMultiSearcher { @@ -63,6 +40,8 @@ private ExecutorService _threadPool = Executors.newFixedThreadPool(100); private final String _node; + + //TODO is this needed? It is never used. private int _maxDoc = 0; public KattaMultiSearcher(final String node) { @@ -113,19 +92,18 @@ * @param max * @throws IOException */ - public final void search(final Query query, final DocumentFrequenceWritable freqs, final String[] shards, + public final void search(final Query query, final DocumentFrequencyWritable freqs, final String[] shards, final HitsMapWritable result, final int max) throws IOException { final Query rewrittenQuery = rewrite(query, shards); final int numDocs = freqs.getNumDocs(); - final CachedDfSource cacheSim = new CachedDfSource(freqs.getAll(), numDocs, new DefaultSimilarity()); - final Weight weight = rewrittenQuery.weight(cacheSim); - // we can maximal found all docs in this system or maximal the requested + final Weight weight = rewrittenQuery.weight(new CachedDfSource(freqs.getAll(), numDocs, new DefaultSimilarity())); + // limit the request to the number requested or the total number of documents, whichever is smaller final int limit = Math.min(numDocs, max); final KattaHitQueue hq = new KattaHitQueue(limit); int totalHits = 0; final int shardsCount = shards.length; - // run the search parallel on the shards with a thread pool + // run the search in parallel on the shards with a thread pool List> tasks = new ArrayList>(); for (int i = 0; i < shardsCount; i++) { SearchCall call = new SearchCall(shards[i], weight, limit); @@ -141,7 +119,7 @@ totalHits += searchResult._totalHits; scoreDocs[i] = searchResult._scoreDocs; } catch (InterruptedException e) { - throw new IOException("Multithread shard search interrupred:", e); + throw new IOException("Multithread shard search interrupted:", e); } catch (ExecutionException e) { throw new IOException("Multithread shard search could not be executed:", e); } @@ -176,13 +154,12 @@ pos++; if (scoreDoc == null) { - // we do not have any data more + // we do not have any more data break; } } - for (int i = hq.size() - 1; i >= 0; i--) { - final Hit hit = (Hit) hq.pop(); + for (Hit hit : hq) { if (hit != null) { result.addHitToShard(hit.getShard(), hit); } @@ -205,7 +182,7 @@ } /** - * Returns the lucene document of a given shard. + * Returns a specified lucene document from a given shard. * * @param shardName * @param docId @@ -213,7 +190,7 @@ * @throws CorruptIndexException * @throws IOException */ - public Document doc(final String shardName, final int docId) throws CorruptIndexException, IOException { + public Document doc(final String shardName, final int docId) throws IOException { final Searchable searchable = _searchers.get(shardName); if (searchable != null) { return searchable.doc(docId); @@ -242,7 +219,7 @@ } /** - * Returns the document frequence for a given term within a given shard. + * Returns the document frequency for a given term within a given shard. * * @param shardName * @param term @@ -266,6 +243,10 @@ } } + /** + * Implements a single thread of a search. Each shard has a separate SearchCall and they + * are run more or less in parallel. + */ private class SearchCall implements Callable { private final String _shardName; @@ -282,14 +263,12 @@ public SearchResult call() throws Exception { final IndexSearcher indexSearcher = _searchers.get(_shardName); final TopDocs docs = indexSearcher.search(_weight, null, _limit); - // totalHits += docs.totalHits; // update totalHits return new SearchResult(docs.totalHits, docs.scoreDocs); } } - private class SearchResult { - + private static class SearchResult { private final int _totalHits; private final ScoreDoc[] _scoreDocs; @@ -300,19 +279,19 @@ } - // cached document frequence source from apache lucene + // cached document frequency source from apache lucene // MultiSearcher. /** - * Document Frequency cache acting as a Dummy-Searcher. This class is no - * full-fledged Searcher, but only supports the methods necessary to + * Document Frequency cache acting as a Dummy-Searcher. This class is not a + * fully-fledged Searcher, but only supports the methods necessary to * initialize Weights. */ private static class CachedDfSource extends Searcher { - private final Map dfMap; // Map from Terms to corresponding doc freqs + private final Map dfMap; // Map from Terms to corresponding doc freqs private final int maxDoc; // document count - public CachedDfSource(final Map dfMap, final int maxDoc, final Similarity similarity) { + public CachedDfSource(final Map dfMap, final int maxDoc, final Similarity similarity) { this.dfMap = dfMap; this.maxDoc = maxDoc; setSimilarity(similarity); @@ -322,7 +301,7 @@ public int docFreq(final Term term) { int df; try { - df = ((Integer) dfMap.get(new TermWritable(term.field(), term.text()))).intValue(); + df = dfMap.get(new TermWritable(term.field(), term.text())); } catch (final NullPointerException e) { throw new IllegalArgumentException("df for term " + term.text() + " not available"); } @@ -387,7 +366,7 @@ } } - protected class KattaHitQueue extends PriorityQueue { + protected class KattaHitQueue extends PriorityQueue implements Iterable { KattaHitQueue(final int size) { initialize(size); } @@ -403,6 +382,22 @@ } return hitA.getScore() < hitB.getScore(); } + + public Iterator iterator() { + return new Iterator() { + public boolean hasNext() { + return KattaHitQueue.this.size() > 0; - } + } + public Hit next() { + return (Hit) KattaHitQueue.this.pop(); -} + } + + public void remove() { + throw new UnsupportedOperationException("Can't remove using this iterator"); + } + }; + } + } + +} Index: src/main/java/net/sf/katta/node/ISearch.java =================================================================== --- src/main/java/net/sf/katta/node/ISearch.java (revision 445) +++ src/main/java/net/sf/katta/node/ISearch.java Thu Apr 30 14:34:58 PDT 2009 @@ -25,66 +25,62 @@ /** * Returns all Hits that match the query. This might be significant slower as - * {@link #search(IQuery, DocumentFrequenceWritable, String[], int)} since we - * replace count with {@link Integer.MAX_VALUE}. + * {@link #search(QueryWritable, DocumentFrequencyWritable , String[], int)} since we + * replace count with Integer.MAX_VALUE. * - * @param query - * @param freqs - * @param shardNames - * A array of shard names to search in. - * @return - * @throws ParseException - * @throws IOException + * @param query The query to run. + * @param freqs Term frequency information for term weighting. + * @param shardNames A array of shard names to search in. + * @return A list of hits from the search. + * @throws IOException If the search had a problem reading files. */ - public HitsMapWritable search(QueryWritable query, DocumentFrequenceWritable freqs, String[] shardNames) throws IOException; + public HitsMapWritable search(QueryWritable query, DocumentFrequencyWritable freqs, String[] shardNames) throws IOException; /** - * @param query - * @param freqs - * @param shardNames - * @param count - * the top n high score hits - * @return - * @throws ParseException - * @throws IOException + * @param query The query to run. + * @param freqs Term frequency information for term weighting. + * @param shardNames A array of shard names to search in. + * @param count The top n high score hits. + * @return A list of hits from the search. + * @throws ParseException If the query is ill-formed. + * @throws IOException If the search had a problem reading files. */ - public HitsMapWritable search(QueryWritable query, DocumentFrequenceWritable freqs, String[] shardNames, int count) + public HitsMapWritable search(QueryWritable query, DocumentFrequencyWritable freqs, String[] shardNames, int count) throws IOException; /** * Returns the number of documents a term occurs in. In a distributed search * environment, we need to get this first and then query all nodes again with - * this information to ensure we compute TF IDF correctly. See {@link http - * ://lucene - * .apache.org/java/2_3_0/api/org/apache/lucene/search/Similarity.html} + * this information to ensure we compute TF IDF correctly. See + * {@link http://lucene.apache.org/java/2_3_0/api/org/apache/lucene/search/Similarity.html} * - * @param input - * @param shards - * @return - * @throws IOException - * @throws ParseException + * @param input TODO is this really just a Lucene query? + * @param shards The shards to search in. + * @return A list of hits from the search. + * @throws IOException If the search had a problem reading files. */ - public DocumentFrequenceWritable getDocFreqs(QueryWritable input, String[] shards) throws IOException; + public DocumentFrequencyWritable getDocFreqs(QueryWritable input, String[] shards) throws IOException; /** - * Returns only the request fields of a lucene document. + * Returns only the requested fields of a lucene document. The fields are returned + * as a map. * - * @param shard - * @param docId - * @param fields - * @return + * @param shard The shard to ask for the document. + * @param docId The document that is desired. + * @param fields The fields to return. + * @return TODO what does this return? A map? * @throws IOException */ public MapWritable getDetails(String shard, int docId, String[] fields) throws IOException; /** * Returns the lucene document. Each field:value tuple of the lucene document - * is pushed ito the map. In most cases + * is inserted into the returned map. In most cases * {@link #getDetails(String, int, String[])} would be a better choice for * performance reasons. * - * @param shard - * @param docId + * @param shard The shard to ask for the document. + * @param docId The document that is desired. * @return * @throws IOException */ Index: src/test/java/net/sf/katta/node/DocumentFrequenceWritableTest.java =================================================================== --- src/test/java/net/sf/katta/node/DocumentFrequenceWritableTest.java (revision 445) +++ src/test/java/net/sf/katta/node/DocumentFrequenceWritableTest.java Thu Apr 30 14:34:58 PDT 2009 @@ -15,15 +15,15 @@ */ package net.sf.katta.node; +import junit.framework.TestCase; + import java.util.ArrayList; import java.util.List; -import junit.framework.TestCase; - public class DocumentFrequenceWritableTest extends TestCase { public void testAddNumDocsMultiThreading() throws InterruptedException { - final DocumentFrequenceWritable writable = new DocumentFrequenceWritable(); + final DocumentFrequencyWritable writable = new DocumentFrequencyWritable(); runThreads(10, writable, new Runnable() { @Override @@ -38,7 +38,7 @@ } public void testAddFrequencies() throws InterruptedException { - final DocumentFrequenceWritable writable = new DocumentFrequenceWritable(); + final DocumentFrequencyWritable writable = new DocumentFrequencyWritable(); runThreads(10, writable, new Runnable() { @Override public void run() { @@ -51,7 +51,7 @@ assertEquals(10 * 10000, writable.get("field", "term").intValue()); } - private void runThreads(int numberOfThreads, final DocumentFrequenceWritable writable, Runnable runnable) throws InterruptedException { + private void runThreads(int numberOfThreads, final DocumentFrequencyWritable writable, Runnable runnable) throws InterruptedException { List threads = new ArrayList(); for (int i = 0; i < numberOfThreads; i++) { threads.add(new Thread(runnable)); Index: src/main/java/net/sf/katta/node/Hit.java =================================================================== --- src/main/java/net/sf/katta/node/Hit.java (revision 445) +++ src/main/java/net/sf/katta/node/Hit.java Thu Apr 30 14:34:58 PDT 2009 @@ -28,9 +28,7 @@ */ public class Hit implements Writable, Comparable { - /** - * - */ + @SuppressWarnings({"UnusedDeclaration"}) private static final long serialVersionUID = -4098882107088103222L; private Text _shard; @@ -75,6 +73,14 @@ return _score; } + public int getDocId() { + return _docId; + } + + public void setDocId(final int docId) { + _docId = docId; + } + public void readFields(final DataInput in) throws IOException { _score = in.readFloat(); final boolean hasNode = in.readBoolean(); @@ -116,7 +122,7 @@ int result = 1; int temp; temp = Float.floatToIntBits(_score); - result = prime * result + (temp ^ (temp >>> 32)); + result = prime * result + temp; result = prime * result + ((_node == null) ? 0 : _node.hashCode()); result = prime * result + ((_shard == null) ? 0 : _shard.hashCode()); result = prime * result + _docId; @@ -149,14 +155,6 @@ return true; } - public int getDocId() { - return _docId; - } - - public void setDocId(final int docId) { - _docId = docId; - } - @Override public String toString() { return getNode() + " " + getShard() + " " + getDocId(); Index: src/main/java/net/sf/katta/node/QueryWritable.java =================================================================== --- src/main/java/net/sf/katta/node/QueryWritable.java (revision 445) +++ src/main/java/net/sf/katta/node/QueryWritable.java Thu Apr 30 14:15:46 PDT 2009 @@ -69,6 +69,9 @@ @Override public boolean equals(Object obj) { + if (getClass() != obj.getClass()) { + return false; + } Query other = ((QueryWritable) obj).getQuery(); return _query.equals(other); }