背景
Geode允许用户使用OQL查询语言对regions中的数据进行查询。目前,查询引擎仅支持COUNT聚集函数,而其他的常用聚集函数,如MIN,MAX,AVG和SUM等都不支持。本文档描述Geode用来支持其他聚集函数的设计思路,采用该设计框架,还可同时支持UDA(用户自定义聚集)。
需求
加强OQL引擎以便支持:
- 聚集函数(AVG,MAX,MIN,COUNT,SUM)
- 使用GROUP BY子句
- 统一框架支持聚集函数和UDA
建议的设计
拟提供一套统一接口同时支持标准的聚集函数和用户自定义聚集函数。
OQL引擎提供通用接口"com.gemstone.gemfire.cache.query.Aggregator" ,该接口用来定义所有聚集函数和UDA所需要的方法。
本文后续的UDA章节中将详细介绍java接口”Aggregator”的细节。
OQL语法变化:
- 支持非唯一性的order by
- 支持Group By子句的语法
- 支持Group BY Path表达式的语法
- 能够在投影属性中发现聚集函数
查询的正确性标准:
如果查询包含Group By子句,则投影属性应尽包含聚集函数和其他出现在Group By子句中的列。
比如:
select col1, avg( col2) from /portfolio group by col1 // Valid query
select col1, col3, avg( col2) from /portfolio group by col1 // invalid query (col3 missing from group by clause)
如果查询不包含Group By子句,则它的投影操作可以包含也可以不包含聚集函数,或者也可以仅包含聚集函数。
比如:
select avg( col2) from /portfolio // Valid
select col1, avg ( col2) from /portfolio (col1 should not be present in projection attributes) // invalid
查询的类型和实现:
仅包含Group By子句,select列表中仅包含列名(无聚集函数)
Select pf.ID , pf.status as status from /portfolio pf where pf.ID > 100 group by pf.ID , status
以上查询可以转换为distinct-order by查询
转换后的查询语句 => select distinct pf.ID , pf.status as status from /portfolio pf where pf.ID > 100 order by pf.ID, status
Replicated Region query: 对于replicated executing,转换后的查询应该可以产出正确的结果集。
Partitioned Region: 可直接支持 (假定 PR支持distinct order by)
仅包含Group By子句,select列表包含至少一个聚集函数,也可以有其他的列
select pf.status, AVG(pf.ID) from /portfolio pf where pf.ID > 0 group by pf.status
Replicated Region query:
获取投影后的数据行(去除聚集函数)
对投影后的数据行进行排序
对于排序后的每一行,输入给聚集函数,并进行计算
当所有行都扫描完毕,准备结果集
Partitioned Region query:
对于每个 Bucket:
获取投影后的数据行(去除聚集函数)
对投影后的数据行进行排序
对于排序后的每一行,输入给聚集函数,并进行计算
当所有行都扫描完毕,准备结果集
对于所有的buckets present locally
每个bucket的计算结果需要在本地进行合并。
对于query 节点
每个节点计算的结果需要被合并,和所有buckets结果合并的方法相同。
最终结果被计算出来
对现有执行引擎的修改和需要考虑的重要因素
当前的OQL引擎需要支持非唯一的 order by子句,order by 比较器-映射到运行时迭代器的属性, 需要映射到投影属性, 因此order by 语句, 能够应用到查询节点, 而不需要再启动另外的 order by查询。从单个bucket节点返回的结果集需要执行多路归并,因为需要保证有序性。
对于PR,对于某些特定的聚集函数,在bucket 节点上的实现不能直接在本地计算,而需要延迟到查询节点上进行聚集计算。
比如.: select col1, avg( col2) from /portfolios group by col1.
为求得查询最终的平均值,不能在每个bucket上分别计算平均值。在各个bucket节点上的avg()聚集函数应把其汇聚计算的sum值和元素的个数值发送给query节点。在query节点上最终进行求平均的计算。
对于那些需要去重的查询,bucket节点上也无法自行进行汇聚计算,需要和query节点的实现进行配合。
比如: select col1, sum( distinct col2) from /portfolios group by col1.
这里,单个bucket上的sum值无法传送到query节点。因为求和的时候需要先去重,而单个bucket无法决定是否其他bucket有等值的数据。在这种情况下,sum计算必须集中到query节点进行,单个bucket上不能简单地进行汇总操作,而应该把所有值发送给query节点。
内置聚集函数:
MAX: 用法 : MAX(Expression)
举例 : MAX(pf.ID) : 其中 expression 必须返回java.lang.Comparable 类型
MIN: 用法 : MIN(Expression)
举例 : MIN(pf.ID) : 其中 expression 必须返回 java.lang.Comparable 类型
AVG: 用法 : AVG(Expression)
举例 : AVG(pf.ID) : 其中 expression 必须返回 java.lang.Number 类型.
AVG: 用法 : AVG ( distinct expression)
SUM: 用法 : SUM(Expression)
举例 : SUM(pf.ID) : 其中 expression 返回结果为 java.lang.Number 类型.
SUM: 用法 : SUM( distinct expression)
COUNT(*): 已经支持
COUNT ( expression | column)
COUNT ( distinct expression | column)
用户自定义聚集函数(UDA):
OQL引擎将提供以下接口,用来实现用户自定义聚集函数
com.gemstone.gemfire.cache.query.Aggregator
该Aggregator接口有以下这些方法:
public interface Aggregator {
/**
* Accumulate the next scalar value
* @param value
*/
public void accumulate(Object value);
/**
* Initialize the Aggregator
*/
public void init();
/**
* @return Return the result scalar value
*/
public Object terminate();
/**
* Merges the incoming aggregator from bucket nodes with the resultant aggregator
* on the query node
* @param otherAggregator
*/
public void merge(Aggregator otherAggregator);
}
在应用代码中,请使用无参数的构造函数。
使用内置聚集函数的示例
select pf.ID , SUM( pos.mktValue ) from /portfolios pf , pf.positions pos group by pf.ID
上述查询将计算出出market value的总和值
注意: 我们强制性地将group by 列作为投影属性的一部分 ( 这将要看是否我们能够在合理的时间内实现代码来释放这种约束. 例如 group by 列不是 projection 列的一部分)
UDA使用的语法建议:
创建UDA
使用 API:
QueryService qs = CacheUtils.getQueryService();
qs.createUDA(“udaAlias”, "com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$SumUDA");
使用 XML 创建:
<uda-manager>
<uda name="uda2" class="com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$UDACLass2"/>
<uda name="uda3" class="com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$UDACLass3"/>
</uda-manager>
cache-9.0 xsd
<xsd:element maxOccurs="1" minOccurs="0" name="uda-manager">
<xsd:complexType>
<xsd:sequence>
<xsd:element maxOccurs="unbounded" minOccurs="0" name="uda">
<xsd:complexType>
<xsd:attribute name="name" type="xsd:string" use="required" />
<xsd:attribute name="class" type="xsd:string" use="required" />
</xsd:complexType>
</xsd:element>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
在查询语句中使用 UDA:
String queryStr = "select p.status , myUDA(p.ID) from /portfolio p where p.ID > 0 group by p.status order by p.status";
这需要我们维护一个聚集函数定义的注册信息,并且将保证注册信息可以在新加节点中获得。
UDA Schema 共享:
UDA schema/definitions 在集群节点之间是共享的, 通过 profile 共享机制.
由 liuming 翻译,由 theseusyang 校对