Skip to content

Conversation

aga9900
Copy link
Contributor

@aga9900 aga9900 commented Aug 26, 2025

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    This PR does 2 things:
  1. Send Metrics When 409 is sent to Users In Case Of Conflicting Flows.
  2. Send metrics When Flow Spec Deletion Fails for an Adhoc Flow.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    Only Adding New Metrics and Logging.

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@aga9900 aga9900 changed the title [GOBBLIN-2220]: Send Metrics When an adhoc Flow Spec Already Exists For Adhoc Flow (edited) [GOBBLIN-2220]: Send Metrics When an adhoc Flow Spec Already Exists For Adhoc Flow Aug 26, 2025
@aga9900 aga9900 changed the title [GOBBLIN-2220]: Send Metrics When an adhoc Flow Spec Already Exists For Adhoc Flow [GOBBLIN-2220]: Send Metrics When Flow Spec Already Exists For An Adhoc Flow Aug 26, 2025
public static final String DAG_ACTIONS_ACT_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActFailed.";
public static final String DAG_ACTIONS_ACT_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActSucceeded.";
public static final String DAG_ACTIONS_CONCLUDE_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFailed.";
public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalSucceeded.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

success is the expected/default case, so the success metric doesn't give us any additional signal. The failure metric is actionable, and tracking failures should be sufficient here. I would suggest dropping success metric for this one as it just adds noise

dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new Properties(), false);
} catch (Exception e) {
super.dagProcEngineMetrics.markDagActionsConcludeFlowSpecRemoval(this.dagAction.getDagActionType(), false);
log.error("Failed to Remove The FlowSpec For Adhoc Flow with URI: " + flowSpec.getUri());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please log the exception as well using log.error(..., e), so that stack trace is captured.

} catch (Exception e) {
super.dagProcEngineMetrics.markDagActionsConcludeFlowSpecRemoval(this.dagAction.getDagActionType(), false);
log.error("Failed to Remove The FlowSpec For Adhoc Flow with URI: " + flowSpec.getUri());
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

earlier the RuntimeException was not caught here, so it was getting caught in DagProcessingEngine which was marking dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();, now we are gulping the exception so it would not be handled in DagProcessingEngine, we should re-throw the exception here

log.error("Failed to Remove The FlowSpec For Adhoc Flow with URI: " + flowSpec.getUri());
return false;
}
super.dagProcEngineMetrics.markDagActionsConcludeFlowSpecRemoval(this.dagAction.getDagActionType(), true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo, we don't need to mark for success

Comment on lines +264 to +265
} catch (SpecNotFoundException e) {
log.error("Error Retrieving FLow For Existing Flow With URI: " + flowSpec.getUri());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not required, since the if block has already checked for existence of flowSpec

flowSpecExistsForAdhocFlow.mark();
}
} else {
log.warn("FlowSpec Already Exists As Scheduled Flow with URI: " + flowSpec.getUri());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move log from line 253 to here, since we are logging twice now

FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri());
if (!storedFlowSpec.isScheduled()) {
log.warn("FlowSpec Already Exists As Adhoc Flow with URI: " + flowSpec.getUri());
if (!flowSpec.isScheduled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it was an adhoc flow(!storedFlowSpec.isScheduled()) and flowSpec is not deleted, that should be sufficient condition to mark the metric for unexpected behaviour. We don't need to check if current flowSpec is scheduled or not

if (!storedFlowSpec.isScheduled()) {
log.warn("FlowSpec Already Exists As Adhoc Flow with URI: " + flowSpec.getUri());
if (!flowSpec.isScheduled()) {
flowSpecExistsForAdhocFlow.mark();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there’s an expected case where a FlowSpec isn't deleted for an adhoc flow, specifically when a new flow is triggered before the previous one is launched. In that case, we throw a TooSoonToRerunSameFlowException from Orchestrator.onAddSpec. We should exclude these scenarios, since these are valid, and only flag cases where the FlowSpec wasn't deleted even after the DAG was launched, which is unexpected.

This would mark the metric for both the scenarios and we would have false/non-actionable alerts in such cases

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants