Intro
Executing statements periodically can be usefull in
Overview
Scheduled queries were added in Hive 4.0 (HIVE-21884) |
Hive has it’s scheduled query interface built into the language itself for easy access:
CREATE SCHEDULED QUERY <scheduled_query_name>
<scheduleSpecification>
[<executedAsSpec> ]
[<enableSpecification>]
<definedAsSpec>
ALTER SCHEDULED QUERY <scheduled_query_name>
(<scheduleSpec>|<executedAsSpec>|<enableSpecification>|<definedAsSpec>|<executeSpec>);
DROP SCHEDULED QUERY <scheduled_query_name>;
Schedules can be specified using CRON expressions or for common cases there is a simpler form; in any case the schedule is stored as Quartz cron expression.
CRON <quartz_schedule_expression>
where quartz_schedule_expression is quoted schedule in the Quartz format
https://www.freeformatter.com/cron-expression-generator-quartz.html
For example the CRON '0 */10 * * * ? *'
expression will fire every 10 minutes.
To give a more readable way to declare schedules EVERY can be used.
EVERY [<integer>] (SECOND|MINUTE|HOUR) [(OFFSET BY|AT) <timeOrDate>]
the format makes it possible to declare schedules in a more readable way:
EVERY 2 MINUTES
EVERY HOUR AT '0:07:30'
EVERY DAY AT '11:35:30'
EXECUTED AS <user_name>
Scheduled queries are executed as the declaring user by default; but people with admin privileges might be able to change the executing user.
(ENABLE[D] | DISABLE[D])
Can be used to enable/disable a schedule.
For CREATE SCHEDULED QUERY statements the default behaviour is set by the configuration key hive.scheduled.queries.create.as.enabled |
In case there are in-flight scheduled executions at the time when the corresponding schedule is disabled - the already running executions will still finish. But no more executions will be triggered. |
[DEFINED] AS <hiveQuery>
The “query” is a single statement expression to be scheduled for execution.
EXECUTE
Changes the schedules next execution time to be now. Could be useful during debugging/development.
Informations about scheduled queries/executions can be obtain by using the information_schema or the sysdb - recommended way is to use the information_schema; sysdb is tables are there to build the information_schema level views - and for debugging.
Suppose we have a scheduled query defined by:
create scheduled query sc1 cron '0 */10 * * * ? *' as select 1;
Let’s take a look at it in the information_schema.scheduled_queries table by using
select * from information_schema.scheduled_queries;
I will transpose the resultset to describe each column
scheduled_query_id | 1 | Internally, every scheduled query also has a numeric id |
schedule_name | sc1 | The name of the schedule |
enabled | true | True if the schedule is enabled |
cluster_namespace | hive | The namespace thes scheduled query belongs to |
schedule | 0 */10 * * * ? * | The schedule described in QUARTZ cron format |
user | dev | The owner/executor of the query |
query | select 1 | The query being scheduled |
next_execution | 2020-01-29 16:50:00 | Technical column; shows when the next execution should happen |
(schedule_name,cluster_namespace) is unique |
This view can be used to get information about recent scheduled query executions.
select * from information_schema.scheduled_executions;
One record in this view has the following informations:
scheduled_execution_id | 13 | Every scheduled query execution has a unique numeric id |
schedule_name | sc1 | The schedule name to which this execution belongs |
executor_query_id | dev_20200131103008_c9a39b8d-e26b-44cd-b8ae-9d054204dc07 | The query id assigned by the execution engine for the given scheduled execution |
state | FINISHED | State of the execution; can be |
start_time | 2020-01-31 10:30:06 | Start time of execution |
end_time | 2020-01-31 10:30:08 | End time of execution |
elapsed | 2 | (computed) end_time-start_time |
error_message | NULL | In case the query is FAILED the error message is shown here |
last_update_time | NULL | During execution the last update time the executor provided informations about the state |
INITED | The scheduled execution record is created at the time an executor is assigned to run it; |
EXECUTING | Queries in executing state are being processed by the executor; during this phase the executor reports the progress of the query in intervals defined by: hive.scheduled.queries.executor.progress.report.interval |
FAILED | The query execution stoped by an error code(or an exception) when this state is set the error_message is also filled. |
FINISHED | The query finished without problems |
TIMED_OUT | An execution is considered timed out when it’s being executed for more than metastore.scheduled.queries.execution.timeout. The scheduled queries maintenance task checks for any timed out executions. |
The scheduled query maintenance task removes older than metastore.scheduled.queries.execution.max.age entries. |
create table t (a integer); -- create a scheduled query; every 10 minute insert a new row create scheduled query sc1 cron '0 */10 * * * ? *' as insert into t values (1); -- depending on hive.scheduled.queries.create.as.enabled the query might get create in disabled mode -- it can be enabled using: alter scheduled query sc1 enabled; -- inspect scheduled queries using the information_schema select * from information_schema.scheduled_queries s where schedule_name='sc1'; +-----------------------+------------------+------------+----------------------+-------------------+---------+-----------+----------------------+ | s.scheduled_query_id | s.schedule_name | s.enabled | s.cluster_namespace | s.schedule | s.user | s.query | s.next_execution | +-----------------------+------------------+------------+----------------------+-------------------+---------+-----------+----------------------+ | 1 | sc1 | true | hive | 0 */10 * * * ? * | dev | select 1 | 2020-02-03 15:10:00 | +-----------------------+------------------+------------+----------------------+-------------------+---------+-----------+----------------------+ -- wait 10 minutes or execute by issuing: alter scheduled query sc1 execute; select * from information_schema.scheduled_executions s where schedule_name='sc1' order by scheduled_execution_id desc limit 1; +---------------------------+------------------+----------------------------------------------------+-----------+----------------------+----------------------+------------+------------------+---------------------+ | s.scheduled_execution_id | s.schedule_name | s.executor_query_id | s.state | s.start_time | s.end_time | s.elapsed | s.error_message | s.last_update_time | +---------------------------+------------------+----------------------------------------------------+-----------+----------------------+----------------------+------------+------------------+---------------------+ | 496 | sc1 | dev_20200203152025_bdf3deac-0ca6-407f-b122-c637e50f99c8 | FINISHED | 2020-02-03 15:20:23 | 2020-02-03 15:20:31 | 8 | NULL | NULL | +---------------------------+------------------+----------------------------------------------------+-----------+----------------------+----------------------+------------+------------------+---------------------+ |
Suppose you have an external table - the contents of it is slowly changing...which will eventually lead that Hive will utilize outdated statistics during planning time
-- create external table create external table t (a integer); -- see where the table lives: desc formatted t; [...] | Location: | file:/data/hive/warehouse/t | NULL | [...] -- in a terminal; load some data into the table directory: seq 1 10 > /data/hive/warehouse/t/f1 -- back in hive you will see that select count(1) from t; 10 -- meanwhile basic stats show that the table has "0" rows desc formatted t; [...] | | numRows | 0 | [...] create scheduled query t_analyze cron '0 */1 * * * ? *' as analyze table t compute statistics for columns; -- wait some time or execute by issuing: alter scheduled query t_analyze execute; select * from information_schema.scheduled_executions s where schedule_name='ex_analyze' order by scheduled_execution_id desc limit 3; +---------------------------+------------------+----------------------------------------------------+------------+----------------------+----------------------+------------+------------------+----------------------+ | s.scheduled_execution_id | s.schedule_name | s.executor_query_id | s.state | s.start_time | s.end_time | s.elapsed | s.error_message | s.last_update_time | +---------------------------+------------------+----------------------------------------------------+------------+----------------------+----------------------+------------+------------------+----------------------+ | 498 | t_analyze | dev_20200203152640_a59bc198-3ed3-4ef2-8f63-573607c9914e | FINISHED | 2020-02-03 15:26:38 | 2020-02-03 15:28:01 | 83 | NULL | NULL | +---------------------------+------------------+----------------------------------------------------+------------+----------------------+----------------------+------------+------------------+----------------------+ -- and the numrows have been updated desc formatted t; [...] | | numRows | 10 | [...] -- we don't want this running every minute anymore... alter scheduled query t_analyze disable; |
-- some settings...they might be there already set hive.support.concurrency=true; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; set hive.strict.checks.cartesian.product=false; set hive.stats.fetch.column.stats=true; set hive.materializedview.rewriting=true; -- create some tables CREATE TABLE emps ( empid INT, deptno INT, name VARCHAR(256), salary FLOAT, hire_date TIMESTAMP) STORED AS ORC TBLPROPERTIES ('transactional'='true'); CREATE TABLE depts ( deptno INT, deptname VARCHAR(256), locationid INT) STORED AS ORC TBLPROPERTIES ('transactional'='true'); -- load data insert into emps values (100, 10, 'Bill', 10000, 1000), (200, 20, 'Eric', 8000, 500), (150, 10, 'Sebastian', 7000, null), (110, 10, 'Theodore', 10000, 250), (120, 10, 'Bill', 10000, 250), (1330, 10, 'Bill', 10000, '2020-01-02'); insert into depts values (10, 'Sales', 10), (30, 'Marketing', null), (20, 'HR', 20); insert into emps values (1330, 10, 'Bill', 10000, '2020-01-02'); -- create mv CREATE MATERIALIZED VIEW mv1 AS SELECT empid, deptname, hire_date FROM emps JOIN depts ON (emps.deptno = depts.deptno) WHERE hire_date >= '2016-01-01 00:00:00'; EXPLAIN SELECT empid, deptname FROM emps JOIN depts ON (emps.deptno = depts.deptno) WHERE hire_date >= '2018-01-01'; -- create a schedule to rebuild mv create scheduled query mv_rebuild cron '0 */10 * * * ? *' defined as alter materialized view mv1 rebuild; -- from this expalin it will be seen that the mv1 is being used EXPLAIN SELECT empid, deptname FROM emps JOIN depts ON (emps.deptno = depts.deptno) WHERE hire_date >= '2018-01-01'; -- insert a new record insert into emps values (1330, 10, 'Bill', 10000, '2020-01-02'); -- the source tables are scanned EXPLAIN SELECT empid, deptname FROM emps JOIN depts ON (emps.deptno = depts.deptno) WHERE hire_date >= '2018-01-01'; -- wait 10 minutes or execute alter scheduled query mv_rebuild execute; -- run it again...the view should be rebuilt EXPLAIN SELECT empid, deptname FROM emps JOIN depts ON (emps.deptno = depts.deptno) WHERE hire_date >= '2018-01-01'; |
drop table if exists t; drop table if exists s; -- suppose that this table is an external table or something -- which supports the pushdown of filter condition on the id column create table s(id integer, cnt integer); -- create an internal table and an offset table create table t(id integer, cnt integer); create table t_offset(offset integer); insert into t_offset values(0); -- pretend that data is added to s insert into s values(1,1); -- run an ingestion... from (select id==offset as first,* from s join t_offset on id>=offset) s1 insert into t select id,cnt where first = false insert overwrite table t_offset select max(s1.id); -- configure to run ingestion every 10 minutes create scheduled query ingest every 10 minutes defined as from (select id==offset as first,* from s join t_offset on id>=offset) s1 insert into t select id,cnt where first = false insert overwrite table t_offset select max(s1.id); -- add some new values insert into s values(2,2),(3,3); -- pretend that a timeout have happened alter scheduled query ingest execute; |