Purpose
Griffin DSL is designed for DQ measurement, as a SQL-like language, trying to describe the DQ domain request.
Including kinds of measurement types:
- accuracy
- profiling
- uniqueness
- timeliness
- ...
Including kinds of data source types:
- batch
- hive
- avro
- ...
- streaming
- kafka
Including kinds of data format types:
- Structured data
- hive table
- json string
- avro
- Unstructured data
Idea
Griffin DSL is sql-like, customized to adapt to our Apache Griffin user case.
Basically, to calculate accuracy metrics, users only need to provide comparison rule like where clause
source.uid = target.uid and source.itemid = target.itemid and source.timestamp =target.timestamp
Apache griffin will calculate out metrics for users.
To calculate profiling metrics, users only need to provide sql-like rule ( in which key word "select" and "from" clause could be ignored )
source.id.count(), source.age.max() where source.age > 10
Examples
Example 1. accuracy between different source and target
source.id = target.id AND source.name = target.name AND source.age = target.age
Example 2. profiling of source
SELECT source.country, source.id.count() AS count, source.age.avg() AS avg_age FROM source GROUP BY source.country SORT BY count DESC LIMIT 5
Syntax BNF
-- literal -- <literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean> | <literal-null> | <literal-nan> <literal-string> ::= <any-string> <literal-number> ::= <integer> | <double> <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms") <literal-boolean> ::= true | false <literal-null> ::= null <literal-nan> ::= nan -- selection -- <selection> ::= <selection-head> [ <field-sel> | <index-sel> | <function-sel> ]* [<as-alias>]? <selection-head> ::= ("data source name registered") | <function> | <field-name> | <all-selection> <field-sel> ::= "." <field-name> | "[" <quote-field-name> "]" <index-sel> ::= "[" <arg> "]" <function-sel> ::= "." <function-name> "(" [<arg>]? [, <arg>]* ")" <arg> ::= <math-expr> -- as alias -- <as-alias> ::= <as> <field-name> -- math expr -- <math-factor> ::= <literal> | <function> | <selection> | "(" <math-expr> ")" [<as-alias>]? <unary-math-expr> ::= [<unary-opr>]* <math-factor> <binary-math-expr> ::= <unary-math-expr> [<binary-opr> <unary-math-expr>]+ <math-expr> ::= <binary-math-expr> -- logical expr -- <in-expr> ::= <math-expr> [<not>]? <in> <range-expr> <between-expr> ::= <math-expr> [<not>]? <between> (<math-expr> <and> <math-expr> | <range-expr>) <range-expr> ::= "(" [<math-expr>]? [, <math-expr>]+ ")" <like-expr> ::= <math-expr> [<not>]? <like> <math-expr> <is-null-expr> ::= <math-expr> <is> [<not>]? <null> <is-nan-expr> ::= <math-expr> <is> [<not>]? <nan> <logical-factor> ::= <math-expr> | <in-expr> | <between-expr> | <like-expr> | <is-null-expr> | <is-nan-expr> | "(" <logical-expr> ")" [<as-alias>]? <unary-logical-expr> ::= [<unary-logical-opr>]* <logical-factor> <binary-logical-expr> ::= <unary-logical-expr> [<binary-logical-opr> <unary-logical-expr>]+ <logical-expr> ::= <binary-logical-expr> -- expression -- <expr> = <math-expr> | <logical-expr> -- function expr -- <function> ::= <function-name> "(" [<arg>] [, <arg>]+ ")" [<as-alias>]? <function-name> ::= ("function name registered") <arg> ::= <expr> -- clauses -- <select-clause> = <expr> [, <expr>]* <where-clause> = <where> <expr> <from-clause> = <from> ("data source name registered") <having-clause> = <having> <expr> <groupby-clause> = <group> <by> <expr> [ <having-clause> ]? <orderby-item> = <expr> [ <DESC> ]? <orderby-clause> = <order> <by> <orderby-item> [ , <orderby-item> ]* <limit-clause> = <limit> <expr> -- combined clauses -- <combined-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+
Syntax description
a Supporting process
logical operation: not, and, or, in, between, like, is null, is nan, =, !=, <>, <=, >=, <, >
mathematical operation: +, -, *, /, %
sql statement: as, where, group by, having, order by, limit
b Keywords
- null, nan, true, false - not, and, or - in, between, like, is - select, distinct, from, as, where, group, by, having, order, desc, asc, limit
c Operators
- !, &&, ||, =, !=, <, >, <=, >=, <> - +, -, *, /, % - (, ) - ., [, ]
d Literals
- string: any string surrounded with a pair of " or ', with escape charactor \ if any request. e.g. "test", 'string 1', "hello \" world \" " - number: double or integer number. e.g. 123, 33.5 - time: a integer with unit in a string, will be translated to a integer number in millisecond. e.g. 3d, 5h, 4ms - boolean: boolean value literally. e.g. true, false
e Selections
- selection head: data source name. e.g. source, target, `my table name` - all field selection: * or with data source name ahead. e.g. *, source.*, target.* - field selection: field name or with data source name ahead. e.g. source.age, target.name, user_id - index selection: interget between square brackets "[]" with field name ahead. e.g. source.attributes[3] - function selection: function name with brackets "()", with field name ahead or not. e.g. count(*), *.count(), source.user_id.count(), max(source.age) - alias: declare an alias after a selection. e.g. source.user_id as id, target.user_name as name
f Math expressions
- math factor: literal or function or selection or math exression with brackets. e.g. 123, max(1, 2, 3, 4), source.age, (source.age + 13) - unary math expression: unary math operator with factor. e.g. -(100 - source.score) - binary math expression: math factors with binary math operators. e.g. source.age + 13, score * 2 + ratio
g Logical expression
- in: in clause like sql. e.g. source.country in ("USA", "CHN", "RSA") - between: between clause like sql. e.g. source.age between 3 and 30, source.age between (3, 30) - like: like clause like sql. e.g. source.name like "%abc%" - is null: is null operator like sql. e.g. source.desc is not null - is nan: check if the value is not a number, the syntax like is null e.g. source.age is not nan - logical factor: math expression or logical expressions above or other logical expressions with brackets. e.g. (source.user_id = target.user_id AND source.age > target.age) - unary logical expression: unary logical operator with factor. e.g. NOT source.has_data, !(source.age = target.age) - binary logical expression: logical factors with binary logical operators, including and, or and comparison operators. e.g. source.age = target.age OR source.ticket = target.tck
h Expression
- expression: logical expression and math expression.
i Function
- argument: expression. - function: function name with arguments between brackets. e.g. max(source.age, target.age), count(*)
j Clause
- select clause: the result columns like sql select clause, we can ignore the word "select" in Griffin DSL. e.g. select user_id.count(), age.max() as max, source.user_id.count() as cnt, source.age.min() - from clause: the table name like sql from clause, in which the data source name must be one of data source names or the output table name of the former rule steps, we can ignore this clause by configoring the data source name. e.g. from source, from `target` - where clause: the filter condition like sql where clause, optional. e.g. where source.id = target.id and source.age = target.age - group-by clause: like the group-by clause in sql, optional. Optional having clause could be following. e.g. group by cntry, group by gender having count(*) > 50 - order-by clause: like the order-by clause, optional. e.g. order by name, order by first_name desc, age asc - limit clause: like the limit clause in sql, optional. e.g. limit 5
k Accuracy Rule
- Accuracy rule expression in Griffin DSL is a logical expression, telling the mapping relation between data sources. e.g. source.id = target.id and source.name = target.name and source.age between (target.age, target.age + 5
l Profiling Rule
- Profiling rule expression in Griffin DSL is a sql-like expression, with select clause ahead, following optional from clause, where clause, group-by clause, order-by clause, limit clause in order. e.g. source.gender, source.id.count() where source.age > 20 group by source.gender, select country, max(age), min(age), count(*) as cnt from source group by country order by cnt desc limit 5
m Uniqueness Rule
- Uniqueness rule expression in Griffin DSL is a list of selection expressions separated by comma, indicates the duplicate columns to measure. e.g. name, (age + 1) as next_age
n Timeliness Rule
- Timeliness rule expression in Griffin DSL is a list of selection expressions separated by comma, indicates the input time and output time (calculate time as default if not set). e.g. ts
Griffin DSL translation to SQL
Griffin DSL is defined for DQ measurement, to describe DQ domain problem.
Actually, in Griffin, we get Griffin DSL rules, translate them into spark-sql rules for calculation in spark-sql engine.
In DQ domain, there're multiple dimensions, we need to translate them in different ways.
a Accuracy
For accuracy, we need to get the match count between source and target, the rule describes the mapping relation between data sources. Griffin needs to translate the dsl rule into multiple sql rules.
For example, the dsl rule is "source.id = target.id and source.name = target.name"
, which represents the match condition of accuracy. After the translation, the sql rules are as below:
- get miss items from source:
SELECT source.* FROM source LEFT JOIN target ON coalesce(source.id, '') = coalesce(target.id, '') and coalesce(source.name, '') = coalesce(target.name, '') WHERE (NOT (source.id IS NULL AND source.name IS NULL)) AND (target.id IS NULL AND target.name IS NULL)
, save as tablemiss_items
. - get miss count:
SELECT COUNT(*) AS miss FROM miss_items
, save as tablemiss_count
. - get total count from source:
SELECT COUNT(*) AS total FROM source
, save as tabletotal_count
. - get accuracy metric:
SELECT miss_count.miss AS miss, total_count.total AS total, (total_count.total - miss_count.miss) AS matched FROM miss_count FULL JOIN total_count
, save as tableaccuracy
.
After the translation, the metrics will be persisted in table accuracy
.
b Profiling
For profiling, the request is always the aggregation function of data, the rule is mainly the same as sql, but only supporting select
, from
, where
, group-by
, having
, order-by
, limit
clauses, which can describe most of the profiling requests. If any complicate request, you can use sql rule directly to describe it.
For example, the dsl rule is "source.cntry, source.id.count(), source.age.max() group by source.cntry"
, which represents the profiling requests. After the translation, the sql rule is as below:
- profiling sql rule:
SELECT source.cntry, count(source.id), max(source.age) FROM source GROUP BY source.cntry
, save as tableprofiling
.
After the translation, the metrics will be persisted in table profiling
.
c Uniqueness
For uniqueness, or called duplicate, is to find out the duplicate items of data, and rollup the items count group by duplicate times.
For example, the dsl rule is "name, age", which represents the duplicate requests, in this case, source and target are the same data set. After the translation, the sql rule is as below:
- get distinct items from source: SELECT name, age FROM source, save as table src.
get all items from target: SELECT name, age FROM target, save as table tgt.
- join two tables: SELECT src.name, src.age FROM tgt RIGHT JOIN src ON coalesce(src.name, '') = coalesce(tgt.name, '') AND coalesce(src.age, '') = coalesce(tgt.age, ''), save as table joined.
- get items duplication: SELECT name, age, (count(*) - 1) AS dup FROM joined GROUP BY name, age, save as table grouped.
- get total metric: SELECT count(*) FROM source, save as table total_metric.
- get unique record: SELECT * FROM grouped WHERE dup = 0, save as table unique_record.
- get unique metric: SELECT count(*) FROM unique_record, save as table unique_metric.
- get duplicate record: SELECT * FROM grouped WHERE dup > 0, save as table dup_record.
- get duplicate metric: SELECT dup, count(*) AS num FROM dup_records GROUP BY dup, save as table dup_metric.
After the translation, the metrics will be persisted in table dup_metric.
d Timeliness
For timeliness, is to measure the latency of each item, and get the statistics of the latencies.
For example, the dsl rule is "ts, out_ts", the first column means the input time of item, the second column means the output time of item, if not set, "__tmst" will be the default output time column. After the translation, the sql rule is as below:
- get input and output time column: SELECT *, ts AS _bts, out_ts AS _ets FROM source, save as table origin_time.
- get latency: SELECT *, (_ets - _bts) AS latency FROM origin_time, save as table lat.
- get timeliness metric: SELECT CAST(AVG(latency) AS BIGINT) AS avg, MAX(latency) AS max, MIN(latency) AS min FROM lat, save as table time_metric.
After the translation, the metrics will be persisted in table time_metric.
Alternative Rules
You can simply use Griffin DSL rule to describe your problem in DQ domain, for some complicate requirement, you can also use some alternative rules supported by Griffin.
a Spark sql
Griffin supports spark-sql directly, you can write rule in sql like this:
{ "dsl.type": "spark-sql", "name": "source", "rule": "SELECT count(id) AS cnt, max(timestamp) AS fresh_time FROM source" }
Griffin will calculate it in spark-sql engine directly.
b Data frame operation
Griffin supports some other operations on data frame in spark, like converting json string data frame into extracted data frame with extracted object schema. For example:
{ "dsl.type": "df-opr", "name": "ext_source", "rule": "from_json", "details": { "df.name": "json_source" } }
Griffin will do the operation to extract json strings.
Actually, you can also extend the df-opr engine and df-opr adaptor in Griffin to support more types of data frame operations.
Tips
Griffin engine runs on spark, it might works in two phases, pre-proc phase and run phase.
a Pre-proc phase
Griffin calculates data source directly, to get appropriate data format, as a preparation for DQ calculation. In this phase, you can use df-opr and spark-sql rules.
After preparation, to support streaming DQ calculation, a timestamp column will be added in each row of data, so the data frame in run phase contains an extra column named "__tmst".
b Run phase
Griffin calculates with prepared data, to get the DQ metrics. In this phase, you can use griffin-dsl, spark-sql rules, and a part of df-opr rules.
For griffin-dsl rule, griffin translates it into spark-sql rule with a group-by condition for column "__tmst", it's useful for especially streaming DQ calculation. But for spark-sql rule, griffin use it directly, you need to add the "__tmst" column in your spark-sql rule explicitly, or you can't get correct metrics result after calculation.
19 Comments
neil wang
Hi Lionel,
I have one question about the Griffin DSL translation of Accuracy . According to my understanding of the following url , you will find the unique key of the data. After this optimization , what will the sql become? Will the unique key become the join key ?
Other question, if both the source and target have duplicate records, how to calculate the accuracy of the target data? This optimization makes me confused.
https://github.com/apache/incubator-griffin/blob/master/griffin-doc/measure/measures.md
Thanks,
Neil Wang
Lionel Liu
Hi Neil,
We don't do any other optimization for accuracy now, just translate it as On clause in spark sql, that works good, as spark sql has some optimization when join with an On clause. We have to find the unique keys of data in the old version, because we implemented the accuracy algorithm ourselves before, then we need some optimization for it. That document explains the define of accuracy, based on our old algorithm, but we can compute it with much less information from users now.
About the sql translation, you can just refer to the contents in this document:
For example, the dsl rule is "
source.id = target.id and source.name = target.name"
, which represents the match condition of accuracy. After the translation, the sql rules are as below:SELECT source.* FROM source LEFT JOIN target ON coalesce(source.id, '') = coalesce(target.id, '') and coalesce(source.name, '') = coalesce(target.name, '') WHERE (NOT (source.id IS NULL AND source.name IS NULL)) AND (target.id IS NULL AND target.name IS NULL)
, save as tablemiss_items
.SELECT COUNT(*) AS miss FROM miss_items
, save as tablemiss_count
.SELECT COUNT(*) AS total FROM source
, save as tabletotal_count
.SELECT miss_count.miss AS miss, total_count.total AS total, (total_count.total - miss_count.miss) AS matched FROM miss_count FULL JOIN total_count
, save as tableaccuracy
.For your second question, accuracy measures count(records in source can also be found in target) / count(all records in source), there're some examples to explain the duplicate conditions.
A. source: [1, 2, 3, 3, 3], target: [1, 2, 3], the accuracy will be 5 / 5, which means all the records in source could be found in target, there're some duplicate records in source, they might be duplicated, but accuracy don't assume their uniqueness, only cares about the match percentage.
B. source: [1, 2, 3], target: [1, 2, 3, 3, 3], the accuracy will be 3 / 3, which also means all source records could be found in target.
C. source: [1, 2, 3, 3, 3]: target: [1, 2], the accuracy will be 2 / 5, we don't know the duplicate source records are legal or not, we need to keep their information.
There might be some unexpected duplication in source, but in accuracy, the duplication can not be found.
Thanks,
Lionel, Liu
neil wang
Hi Lionel,
Thanks,I have understand the translation of accuracy.In other words, Uniqueness must be used with accuracy in the duplicate conditions.
About Uniqueness, the steps of translation maybe have some errors.Please have a check.
Thanks,
Neil Wang
Lionel Liu
Aha, got it, I missed one row, thanks for your remind~
Lionel Liu
Btw, I think uniqueness is not necessary before accuracy, only if you want to ignore the duplicate ones. But there comes another question, how to get total count of source.
e.g.: source: [1, 2, 3, 3, 3], target: [1, 2, 3], if ignore the duplicate records from source, the accuracy should be 3 / 3 or 3 / 5 ?
3 / 3 seems to tell you all records from source are found in target, but the total count is a little strange. 3 / 5 seems to have a correct total count, but the missing 2 records can also be found in target, that destroys accuracy definition.
If you do want to keep the distinct records in source, I think 3 / 3 might be a better result in this case, if so, the distinct process is just something like a preprocess of data source, not need to be in the accuracy calculation. Which means, in accuracy, the source should be [1, 2, 3], preprocessed from [1, 2, 3, 3, 3]
Gaurav Malhotra
Hello,
I wish to integrate Apache Griffin with AWS S3 in place of HDFS (HDOOP). How can be do that? Looking at code, the framework is written specifically for HDFS/HADOOP.
Look forward to hear from you.
Regards,
Gaurav
Lionel Liu
Hi Gaurav,
Griffin works on HDFS in these places:
Base on this, I think you can integrate it with AWS. If you need streaming mode, you can implement a temporary storage based on AWS S3, or you can skip this for batch mode only. If you want to save metrics in AWS S3, you may need such a metric persist solution.
Thanks,
Lionel
Gaurav Malhotra
Thanks Lionel. I am relatively new to Griffin. Would like to explore option 1. But I am confused a bit know. I see the Spark Job for Graffin DQ matrix is configured with HDFS. How can we change it to S3.? and all the cool features of Griffin like UI, REST API etc. A polite request, if you have a S3 sample available, it will be great if you can share with me or give me pointer where/how we can change storage of HIVE from HDFS to AWS S3.
Thanks again,
Gaurav
Lionel Liu
Which fields in configuration do you concern with? Would you please list them to our mail list? There might be a better place for the discussion.
Gaurav Malhotra
Hello,
We had good progress integrating Graffin with aws s3 and Livy. Everything is running on K8 cluster like a charm when we extended the Apache Griffin image. The change was what you suggested. We simply pointed Hive to S3. With this integration, it's able to read data and run Griffin measures jobs
But we have hit a roadblock when we tried to recreate the same idea on the k8 cluster. We are facing the following exception and we have tried for a week to fix with various ways - with no success :-(
Below is the exception we are getting in the livy logs:
"Exception in thread "main" java.io.IOException: Incomplete HDFS URI, no host: hdfs:///griffin/griffin-measure.jar"
Our Analysis
==========
The problem seems to be that griffin is still trying to look for the measure jar on HDFS but it is not there because we do not have HDFS. We know that this jar is there on the griffin docker image under the name of measure-0.5.0-SNAPSHOT.jar (correct me if I'm wrong). We cannot figure out how to make use of this jar without HDFS.
Griffin is running on a docker image with only nodejs, postgresql and npm on there. This image is pointing at our spark and hive docker images.
Further I think the above problem is caused by default sparkProperties.json configuration
Do I have to manually make changes to this fine and build the jar or there is another mechanism
Could you help how we can make this connection?
Look for to your suggestion. We are really stuck.
Cheers,
Gaurav Malhotra
Lionel Liu
When livy submit a job to spark cluster, it would be submitted in cluster mode, which means any spark node might be the application driver. Therefore, each spark node should be able to access the files required by the job, including the jar package and hive-site.xml (required by spark to access your Hive). HDFS is a natural solution for the global access from each spark node, and HDFS is usually supported in spark cluster, that's why we leverage it by default.
But in your case, there's no HDFS, maybe we can solve this issue like spark cluster, just put the necessary jar packages and files in the same path on each spark node, and update the path in the configuration file sparkProperties.json, then the files could be found by any node via the same path.
Gaurav Malhotra
Thanks for suggestion.
Short term approach
================
Change sparkProperties.json replace and also make sure griffin-measure.jar is present on all the spark worker node
similar with hive-site.xml
Better approach
============
Add s3 approach to Griffin by extending - PropertiesConfig below method and add s3
Please validate my though process.
Thanks
Gaurav Malhotra
Since FileUtil uses:
I think: location as file://blah will work or we have to give complete file path
Lionel Liu
I think a complete file path would be better, I'm not sure what's the relative path it should be, relative for the jar package or for the spark worker, maybe for the latter.
Lionel Liu
The configuration looks good to me.
You can have a try about this, not sure it would work based on this document: https://community.hortonworks.com/articles/151164/how-to-submit-spark-application-through-livy-rest.html, it says that for livy:
The “application-jar” should be reachable by remote cluster manager, which means this “application-jar” should be put onto a distributed file system like HDFS. This is different from “spark-submit” because “spark-submit” also handles uploading jars from local disk, but Livy REST APIs doesn’t do jar uploading.
However, in this document: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.5/bk_spark-component-guide/content/livy-batch-job.html, it seems like livy supports the same paths as spark submit shell.
You can have a try first, if not work, you can refer to the livy community.
Gaurav Malhotra
Thanks. We uploaded griffin-measure.jar to all the Spark work nodes and gave the absolute path in sparkProperties.json.
The Spark Job submit works. In the Griffin UI it shows successful. But in the Livy logs it shows error code 251 and also do not shows any metrics.
I also see following warning in the logs:
19/02/04 12:35:18 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
19/02/04 12:35:18 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
19/02/04 12:35:26 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
data source timeRanges: source -> (1549197300000, 1549197300000], target -> (1549197300000, 1549197300000]
Also one more observation, which could be the cause of the problem. It's regarding hive-stite.xml in sparkProperties.json
I am not using yarn. So how to configure hive-site.xml
Seems to be bit lost. Any pointers how to debug this further.
Gaurav Malhotra
Last hurdle. Please validate my understanding. The Graffin-measure.jar writes the metrics to the following:
ConsoleSink
ElasticSearchSink
HdfsSink
MongoSink.scala
The problem is, I don't see the metrics. I did configure ConsoleSink. I was hopping to see the metrics on console but I don't???
I may sound little silly, my problem is I am not able to validate whether my integration is working or not.
Gaurav Malhotra
We got end to end Griffin working with Hive pointing to S3 running on K8 cluster like a charm.
One last thing. Currently I am hardcoding sinks in the class - Measure.sinksList=Arrays.asList("CONSOLE");
Question: How can we configure Sinks from outside?
Thanks
sujeeth
Gaurav i am looking at doing the same thing you tried are you willing to open up the implementation in github? or help get us started in anyway?