Apache Solr Documentation

6.5 Ref Guide (PDF Download)
Solr Tutorial
Solr Community Wiki

Older Versions of this Guide (PDF)

Ref Guide Topics

Meta-Documentation

*** As of June 2017, the latest Solr Ref Guide is located at https://lucene.apache.org/solr/guide ***

Please note comments on these pages have now been disabled for all users.

Skip to end of metadata
Go to start of metadata
This Page Has Been Removed from the Guide

This page has been replaced with a newer version for Solr 6, found at Streaming Expressions. The content on this page is valid for Solr 5 ONLY.

 

Streaming Expressions provide a simple query language for SolrCloud that merges search with parallel computing. Under the covers Streaming Expressions are backed by a java Streaming API that provides a fast map/reduce implementation for SolrCloud.

Streaming Expressions provide a higher level query language so you don't have to be a java programmer to access the parallel computing capabilities in the Streaming API.

Both Streaming Expressions and the Streaming API are new and experimental features with API's subject to change.

The Language Basics

  • Streaming Expressions are composed of functions.
  • All functions behave like Streams, which means that they don't hold all the data in memory at once. 
  • Functions are designed to work with entire result sets rather then the top N results like normal search. The /export handler was developed to support this.
  • Functions emit a stream of Tuples (key/value Maps)
  • Functions can be compounded, or wrapped, to form complex operations
  • Functions can be parallelized across a worker collection.
  • Under the covers SolrCloud supports map/reduce "Shuffling" to partition result sets across worker nodes. 
  • Under the covers all functions map and are compiled to Streaming API java objects.

Functions

Searches a SolrCloud collection and emits a stream of Tuples that match the query.

Params

  • Collection: (Mandatory) the collection being search
  • zkHost : (Optional) if collection being searched is found in a different zkHost then the local stream handler.
  • qt : (Optional) specifies the query type. Set this to "/export" to work with large result sets. Defaults to "/select".
  • rows: (Mandatory with /select handler) The rows parameter specifies how many rows to return. This param is only needed with the /select handler as the /export handler always returns all rows.
  • q : (Mandatory) The query
  • fl : (Mandatory) The field list
  • sort : (Mandatory) The sort criteria
  • partitionKeys: A list of fields (comma separated), to partition search results across worker nodes.  This parameter is only required with the parallel() expression (see below)

Syntax

search(collection1, zkHost="localhost:9983", qt="/export", q="*:*", fl="id,a_s,a_i,a_f", sort="a_f asc, a_i asc") 

merge

Merges two Streaming Expressions and maintains the ordering of the underlying streams. Note that the merge function maintains the ordering of the underlying streams, so the sorts of the underlying streams must line up with the sort parameter provided to the merge function.

Params

  • StreamExpression A
  • StreamExpression B
  • on : Sort criteria for performing the merge

Syntax

merge( search(collection1,  q="id:(0 3 4)",  fl="id,a_s,a_i,a_f",  sort="a_f asc"), search(collection1,  q="id:(1)",  fl="id,a_s,a_i,a_f",  sort="a_f asc"), on="a_f asc") 

group

Wraps a Streaming Expression and groups tuples by common fields. The group function emits one group head Tuple per group. The group head Tuple contains a list of all the Tuples within the group. The group by parameter must match-up with the sort order of the underlying stream. The group function implements a non-co-located grouping algorithm. This means that records from the same group do not need to be co-located on the same shard. When executed in parallel the partitionKeys parameter must be the same as the group by field so that records from the same group will be shuffled to the same worker.

Params

  • StreamExpression
  • by : grouping criteria

Syntax

 group( search(collection1,  q="*:*", qt="/export",  fl="id,a_s,a_i,a_f",  sort="a_s asc, a_f asc"), by="a_s asc")

Unique

Wraps a Streaming Expression and emits a unique stream of Tuples based on the  over  parameter. The unique function relies on the sort order of the underlying stream. The over parameter must match up with the sort order of the underlying stream. The unique function implements a non-co-located unique algorithm. This means that records with the same unique over field do not need to be co-located on the same shard. When executed in the parallel, the partitionKeys parameter must be the same as the unique over field so that records with the same keys will be shuffled to the same worker.

Params

  • StreamExpression
  • over: unique criteria

syntax

 unique( search(collection1, q="*:*", qt="/export", fl="id,a_s,a_i,a_f", sort="a_f asc, a_i asc"), over="a_f asc")

top

Wraps a Streaming Expression and re-orders the Tuples. The top function emits only the top N tuples in the new sort order. The top function re-orders the underlying stream so the sort criteria does not have to match up with the underlying stream.

