Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class ServiceMetricNames {

public static final String CREATE_FLOW_METER = "CreateFlow";
public static final String DELETE_FLOW_METER = "DeleteFlow";
public static final String FLOW_SPEC_EXISTS_FOR_ADHOC_FLOW = "flowSpecExistsForAdhocFlow";
public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
public static final String SUCCESSFUL_FLOW_METER = "SuccessfulFlows";
public static final String START_SLA_EXCEEDED_FLOWS_METER = "StartSLAExceededFlows";
Expand Down Expand Up @@ -96,6 +97,8 @@ public class ServiceMetricNames {
public static final String DAG_ACTIONS_ACT_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActFailed.";
public static final String DAG_ACTIONS_ACT_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActSucceeded.";
public static final String DAG_ACTIONS_CONCLUDE_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFailed.";
public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalSucceeded";
public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalFailed";
public static final String DAG_ACTIONS_CONCLUDE_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeSucceeded.";
public static final String DAG_ACTIONS_DELETE_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteSucceeded.";
public static final String DAG_ACTIONS_DELETE_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteFailed.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class FlowConfigsV2ResourceHandler implements FlowConfigsResourceHandlerI
protected FlowCatalog flowCatalog;
protected final ContextAwareMeter createFlow;
protected final ContextAwareMeter deleteFlow;
protected final ContextAwareMeter flowSpecExistsForAdhocFlow;
protected final ContextAwareMeter runImmediatelyFlow;

@Inject
Expand All @@ -100,6 +101,8 @@ public FlowConfigsV2ResourceHandler(@Named(InjectionNames.SERVICE_NAME) String s
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.DELETE_FLOW_METER));
this.runImmediatelyFlow = metricContext.contextAwareMeter(
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.RUN_IMMEDIATELY_FLOW_METER));
this.flowSpecExistsForAdhocFlow = metricContext.contextAwareMeter(
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames. RUN_IMMEDIATELY_FLOW_METER));
}

public FlowConfig getFlowConfig(FlowId flowId)
Expand Down Expand Up @@ -248,8 +251,22 @@ public CreateKVResponse<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> cr
// Return conflict and take no action if flowSpec has already been created
if (this.flowCatalog.exists(flowSpec.getUri())) {
log.warn("FlowSpec with URI {} already exists, no action will be taken", flowSpec.getUri());
return new CreateKVResponse<>(new RestLiServiceException(HttpStatus.S_409_CONFLICT,
"FlowSpec with URI " + flowSpec.getUri() + " already exists, no action will be taken"));
try {
FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri());
if (!storedFlowSpec.isScheduled()) {
log.error("FlowSpec Already Exists As Adhoc Flow with URI: " + flowSpec.getUri());
if (!flowSpec.isScheduled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it was an adhoc flow(!storedFlowSpec.isScheduled()) and flowSpec is not deleted, that should be sufficient condition to mark the metric for unexpected behaviour. We don't need to check if current flowSpec is scheduled or not

flowSpecExistsForAdhocFlow.mark();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there’s an expected case where a FlowSpec isn't deleted for an adhoc flow, specifically when a new flow is triggered before the previous one is launched. In that case, we throw a TooSoonToRerunSameFlowException from Orchestrator.onAddSpec. We should exclude these scenarios, since these are valid, and only flag cases where the FlowSpec wasn't deleted even after the DAG was launched, which is unexpected.

This would mark the metric for both the scenarios and we would have false/non-actionable alerts in such cases

}
} else {
log.error("FlowSpec Already Exists As Scheduled Flow with URI: " + flowSpec.getUri());
}
} catch (SpecNotFoundException e) {
log.error("Error Retrieving FLow For Existing Flow With URI: " + flowSpec.getUri());
Comment on lines +264 to +265
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not required, since the if block has already checked for existence of flowSpec

} finally {
return new CreateKVResponse<>(new RestLiServiceException(HttpStatus.S_409_CONFLICT,
"FlowSpec with URI " + flowSpec.getUri() + " already exists, no action will be taken"));
}
}

Map<String, AddSpecResponse> responseMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class DagProcessingEngineMetrics {
private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> dagActionsActSucceededMeterByDagActionType = new HashMap<>();
private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> dagActionsConcludeFailedMeterByDagActionType = new HashMap<>();
private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> dagActionsConcludeSucceededMeterByDagActionType = new HashMap<>();
private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType = new HashMap<>();
private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> dagActionsConcludeFlowSpecRemovalFailedMetreByDagActionType = new HashMap<>();
private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> dagActionsDeleteFailedMeterByDagActionType = new HashMap<>();
private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> dagActionsDeleteSucceededMeterByDagActionType = new HashMap<>();
private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> dagActionsAverageProcessingDelayMillisMeterByDagActionType = new HashMap<>();
Expand Down Expand Up @@ -89,6 +91,8 @@ public void registerAllMetrics() {
registerMetricForEachDagActionType(this.dagActionsActSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_ACT_SUCCEEDED);
registerMetricForEachDagActionType(this.dagActionsConcludeFailedMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FAILED);
registerMetricForEachDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_SUCCEEDED);
registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED);
registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalFailedMetreByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_FAILED);
registerMetricForEachDagActionType(this.dagActionsDeleteFailedMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_DELETE_FAILED);
registerMetricForEachDagActionType(this.dagActionsDeleteSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_DELETE_SUCCEEDED);
registerMetricForEachDagActionType(this.dagActionsAverageProcessingDelayMillisMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_AVERAGE_PROCESSING_DELAY_MILLIS);
Expand Down Expand Up @@ -163,6 +167,14 @@ public void markDagActionsConclude(DagActionStore.DagActionType dagActionType, b
}
}

public void markDagActionsConflowFlowSpecRemoval(DagActionStore.DagActionType dagActionType, boolean succeeded) {
if (succeeded) {
updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType, dagActionType);
} else {
updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalFailedMetreByDagActionType, dagActionType);
}
}

public void markDagActionsDeleted(DagActionStore.DagActionType dagActionType, boolean succeeded) {
if (succeeded) {
updateMetricForDagActionType(this.dagActionsDeleteSucceededMeterByDagActionType, dagActionType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public abstract class DagTask {
@Getter public final DagActionStore.DagAction dagAction;
protected final DagManagementStateStore dagManagementStateStore;
private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
private final DagProcessingEngineMetrics dagProcEngineMetrics;
protected final DagProcessingEngineMetrics dagProcEngineMetrics;

public DagTask(DagActionStore.DagAction dagAction, LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
DagManagementStateStore dagManagementStateStore, DagProcessingEngineMetrics dagProcEngineMetrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,14 @@ public final boolean conclude() {
FlowSpec flowSpec =
this.dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(dagId.getFlowId()));
if (!flowSpec.isScheduled()) {
dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new Properties(), false);
try {
//This can throw Runtime, IllegalState and IO Exceptions which are not caught here.
dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new Properties(), false);
} catch (Exception e) {
super.dagProcEngineMetrics.markDagActionsConflowFlowSpecRemoval(this.dagAction.getDagActionType(), false);
log.error("Failed to Remove The FlowSpec For Adhoc Flow with URI: " + flowSpec.getUri());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please log the exception as well using log.error(..., e), so that stack trace is captured.

return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

earlier the RuntimeException was not caught here, so it was getting caught in DagProcessingEngine which was marking dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();, now we are gulping the exception so it would not be handled in DagProcessingEngine, we should re-throw the exception here

}
}
return true;
}
Expand Down