Skip to content
This repository was archived by the owner on Mar 12, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions integration_tests/cases/common/dml/case_sensitive.result
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,7 @@ DESC `CASE_SENSITIVE_TABLE1`;

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: DESC `CASE_SENSITIVE_TABLE1`;. Caused by: Failed to create plan, err:Table not found, table:CASE_SENSITIVE_TABLE1" })

DROP TABLE IF EXISTS case_SENSITIVE_table1;

affected_rows: 0

2 changes: 2 additions & 0 deletions integration_tests/cases/common/dml/case_sensitive.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,5 @@ DESC CASE_SENSITIVE_TABLE1;
DESC `case_SENSITIVE_table1`;

DESC `CASE_SENSITIVE_TABLE1`;

DROP TABLE IF EXISTS case_SENSITIVE_table1;
4 changes: 4 additions & 0 deletions integration_tests/cases/common/dml/insert_mode.result
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,7 @@ UInt64(0),Timestamp(2),UInt32(20),String("123"),UInt32(21),UInt32(22),UInt32(4),
UInt64(0),Timestamp(3),UInt32(30),String("123"),UInt32(31),UInt32(32),UInt32(5),


DROP TABLE IF EXISTS `03_dml_insert_mode_table4`;

affected_rows: 0

3 changes: 2 additions & 1 deletion integration_tests/cases/common/dml/insert_mode.sql
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,5 @@ FROM
ORDER BY
`c1` ASC;

DROP TABLE `03_dml_insert_mode_table4`;

DROP TABLE IF EXISTS `03_dml_insert_mode_table4`;
8 changes: 8 additions & 0 deletions integration_tests/cases/common/dml/issue-302.result
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
DROP TABLE IF EXISTS issue302;

affected_rows: 0

CREATE TABLE `issue302` (`name` string TAG NULL, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE=Analytic with (enable_ttl='false');

affected_rows: 0
Expand All @@ -12,3 +16,7 @@ issue302.t,COUNT(DISTINCT issue302.name),
Timestamp(1651737067000),Int64(0),


DROP TABLE IF EXISTS issue302;

affected_rows: 0

4 changes: 4 additions & 0 deletions integration_tests/cases/common/dml/issue-302.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
DROP TABLE IF EXISTS issue302;

CREATE TABLE `issue302` (`name` string TAG NULL, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE=Analytic with (enable_ttl='false');

INSERT INTO issue302(t, value) VALUES(1651737067000, 100);

select `t`, count(distinct name) from issue302 group by `t`;

DROP TABLE IF EXISTS issue302;
4 changes: 4 additions & 0 deletions integration_tests/cases/common/dml/issue-59.result
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ String("logical_plan"),String("Projection: issue59.id + Int64(1), COUNT(DISTINCT
String("physical_plan"),String("ProjectionExec: expr=[issue59.id + Int64(1)@0 as issue59.id + Int64(1), COUNT(DISTINCT issue59.account)@1 as COUNT(DISTINCT issue59.account)]\n ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }, Column { name: \"alias1\", index: 1 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ScanTable: table=issue59, parallelism=8, order=None, \n"),


DROP TABLE IF EXISTS issue59;

affected_rows: 0

2 changes: 2 additions & 0 deletions integration_tests/cases/common/dml/issue-59.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ GROUP BY id+1;
explain SELECT id+1, count(distinct(account))
FROM issue59
GROUP BY id+1;

DROP TABLE IF EXISTS issue59;
8 changes: 8 additions & 0 deletions integration_tests/cases/common/dml/issue-637.result
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,11 @@ tsid,t,double_filed,float_filed,str_field,var_field,u64_field,u32_field,u16_fiel
UInt64(0),Timestamp(1651737067000),Double(100.0),Float(100.0),String("s"),Varbinary([118]),UInt64(100),UInt32(100),UInt16(100),UInt8(100),Int64(100),Int32(100),Int16(100),Int8(100),Boolean(false),


DROP TABLE IF EXISTS issue637;

affected_rows: 0

DROP TABLE IF EXISTS issue637_1;

affected_rows: 0

4 changes: 4 additions & 0 deletions integration_tests/cases/common/dml/issue-637.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ VALUES
(1651737067000,100,100,"s","v",100,100,100,100,100,100,100,100,false);

SELECT * FROM `issue637_1`;

DROP TABLE IF EXISTS issue637;

DROP TABLE IF EXISTS issue637_1;
5 changes: 5 additions & 0 deletions integration_tests/cases/env/local/influxql/basic.result
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ SELECT "level_description", location, water_level FROM "h2o_feet" where location

{"results":[{"statement_id":0,"series":[{"name":"h2o_feet","columns":["time","level_description","location","water_level"],"values":[[1439827200000,"below 3 feet","santa_monica",2.064],[1439827560000,"below 3 feet","santa_monica",2.116],[1439827620000,"below 3 feet","santa_monica",2.028]]}]}]}

-- SQLNESS ARG protocol=influxql
show measurements;

{"results":[{"statement_id":0,"series":[{"name":"measurements","columns":["name"],"values":[["h2o_feet"]]}]}]}

DROP TABLE IF EXISTS `h2o_feet`;

affected_rows: 0
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/cases/env/local/influxql/basic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ SELECT * FROM "h2o_feet";
-- SQLNESS ARG protocol=influxql
SELECT "level_description", location, water_level FROM "h2o_feet" where location = 'santa_monica';

-- SQLNESS ARG protocol=influxql
show measurements;

DROP TABLE IF EXISTS `h2o_feet`;
42 changes: 31 additions & 11 deletions interpreters/src/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use regex::Regex;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use sql::{
ast::ShowCreateObject,
plan::{ShowCreatePlan, ShowPlan, ShowTablesPlan},
plan::{QueryType, ShowCreatePlan, ShowPlan, ShowTablesPlan},
};

use crate::{
Expand Down Expand Up @@ -127,17 +127,37 @@ impl ShowInterpreter {
.map(|t| t.name().to_string())
.collect::<Vec<_>>(),
};
let schema = DataSchema::new(vec![Field::new(
SHOW_TABLES_COLUMN_SCHEMA,
DataType::Utf8,
false,
)]);
let record_batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(StringArray::from(tables_names))],
)
.context(CreateRecordBatch)?;

let record_batch = match plan.query_type {
QueryType::Sql => {
let schema = DataSchema::new(vec![Field::new(
SHOW_TABLES_COLUMN_SCHEMA,
DataType::Utf8,
false,
)]);

RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(StringArray::from(tables_names))],
)
.context(CreateRecordBatch)?
}
QueryType::InfluxQL => {
// TODO: refactor those constants
let schema = DataSchema::new(vec![
Field::new("ceresdb::measurement", DataType::Utf8, false),
Field::new("name", DataType::Utf8, false),
]);

let measurements = vec!["measurements".to_string(); tables_names.len()];
let measurements = Arc::new(StringArray::from(measurements));
RecordBatch::try_new(
Arc::new(schema),
vec![measurements, Arc::new(StringArray::from(tables_names))],
)
.context(CreateRecordBatch)?
}
};
let record_batch = record_batch.try_into().context(ToCommonRecordType)?;

Ok(Output::Records(vec![record_batch]))
Expand Down
6 changes: 4 additions & 2 deletions server/src/handlers/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl From<&str> for Precision {
/// Query string parameters for query api(by influxql)
///
/// It's derived from query string parameters of query described in
/// doc of influxdb 1.8:
/// doc of influxdb 1.8:
/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-string-parameters-1
///
/// NOTE:
Expand Down Expand Up @@ -311,6 +311,8 @@ impl InfluxqlResultBuilder {
}
);

// Query like `show measurements`, there will be not timestamp.
let has_timestamp = column_schemas.iter().any(|c| c.data_type.is_timestamp());
Comment thread
jiacai2050 marked this conversation as resolved.
// Find the tags part and columns part from schema.
let mut group_by_col_idxs = Vec::new();
let mut value_col_idxs = Vec::new();
Expand All @@ -324,7 +326,7 @@ impl InfluxqlResultBuilder {
});

