Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9ec04fa
Initial version of FirestoreToFirestore copy pipeline
pacoavila808 Dec 2, 2025
5fb2571
reformat
pacoavila808 Dec 2, 2025
9763fb2
Fix pipeline description in pom.xml
pacoavila808 Dec 3, 2025
9d94e5a
Cleanup options, refactor transforms
pacoavila808 Dec 3, 2025
50a5fe2
Cleanup comments
pacoavila808 Dec 3, 2025
82f663e
remove classic-templates dependency
pacoavila808 Dec 4, 2025
d5321f1
Fix style issues
pacoavila808 Dec 4, 2025
59d6028
Revert changes to DatastoreConverters
pacoavila808 Dec 4, 2025
4c4cd0a
Add logging and UncaughtExceptionLogger
pacoavila808 Dec 5, 2025
f0314d1
Wrap in try catch
pacoavila808 Dec 5, 2025
0fa179f
Don't wait for pipeline to finish
pacoavila808 Dec 5, 2025
977a77f
Set collection id and clean up workers parameter
pacoavila808 Dec 5, 2025
25bf927
Fix database id bug
pacoavila808 Dec 5, 2025
8963782
Add project and database to read fn, Update beam version to 2.71.0-SN…
pacoavila808 Dec 9, 2025
ae06e74
Spotless apply
pacoavila808 Dec 9, 2025
9e9c2a2
Fix database parameters provided
pacoavila808 Dec 9, 2025
848deab
Remove unused parameter. Comment cleanup.
pacoavila808 Dec 10, 2025
2c51f39
Generate readme
pacoavila808 Dec 10, 2025
10e5e24
Cleanup docs
pacoavila808 Dec 10, 2025
832680f
Add support for multiple collections
pacoavila808 Dec 10, 2025
7ea150b
Cleanup tests
pacoavila808 Dec 10, 2025
83e6822
regen docs
pacoavila808 Dec 10, 2025
b2b8408
Default to all collections if no flag provided
pacoavila808 Dec 10, 2025
305319c
Rename variables
pacoavila808 Dec 10, 2025
743a1b3
Default to source project if not provided
pacoavila808 Dec 10, 2025
337d3ec
Require database id
pacoavila808 Dec 10, 2025
fd47b33
Add options validation
pacoavila808 Dec 10, 2025
ace3332
regen docs
pacoavila808 Dec 10, 2025
9d25a62
cleanup
pacoavila808 Dec 11, 2025
accaad4
Generate terraform template
pacoavila808 Dec 11, 2025
80a1c78
Update v2/firestore-to-firestore/src/main/java/com/google/cloud/telep…
pacoavila808 Dec 11, 2025
c105ebe
Fix comments in package-info
pacoavila808 Dec 11, 2025
a125cd8
Address gemini comments
pacoavila808 Dec 11, 2025
e51ccfc
Add integration test
pacoavila808 Dec 11, 2025
71e4983
working integration test with fixed database ids
pacoavila808 Dec 11, 2025
93ddde4
Provision databases as part of test setup
pacoavila808 Dec 11, 2025
7de98db
cleanup
pacoavila808 Dec 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions it/google-cloud-platform/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-firestore</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-firestore-admin</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastream</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package org.apache.beam.it.gcp.firestore;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.firestore.v1.FirestoreAdminClient;
import com.google.common.base.Strings;
import com.google.firestore.admin.v1.CreateDatabaseRequest;
import com.google.firestore.admin.v1.Database;
import com.google.firestore.admin.v1.Database.DatabaseEdition;
import com.google.firestore.admin.v1.Database.DatabaseType;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.beam.it.common.ResourceManager;

