-
Notifications
You must be signed in to change notification settings - Fork 9.2k
Hadoop-19676: ABFS: Aggregated Metrics Manager with Multiple Emission Criteria #8137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
============================================================
|
| abfsConfiguration.isBackoffRetryMetricsEnabled()); | ||
| break; | ||
| case INTERNAL_FOOTER_METRIC_FORMAT: | ||
| initializeReadFooterMetrics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
break missing here
|
|
||
| @Override | ||
| public String toString() { | ||
| String metric = ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use empty string constant
| public static final String FS_AZURE_METRIC_ACCOUNT_NAME = "fs.azure.metric.account.name"; | ||
| public static final String FS_AZURE_METRIC_ACCOUNT_KEY = "fs.azure.metric.account.key"; | ||
| public static final String FS_AZURE_METRIC_URI = "fs.azure.metric.uri"; | ||
| public static final String FS_AZURE_METRIC_FORMAT = "fs.azure.metric.format"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add jvadoc for all these with {@value} tag.
...ls/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
Show resolved
Hide resolved
| public static final String FS_AZURE_METRIC_ACCOUNT_NAME = "fs.azure.metric.account.name"; | ||
| public static final String FS_AZURE_METRIC_ACCOUNT_KEY = "fs.azure.metric.account.key"; | ||
| public static final String FS_AZURE_METRIC_URI = "fs.azure.metric.uri"; | ||
| public static final String FS_AZURE_METRIC_FORMAT = "fs.azure.metric.format"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep all the metric related configs name consistent with same prefix.
fs.azure.metrics....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
metric account name, key and format have "metric", other configurations have "metrics". I have kept it intentionally. Do you want to change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think we should have common prefix for all the metric related configs
...ls/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBackoffMetrics.java
Show resolved
Hide resolved
| metricAccountKey)) { | ||
| int dotIndex = metricAccountName.indexOf(AbfsHttpConstants.DOT); | ||
| if (dotIndex <= 0) { | ||
| throw new InvalidUriException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a test around this exception if not already there
| final AbfsConfiguration abfsConfiguration, | ||
| final EncryptionContextProvider encryptionContextProvider, | ||
| final AbfsClientContext abfsClientContext, | ||
| final String fileSystemId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its better to pass it as a part of client context similar to other client related fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, make sense. Will do this change.
| this.metricsEmitScheduler | ||
| = Executors.newSingleThreadScheduledExecutor(); | ||
| // run every 1 minute to check the metrics count | ||
| this.metricsEmitScheduler.scheduleAtFixedRate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 2 separate schedulers being added here seems like.
Each client has its own scheduler and then the singleton metric manager class also has one?
Is this as per design?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is as per design, each file system will emit the metrics to manager class at regular interval if not closed. The singleton manager class will do actual API call to send those collected metrics.
| if (isMetricCollectionEnabled && runningTimerTask != null) { | ||
| runningTimerTask.cancel(); | ||
| timer.cancel(); | ||
| if (isMetricCollectionEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If not already done, verify that after FS close all the threads are properly getting shutdown and no leak is there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are few tests which already cover this. Will check if more test cases are needed to cover more scenario.
| private static final Logger LOG = LoggerFactory.getLogger( | ||
| AbfsReadFooterMetrics.class); | ||
|
|
||
| private static final String FOOTER_LENGTH = "20"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment as to why 20?
| operationType, failureReason, retryCount, retryPolicy.getAbbreviation(), retryInterval); | ||
| if (abfsBackoffMetrics != null) { | ||
| updateBackoffTimeMetrics(retryCount, sleepDuration); | ||
| updateBackoffTimeMetrics(retryCount, retryInterval); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Earlier sleepDuration was wrongly passed in updateBackoffTimeMetrics, actually it is retryInterval which tells us the delay between two retries.
Line 341 in 4a82720
| updateBackoffTimeMetrics(retryCount, sleepDuration); |
| } | ||
| StringBuilder metricBuilder = new StringBuilder(); | ||
| getRetryMetrics(metricBuilder); | ||
| if (isRetryMetricEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should always have retry metrics added as a part of these aggregate metrics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this change as per the discussion we had with the team. We can discuss this offline and if agreed by all I will revert the change.
| private static volatile AggregateMetricsManager instance; | ||
|
|
||
| // Rate limiter to control the rate of dispatching metrics. | ||
| private static volatile SimpleRateLimiter rateLimiter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rateLimiter is declared as static but is initialized in the constructor using permitsPerSecond.
This means the rate limiter ends up being global to the JVM, even though its value comes from instance-level configuration. In practice, whichever code initializes the manager first decides the rate, and any later calls to get() with different values are silently ignored.
|
|
||
| boolean isRemoved = bucket.deregisterClient(abfsClient); | ||
|
|
||
| if (bucket.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a small race window between isEmpty() and remove(). Another thread may concurrently register a new client for the same account and reuse the bucket, but it can still be removed based on the earlier emptiness check. This makes the behavior timing-dependent and hard to reason about under concurrency.
You can use buckets.computeIfPresent() to perform the emptiness check and removal atomically, which avoids this race and keeps the map state consistent.
| // Add shutdown hook to dispatch remaining metrics on JVM shutdown. | ||
| Runtime.getRuntime().addShutdownHook(new Thread(() -> { | ||
| dispatchMetrics(); | ||
| scheduler.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we wait for dispatch metrics to finish before scheduler is shutdown ?
| }); | ||
|
|
||
| // Schedule periodic dispatching of metrics. | ||
| this.scheduler.scheduleAtFixedRate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to use scheduleWithFixedDelay as scheduleAtFixedRate can overlap executions.
| * @param permitsPerSecond Rate limit for dispatching metrics. | ||
| * @return Singleton instance of AggregateMetricsManager. | ||
| */ | ||
| public static AggregateMetricsManager get(final long dispatchIntervalInMins, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get method name is very generic, use better naming
| @Override | ||
| public void initializeMetrics(MetricFormat metricFormat) { | ||
| public void initializeMetrics(final MetricFormat metricFormat, | ||
| final AbfsConfiguration abfsConfiguration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of passing AbfsConfiguration here, better to only pass isRetryMetricsEnabled.
Whole object is not needed
| if (abfsReadFooterMetrics == null) { | ||
| abfsReadFooterMetrics = new AbfsReadFooterMetrics(); | ||
| } else { | ||
| //In case metrics is emitted based on total count, there could be a chance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo, space after //
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger( | ||
| AbfsReadFooterMetrics.class); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Empty line changes can be reverted.
|
|
||
| /** | ||
| * Constructor to initialize the IOStatisticsStore with counters and mean statistics. | ||
| * Updates the file type based on the metrics collected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain in javadoc what logic is used to infer file type
| //In case metrics is emitted based on total count, there could be a chance | ||
| // that file type for which we have calculated the type will be lost. | ||
| // To avoid that, creating a new instance with existing map. | ||
| abfsReadFooterMetrics = new AbfsReadFooterMetrics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not able to get this.
How can the file type will be lost? It must be a part of AbfsReadFooterMetrics object only
| /** | ||
| * Initialize the read footer metrics. | ||
| * In case the metrics are already initialized, | ||
| * create a new instance with the existing map. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why create a new instance?
| /** | ||
| * Metrics to client request id header. | ||
| */ | ||
| AV0("av0", 3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add what 3 fields will be there in comment as added in other versions javadoc
| checkPrerequisites(); | ||
| } | ||
|
|
||
| private void checkPrerequisites(){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these moved somewhere else?
| : Stream.of(readFooterMetricsEnum.getName())) | ||
| .toArray(String[]::new); | ||
| private boolean haveEqualValues(String value) { | ||
| String[] parts = value.split("_"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add constant for underscore
| long nextReadPos, | ||
| int len, | ||
| long contentLength) { | ||
| if (nextReadPos |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Condition should be in same line
| return false; | ||
| } | ||
|
|
||
| if (clients.size() == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code checks whether the client is the last one before removing it, so another thread can change the client list in between, causing metrics to be sent at the wrong time or using a closing client.
Solution: first remove the client, then check if the client list is empty, and only then drain and send the remaining metrics.
| rateLimiter.acquire(); // Rate limiting | ||
| try { | ||
| client.getMetricCall(chunk); | ||
| } catch (IOException ignored) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can add a debug line here instead of ignoring exception
| // Send outside synchronized block | ||
| if (client != null && batchToSend != null && !batchToSend.isEmpty()) { | ||
| for (String chunk : splitListBySize(batchToSend, MAX_HEADER_SIZE)) { | ||
| rateLimiter.acquire(); // Rate limiting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can wait forever to acquire, should be time based ?
| List<String> result = new ArrayList<>(); | ||
| StringBuilder sb = new StringBuilder(); | ||
|
|
||
| for (String s : new ArrayList<>(items)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need of copy here, you can directly iterate as for (String s: items)
| } | ||
|
|
||
| // Check if there are no registered clients | ||
| public boolean isEmpty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be synchronized
| StringBuilder sb = new StringBuilder(); | ||
|
|
||
| for (String s : new ArrayList<>(items)) { | ||
| String wrapped = "[" + s + "]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use constants for colon and brackets
| /** | ||
| * Acquires a permit from the rate limiter, blocking until one is available. | ||
| */ | ||
| public synchronized void acquire() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method can block forever because it waits in an infinite loop while holding a synchronized lock, which can stall other threads and delay shutdown.
It also handles interrupts weakly and offers no timeout or non-blocking option, making it unsafe for critical paths.
| "Metric collection should be enabled even if metric account is not set") | ||
| .isTrue(); | ||
|
|
||
| Assertions.assertThat( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If its INTERNAL_METRIC_FORMAT it should have both backoff and footer metrics right ?
| tracingContext.constructHeader(abfsHttpOperation, null, | ||
| EXPONENTIAL_RETRY_POLICY_ABBREVIATION); | ||
| assertThat(abfsHttpOperation.getClientRequestId()) | ||
| .describedAs("ClientRequestId should be contains Backoff metrics") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should be containing
| public void testAggregatedMetricsManagerWithJVMExit0() | ||
| throws IOException, InterruptedException { | ||
| // ------------------------------- | ||
| // Program 1 (kept exactly as you asked) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this comment
| private void runProgramAndCaptureOutput(String program, | ||
| boolean expectMetricsFlush, int expectedExitCode) | ||
| throws IOException, InterruptedException { | ||
| Path tempFile = Files.createTempFile("ShutdownTestProg", ".java"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These temporary files are never deleted
| .start(); | ||
|
|
||
| String compileOutput = readProcessOutput(javac); | ||
| javac.waitFor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add timed wait like
if (!javac.waitFor(30, TimeUnit.SECONDS)) {
javac.destroyForcibly();
throw new AssertionError("javac timed out");
}
| .start(); | ||
|
|
||
| String output = readProcessOutput(javaProc); | ||
| int exitCode = javaProc.waitFor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
| initializeReadFooterMetrics(); | ||
| break; | ||
| default: | ||
| break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: break is redundant
| case INTERNAL_METRIC_FORMAT: | ||
| abfsBackoffMetrics = new AbfsBackoffMetrics( | ||
| abfsConfiguration.isBackoffRetryMetricsEnabled()); | ||
| initializeReadFooterMetrics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can do sm like
abfsReadFooterMetrics = new AbfsReadFooterMetrics(
abfsReadFooterMetrics == null ? null : abfsReadFooterMetrics.getFileTypeMetricsMap()
);
we would have a single constructor for AbfsReadFooterMetrics then
| * @throws IOException if URL is malformed. | ||
| */ | ||
| private void setMetricsUrl(String urlString) throws IOException { | ||
| metricUrl = UriUtils.changeUrlFromBlobToDfs(new URL(urlString)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why cant we send it for blob endpoints?
| * | ||
| * @param fileTypeMetricsMap the map to track file type metrics | ||
| */ | ||
| public AbfsReadFooterMetrics(Map<String, FileTypeMetrics> fileTypeMetricsMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can try combining the 2 constructors from the above comment
| private final long intervalNanos; | ||
|
|
||
| // Next allowed time to acquire a permit in nanoseconds. | ||
| private long nextAllowedTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have this as atomic/volatile?
|
|
||
| import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; | ||
|
|
||
| public final class SimpleRateLimiter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can add some class definition here and if its local to JVM
| ALL_ID_FORMAT, // <client-correlation-id>:<client-req-id>:<filesystem-id> | ||
| // :<primary-req-id>:<stream-id>:<hdfs-operation>:<retry-count> | ||
|
|
||
| AGGREGATED_METRICS_FORMAT; // <client-correlation-id>:<filesystem-id> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can have the comment in same line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
This PR introduces a centralized Aggregated Metrics Manager and defines the conditions under which aggregated metrics are emitted from individual file systems.
Key Changes
Aggregated metrics are emitted based on the following conditions:
Time-based interval - Each file system periodically emits its collected metrics at a fixed interval. After emission, metric collection is reset.
Threshold-based emission - A scheduler runs at regular intervals to check whether the total number of operations has exceeded a configured threshold. This prevents the aggregated metrics string from growing too large to be safely sent as an HTTP request header. If the threshold is reached, the collected metrics are emitted immediately, and metric collection is reset.
Idle-period emission - If a file system remains idle for a configured duration, any accumulated metrics are emitted, and metric collection is reset.
File system close - When a file system is closed, all remaining collected metrics are emitted to ensure no data is lost.
All file systems now push their aggregated metrics to a shared Aggregated Metrics Manager. This manager evaluates the configured emission criteria and determines whether metrics should be emitted immediately or deferred until a later time.
This will also rate limit the number of metrics calls per second.