Skip to content

Commit 479cd51

Browse files
authored
fix: Feature logging to Redshift is broken (#2655)
* reorder fields in log schema Signed-off-by: Oleksii Moskalenko <[email protected]> * add comment Signed-off-by: Oleksii Moskalenko <[email protected]>
1 parent 87c194c commit 479cd51

File tree

1 file changed

+18
-12
lines changed

1 file changed

+18
-12
lines changed

sdk/python/feast/feature_logging.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,11 @@ def get_schema(self, registry: "Registry") -> pa.Schema:
5252
fields: Dict[str, pa.DataType] = {}
5353

5454
for projection in self._feature_service.feature_view_projections:
55-
for feature in projection.features:
56-
fields[
57-
f"{projection.name_to_use()}__{feature.name}"
58-
] = FEAST_TYPE_TO_ARROW_TYPE[feature.dtype]
59-
fields[
60-
f"{projection.name_to_use()}__{feature.name}__timestamp"
61-
] = PA_TIMESTAMP_TYPE
62-
fields[
63-
f"{projection.name_to_use()}__{feature.name}__status"
64-
] = pa.int32()
65-
55+
# The order of fields in the generated schema should match
56+
# the order created on the other side (inside Go logger).
57+
# Otherwise, some offline stores might not accept parquet files (produced by Go).
58+
# Go code can be found here:
59+
# https://github.com/feast-dev/feast/blob/master/go/internal/feast/server/logging/memorybuffer.go#L51
6660
try:
6761
feature_view = registry.get_feature_view(projection.name, self._project)
6862
except FeatureViewNotFoundException:
@@ -91,9 +85,21 @@ def get_schema(self, registry: "Registry") -> pa.Schema:
9185
from_value_type(entity.value_type)
9286
]
9387

88+
for feature in projection.features:
89+
fields[
90+
f"{projection.name_to_use()}__{feature.name}"
91+
] = FEAST_TYPE_TO_ARROW_TYPE[feature.dtype]
92+
fields[
93+
f"{projection.name_to_use()}__{feature.name}__timestamp"
94+
] = PA_TIMESTAMP_TYPE
95+
fields[
96+
f"{projection.name_to_use()}__{feature.name}__status"
97+
] = pa.int32()
98+
9499
# system columns
95-
fields[REQUEST_ID_FIELD] = pa.string()
96100
fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=UTC)
101+
fields[LOG_DATE_FIELD] = pa.date32()
102+
fields[REQUEST_ID_FIELD] = pa.string()
97103

98104
return pa.schema(
99105
[pa.field(name, data_type) for name, data_type in fields.items()]

0 commit comments

Comments
 (0)