Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@

package com.ibm.fhir.bulkdata.jbatch.export.fast;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.time.Instant;
import java.time.temporal.TemporalAccessor;
Expand Down Expand Up @@ -50,6 +49,7 @@
import com.ibm.fhir.persistence.ResourcePayload;
import com.ibm.fhir.persistence.helper.FHIRPersistenceHelper;
import com.ibm.fhir.persistence.helper.FHIRTransactionHelper;
import com.ibm.fhir.persistence.util.InputOutputByteStream;
import com.ibm.fhir.search.date.DateTimeHandler;

/**
Expand Down Expand Up @@ -147,7 +147,7 @@ public class ResourcePayloadReader extends AbstractItemReader {
private long txEndTime;

// The buffer space we collect data into until we hit the threshold to push to COS
private ByteArrayOutputStream outputStream;
private InputOutputByteStream ioBuffer;

// The sum of the multi-part sizes for parts that have been uploaded
private long currentObjectSize;
Expand Down Expand Up @@ -186,7 +186,7 @@ public class ResourcePayloadReader extends AbstractItemReader {
*/
public ResourcePayloadReader() {
super();
this.outputStream = new ByteArrayOutputStream(this.initialBufferSize);
this.ioBuffer = new InputOutputByteStream(this.initialBufferSize);
if (logger.isLoggable(Level.FINE)) {
logger.fine("Max resources Per Object: " + resourcesPerObject);
logger.fine("Part Upload Trigger Size: " + partUploadTriggerSize);
Expand Down Expand Up @@ -253,7 +253,7 @@ public void open(Serializable checkpoint) throws Exception {

// Just in case the framework tries to reopen an existing instance,
// make sure we start with an empty output stream
this.outputStream.reset();
this.ioBuffer.reset();
}

// Transient user data is required to signal completion of this partition
Expand Down Expand Up @@ -403,11 +403,12 @@ public Boolean processPayload(ResourcePayload t) {
completeCurrentUpload();
}

// Accumulate the payload in the outputStream buffer
if (this.outputStream.size() > 0) {
this.outputStream.write(NDJSON_LINE_SEPARATOR);
// Accumulate the payload in the output buffer
OutputStream outputStream = ioBuffer.outputStream();
if (this.ioBuffer.size() > 0) {
outputStream.write(NDJSON_LINE_SEPARATOR);
}
this.currentObjectSize += t.transferTo(this.outputStream);
this.currentObjectSize += t.transferTo(outputStream);
this.currentObjectResourceCount++;

// upload now if we have reached the Goldilocks threshold size for a part
Expand Down Expand Up @@ -461,7 +462,7 @@ private void uploadWhenReady() throws Exception {
// the total number of parts), but not so large that the
// upload would take too long and exceed our transaction
// timeout.
if (this.outputStream.size() > this.partUploadTriggerSize) {
if (this.ioBuffer.size() > this.partUploadTriggerSize) {
uploadPart();
}
}
Expand All @@ -474,15 +475,15 @@ private void uploadPart() throws Exception {
// S3 API: Part number must be an integer between 1 and 10000
int currentObjectPartNumber = uploadedParts.size() + 1;
if (logger.isLoggable(Level.FINE)) {
logger.fine(logPrefix() + " Uploading part# " + currentObjectPartNumber + " ["+ outputStream.size() + " bytes] for uploadId '" + uploadId + "'");
logger.fine(logPrefix() + " Uploading part# " + currentObjectPartNumber + " ["+ ioBuffer.size() + " bytes] for uploadId '" + uploadId + "'");
}

byte[] buffer = outputStream.toByteArray();
InputStream is = new ByteArrayInputStream(buffer);
// The ioBuffer can provide us with an InputStream without having to copy the byte-buffer
InputStream is = ioBuffer.inputStream();
PartETag uploadResult = BulkDataUtils.multiPartUpload(cosClient, cosBucketName, currentObjectName,
uploadId, is, buffer.length, currentObjectPartNumber);
uploadId, is, ioBuffer.size(), currentObjectPartNumber);
uploadedParts.add(uploadResult);
outputStream.reset();
ioBuffer.reset();
}

/**
Expand All @@ -494,7 +495,7 @@ private void completeCurrentUpload() throws Exception {
}

// upload any final amount of data we have in the buffer
if (this.outputStream.size() > 0) {
if (this.ioBuffer.size() > 0) {
logger.fine(logPrefix() + " uploading final part for '" + this.uploadId + "'");
uploadPart();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.ibm.fhir.persistence.jdbc.util.ResourceTypesCache;
import com.ibm.fhir.persistence.jdbc.util.ResourceTypesCacheUpdater;
import com.ibm.fhir.persistence.jdbc.util.SqlQueryData;
import com.ibm.fhir.persistence.util.InputOutputByteStream;
import com.ibm.fhir.schema.control.FhirSchemaConstants;

/**
Expand All @@ -66,6 +67,15 @@ public class ResourceDAOImpl extends FHIRDbDAOImpl implements ResourceDAO {

public static final String DEFAULT_VALUE_REINDEX_TSTAMP = "1970-01-01 00:00:00";

// column indices for all our resource reading queries
public static final int IDX_RESOURCE_ID = 1;
public static final int IDX_LOGICAL_RESOURCE_ID = 2;
public static final int IDX_VERSION_ID = 3;
public static final int IDX_LAST_UPDATED = 4;
public static final int IDX_IS_DELETED = 5;
public static final int IDX_DATA = 6;
public static final int IDX_LOGICAL_ID = 7;

// Read the current version of the resource
private static final String SQL_READ = "SELECT R.RESOURCE_ID, R.LOGICAL_RESOURCE_ID, R.VERSION_ID, R.LAST_UPDATED, R.IS_DELETED, R.DATA, LR.LOGICAL_ID " +
"FROM %s_RESOURCES R, %s_LOGICAL_RESOURCES LR WHERE " +
Expand Down Expand Up @@ -244,12 +254,15 @@ protected Resource createDTO(ResultSet resultSet) throws FHIRPersistenceDataAcce
Resource resource = new Resource();

try {
resource.setData(resultSet.getBytes("DATA"));
resource.setId(resultSet.getLong("RESOURCE_ID"));
resource.setLastUpdated(resultSet.getTimestamp("LAST_UPDATED"));
resource.setLogicalId(resultSet.getString("LOGICAL_ID"));
resource.setVersionId(resultSet.getInt("VERSION_ID"));
resource.setDeleted(resultSet.getString("IS_DELETED").equals("Y") ? true : false);
byte[] payloadData = resultSet.getBytes(IDX_DATA);
if (payloadData != null) {
resource.setDataStream(new InputOutputByteStream(payloadData, payloadData.length));
}
resource.setId(resultSet.getLong(IDX_RESOURCE_ID));
resource.setLastUpdated(resultSet.getTimestamp(IDX_LAST_UPDATED));
resource.setLogicalId(resultSet.getString(IDX_LOGICAL_ID));
resource.setVersionId(resultSet.getInt(IDX_VERSION_ID));
resource.setDeleted(resultSet.getString(IDX_IS_DELETED).equals("Y") ? true : false);
} catch (Throwable e) {
FHIRPersistenceDataAccessException fx = new FHIRPersistenceDataAccessException("Failure creating Resource DTO.");
throw severe(log, fx, e);
Expand Down Expand Up @@ -556,13 +569,13 @@ public Resource insert(Resource resource, List<ExtractedParameterValue> paramete
stmt.setString(2, resource.getLogicalId());

// Check for large objects, and branch around it.
boolean large = FhirSchemaConstants.STORED_PROCEDURE_SIZE_LIMIT < resource.getData().length;
boolean large = FhirSchemaConstants.STORED_PROCEDURE_SIZE_LIMIT < resource.getDataStream().size();
if (large) {
// Outside of the normal flow we have a BIG JSON or XML
stmt.setNull(3, Types.BLOB);
} else {
// Normal Flow, we set the data
stmt.setBytes(3, resource.getData());
stmt.setBinaryStream(3, resource.getDataStream().inputStream());
}

lastUpdated = resource.getLastUpdated();
Expand All @@ -582,7 +595,7 @@ public Resource insert(Resource resource, List<ExtractedParameterValue> paramete
String largeStmtString = String.format(LARGE_BLOB, resource.getResourceType());
try (PreparedStatement ps = connection.prepareStatement(largeStmtString)) {
// Use the long id to update the record in the database with the large object.
ps.setBytes(1, resource.getData());
ps.setBinaryStream(1, resource.getDataStream().inputStream());
ps.setLong(2, versionedResourceRowId);
long dbCallStartTime2 = System.nanoTime();
int numberOfRows = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import static com.ibm.fhir.persistence.jdbc.JDBCConstants.UTC;

import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -122,7 +123,7 @@ public Resource insert(Resource resource, List<ExtractedParameterValue> paramet
long resourceId = this.storeResource(resource.getResourceType(),
parameters,
resource.getLogicalId(),
resource.getData(),
resource.getDataStream().inputStream(),
lastUpdated,
resource.isDeleted(),
sourceKey,
Expand Down Expand Up @@ -196,7 +197,7 @@ public Resource insert(Resource resource, List<ExtractedParameterValue> paramet
* @return the resource_id for the entry we created
* @throws Exception
*/
public long storeResource(String tablePrefix, List<ExtractedParameterValue> parameters, String p_logical_id, byte[] p_payload, Timestamp p_last_updated, boolean p_is_deleted,
public long storeResource(String tablePrefix, List<ExtractedParameterValue> parameters, String p_logical_id, InputStream p_payload, Timestamp p_last_updated, boolean p_is_deleted,
String p_source_key, Integer p_version, Connection conn, ParameterDAO parameterDao) throws Exception {

final String METHODNAME = "storeResource() for " + tablePrefix + " resource";
Expand Down Expand Up @@ -413,7 +414,7 @@ public long storeResource(String tablePrefix, List<ExtractedParameterValue> para
stmt.setLong(1, v_resource_id);
stmt.setLong(2, v_logical_resource_id);
stmt.setInt(3, v_insert_version);
stmt.setBytes(4, p_payload);
stmt.setBinaryStream(4, p_payload);
stmt.setTimestamp(5, p_last_updated, UTC);
stmt.setString(6, p_is_deleted ? "Y" : "N");
stmt.executeUpdate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@

import java.sql.Timestamp;

import com.ibm.fhir.persistence.util.InputOutputByteStream;

/**
* This class defines the Data Transfer Object representing a row in the FHIR Resource table.
*/
public class Resource {

private long id;
private String logicalId;
private int versionId;
private String resourceType;
private Timestamp lastUpdated;
private byte[] data;

// The buffer holding the payload data
private InputOutputByteStream dataStream;

private boolean deleted;


public Resource() {
super();
}
Expand Down Expand Up @@ -66,27 +71,32 @@ public void setVersionId(int versionId) {
this.versionId = versionId;
}

public byte[] getData() {
return data;
}

public void setData(byte[] data) {
this.data = data;
}

public boolean isDeleted() {
return deleted;
}

public void setDeleted(boolean deleted) {
this.deleted = deleted;
}

@Override
public String toString() {
return "Resource [id=" + id + ", logicalId=" + logicalId + ", versionId=" + versionId + ", resourceType="
+ resourceType + ", lastUpdated=" + lastUpdated + ", deleted=" + deleted + "]";
}

/**
* @return the dataStream
*/
public InputOutputByteStream getDataStream() {
return dataStream;
}

/**
* @param dataStream the dataStream to set
*/
public void setDataStream(InputOutputByteStream dataStream) {
this.dataStream = dataStream;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import static com.ibm.fhir.model.type.String.string;
import static com.ibm.fhir.model.util.ModelSupport.getResourceType;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
Expand Down Expand Up @@ -142,6 +140,7 @@
import com.ibm.fhir.persistence.jdbc.util.SqlQueryData;
import com.ibm.fhir.persistence.jdbc.util.TimestampPrefixedUUID;
import com.ibm.fhir.persistence.util.FHIRPersistenceUtil;
import com.ibm.fhir.persistence.util.InputOutputByteStream;
import com.ibm.fhir.persistence.util.LogicalIdentityProvider;
import com.ibm.fhir.schema.control.FhirSchemaConstants;
import com.ibm.fhir.search.SearchConstants;
Expand All @@ -166,6 +165,7 @@
public class FHIRPersistenceJDBCImpl implements FHIRPersistence, SchemaNameSupplier {
private static final String CLASSNAME = FHIRPersistenceJDBCImpl.class.getName();
private static final Logger log = Logger.getLogger(CLASSNAME);
private static final int DATA_BUFFER_INITIAL_SIZE = 10*1024; // 10KiB

protected static final String TXN_JNDI_NAME = "java:comp/UserTransaction";
public static final String TRX_SYNCH_REG_JNDI_NAME = "java:comp/TransactionSynchronizationRegistry";
Expand Down Expand Up @@ -338,7 +338,8 @@ public <T extends Resource> SingleResourceResult<T> create(FHIRPersistenceContex
final String METHODNAME = "create";
log.entering(CLASSNAME, METHODNAME);

ByteArrayOutputStream stream = new ByteArrayOutputStream();
// Most resources are well under 10K after being serialized and compressed
InputOutputByteStream ioStream = new InputOutputByteStream(DATA_BUFFER_INITIAL_SIZE);
String logicalId;

// We need to update the meta in the resource, so we need a modifiable version
Expand Down Expand Up @@ -378,10 +379,10 @@ public <T extends Resource> SingleResourceResult<T> create(FHIRPersistenceContex
resourceDTO.setResourceType(updatedResource.getClass().getSimpleName());

// Serialize and compress the Resource
GZIPOutputStream zipStream = new GZIPOutputStream(stream);
GZIPOutputStream zipStream = new GZIPOutputStream(ioStream.outputStream());
FHIRGenerator.generator( Format.JSON, false).generate(updatedResource, zipStream);
zipStream.finish();
resourceDTO.setData(stream.toByteArray());
resourceDTO.setDataStream(ioStream);
zipStream.close();

// The DAO objects are now created on-the-fly (not expensive to construct) and
Expand Down Expand Up @@ -494,7 +495,7 @@ public <T extends Resource> SingleResourceResult<T> update(FHIRPersistenceContex

Class<? extends Resource> resourceType = resource.getClass();
com.ibm.fhir.persistence.jdbc.dto.Resource existingResourceDTO;
ByteArrayOutputStream stream = new ByteArrayOutputStream();
InputOutputByteStream ioStream = new InputOutputByteStream(DATA_BUFFER_INITIAL_SIZE);

// Resources are immutable, so we need a new builder to update it (since R4)
Resource.Builder resultResourceBuilder = resource.toBuilder();
Expand Down Expand Up @@ -569,10 +570,10 @@ public <T extends Resource> SingleResourceResult<T> update(FHIRPersistenceContex
resourceDTO.setResourceType(updatedResource.getClass().getSimpleName());

// Serialize and compress the Resource
GZIPOutputStream zipStream = new GZIPOutputStream(stream);
GZIPOutputStream zipStream = new GZIPOutputStream(ioStream.outputStream());
FHIRGenerator.generator(Format.JSON, false).generate(updatedResource, zipStream);
zipStream.finish();
resourceDTO.setData(stream.toByteArray());
resourceDTO.setDataStream(ioStream);
zipStream.close();

// Persist the Resource DTO.
Expand Down Expand Up @@ -802,7 +803,7 @@ public <T extends Resource> SingleResourceResult<T> delete(FHIRPersistenceContex

com.ibm.fhir.persistence.jdbc.dto.Resource existingResourceDTO = null;
T existingResource = null;
ByteArrayOutputStream stream = new ByteArrayOutputStream();
InputOutputByteStream ioStream = new InputOutputByteStream(DATA_BUFFER_INITIAL_SIZE);

Resource.Builder resourceBuilder;

Expand Down Expand Up @@ -856,10 +857,10 @@ public <T extends Resource> SingleResourceResult<T> delete(FHIRPersistenceContex
resourceDTO.setVersionId(newVersionNumber);

// Serialize and compress the Resource
GZIPOutputStream zipStream = new GZIPOutputStream(stream);
GZIPOutputStream zipStream = new GZIPOutputStream(ioStream.outputStream());
FHIRGenerator.generator(Format.JSON, false).generate(updatedResource, zipStream);
zipStream.finish();
resourceDTO.setData(stream.toByteArray());
resourceDTO.setDataStream(ioStream);
zipStream.close();

Timestamp timestamp = FHIRUtilities.convertToTimestamp(lastUpdated.getValue());
Expand Down Expand Up @@ -1741,7 +1742,7 @@ private <T extends Resource> T convertResourceDTO(com.ibm.fhir.persistence.jdbc.
InputStream in = null;
try {
if (resourceDTO != null) {
in = new GZIPInputStream(new ByteArrayInputStream(resourceDTO.getData()));
in = new GZIPInputStream(resourceDTO.getDataStream().inputStream());
if (elements != null) {
// parse/filter the resource using elements
resource = FHIRParser.parser(Format.JSON).as(FHIRJsonParser.class).parseAndFilter(in, elements);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Resource insert(Resource resource, List<ExtractedParameterValue> paramete
stmt = connection.prepareCall(stmtString);
stmt.setString(1, resource.getResourceType());
stmt.setString(2, resource.getLogicalId());
stmt.setBytes(3, resource.getData());
stmt.setBinaryStream(3, resource.getDataStream().inputStream());

lastUpdated = resource.getLastUpdated();
stmt.setTimestamp(4, lastUpdated, UTC);
Expand Down
Loading