Skip to content

Commit a75214d

Browse files
author
Aditya Pratap Singh
committed
added some unit tests for gobblin temporal module
1 parent 72306bb commit a75214d

File tree

6 files changed

+292
-105
lines changed

6 files changed

+292
-105
lines changed

gobblin-temporal/build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,14 @@ dependencies {
6767
testCompile project(":gobblin-example")
6868

6969
testCompile externalDependency.testng
70-
testCompile externalDependency.mockito
70+
testCompile externalDependency.mockitoInline
71+
testCompile externalDependency.powerMockApi
72+
testCompile externalDependency.powerMockModule
7173
testCompile externalDependency.hadoopYarnMiniCluster
7274
testCompile externalDependency.curatorFramework
7375
testCompile externalDependency.curatorTest
7476

77+
7578
testCompile ('com.google.inject:guice:3.0') {
7679
force = true
7780
}

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ public CommitStats commit(WUProcessingSpec workSpec) {
6666
return commitGobblinStats;
6767
}
6868

69-
private List<DatasetTaskSummary> convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
69+
70+
protected List<DatasetTaskSummary> convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
7071
List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
7172
for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
7273
datasetTaskSummaries.add(new DatasetTaskSummary(entry.getKey(), entry.getValue().getRecordsWritten(), entry.getValue().getBytesWritten(), entry.getValue().isSuccessfullyCommitted()));

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.gobblin.temporal.util.nesting.workflow;
1919

20+
import com.google.common.annotations.VisibleForTesting;
2021
import java.time.Duration;
2122
import java.util.ArrayList;
2223
import java.util.Collections;
@@ -115,8 +116,9 @@ protected NestingExecWorkflow<WORK_ITEM> createChildWorkflow(final WorkflowAddr
115116
return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
116117
}
117118

119+
@VisibleForTesting
118120
/** @return how long to pause prior to creating a child workflow, based on `numDirectLeavesChildMayHave` */
119-
protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) {
121+
public Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) {
120122
// (only pause when an appreciable number of leaves)
121123
// TODO: use a configuration value, for simpler adjustment, rather than hard-code
122124
return numDirectLeavesChildMayHave > MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT
@@ -130,11 +132,9 @@ protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChi
130132
* List<Integer> naiveUniformity = Collections.nCopies(numSubTreesPerSubTree, numSubTreeChildren);
131133
* @return each sub-tree's desired size, in ascending sub-tree order
132134
*/
133-
protected static List<Integer> consolidateSubTreeGrandChildren(
134-
final int numSubTreesPerSubTree,
135-
final int numChildrenTotal,
136-
final int numSubTreeChildren
137-
) {
135+
@VisibleForTesting
136+
public static List<Integer> consolidateSubTreeGrandChildren(final int numSubTreesPerSubTree,
137+
final int numChildrenTotal, final int numSubTreeChildren) {
138138
if (numSubTreesPerSubTree <= 0) {
139139
return Lists.newArrayList();
140140
} else if (isSqrt(numSubTreeChildren, numChildrenTotal)) {

gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java

Lines changed: 0 additions & 97 deletions
This file was deleted.
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gobblin.temporal.ddm.utils;
18+
19+
import java.io.DataOutputStream;
20+
import java.io.IOException;
21+
import java.net.URI;
22+
import java.net.URISyntaxException;
23+
import java.util.Properties;
24+
25+
import org.mockito.Mockito;
26+
import org.testng.Assert;
27+
import org.testng.annotations.BeforeMethod;
28+
import org.testng.annotations.Test;
29+
import org.apache.hadoop.fs.FSDataOutputStream;
30+
import org.apache.hadoop.fs.FileSystem;
31+
import org.apache.hadoop.fs.Path;
32+
33+
import org.apache.gobblin.runtime.JobState;
34+
import org.apache.gobblin.runtime.TaskState;
35+
import org.apache.gobblin.source.Source;
36+
import org.apache.gobblin.metastore.StateStore;
37+
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
38+
39+
40+
public class JobStateUtilTest {
41+
42+
private JobState jobState;
43+
private FileSystem fileSystem;
44+
45+
@BeforeMethod
46+
public void setUp() {
47+
jobState = Mockito.mock(JobState.class);
48+
fileSystem = Mockito.mock(FileSystem.class);
49+
}
50+
51+
@Test
52+
public void testOpenFileSystem() throws IOException {
53+
54+
Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test");
55+
Mockito.when(jobState.getProperties()).thenReturn(new Properties());
56+
57+
FileSystem fs = JobStateUtils.openFileSystem(jobState);
58+
59+
Assert.assertNotNull(fs);
60+
Mockito.verify(jobState,Mockito.times(1)).getProp(Mockito.anyString(), Mockito.anyString());
61+
}
62+
63+
@Test
64+
public void testCreateSource() throws ReflectiveOperationException {
65+
Mockito.when(jobState.getProp(Mockito.anyString()))
66+
.thenReturn("org.apache.gobblin.source.extractor.filebased.TextFileBasedSource");
67+
Source<?, ?> source = JobStateUtils.createSource(jobState);
68+
Assert.assertNotNull(source);
69+
}
70+
71+
@Test
72+
public void testOpenTaskStateStoreUncached() throws URISyntaxException {
73+
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("file:///test");
74+
Mockito.when(jobState.getJobId()).thenReturn("testJobId");
75+
Mockito.when(jobState.getJobName()).thenReturn("testJobName");
76+
Mockito.when(fileSystem.makeQualified(Mockito.any()))
77+
.thenReturn(new Path("file:///test/testJobName/testJobId/output"));
78+
Mockito.when(fileSystem.getUri()).thenReturn(new URI("file:///test/testJobName/testJobId/output"));
79+
80+
StateStore<TaskState> stateStore = JobStateUtils.openTaskStateStoreUncached(jobState, fileSystem);
81+
82+
Assert.assertNotNull(stateStore);
83+
}
84+
85+
@Test
86+
public void testGetFileSystemUri() {
87+
Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test");
88+
URI fsUri = JobStateUtils.getFileSystemUri(jobState);
89+
Assert.assertEquals(URI.create("file:///test"), fsUri);
90+
Mockito.verify(jobState).getProp(Mockito.anyString(), Mockito.anyString());
91+
}
92+
93+
@Test
94+
public void testGetWorkDirRoot() {
95+
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp");
96+
Mockito.when(jobState.getJobName()).thenReturn("testJob");
97+
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
98+
Path rootPath = JobStateUtils.getWorkDirRoot(jobState);
99+
Assert.assertEquals(new Path("/tmp/testJob/jobId123"), rootPath);
100+
Mockito.verify(jobState, Mockito.times(1)).getProp(Mockito.anyString());
101+
}
102+
103+
@Test
104+
public void testGetWorkUnitsPath() {
105+
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp");
106+
Mockito.when(jobState.getJobName()).thenReturn("testJob");
107+
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
108+
Path workUnitsPath = JobStateUtils.getWorkUnitsPath(jobState);
109+
Assert.assertEquals(new Path("/tmp/testJob/jobId123/input"), workUnitsPath);
110+
}
111+
112+
@Test
113+
public void testGetTaskStateStorePath() throws IOException {
114+
Mockito.when(fileSystem.makeQualified(Mockito.any(Path.class))).thenReturn(new Path("/qualified/path"));
115+
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp");
116+
Mockito.when(jobState.getJobName()).thenReturn("testJob");
117+
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
118+
Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState, fileSystem);
119+
Assert.assertEquals(new Path("/qualified/path"), taskStateStorePath);
120+
}
121+
122+
@Test
123+
public void testWriteJobState() throws IOException {
124+
Path workDirRootPath = new Path("/tmp");
125+
FSDataOutputStream dos = Mockito.mock(FSDataOutputStream.class);
126+
Mockito.when(fileSystem.create(Mockito.any(Path.class))).thenReturn(dos);
127+
128+
JobStateUtils.writeJobState(jobState, workDirRootPath, fileSystem);
129+
130+
Mockito.verify(fileSystem).create(Mockito.any(Path.class));
131+
Mockito.verify(jobState).write(Mockito.any(DataOutputStream.class), Mockito.anyBoolean(), Mockito.anyBoolean());
132+
}
133+
134+
@Test
135+
public void testGetSharedResourcesBroker() {
136+
Mockito.when(jobState.getProperties()).thenReturn(System.getProperties());
137+
Mockito.when(jobState.getJobName()).thenReturn("testJob");
138+
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
139+
Assert.assertNotNull(JobStateUtils.getSharedResourcesBroker(jobState));
140+
}
141+
}

0 commit comments

Comments
 (0)