Params

  • n : Number of top tuples to return
  • StreamExpression
  • sort: Sort criteria for selecting the top N tuples.

Syntax

The expression below finds the top 3 results of the underlying search. Notice that it reverses the sort order. The top function re-orders the results of the underlying stream.

 top(n=3, search(collection1, q="*:*", qt="/export", fl="id,a_s,a_i,a_f", sort="a_f desc, a_i desc"), sort="a_f asc, a_i asc")

Parallel

Wraps a Streaming Expression and sends it to N worker nodes to be processed in parallel. The parallel function requires that the partitionKeys parameter be provided to the underlying searches. The partitionKeys parameter will partition the search results (Tuples) across the worker nodes. Tuples with the same values in the partitionKeys field will be shuffled to the same worker nodes. The parallel function maintains the sort order of the Tupes returned by the worker nodes. So the sort criteria of the parallel function must match up with the sort order of the Tuples returned by the workers.

Params

  • Collection: Name of the worker collection to send the StreamExpression to.
  • StreamExpression: Expression to send to the worker collection.
  • workers: Number of workers in the worker collection to send the expression to.
  • zkHost: zkHost where the worker collection resides.
  • sort: sort criteria for ordering Tuples returned by the worker nodes.

Syntax

parallel(collection1, group( search(collection1, q="*:*", qt="/export", fl="id,a_s,a_i,a_f", sort="a_s desc,a_f asc", partitionKeys="a_s"), by="a_s desc"), workers="2", zkHost="localhost:9983", sort="a_s desc")

HTTP Interface: Stream Handler

Solr has a new /stream handler that takes Streaming Expression requests and returns the Tuples as a JSON stream. The stream http parameter is used to specify the Streaming Expression. For example, this curl command (encodes and) POSTS to the /stream handler a simple "search()" expression:

curl --data-urlencode 'stream=search(sample, q="*:*", fl="id,field_i", sort="field_i asc")' http://localhost:8901/solr/sample/stream

For the above example the /stream handler responded with the following JSON response (without the formatting):

{ "responseHeader": { "status": 0, "QTime": 1 }, "tuples": { "numFound": -1, "start": -1, "docs": [ { "id": "doc1", "field_i": 1 }, { "id": "doc2", "field_i": 2 }, { "EOF": true } ] } }

 

This response needs to be handled in a different manner then a normal search response. The first thing to notice is that numFound and start are both set to -1. In the first release these values are unsupported and in future releases may be removed. The reason is that numFound is often impossible to know because the streams are being transformed after they leave Solr. So only after the final EOF Tuple is read can you be sure the stream is finished.  The start header from Solr is also not supported as in the future paging will likely be implement above Solr in a Streaming Expression. 

Also notice the final doc which only contains "EOF": true. This is the EOF Tuple which marks the end of the stream. In your code you'll need to use a streaming JSON implementation because Streaming Expressions return the entire result set which may be millions of results. In your JSON client you'll need to iterate each doc (Tuple) and check for the EOF Tuple to determine the end of stream.

In the future the EOF Tuple will also hold aggregations that are gathered by Streaming Expressions.

The org.apache.solr.client.solrj.io package provides Java classes that compile Streaming Expressions into live Streaming API objects. These classes can be used to execute Streaming Expressions from inside a Java application. For example:

 

StreamFactory streamFactory = new StreamFactory().withCollectionZkHost("collection1", zkServer.getZkAddress()) .withStreamFunction("search", CloudSolrStream.class) .withStreamFunction("unique", UniqueStream.class) .withStreamFunction("top", RankStream.class) .withStreamFunction("group", ReducerStream.class) .withStreamFunction("parallel", ParallelStream.class);   ParallelStream pstream = (ParallelStream)streamFactory.constructStream("parallel(collection1, group(search(collection1, q=\"*:*\", fl=\"id,a_s,a_i,a_f\", sort=\"a_s asc,a_f asc\", partitionKeys=\"a_s\"), by=\"a_s asc\"), workers=\"2\", zkHost=\""+zkHost+"\", sort=\"a_s asc\")");

Request Handlers

The search expression allows you to specify a request hander using the qt parameter. By default the /select handler is used. This /select handler can be used for simple rapid prototyping of expressions. For production you will most likely want to use the /export handler which is designed to sort and export entire result sets. The /export handler is not used by default because it has much stricter requirements then the /select handler so it's not as easy to get started working with. To read more about the export handlers requirements review the documentation.

Worker Collections

