diff --git a/.gitignore b/.gitignore index d53d616..ec15ad1 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,8 @@ uv.lock # IDE settings (optional) .vscode/ .idea/ +.specstory/ +.cursor/ # Distribution directories *.dist-info/ diff --git a/README.md b/README.md index 0033771..457021f 100644 --- a/README.md +++ b/README.md @@ -14,17 +14,30 @@ The server doesn't expose any resources. The server doesn't provide any prompts. ### Tools -The server offers three core tools: +The server offers different tools for IoTDB Tree Model and Table Model. You can choose between them by setting the "IOTDB_SQL_DIALECT" configuration to either "tree" or "table". -#### Query Tools +#### Tree Model +- `metadata_query` + - Execute SHOW/COUNT queries to read metadata from the database + - Input: + - `query_sql` (string): The SHOW/COUNT SQL query to execute + - Returns: Query results as array of objects +- `select_query` + - Execute SELECT queries to read data from the database + - Input: + - `query_sql` (string): The SELECT SQL query to execute + - Returns: Query results as array of objects + +#### Table Model + +##### Query Tools - `read_query` - Execute SELECT queries to read data from the database - Input: - `query` (string): The SELECT SQL query to execute - Returns: Query results as array of objects - -#### Schema Tools +##### Schema Tools - `list_tables` - Get a list of all tables in the database - No input required @@ -37,7 +50,6 @@ The server offers three core tools: - Returns: Array of column definitions with names and types - ## Claude Desktop Integration ## Prerequisites @@ -90,7 +102,8 @@ Location: `%APPDATA%/Claude/claude_desktop_config.json` "IOTDB_PORT": "6667", "IOTDB_USER": "root", "IOTDB_PASSWORD": "root", - "IOTDB_DATABASE": "test" + "IOTDB_DATABASE": "test", + "IOTDB_SQL_DIALECT": "table" } } } diff --git a/src/iotdb_mcp_server/config.py b/src/iotdb_mcp_server/config.py index db6875a..c571e2b 100644 --- a/src/iotdb_mcp_server/config.py +++ b/src/iotdb_mcp_server/config.py @@ -49,7 +49,12 @@ class Config: database: str """ - IoTDB database name + IoTDB database + """ + + sql_dialect: str + """ + SQL dialect: tree or table """ @staticmethod @@ -73,13 +78,6 @@ def from_env_arguments() -> "Config": default=os.getenv("IOTDB_PORT", 6667), ) - parser.add_argument( - "--database", - type=str, - help="IoTDB connect database name", - default=os.getenv("IOTDB_DATABASE", "test"), - ) - parser.add_argument( "--user", type=str, @@ -94,11 +92,26 @@ def from_env_arguments() -> "Config": default=os.getenv("IOTDB_PASSWORD", "root"), ) + parser.add_argument( + "--database", + type=str, + help="IoTDB connect database name", + default=os.getenv("IOTDB_DATABASE", "test"), + ) + + parser.add_argument( + "--sql-dialect", + type=str, + help="SQL dialect: tree or table", + default=os.getenv("IOTDB_SQL_DIALECT", "table"), + ) + args = parser.parse_args() return Config( host=args.host, port=args.port, - database=args.database, user=args.user, password=args.password, + database=args.database, + sql_dialect=args.sql_dialect, ) diff --git a/src/iotdb_mcp_server/server.py b/src/iotdb_mcp_server/server.py index 0527bc2..7e4b9ff 100644 --- a/src/iotdb_mcp_server/server.py +++ b/src/iotdb_mcp_server/server.py @@ -17,10 +17,13 @@ # import logging +import datetime +from iotdb.Session import Session +from iotdb.SessionPool import SessionPool, PoolConfig +from iotdb.utils.SessionDataSet import SessionDataSet from iotdb.table_session import TableSession from iotdb.table_session_pool import TableSessionPool, TableSessionPoolConfig -from iotdb.utils.SessionDataSet import SessionDataSet from mcp.server.fastmcp import FastMCP from mcp.types import ( TextContent, @@ -46,85 +49,230 @@ "user": config.user, "password": config.password, "database": config.database, + "sql_dialect": config.sql_dialect, } logger.info(f"IoTDB Config: {db_config}") -session_pool_config = TableSessionPoolConfig( - node_urls=[str(config.host) + ":" + str(config.port)], - username=config.user, - password=config.password, - database=None if len(config.database) == 0 else config.database, -) +if config.sql_dialect == "tree": + + pool_config = PoolConfig( + node_urls=[str(config.host) + ":" + str(config.port)], + user_name=config.user, + password=config.password, + fetch_size=1024, + time_zone="UTC+8", + max_retry=3 + ) + max_pool_size = 5 + wait_timeout_in_ms = 3000 + session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms) + + @mcp.tool() + async def metadata_query(query_sql: str) -> list[TextContent]: + """Execute metadata queries on IoTDB to explore database structure and statistics. + + Args: + query_sql: The metadata query to execute. Supported queries: + - SHOW DATABASES [path]: List all databases or databases under a specific path + - SHOW TIMESERIES [path]: List all time series or time series under a specific path + - SHOW CHILD PATHS [path]: List child paths under a specific path + - SHOW CHILD NODES [path]: List child nodes under a specific path + - SHOW DEVICES [path]: List all devices or devices under a specific path + - COUNT TIMESERIES [path]: Count time series under a specific path + - COUNT NODES [path]: Count nodes under a specific path + - COUNT DEVICES [path]: Count devices under a specific path + - if path is not provided, the query will be applied to root.** + + Examples: + SHOW DATABASES root.** + SHOW TIMESERIES root.ln.** + SHOW CHILD PATHS root.ln + SHOW CHILD PATHS root.ln.*.* + SHOW CHILD NODES root.ln + SHOW DEVICES root.ln.** + COUNT TIMESERIES root.ln.** + COUNT NODES root.ln + COUNT DEVICES root.ln + """ + session = session_pool.get_session() + try: + stmt = query_sql.strip().upper() + + # 处理SHOW DATABASES + if ( + stmt.startswith("SHOW DATABASES") + or stmt.startswith("SHOW TIMESERIES") + or stmt.startswith("SHOW CHILD PATHS") + or stmt.startswith("SHOW CHILD NODES") + or stmt.startswith("SHOW DEVICES") + or stmt.startswith("COUNT TIMESERIES") + or stmt.startswith("COUNT NODES") + or stmt.startswith("COUNT DEVICES") + ): + res = session.execute_query_statement(query_sql) + return prepare_res(res, session) + else: + raise ValueError("Unsupported metadata query. Please use one of the supported query types.") + + except Exception as e: + session.close() + raise e + + @mcp.tool() + async def select_query(query_sql: str) -> list[TextContent]: + """Execute a SELECT query on the IoTDB tree SQL dialect. + + Args: + query_sql: The SQL query to execute (using TREE dialect) + + SQL Syntax: + SELECT [LAST] selectExpr [, selectExpr] ... + [INTO intoItem [, intoItem] ...] + FROM prefixPath [, prefixPath] ... + [WHERE whereCondition] + [GROUP BY { + ([startTime, endTime), interval [, slidingStep]) | + LEVEL = levelNum [, levelNum] ... | + TAGS(tagKey [, tagKey] ... | + VARIATION(expression[,delta][,ignoreNull=true/false]) | + CONDITION(expression,[keep>/>=/=/ 2017-11-01T00:05:00.000 and time < 2017-11-01T00:12:00.000) or (time >= 2017-11-01T16:35:00.000 and time <= 2017-11-01T16:37:00.000) + select * from root.ln.** where time > 1 order by time desc limit 10; -session_pool = TableSessionPool(session_pool_config) + Supported Aggregate Functions: + SUM + COUNT + MAX_VALUE + MIN_VALUE + AVG + VARIANCE + MAX_TIME + MIN_TIME + ... + """ + session = session_pool.get_session() + res = session.execute_query_statement(query_sql) + stmt = query_sql.strip().upper() + # Regular SELECT queries + if ( + stmt.startswith("SELECT") + ): + return prepare_res(res, session) + # Non-SELECT queries + else: + raise ValueError("Only SELECT queries are allowed for read_query") -@mcp.tool() -async def read_query(query_sql: str) -> list[TextContent]: - """Execute a SELECT query on the IoTDB. Please use table sql_dialect when generating SQL queries. + def prepare_res( + _res: SessionDataSet, _session: Session + ) -> list[TextContent]: + columns = _res.get_column_names() + result = [] + while _res.has_next(): + record = _res.next() + if columns[0] == "Time": + timestamp = record.get_timestamp() + # formatted_time = datetime.datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + row = record.get_fields() + result.append(timestamp + "," + ",".join(map(str, row))) + else: + row = record.get_fields() + result.append(",".join(map(str, row))) + _session.close() + return [ + TextContent( + type="text", + text="\n".join([",".join(columns)] + result), + ) + ] - Args: - query_sql: The SQL query to execute (using TABLE dialect) - """ - table_session = session_pool.get_session() - res = table_session.execute_query_statement(query_sql) +elif config.sql_dialect == "table": + + session_pool_config = TableSessionPoolConfig( + node_urls=[str(config.host) + ":" + str(config.port)], + username=config.user, + password=config.password, + database=None if len(config.database) == 0 else config.database, + ) + session_pool = TableSessionPool(session_pool_config) + + @mcp.tool() + async def read_query(query_sql: str) -> list[TextContent]: + """Execute a SELECT query on the IoTDB. Please use table sql_dialect when generating SQL queries. + + Args: + query_sql: The SQL query to execute (using TABLE dialect) + """ + table_session = session_pool.get_session() + res = table_session.execute_query_statement(query_sql) + + stmt = query_sql.strip().upper() + # Regular SELECT queries + if ( + stmt.startswith("SELECT") + or stmt.startswith("DESCRIBE") + or stmt.startswith("SHOW") + ): + return prepare_res(res, table_session) + # Non-SELECT queries + else: + raise ValueError("Only SELECT queries are allowed for read_query") + + + @mcp.tool() + async def list_tables() -> list[TextContent]: + """List all tables in the IoTDB database.""" + table_session = session_pool.get_session() + res = table_session.execute_query_statement("SHOW TABLES") + + result = ["Tables_in_" + db_config["database"]] # Header + while res.has_next(): + result.append(str(res.next().get_fields()[0])) + table_session.close() + return [TextContent(type="text", text="\n".join(result))] + + + @mcp.tool() + async def describe_table(table_name: str) -> list[TextContent]: + """Get the schema information for a specific table + Args: + table_name: name of the table to describe + """ + table_session = session_pool.get_session() + res = table_session.execute_query_statement("DESC " + table_name) - stmt = query_sql.strip().upper() - # Regular SELECT queries - if ( - stmt.startswith("SELECT") - or stmt.startswith("DESCRIBE") - or stmt.startswith("SHOW") - ): return prepare_res(res, table_session) - # Non-SELECT queries - else: - raise ValueError("Only SELECT queries are allowed for read_query") - - -@mcp.tool() -async def list_tables() -> list[TextContent]: - """List all tables in the IoTDB database.""" - table_session = session_pool.get_session() - res = table_session.execute_query_statement("SHOW TABLES") - - result = ["Tables_in_" + db_config["database"]] # Header - while res.has_next(): - result.append(str(res.next().get_fields()[0])) - table_session.close() - return [TextContent(type="text", text="\n".join(result))] - - -@mcp.tool() -async def describe_table(table_name: str) -> list[TextContent]: - """Get the schema information for a specific table - Args: - table_name: name of the table to describe - """ - table_session = session_pool.get_session() - res = table_session.execute_query_statement("DESC " + table_name) - - return prepare_res(res, table_session) - - -def prepare_res( - _res: SessionDataSet, _table_session: TableSession -) -> list[TextContent]: - columns = _res.get_column_names() - result = [] - while _res.has_next(): - row = _res.next().get_fields() - result.append(",".join(map(str, row))) - _table_session.close() - return [ - TextContent( - type="text", - text="\n".join([",".join(columns)] + result), - ) - ] + def prepare_res( + _res: SessionDataSet, _table_session: TableSession + ) -> list[TextContent]: + columns = _res.get_column_names() + result = [] + while _res.has_next(): + row = _res.next().get_fields() + result.append(",".join(map(str, row))) + _table_session.close() + return [ + TextContent( + type="text", + text="\n".join([",".join(columns)] + result), + ) + ] + if __name__ == "__main__": logger.info("iotdb_mcp_server running with stdio transport") # Initialize and run the server