public class FirestoreAdminResourceManager implements ResourceManager {

private final FirestoreAdminClient firestoreAdminClient;

private final String projectId;
private final String region;

private final Set<String> databaseIds;

private FirestoreAdminResourceManager(Builder builder) {
try {
this.firestoreAdminClient = FirestoreAdminClient.create();
} catch (Exception e) {
throw new RuntimeException("Failed to create FirestoreAdminClient", e);
}
this.projectId = builder.projectId;
this.region = builder.region;
this.databaseIds = new HashSet<>();
}

@Override
public void cleanupAll() {
try {
for (String databaseId : databaseIds) {
deleteDatabase(databaseId);
}
} catch (Exception e) {
throw new FirestoreAdminResourceManagerException("Error cleaning up Firestore resources", e);
} finally {
databaseIds.clear();
try {
firestoreAdminClient.close();
} catch (Exception e) {
throw new FirestoreAdminResourceManagerException("Error closing Firestore client", e);
}
}
}

public void createDatabase(String databaseId, DatabaseType type, DatabaseEdition edition) {
try {
firestoreAdminClient
.createDatabaseAsync(
CreateDatabaseRequest.newBuilder()
.setParent("projects/" + projectId)
.setDatabaseId(databaseId)
.setDatabase(
Database.newBuilder()
.setName(databaseId)
.setType(type)
.setDatabaseEdition(edition)
.setLocationId(region))
.build())
.get();
} catch (InterruptedException | ExecutionException e) {
throw new FirestoreAdminResourceManagerException("Error creating Firestore database", e);
}
}

public void deleteDatabase(String databaseId) {
try {
firestoreAdminClient.deleteDatabaseAsync(databaseId).get();
} catch (InterruptedException | ExecutionException e) {
throw new FirestoreAdminResourceManagerException("Error deleting Firestore database", e);
}
}

public static FirestoreAdminResourceManager.Builder builder(String testId) {
checkArgument(!Strings.isNullOrEmpty(testId), "testId can not be empty");
return new FirestoreAdminResourceManager.Builder(testId);
}

/** Builder for {@link FirestoreAdminResourceManager}. */
public static final class Builder {

private final String testId;
private String projectId;
private String region;

private Builder(String testId) {
this.testId = testId;
}

public FirestoreAdminResourceManager.Builder setProject(String projectId) {
this.projectId = projectId;
return this;
}

public FirestoreAdminResourceManager.Builder setRegion(String region) {
this.region = region;
return this;
}

public FirestoreAdminResourceManager build() {
if (projectId == null) {
throw new IllegalArgumentException("Project ID must be provided");
}
if (region == null) {
throw new IllegalArgumentException("Region must be provided");
}
return new FirestoreAdminResourceManager(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.apache.beam.it.gcp.firestore;

/** Custom exception for {@link FirestoreAdminResourceManagerException} operations. */
public class FirestoreAdminResourceManagerException extends RuntimeException {

public FirestoreAdminResourceManagerException(String message) {
super(message);
}

public FirestoreAdminResourceManagerException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.it.gcp.firestore;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.core.ApiFuture;
import com.google.auth.Credentials;
import com.google.cloud.firestore.CollectionReference;
import com.google.cloud.firestore.DocumentReference;
import com.google.cloud.firestore.Firestore;
import com.google.cloud.firestore.FirestoreOptions;
import com.google.cloud.firestore.QueryDocumentSnapshot;
import com.google.cloud.firestore.QuerySnapshot;
import com.google.cloud.firestore.WriteBatch;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.beam.it.common.ResourceManager;

/** Default implementation of {@link FirestoreResourceManager}. */
public class FirestoreResourceManager implements ResourceManager {

private final String projectId;
private final String databaseId;
private final Firestore firestore;
private final Set<String> collectionIds;

private FirestoreResourceManager(Builder builder) {
this.projectId = builder.projectId;
this.databaseId = builder.databaseId;
this.collectionIds = new HashSet<>();
this.firestore =
FirestoreOptions.newBuilder()
.setCredentials(builder.credentials)
.setProjectId(projectId)
.setDatabaseId(databaseId)
.build()
.getService();
}

@VisibleForTesting
FirestoreResourceManager(Firestore firestore) {
this.projectId = firestore.getOptions().getProjectId();
this.databaseId = firestore.getOptions().getDatabaseId();
this.collectionIds = new HashSet<>();
this.firestore = firestore;
}

public static Builder builder(String testId) {
checkArgument(!Strings.isNullOrEmpty(testId), "testId can not be empty");
return new Builder(testId);
}

public void write(String collectionName, Map<String, Map<String, Object>> documents) {
collectionIds.add(collectionName);
WriteBatch batch = firestore.batch();

for (Map.Entry<String, Map<String, Object>> entry : documents.entrySet()) {
DocumentReference docRef = firestore.collection(collectionName).document(entry.getKey());
batch.set(docRef, entry.getValue());
}

try {
batch.commit().get();
} catch (InterruptedException | ExecutionException e) {
throw new FirestoreResourceManagerException("Error writing documents to Firestore", e);
}
}

public List<QueryDocumentSnapshot> read(String collectionName) {
try {
ApiFuture<QuerySnapshot> query = firestore.collection(collectionName).get();
return query.get().getDocuments();
} catch (InterruptedException | ExecutionException e) {
throw new FirestoreResourceManagerException("Error reading documents from Firestore", e);
}
}

@Override
public void cleanupAll() {
try {
for (String collectionName : collectionIds) {
deleteCollection(collectionName);
}
} catch (Exception e) {
throw new FirestoreResourceManagerException("Error cleaning up Firestore resources", e);
} finally {
collectionIds.clear();
try {
firestore.close();
} catch (Exception e) {
throw new FirestoreResourceManagerException("Error closing Firestore client", e);
}
}
}

private void deleteCollection(String collectionName)
throws ExecutionException, InterruptedException {
CollectionReference collection = firestore.collection(collectionName);
int batchSize = 100;
ApiFuture<QuerySnapshot> future = collection.limit(batchSize).get();
List<QueryDocumentSnapshot> documents = future.get().getDocuments();

while (!documents.isEmpty()) {
WriteBatch batch = firestore.batch();
for (QueryDocumentSnapshot document : documents) {
batch.delete(document.getReference());
}
batch.commit().get();
future = collection.limit(batchSize).get();
documents = future.get().getDocuments();
}
}

/** Builder for {@link FirestoreResourceManager}. */
public static final class Builder {

private final String testId;
private String projectId;
private String databaseId;
private Credentials credentials;

private Builder(String testId) {
this.testId = testId;
}

public Builder setProject(String projectId) {
this.projectId = projectId;
return this;
}

public Builder setDatabase(String databaseId) {
this.databaseId = databaseId;
return this;
}

public Builder setCredentials(Credentials credentials) {
this.credentials = credentials;
return this;
}

public FirestoreResourceManager build() {
if (credentials == null) {
throw new IllegalArgumentException("Credentials must be provided");
}
if (projectId == null) {
throw new IllegalArgumentException("Project ID must be provided");
}
if (databaseId == null) {
throw new IllegalArgumentException("Database ID must be provided");
}
return new FirestoreResourceManager(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.apache.beam.it.gcp.firestore;

/** Custom exception for {@link FirestoreResourceManager} operations. */
public class FirestoreResourceManagerException extends RuntimeException {

public FirestoreResourceManagerException(String message) {
super(message);
}

public FirestoreResourceManagerException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Package for managing Firestore resources within integration tests. */
package org.apache.beam.it.gcp.firestore;
Loading