The parallel expression sends a Streaming Expression to a worker collection to be executed in parallel. A worker collection can be any SolrCloud collection that has the /stream handler configured. Unlike normal SolrCloud collections, worker collections don't have to hold any data. Worker collections can be empty collections that exists only to execute Streaming Expressions.

Data Requirements

The initial release of Streaming Expressions and the Streaming API require that all sort fields contain non-null data. There are also a number of data requirements for the /export handler concerning sort and fl fields, please see the /export handler documentation for details.

 {scrollbar} 

 

 

 

  • No labels

19 Comments

  1. Just a few more sections to fill in.

  2. Ok, very close to finished. Just need to add the curl example and do a final proofing.

  3. Ok, finished I think. Just need to decide where to put the docs.

  4. Joel:

    I seem to have to specify qt="export", qt="/export" errors. Is this expected?

    1. The SQLHandler uses /export:

      Map<String, String> params = new HashMap();
      params.put(CommonParams.FL, fl);
      params.put(CommonParams.Q, sqlVisitor.query);
      //Always use the /export handler for Group By Queries because it requires exporting full result sets.
      params.put(CommonParams.QT, "/export");

       

      You want to post the StackTrace to a jira?

       

      1. This expression passed for me in a test case:

        expression = StreamExpressionParser.parse("parallel(collection1,"
         + "rollup("
         + "search(collection1, q=*:*, fl=\"a_s,a_i,a_f\", sort=\"a_s asc\", partitionKeys=\"a_s\", qt=\"/export\"),"
         + "over=\"a_s\","
         + "sum(a_i),"
         + "sum(a_f),"
         + "min(a_i),"
         + "min(a_f),"
         + "max(a_i),"
         + "max(a_f),"
         + "avg(a_i),"
         + "avg(a_f),"
         + "count(*)"
         + "),"
         + "workers=\"2\", zkHost=\""+zkServer.getZkAddress()+"\", sort=\"a_s asc\")"
         );
        1. Just read through CloudSolrStream and there is no special handling of the "qt" param. This means that it's passed directly to Solr. 

          So, I suspect your configuration for the export handler is named "export" rather /export.

  5. The recent change for SOLR-8443 has changed the name of the expression from "stream=" to "expr=".  The curl command under the "HTTP Interface: Stream Handler" section should probably be updated to reflect this.

    i.e.

     

    curl --data-urlencode 'expr=search(sample,
                                         q="*:*",
                                         fl="id,field_i",
                                         sort="field_i asc")' http://localhost:8901/solr/sample/stream

     

    1. This is only in trunk. The http param is still 'stream' for Solr 5.

  6. Should the section above describing the parameters for the search() function contain an entry for 'partitionKeys'?  Maybe something like:

    • partitionKeys a field to use when partitioning search results across search-workers.  Typically only used with the parallel() expression (see below).

     

    Relatedly, can I request permission to edit this wiki page?  Or is that a right reserved for committers or more experienced contributors?

     

     

    1. Yes, we should document the behavior of the partition keys under the search section. Currently the cwiki is maintained only by committers.

  7. Joel Bernstein or Dennis Gove - what is the difference between this page and the page with nearly the same name that's already published with the Ref Guide (Streaming Expressions (Solr 5))? If this page is for Solr 6, I think there should be a note at the top explaining the differences between the pages.

    1. Cassandra Targett, to which page are you referring? The one you've linked to (https://cwiki.apache.org/confluence/display/solr/Streaming+Expressions) appears to be this same page.

      1. Ha, yeah, you're proving my point (wink). I meant the other page with pretty much the exact same name Streaming Expressions. <- That page needs a note about what the differences are.

        1. Yep, we've got some work to do. I think I counted 20 expressions for Solr 6. I planned on working some on this this week. When we're done the Solr 6 page will look very different.

          1. OK, I will add a note to that page to that effect. I assume the new stuff will be interspersed with the stuff that exists today - if not, one option is to only document what's new for Solr 6 on the other page and the new sections can be copied in to the published doc in the 6.0 version. If it's all intertwined inline, that wouldn't make sense, but if it's separate sections, might be easier to see what's different.

            1. I think the page will be changing quite a bit. Makes sense to have a separate page to work with.

        2. What I mean is that the one you've linked to (with link https://cwiki.apache.org/confluence/display/solr/Streaming+Expressions) is the same as the link for this page https://cwiki.apache.org/confluence/display/solr/Streaming+Expressions, unless I'm missing something.

          1. Yeah, because they have almost exactly the same name and nearly exactly the same content, I lost where I was and I made the comment on the wrong page.