// The group by tags will be placed after measurement and before time column.
let mut searching_group_by_tags = true;
let mut searching_group_by_tags = has_timestamp;
for (idx, col) in col_iter {
if col.data_type.is_timestamp() {
searching_group_by_tags = false;
Expand Down
18 changes: 14 additions & 4 deletions sql/src/influxql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ use influxql_logical_planner::planner::InfluxQLToLogicalPlan;
use influxql_parser::{
common::{MeasurementName, QualifiedMeasurementName},
select::{MeasurementSelection, SelectStatement},
show_measurements::ShowMeasurementsStatement,
statement::Statement as InfluxqlStatement,
};
use snafu::{ensure, ResultExt};

use crate::{
influxql::{error::*, provider::InfluxSchemaProviderImpl},
plan::{Plan, QueryPlan},
plan::{Plan, QueryPlan, QueryType, ShowPlan, ShowTablesPlan},
provider::{ContextProviderAdapter, MetaProvider},
};

Expand All @@ -36,15 +37,15 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
/// the [InfluxqlStatement] will be converted to [SqlStatement] first,
/// and build plan then.
pub fn statement_to_plan(self, stmt: InfluxqlStatement) -> Result<Plan> {
match &stmt {
match stmt {
InfluxqlStatement::Select(_) => self.select_to_plan(stmt),
InfluxqlStatement::ShowMeasurements(stmt) => self.show_measurements_to_plan(*stmt),
InfluxqlStatement::CreateDatabase(_)
| InfluxqlStatement::ShowDatabases(_)
| InfluxqlStatement::ShowRetentionPolicies(_)
| InfluxqlStatement::ShowTagKeys(_)
| InfluxqlStatement::ShowTagValues(_)
| InfluxqlStatement::ShowFieldKeys(_)
| InfluxqlStatement::ShowMeasurements(_)
| InfluxqlStatement::Delete(_)
| InfluxqlStatement::DropMeasurement(_)
| InfluxqlStatement::Explain(_) => Unimplemented {
Expand All @@ -54,7 +55,16 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
}
}

pub fn select_to_plan(self, stmt: InfluxqlStatement) -> Result<Plan> {
// TODO: support offset/limit/match in stmt
fn show_measurements_to_plan(self, _stmt: ShowMeasurementsStatement) -> Result<Plan> {
let plan = ShowTablesPlan {
pattern: None,
query_type: QueryType::InfluxQL,
};
Ok(Plan::Show(ShowPlan::ShowTablesPlan(plan)))
}

fn select_to_plan(self, stmt: InfluxqlStatement) -> Result<Plan> {
if let InfluxqlStatement::Select(select_stmt) = &stmt {
check_select_statement(select_stmt)?;
} else {
Expand Down
7 changes: 7 additions & 0 deletions sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,17 @@ pub struct ShowCreatePlan {
pub obj_type: ShowCreateObject,
}

#[derive(Debug, PartialEq, Eq)]
pub enum QueryType {
Sql,
InfluxQL,
}

#[derive(Debug, PartialEq, Eq)]
pub struct ShowTablesPlan {
/// Like pattern
pub pattern: Option<String>,
pub query_type: QueryType,
}

#[derive(Debug)]
Expand Down
5 changes: 4 additions & 1 deletion sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ use crate::{
partition::PartitionParser,
plan::{
AlterTableOperation, AlterTablePlan, CreateTablePlan, DescribeTablePlan, DropTablePlan,
ExistsTablePlan, InsertPlan, Plan, QueryPlan, ShowCreatePlan, ShowPlan, ShowTablesPlan,
ExistsTablePlan, InsertPlan, Plan, QueryPlan, QueryType, ShowCreatePlan, ShowPlan,
ShowTablesPlan,
},
promql::{ColumnNames, Expr as PromExpr},
provider::{ContextProviderAdapter, MetaProvider},
Expand Down Expand Up @@ -992,6 +993,7 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> {
fn show_tables_to_plan(&self, show_tables: ShowTables) -> Result<Plan> {
let plan = ShowTablesPlan {
pattern: show_tables.pattern,
query_type: QueryType::Sql,
};
Ok(Plan::Show(ShowPlan::ShowTablesPlan(plan)))
}
Expand Down Expand Up @@ -2193,6 +2195,7 @@ mod tests {
ShowTablesPlan(
ShowTablesPlan {
pattern: None,
query_type: Sql,
},
),
)"#,
Expand Down