-
Notifications
You must be signed in to change notification settings - Fork 8.5k
[Dataset Quality] Apply chunking algorithm for getIndexBasicStats #221153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Dataset Quality] Apply chunking algorithm for getIndexBasicStats #221153
Conversation
|
@rStelmach I think there might be a couple of others that we are missing. I was having a look at these truncations problems and came across a comment from @weltenwort Would be great if we can also add some error handling there and 1 suggestion is that we might also have a look at using |
|
@mohamedhamed-ahmed |
…tasks-generates-too-big-requests-and-responses
Thanks for applying the changes to the other functions. I see what you are trying to do there but would prefer if @awahab07 would have a look at the changes to make sure that the fallbacks are what we expect there and if the approach is fine from a telemetry prespective. |
awahab07
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great to see the refactor to optimize ES calls and handling the output for errors.
Left some comments as certain error handlings may overfit and suppress some errors, which may result in data inconsistencies.
About where to focus what optimization and how to move forward, I suggest:
-
addMappingsToIndicescould potentially result in errors where the output is too large and less likely to causetoo_long_http_line_exception. So filtering the output is crucial in this function (chunking is still good here though). -
getIndexBasicStatslooks like the main culprit causingtoo_long_http_line_exceptionerrors. So request chunking, filtering output and handling large output errors is important here. -
To ensure the applied filtering is stripping required fields, it'd be great to have a scenario to test the Before and After effect to make sure the resultant stats are intact. For this, a mix of data streams and regular indices with names conforming to log patterns can be created, while collected fields being ingested. Also a few documents in the failure store for a few log patterns present.
| esClient.indices.getMapping({ | ||
| index: logsIndexPatterns.map((pattern) => pattern.pattern), | ||
| }) | ||
| reduceAsyncChunks<IndicesGetMappingResponse>(patterns, (patternChunk) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we shouldn't provide the generic IndicesGetMappingResponse to reduceAsyncChunks. It would be safer to let it infer the ReturnType of safeMappingCall. A mismatch would indicate something's wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addMappingsToIndices is only called with the following indices atm
[
'filebeat-*', '*filebeat*',
'metricbeat-*', '*metricbeat*',
'apm-*', 'functionbeat-*',
'*functionbeat*', 'heartbeat-*',
'*heartbeat*', 'logstash-*',
'*logstash*', 'fluentd*',
'telegraf*', 'prometheusbeat*',
'fluent-bit*', 'fluentbit*',
'*nginx*', '*apache*',
'logs-*-*', '*logs*'
]this makes 220 chars. So this was unlikely be causing the too_long_http_line_exception. So here filtering the output is more important. Though chunking is still good if in future the indices grow.
| info.indices.forEach((index) => { | ||
| if (mappings[index]) { | ||
| info.mapping = { ...(info.mapping ?? {}), [index]: mappings[index] }; | ||
| const mappingsRecord = mappings as IndicesGetMappingResponse; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, we shouldn't assert the type as it may hide type mismatches if any.
| esClient, | ||
| indices, | ||
| breatheDelay, | ||
| breatheDelay = 100, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better not to make it optional or provide a default to make the callers decide the value.
| if (indices.length === 0) { | ||
| return of([]); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚀
| esClient.indices.stats({ | ||
| index: indexChunk, | ||
| metric: ['docs'], | ||
| filter_path: ['indices.*.total.docs'], | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great to specify the metric and filter_path. This can significantly reduce the output payload.
However need to make sure the following are still returned which are later used in calculation.
.primaries?.docs?.count
.primaries?.store?.size_in_bytes
| index: chunk, | ||
| filter_path: ['*.mappings'], | ||
| }); | ||
| return (result as any).body ?? result; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return of getMapping should be well typed, so any here shouldn't be needed.
| filter_path: ['*.mappings'], | ||
| }); | ||
| return (result as any).body ?? result; | ||
| } catch (err: any) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See if it's possible to provided error type(s) here that we want to handle instead of marking it as any.
| const type = err?.meta?.body?.error?.type; | ||
| if ( | ||
| retries > 0 && | ||
| (type === 'too_long_http_line_exception' || err.message?.includes('DeserializationError')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are checking errors for response here, isn't too_long_http_line_exception request time error which should have been circumvented by chunking, and we want to handle the the circuit_breaking_exception error here?
| const indices = Array.isArray(pattern.pattern) ? pattern.pattern : [pattern.pattern]; | ||
| try { | ||
| const resp = await reduceAsyncChunks<IndicesGetResponse>(indices, (indexChunk) => | ||
| safeEsCall(() => | ||
| esClient.indices.get({ | ||
| index: indexChunk, | ||
| features: ['mappings'], | ||
| filter_path: ['*.mappings', '*._meta'], | ||
| }) | ||
| ) | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getIndicesInfoForPattern calls each pattern one by one so chunking here isn't need. However filtering the output for features and path is useful (provided the filtered data still has the necessary fields).
| meta: indexInfo.mappings?._meta, | ||
| }; | ||
| }); | ||
| } catch (error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching and returning empty values here may skew the reports and aggregations. Better to let the error be thrown.
…tasks-generates-too-big-requests-and-responses
…tasks-generates-too-big-requests-and-responses
…t --include-path /api/status --include-path /api/alerting/rule/ --include-path /api/alerting/rules --include-path /api/actions --include-path /api/security/role --include-path /api/spaces --include-path /api/streams --include-path /api/fleet --include-path /api/dashboards --include-path /api/saved_objects/_import --include-path /api/saved_objects/_export --include-path /api/maintenance_window --update'
|
@elasticmachine merge upstream |
…tasks-generates-too-big-requests-and-responses
awahab07
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes.
I've tested the generated telemetry events payload on my end and it looks identical before and after the PR commits.
| Before | After |
|---|---|
[
{
"beat": [
],
"doc_count": 16,
"failure_store_doc_count": 0,
"failure_store_index_count": 0,
"field_count": 2,
"field_existence": {
"@timestamp": 16
},
"index_count": 1,
"managed_by": [
],
"namespace_count": 0,
"package_name": [
],
"pattern_name": "filebeat",
"shipper": "filebeat",
"size_in_bytes": 18021,
"structure_level": {
"1": 16
}
},
{
"beat": [
],
"doc_count": 1440,
"failure_store_doc_count": 0,
"failure_store_index_count": 0,
"field_count": 79,
"field_existence": {
"@timestamp": 1440,
"cloud.provider": 1440,
"container.id": 1440,
"data_stream.dataset": 1440,
"data_stream.namespace": 1440,
"data_stream.type": 1440,
"event.dataset": 1440,
"host.name": 1440,
"kubernetes.pod.uid": 1440,
"log.level": 1440,
"message": 1440,
"service.name": 1440
},
"index_count": 1,
"managed_by": [
],
"namespace_count": 1,
"package_name": [
],
"pattern_name": "apache",
"size_in_bytes": 2940697,
"structure_level": {
"6": 1440
}
},
{
"beat": [
],
"doc_count": 55606,
"failure_store_doc_count": 0,
"failure_store_index_count": 0,
"field_count": 105,
"field_existence": {
"@timestamp": 55606,
"cloud.provider": 5760,
"container.id": 5760,
"data_stream.dataset": 55606,
"data_stream.namespace": 55606,
"data_stream.type": 55606,
"event.dataset": 5760,
"host.name": 55606,
"kubernetes.pod.uid": 5760,
"log.level": 55600,
"message": 5760,
"service.name": 55600
},
"index_count": 8,
"managed_by": [
],
"namespace_count": 1,
"package_name": [
],
"pattern_name": "dsns-logs",
"size_in_bytes": 26329771,
"structure_level": {
"2": 49846,
"6": 5760
}
}
] |
[
{
"beat": [
],
"doc_count": 16,
"failure_store_doc_count": 0,
"failure_store_index_count": 0,
"field_count": 2,
"field_existence": {
"@timestamp": 16
},
"index_count": 1,
"managed_by": [
],
"namespace_count": 0,
"package_name": [
],
"pattern_name": "filebeat",
"shipper": "filebeat",
"size_in_bytes": 18021,
"structure_level": {
"1": 16
}
},
{
"beat": [
],
"doc_count": 1440,
"failure_store_doc_count": 0,
"failure_store_index_count": 0,
"field_count": 79,
"field_existence": {
"@timestamp": 1440,
"cloud.provider": 1440,
"container.id": 1440,
"data_stream.dataset": 1440,
"data_stream.namespace": 1440,
"data_stream.type": 1440,
"event.dataset": 1440,
"host.name": 1440,
"kubernetes.pod.uid": 1440,
"log.level": 1440,
"message": 1440,
"service.name": 1440
},
"index_count": 1,
"managed_by": [
],
"namespace_count": 1,
"package_name": [
],
"pattern_name": "apache",
"size_in_bytes": 2940697,
"structure_level": {
"6": 1440
}
},
{
"beat": [
],
"doc_count": 55606,
"failure_store_doc_count": 0,
"failure_store_index_count": 0,
"field_count": 105,
"field_existence": {
"@timestamp": 55606,
"cloud.provider": 5760,
"container.id": 5760,
"data_stream.dataset": 55606,
"data_stream.namespace": 55606,
"data_stream.type": 55606,
"event.dataset": 5760,
"host.name": 55606,
"kubernetes.pod.uid": 5760,
"log.level": 55600,
"message": 5760,
"service.name": 55600
},
"index_count": 8,
"managed_by": [
],
"namespace_count": 1,
"package_name": [
],
"pattern_name": "dsns-logs",
"size_in_bytes": 26329771,
"structure_level": {
"2": 49846,
"6": 5760
}
}
] |
The test data was generated via:
node scripts/synthtrace logs_traces_hosts --from=now-1w --to=now \
--scenarioOpts='{
"ingestHosts": true,
"ingestTraces": true,
"numSpaces": 3,
"datasets": [
"mysql.error", "mysql.slowlog",
"kubernetes.audit_logs", "kubernetes.container_logs",
"nginx.access", "nginx.error",
"system.auth", "system.syslog",
"apache.error", "apache.access",
"postgresql.error", "postgresql.log",
"redis.error", "redis.log",
"mongodb.error", "mongodb.log",
"haproxy.error", "haproxy.log",
"activemq.audit", "activemq.log",
"elasticsearch.audit", "elasticsearch.slowlog",
"elasticsearch.server", "elasticsearch.deprecation", "elasticsearch.gc",
"akamai.siem",
"auditd.log", "auditd_manager.auditd",
"cloud_security_posture.findings", "cloud_security_posture.vulnerabilities",
"docker.container_logs",
"elastic_agent.apm_server", "elastic_agent.filebeat", "elastic_agent.heartbeat",
"fleet_server.output_health", "fleet_server.logs",
"kibana.audit", "kibana.log",
"microsoft_sqlserver.audit", "microsoft_sqlserver.log",
"network_traffic.http", "network_traffic.dns"
]
}'The dataset names could be changed however to match all the the collected patterns for a more robust before and after comparison.
…tasks-generates-too-big-requests-and-responses
💛 Build succeeded, but was flaky
Failed CI StepsTest Failures
Metrics [docs]
History
|
|
Starting backport for target branches: 8.19 |
…astic#221153) After adding a chunking algorithm for getIndexBasicStats errors are not appearing anymore. For testing purposes I created 600 indicies using this command : `for i in {1..600}; do curl -u <username>:<password> -X POST "<elasticsearch_url>/<index_base_name>-$i/_doc" -H 'Content-Type: application/json' -d '{"@timestamp": "<timestamp>", "message": "<message>"}' done` and created this test for testing the telemetry function : ``` import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import { Client } from '@elastic/elasticsearch'; import { lastValueFrom } from 'rxjs'; import { getIndexBasicStats, addMappingsToIndices, getIndicesInfoForPattern, } from '@kbn/dataset-quality-plugin/server/services/data_telemetry/helpers'; export type IndicesGetResponse = Record<IndexName, any>; export type IndexName = string; async function runTest() { const client = new Client({ node: 'http://localhost:9200', auth: { username: '', password: '' }, }); const results = { addMappings: await testAddMappingsToIndicesTypeSafety(client), indicesInfo: await testGetIndicesInfoForPattern(client), basicStats: await testGetIndexBasicStats(client), }; process.stdout.write('Test results:\n'); process.stdout.write(`- addMappingsToIndices: ${results.addMappings ? 'SUCCESS' : 'FAILED'}\n`); process.stdout.write( `- getIndicesInfoForPattern: ${results.indicesInfo ? 'SUCCESS' : 'FAILED'}\n` ); process.stdout.write(`- getIndexBasicStats: ${results.basicStats ? 'SUCCESS' : 'FAILED'}\n`); } async function testAddMappingsToIndicesTypeSafety(client: Client): Promise<boolean> { try { const logsIndexPatterns = [{ patternName: 'logs-*', shipper: 'test', pattern: 'logs-*' }]; const dataStreamsInfo = [ { patternName: 'logs-*', shipper: 'test', isDataStream: false, name: 'logs-test-1', indices: ['logs-test-1'], mapping: undefined, }, ]; const observable = addMappingsToIndices({ esClient: client as unknown as ElasticsearchClient, dataStreamsInfo, logsIndexPatterns, }); const result = await lastValueFrom(observable); const hasMapping = result.length > 0 && !!result[0].mapping; process.stdout.write( `addMappingsToIndices ${hasMapping ? 'found mappings' : 'no mapping found'}\n` ); if (hasMapping) { const keys = Object.keys(result[0].mapping!); process.stdout.write(` Keys: ${keys.join(', ')}\n`); } return true; } catch (error) { process.stdout.write( `addMappingsToIndices error: ${error instanceof Error ? error.message : String(error)}\n` ); return false; } } async function testGetIndicesInfoForPattern(client: Client): Promise<boolean> { try { const pattern = { patternName: 'logs-*', shipper: 'test', pattern: 'logs-*' }; const result = await getIndicesInfoForPattern({ esClient: client as unknown as ElasticsearchClient, pattern, }); if (result.length > 0) { process.stdout.write( `getIndicesInfoForPattern found ${result.length} indices, sample: ${result[0].name}\n` ); return true; } else { process.stdout.write('getIndicesInfoForPattern returned no results\n'); return false; } } catch (error) { process.stdout.write( `getIndicesInfoForPattern error: ${error instanceof Error ? error.message : String(error)}\n` ); return false; } } async function testGetIndexBasicStats(client: Client): Promise<boolean> { try { const response = await client.cat.indices({ format: 'json', h: 'index', index: 'logs-*', }); const indicesList = response as Array<{ index: string }>; if (indicesList.length === 0) { process.stdout.write('No indices found to test with\n'); return false; } const indexBasicInfos = indicesList.map((indexObj) => ({ patternName: 'logs-*', shipper: 'test', isDataStream: false, name: indexObj.index, indices: [indexObj.index], mapping: undefined, })); process.stdout.write(`Testing with ${indexBasicInfos.length} indices\n`); const observable = getIndexBasicStats({ esClient: client as unknown as ElasticsearchClient, indices: indexBasicInfos, breatheDelay: 0, }); const result = await lastValueFrom(observable); process.stdout.write(`getIndexBasicStats processed ${result.length} indices\n`); return result.length > 0; } catch (error) { process.stdout.write( `getIndexBasicStats error: ${error instanceof Error ? error.message : String(error)}\n` ); return false; } } runTest().catch((err) => { process.stdout.write('Unhandled error in test:', err); }); ``` --------- Co-authored-by: kibanamachine <[email protected]> Co-authored-by: Elastic Machine <[email protected]> Co-authored-by: Abdul Zahid <[email protected]> (cherry picked from commit 2616174)
💚 All backports created successfully
Note: Successful backport PRs will be merged automatically after passing CI. Questions ?Please refer to the Backport tool documentation |
…astic#221153) After adding a chunking algorithm for getIndexBasicStats errors are not appearing anymore. For testing purposes I created 600 indicies using this command : `for i in {1..600}; do curl -u <username>:<password> -X POST "<elasticsearch_url>/<index_base_name>-$i/_doc" -H 'Content-Type: application/json' -d '{"@timestamp": "<timestamp>", "message": "<message>"}' done` and created this test for testing the telemetry function : ``` import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import { Client } from '@elastic/elasticsearch'; import { lastValueFrom } from 'rxjs'; import { getIndexBasicStats, addMappingsToIndices, getIndicesInfoForPattern, } from '@kbn/dataset-quality-plugin/server/services/data_telemetry/helpers'; export type IndicesGetResponse = Record<IndexName, any>; export type IndexName = string; async function runTest() { const client = new Client({ node: 'http://localhost:9200', auth: { username: '', password: '' }, }); const results = { addMappings: await testAddMappingsToIndicesTypeSafety(client), indicesInfo: await testGetIndicesInfoForPattern(client), basicStats: await testGetIndexBasicStats(client), }; process.stdout.write('Test results:\n'); process.stdout.write(`- addMappingsToIndices: ${results.addMappings ? 'SUCCESS' : 'FAILED'}\n`); process.stdout.write( `- getIndicesInfoForPattern: ${results.indicesInfo ? 'SUCCESS' : 'FAILED'}\n` ); process.stdout.write(`- getIndexBasicStats: ${results.basicStats ? 'SUCCESS' : 'FAILED'}\n`); } async function testAddMappingsToIndicesTypeSafety(client: Client): Promise<boolean> { try { const logsIndexPatterns = [{ patternName: 'logs-*', shipper: 'test', pattern: 'logs-*' }]; const dataStreamsInfo = [ { patternName: 'logs-*', shipper: 'test', isDataStream: false, name: 'logs-test-1', indices: ['logs-test-1'], mapping: undefined, }, ]; const observable = addMappingsToIndices({ esClient: client as unknown as ElasticsearchClient, dataStreamsInfo, logsIndexPatterns, }); const result = await lastValueFrom(observable); const hasMapping = result.length > 0 && !!result[0].mapping; process.stdout.write( `addMappingsToIndices ${hasMapping ? 'found mappings' : 'no mapping found'}\n` ); if (hasMapping) { const keys = Object.keys(result[0].mapping!); process.stdout.write(` Keys: ${keys.join(', ')}\n`); } return true; } catch (error) { process.stdout.write( `addMappingsToIndices error: ${error instanceof Error ? error.message : String(error)}\n` ); return false; } } async function testGetIndicesInfoForPattern(client: Client): Promise<boolean> { try { const pattern = { patternName: 'logs-*', shipper: 'test', pattern: 'logs-*' }; const result = await getIndicesInfoForPattern({ esClient: client as unknown as ElasticsearchClient, pattern, }); if (result.length > 0) { process.stdout.write( `getIndicesInfoForPattern found ${result.length} indices, sample: ${result[0].name}\n` ); return true; } else { process.stdout.write('getIndicesInfoForPattern returned no results\n'); return false; } } catch (error) { process.stdout.write( `getIndicesInfoForPattern error: ${error instanceof Error ? error.message : String(error)}\n` ); return false; } } async function testGetIndexBasicStats(client: Client): Promise<boolean> { try { const response = await client.cat.indices({ format: 'json', h: 'index', index: 'logs-*', }); const indicesList = response as Array<{ index: string }>; if (indicesList.length === 0) { process.stdout.write('No indices found to test with\n'); return false; } const indexBasicInfos = indicesList.map((indexObj) => ({ patternName: 'logs-*', shipper: 'test', isDataStream: false, name: indexObj.index, indices: [indexObj.index], mapping: undefined, })); process.stdout.write(`Testing with ${indexBasicInfos.length} indices\n`); const observable = getIndexBasicStats({ esClient: client as unknown as ElasticsearchClient, indices: indexBasicInfos, breatheDelay: 0, }); const result = await lastValueFrom(observable); process.stdout.write(`getIndexBasicStats processed ${result.length} indices\n`); return result.length > 0; } catch (error) { process.stdout.write( `getIndexBasicStats error: ${error instanceof Error ? error.message : String(error)}\n` ); return false; } } runTest().catch((err) => { process.stdout.write('Unhandled error in test:', err); }); ``` --------- Co-authored-by: kibanamachine <[email protected]> Co-authored-by: Elastic Machine <[email protected]> Co-authored-by: Abdul Zahid <[email protected]>
…ats (#221153) (#223380) # Backport This will backport the following commits from `main` to `8.19`: - [[Dataset Quality] Apply chunking algorithm for getIndexBasicStats (#221153)](#221153) <!--- Backport version: 9.6.6 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sorenlouv/backport) <!--BACKPORT [{"author":{"name":"Robert Stelmach","email":"[email protected]"},"sourceCommit":{"committedDate":"2025-06-11T09:53:07Z","message":"[Dataset Quality] Apply chunking algorithm for getIndexBasicStats (#221153)\n\nAfter adding a chunking algorithm for getIndexBasicStats errors are not\nappearing anymore.\nFor testing purposes I created 600 indicies using this command : \n`for i in {1..600}; do\ncurl -u <username>:<password> -X POST\n\"<elasticsearch_url>/<index_base_name>-$i/_doc\" -H 'Content-Type:\napplication/json' -d '{\"@timestamp\": \"<timestamp>\", \"message\":\n\"<message>\"}'\ndone`\n\n and created this test for testing the telemetry function :\n\n```\n\nimport { ElasticsearchClient } from '@kbn/core-elasticsearch-server';\nimport { Client } from '@elastic/elasticsearch';\nimport { lastValueFrom } from 'rxjs';\nimport {\n getIndexBasicStats,\n addMappingsToIndices,\n getIndicesInfoForPattern,\n} from '@kbn/dataset-quality-plugin/server/services/data_telemetry/helpers';\n\nexport type IndicesGetResponse = Record<IndexName, any>;\nexport type IndexName = string;\n\nasync function runTest() {\n const client = new Client({\n node: 'http://localhost:9200',\n auth: { username: '', password: '' },\n });\n\n const results = {\n addMappings: await testAddMappingsToIndicesTypeSafety(client),\n indicesInfo: await testGetIndicesInfoForPattern(client),\n basicStats: await testGetIndexBasicStats(client),\n };\n\n process.stdout.write('Test results:\\n');\n process.stdout.write(`- addMappingsToIndices: ${results.addMappings ? 'SUCCESS' : 'FAILED'}\\n`);\n process.stdout.write(\n `- getIndicesInfoForPattern: ${results.indicesInfo ? 'SUCCESS' : 'FAILED'}\\n`\n );\n process.stdout.write(`- getIndexBasicStats: ${results.basicStats ? 'SUCCESS' : 'FAILED'}\\n`);\n}\n\nasync function testAddMappingsToIndicesTypeSafety(client: Client): Promise<boolean> {\n try {\n const logsIndexPatterns = [{ patternName: 'logs-*', shipper: 'test', pattern: 'logs-*' }];\n const dataStreamsInfo = [\n {\n patternName: 'logs-*',\n shipper: 'test',\n isDataStream: false,\n name: 'logs-test-1',\n indices: ['logs-test-1'],\n mapping: undefined,\n },\n ];\n const observable = addMappingsToIndices({\n esClient: client as unknown as ElasticsearchClient,\n dataStreamsInfo,\n logsIndexPatterns,\n });\n const result = await lastValueFrom(observable);\n\n const hasMapping = result.length > 0 && !!result[0].mapping;\n process.stdout.write(\n `addMappingsToIndices ${hasMapping ? 'found mappings' : 'no mapping found'}\\n`\n );\n if (hasMapping) {\n const keys = Object.keys(result[0].mapping!);\n process.stdout.write(` Keys: ${keys.join(', ')}\\n`);\n }\n return true;\n } catch (error) {\n process.stdout.write(\n `addMappingsToIndices error: ${error instanceof Error ? error.message : String(error)}\\n`\n );\n return false;\n }\n}\n\nasync function testGetIndicesInfoForPattern(client: Client): Promise<boolean> {\n try {\n const pattern = { patternName: 'logs-*', shipper: 'test', pattern: 'logs-*' };\n const result = await getIndicesInfoForPattern({\n esClient: client as unknown as ElasticsearchClient,\n pattern,\n });\n\n if (result.length > 0) {\n process.stdout.write(\n `getIndicesInfoForPattern found ${result.length} indices, sample: ${result[0].name}\\n`\n );\n return true;\n } else {\n process.stdout.write('getIndicesInfoForPattern returned no results\\n');\n return false;\n }\n } catch (error) {\n process.stdout.write(\n `getIndicesInfoForPattern error: ${error instanceof Error ? error.message : String(error)}\\n`\n );\n return false;\n }\n}\n\nasync function testGetIndexBasicStats(client: Client): Promise<boolean> {\n try {\n const response = await client.cat.indices({\n format: 'json',\n h: 'index',\n index: 'logs-*',\n });\n\n const indicesList = response as Array<{ index: string }>;\n\n if (indicesList.length === 0) {\n process.stdout.write('No indices found to test with\\n');\n return false;\n }\n\n const indexBasicInfos = indicesList.map((indexObj) => ({\n patternName: 'logs-*',\n shipper: 'test',\n isDataStream: false,\n name: indexObj.index,\n indices: [indexObj.index],\n mapping: undefined,\n }));\n\n process.stdout.write(`Testing with ${indexBasicInfos.length} indices\\n`);\n\n const observable = getIndexBasicStats({\n esClient: client as unknown as ElasticsearchClient,\n indices: indexBasicInfos,\n breatheDelay: 0,\n });\n\n const result = await lastValueFrom(observable);\n process.stdout.write(`getIndexBasicStats processed ${result.length} indices\\n`);\n return result.length > 0;\n } catch (error) {\n process.stdout.write(\n `getIndexBasicStats error: ${error instanceof Error ? error.message : String(error)}\\n`\n );\n return false;\n }\n}\n\nrunTest().catch((err) => {\n process.stdout.write('Unhandled error in test:', err);\n});\n \n```\n\n---------\n\nCo-authored-by: kibanamachine <[email protected]>\nCo-authored-by: Elastic Machine <[email protected]>\nCo-authored-by: Abdul Zahid <[email protected]>","sha":"26161740f8b1557d97d5e734a580cca33e67a874","branchLabelMapping":{"^v9.1.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["bug","release_note:fix","Feature:Dataset Health","backport:version","v9.1.0","v8.19.0"],"title":"[Dataset Quality] Apply chunking algorithm for getIndexBasicStats","number":221153,"url":"https://github.com/elastic/kibana/pull/221153","mergeCommit":{"message":"[Dataset Quality] Apply chunking algorithm for getIndexBasicStats (#221153)\n\nAfter adding a chunking algorithm for getIndexBasicStats errors are not\nappearing anymore.\nFor testing purposes I created 600 indicies using this command : \n`for i in {1..600}; do\ncurl -u <username>:<password> -X POST\n\"<elasticsearch_url>/<index_base_name>-$i/_doc\" -H 'Content-Type:\napplication/json' -d '{\"@timestamp\": \"<timestamp>\", \"message\":\n\"<message>\"}'\ndone`\n\n and created this test for testing the telemetry function :\n\n```\n\nimport { ElasticsearchClient } from '@kbn/core-elasticsearch-server';\nimport { Client } from '@elastic/elasticsearch';\nimport { lastValueFrom } from 'rxjs';\nimport {\n getIndexBasicStats,\n addMappingsToIndices,\n getIndicesInfoForPattern,\n} from '@kbn/dataset-quality-plugin/server/services/data_telemetry/helpers';\n\nexport type IndicesGetResponse = Record<IndexName, any>;\nexport type IndexName = string;\n\nasync function runTest() {\n const client = new Client({\n node: 'http://localhost:9200',\n auth: { username: '', password: '' },\n });\n\n const results = {\n addMappings: await testAddMappingsToIndicesTypeSafety(client),\n indicesInfo: await testGetIndicesInfoForPattern(client),\n basicStats: await testGetIndexBasicStats(client),\n };\n\n process.stdout.write('Test results:\\n');\n process.stdout.write(`- addMappingsToIndices: ${results.addMappings ? 'SUCCESS' : 'FAILED'}\\n`);\n process.stdout.write(\n `- getIndicesInfoForPattern: ${results.indicesInfo ? 'SUCCESS' : 'FAILED'}\\n`\n );\n process.stdout.write(`- getIndexBasicStats: ${results.basicStats ? 'SUCCESS' : 'FAILED'}\\n`);\n}\n\nasync function testAddMappingsToIndicesTypeSafety(client: Client): Promise<boolean> {\n try {\n const logsIndexPatterns = [{ patternName: 'logs-*', shipper: 'test', pattern: 'logs-*' }];\n const dataStreamsInfo = [\n {\n patternName: 'logs-*',\n shipper: 'test',\n isDataStream: false,\n name: 'logs-test-1',\n indices: ['logs-test-1'],\n mapping: undefined,\n },\n ];\n const observable = addMappingsToIndices({\n esClient: client as unknown as ElasticsearchClient,\n dataStreamsInfo,\n logsIndexPatterns,\n });\n const result = await lastValueFrom(observable);\n\n const hasMapping = result.length > 0 && !!result[0].mapping;\n process.stdout.write(\n `addMappingsToIndices ${hasMapping ? 'found mappings' : 'no mapping found'}\\n`\n );\n if (hasMapping) {\n const keys = Object.keys(result[0].mapping!);\n process.stdout.write(` Keys: ${keys.join(', ')}\\n`);\n }\n return true;\n } catch (error) {\n process.stdout.write(\n `addMappingsToIndices error: ${error instanceof Error ? error.message : String(error)}\\n`\n );\n return false;\n }\n}\n\nasync function testGetIndicesInfoForPattern(client: Client): Promise<boolean> {\n try {\n const pattern = { patternName: 'logs-*', shipper: 'test', pattern: 'logs-*' };\n const result = await getIndicesInfoForPattern({\n esClient: client as unknown as ElasticsearchClient,\n pattern,\n });\n\n if (result.length > 0) {\n process.stdout.write(\n `getIndicesInfoForPattern found ${result.length} indices, sample: ${result[0].name}\\n`\n );\n return true;\n } else {\n process.stdout.write('getIndicesInfoForPattern returned no results\\n');\n return false;\n }\n } catch (error) {\n process.stdout.write(\n `getIndicesInfoForPattern error: ${error instanceof Error ? error.message : String(error)}\\n`\n );\n return false;\n }\n}\n\nasync function testGetIndexBasicStats(client: Client): Promise<boolean> {\n try {\n const response = await client.cat.indices({\n format: 'json',\n h: 'index',\n index: 'logs-*',\n });\n\n const indicesList = response as Array<{ index: string }>;\n\n if (indicesList.length === 0) {\n process.stdout.write('No indices found to test with\\n');\n return false;\n }\n\n const indexBasicInfos = indicesList.map((indexObj) => ({\n patternName: 'logs-*',\n shipper: 'test',\n isDataStream: false,\n name: indexObj.index,\n indices: [indexObj.index],\n mapping: undefined,\n }));\n\n process.stdout.write(`Testing with ${indexBasicInfos.length} indices\\n`);\n\n const observable = getIndexBasicStats({\n esClient: client as unknown as ElasticsearchClient,\n indices: indexBasicInfos,\n breatheDelay: 0,\n });\n\n const result = await lastValueFrom(observable);\n process.stdout.write(`getIndexBasicStats processed ${result.length} indices\\n`);\n return result.length > 0;\n } catch (error) {\n process.stdout.write(\n `getIndexBasicStats error: ${error instanceof Error ? error.message : String(error)}\\n`\n );\n return false;\n }\n}\n\nrunTest().catch((err) => {\n process.stdout.write('Unhandled error in test:', err);\n});\n \n```\n\n---------\n\nCo-authored-by: kibanamachine <[email protected]>\nCo-authored-by: Elastic Machine <[email protected]>\nCo-authored-by: Abdul Zahid <[email protected]>","sha":"26161740f8b1557d97d5e734a580cca33e67a874"}},"sourceBranch":"main","suggestedTargetBranches":["8.19"],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/221153","number":221153,"mergeCommit":{"message":"[Dataset Quality] Apply chunking algorithm for getIndexBasicStats (#221153)\n\nAfter adding a chunking algorithm for getIndexBasicStats errors are not\nappearing anymore.\nFor testing purposes I created 600 indicies using this command : \n`for i in {1..600}; do\ncurl -u <username>:<password> -X POST\n\"<elasticsearch_url>/<index_base_name>-$i/_doc\" -H 'Content-Type:\napplication/json' -d '{\"@timestamp\": \"<timestamp>\", \"message\":\n\"<message>\"}'\ndone`\n\n and created this test for testing the telemetry function :\n\n```\n\nimport { ElasticsearchClient } from '@kbn/core-elasticsearch-server';\nimport { Client } from '@elastic/elasticsearch';\nimport { lastValueFrom } from 'rxjs';\nimport {\n getIndexBasicStats,\n addMappingsToIndices,\n getIndicesInfoForPattern,\n} from '@kbn/dataset-quality-plugin/server/services/data_telemetry/helpers';\n\nexport type IndicesGetResponse = Record<IndexName, any>;\nexport type IndexName = string;\n\nasync function runTest() {\n const client = new Client({\n node: 'http://localhost:9200',\n auth: { username: '', password: '' },\n });\n\n const results = {\n addMappings: await testAddMappingsToIndicesTypeSafety(client),\n indicesInfo: await testGetIndicesInfoForPattern(client),\n basicStats: await testGetIndexBasicStats(client),\n };\n\n process.stdout.write('Test results:\\n');\n process.stdout.write(`- addMappingsToIndices: ${results.addMappings ? 'SUCCESS' : 'FAILED'}\\n`);\n process.stdout.write(\n `- getIndicesInfoForPattern: ${results.indicesInfo ? 'SUCCESS' : 'FAILED'}\\n`\n );\n process.stdout.write(`- getIndexBasicStats: ${results.basicStats ? 'SUCCESS' : 'FAILED'}\\n`);\n}\n\nasync function testAddMappingsToIndicesTypeSafety(client: Client): Promise<boolean> {\n try {\n const logsIndexPatterns = [{ patternName: 'logs-*', shipper: 'test', pattern: 'logs-*' }];\n const dataStreamsInfo = [\n {\n patternName: 'logs-*',\n shipper: 'test',\n isDataStream: false,\n name: 'logs-test-1',\n indices: ['logs-test-1'],\n mapping: undefined,\n },\n ];\n const observable = addMappingsToIndices({\n esClient: client as unknown as ElasticsearchClient,\n dataStreamsInfo,\n logsIndexPatterns,\n });\n const result = await lastValueFrom(observable);\n\n const hasMapping = result.length > 0 && !!result[0].mapping;\n process.stdout.write(\n `addMappingsToIndices ${hasMapping ? 'found mappings' : 'no mapping found'}\\n`\n );\n if (hasMapping) {\n const keys = Object.keys(result[0].mapping!);\n process.stdout.write(` Keys: ${keys.join(', ')}\\n`);\n }\n return true;\n } catch (error) {\n process.stdout.write(\n `addMappingsToIndices error: ${error instanceof Error ? error.message : String(error)}\\n`\n );\n return false;\n }\n}\n\nasync function testGetIndicesInfoForPattern(client: Client): Promise<boolean> {\n try {\n const pattern = { patternName: 'logs-*', shipper: 'test', pattern: 'logs-*' };\n const result = await getIndicesInfoForPattern({\n esClient: client as unknown as ElasticsearchClient,\n pattern,\n });\n\n if (result.length > 0) {\n process.stdout.write(\n `getIndicesInfoForPattern found ${result.length} indices, sample: ${result[0].name}\\n`\n );\n return true;\n } else {\n process.stdout.write('getIndicesInfoForPattern returned no results\\n');\n return false;\n }\n } catch (error) {\n process.stdout.write(\n `getIndicesInfoForPattern error: ${error instanceof Error ? error.message : String(error)}\\n`\n );\n return false;\n }\n}\n\nasync function testGetIndexBasicStats(client: Client): Promise<boolean> {\n try {\n const response = await client.cat.indices({\n format: 'json',\n h: 'index',\n index: 'logs-*',\n });\n\n const indicesList = response as Array<{ index: string }>;\n\n if (indicesList.length === 0) {\n process.stdout.write('No indices found to test with\\n');\n return false;\n }\n\n const indexBasicInfos = indicesList.map((indexObj) => ({\n patternName: 'logs-*',\n shipper: 'test',\n isDataStream: false,\n name: indexObj.index,\n indices: [indexObj.index],\n mapping: undefined,\n }));\n\n process.stdout.write(`Testing with ${indexBasicInfos.length} indices\\n`);\n\n const observable = getIndexBasicStats({\n esClient: client as unknown as ElasticsearchClient,\n indices: indexBasicInfos,\n breatheDelay: 0,\n });\n\n const result = await lastValueFrom(observable);\n process.stdout.write(`getIndexBasicStats processed ${result.length} indices\\n`);\n return result.length > 0;\n } catch (error) {\n process.stdout.write(\n `getIndexBasicStats error: ${error instanceof Error ? error.message : String(error)}\\n`\n );\n return false;\n }\n}\n\nrunTest().catch((err) => {\n process.stdout.write('Unhandled error in test:', err);\n});\n \n```\n\n---------\n\nCo-authored-by: kibanamachine <[email protected]>\nCo-authored-by: Elastic Machine <[email protected]>\nCo-authored-by: Abdul Zahid <[email protected]>","sha":"26161740f8b1557d97d5e734a580cca33e67a874"}},{"branch":"8.19","label":"v8.19.0","branchLabelMappingKey":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"state":"NOT_CREATED"}]}] BACKPORT--> Co-authored-by: Robert Stelmach <[email protected]> Co-authored-by: Elastic Machine <[email protected]> Co-authored-by: Abdul Zahid <[email protected]>
…astic#221153) After adding a chunking algorithm for getIndexBasicStats errors are not appearing anymore. For testing purposes I created 600 indicies using this command : `for i in {1..600}; do curl -u <username>:<password> -X POST "<elasticsearch_url>/<index_base_name>-$i/_doc" -H 'Content-Type: application/json' -d '{"@timestamp": "<timestamp>", "message": "<message>"}' done` and created this test for testing the telemetry function : ``` import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import { Client } from '@elastic/elasticsearch'; import { lastValueFrom } from 'rxjs'; import { getIndexBasicStats, addMappingsToIndices, getIndicesInfoForPattern, } from '@kbn/dataset-quality-plugin/server/services/data_telemetry/helpers'; export type IndicesGetResponse = Record<IndexName, any>; export type IndexName = string; async function runTest() { const client = new Client({ node: 'http://localhost:9200', auth: { username: '', password: '' }, }); const results = { addMappings: await testAddMappingsToIndicesTypeSafety(client), indicesInfo: await testGetIndicesInfoForPattern(client), basicStats: await testGetIndexBasicStats(client), }; process.stdout.write('Test results:\n'); process.stdout.write(`- addMappingsToIndices: ${results.addMappings ? 'SUCCESS' : 'FAILED'}\n`); process.stdout.write( `- getIndicesInfoForPattern: ${results.indicesInfo ? 'SUCCESS' : 'FAILED'}\n` ); process.stdout.write(`- getIndexBasicStats: ${results.basicStats ? 'SUCCESS' : 'FAILED'}\n`); } async function testAddMappingsToIndicesTypeSafety(client: Client): Promise<boolean> { try { const logsIndexPatterns = [{ patternName: 'logs-*', shipper: 'test', pattern: 'logs-*' }]; const dataStreamsInfo = [ { patternName: 'logs-*', shipper: 'test', isDataStream: false, name: 'logs-test-1', indices: ['logs-test-1'], mapping: undefined, }, ]; const observable = addMappingsToIndices({ esClient: client as unknown as ElasticsearchClient, dataStreamsInfo, logsIndexPatterns, }); const result = await lastValueFrom(observable); const hasMapping = result.length > 0 && !!result[0].mapping; process.stdout.write( `addMappingsToIndices ${hasMapping ? 'found mappings' : 'no mapping found'}\n` ); if (hasMapping) { const keys = Object.keys(result[0].mapping!); process.stdout.write(` Keys: ${keys.join(', ')}\n`); } return true; } catch (error) { process.stdout.write( `addMappingsToIndices error: ${error instanceof Error ? error.message : String(error)}\n` ); return false; } } async function testGetIndicesInfoForPattern(client: Client): Promise<boolean> { try { const pattern = { patternName: 'logs-*', shipper: 'test', pattern: 'logs-*' }; const result = await getIndicesInfoForPattern({ esClient: client as unknown as ElasticsearchClient, pattern, }); if (result.length > 0) { process.stdout.write( `getIndicesInfoForPattern found ${result.length} indices, sample: ${result[0].name}\n` ); return true; } else { process.stdout.write('getIndicesInfoForPattern returned no results\n'); return false; } } catch (error) { process.stdout.write( `getIndicesInfoForPattern error: ${error instanceof Error ? error.message : String(error)}\n` ); return false; } } async function testGetIndexBasicStats(client: Client): Promise<boolean> { try { const response = await client.cat.indices({ format: 'json', h: 'index', index: 'logs-*', }); const indicesList = response as Array<{ index: string }>; if (indicesList.length === 0) { process.stdout.write('No indices found to test with\n'); return false; } const indexBasicInfos = indicesList.map((indexObj) => ({ patternName: 'logs-*', shipper: 'test', isDataStream: false, name: indexObj.index, indices: [indexObj.index], mapping: undefined, })); process.stdout.write(`Testing with ${indexBasicInfos.length} indices\n`); const observable = getIndexBasicStats({ esClient: client as unknown as ElasticsearchClient, indices: indexBasicInfos, breatheDelay: 0, }); const result = await lastValueFrom(observable); process.stdout.write(`getIndexBasicStats processed ${result.length} indices\n`); return result.length > 0; } catch (error) { process.stdout.write( `getIndexBasicStats error: ${error instanceof Error ? error.message : String(error)}\n` ); return false; } } runTest().catch((err) => { process.stdout.write('Unhandled error in test:', err); }); ``` --------- Co-authored-by: kibanamachine <[email protected]> Co-authored-by: Elastic Machine <[email protected]> Co-authored-by: Abdul Zahid <[email protected]>
After adding a chunking algorithm for getIndexBasicStats errors are not appearing anymore.
For testing purposes I created 600 indicies using this command :
for i in {1..600}; do curl -u <username>:<password> -X POST "<elasticsearch_url>/<index_base_name>-$i/_doc" -H 'Content-Type: application/json' -d '{"@timestamp": "<timestamp>", "message": "<message>"}' doneand created this test for testing the telemetry function :