Skip to content

Commit 6ea0b71

Browse files
authored
Merge pull request #264 from moov-io/web-listen-for-acceptFile
incoming/web: listen for response to acceptFile
2 parents e2369f2 + 5a8e280 commit 6ea0b71

File tree

10 files changed

+171
-66
lines changed

10 files changed

+171
-66
lines changed

internal/environment.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/moov-io/achgateway"
3030
"github.com/moov-io/achgateway/internal/events"
3131
"github.com/moov-io/achgateway/internal/files"
32+
"github.com/moov-io/achgateway/internal/incoming"
3233
"github.com/moov-io/achgateway/internal/incoming/odfi"
3334
"github.com/moov-io/achgateway/internal/incoming/stream"
3435
"github.com/moov-io/achgateway/internal/incoming/web"
@@ -212,7 +213,8 @@ func NewEnvironment(env *Environment) (*Environment, error) {
212213
env.PublicRouter.Path("/ping").Methods("GET").HandlerFunc(addPingRoute)
213214

214215
// append HTTP routes
215-
web.NewFilesController(env.Config.Logger, env.Config.Inbound.HTTP, httpFiles, fileReceiver.CancellationResponses).AppendRoutes(env.PublicRouter)
216+
queueFileResponses := make(chan incoming.QueueACHFileResponse, 1000)
217+
web.NewFilesController(env.Config.Logger, env.Config.Inbound.HTTP, httpFiles, queueFileResponses, fileReceiver.CancellationResponses).AppendRoutes(env.PublicRouter)
216218

217219
// shard mapping HTTP routes
218220
shardMappingService, err := shards.NewShardMappingService(stime.NewStaticTimeService(), env.Config.Logger, shardRepository)

internal/incoming/models.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ func (f ACHFile) Validate() error {
4242
return nil
4343
}
4444

45+
type QueueACHFileResponse struct {
46+
FileID string `json:"id"`
47+
ShardKey string `json:"shardKey"`
48+
Error string `json:"error"`
49+
}
50+
4551
type CancelACHFile struct {
4652
FileID string `json:"id"`
4753
ShardKey string `json:"shardKey"`

internal/incoming/web/api_files.go

Lines changed: 76 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,28 @@ import (
4343
"gocloud.dev/pubsub"
4444
)
4545

46-
func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub stream.Publisher, cancellationResponses chan models.FileCancellationResponse) *FilesController {
46+
func NewFilesController(
47+
logger log.Logger,
48+
cfg service.HTTPConfig,
49+
pub stream.Publisher,
50+
queueFileResponses chan incoming.QueueACHFileResponse,
51+
cancellationResponses chan models.FileCancellationResponse,
52+
) *FilesController {
4753
controller := &FilesController{
4854
logger: logger,
4955
cfg: cfg,
5056
publisher: pub,
5157

58+
activeQueueFiles: make(map[string]chan incoming.QueueACHFileResponse),
59+
queueFileResponses: queueFileResponses,
60+
5261
activeCancellations: make(map[string]chan models.FileCancellationResponse),
5362
cancellationResponses: cancellationResponses,
5463
}
64+
65+
controller.listenForQueueACHFileResponses()
5566
controller.listenForCancellations()
67+
5668
return controller
5769
}
5870

@@ -61,11 +73,45 @@ type FilesController struct {
6173
cfg service.HTTPConfig
6274
publisher stream.Publisher
6375

76+
queueFileLock sync.Mutex
77+
activeQueueFiles map[string]chan incoming.QueueACHFileResponse
78+
queueFileResponses chan incoming.QueueACHFileResponse
79+
6480
cancellationLock sync.Mutex
6581
activeCancellations map[string]chan models.FileCancellationResponse
6682
cancellationResponses chan models.FileCancellationResponse
6783
}
6884

85+
func (c *FilesController) listenForQueueACHFileResponses() {
86+
c.logger.Info().Log("listening for QueueACHFile responses")
87+
go func() {
88+
for {
89+
// Wait for a message
90+
resp := <-c.queueFileResponses
91+
logger := c.logger.Info().With(log.Fields{
92+
"file_id": log.String(resp.FileID),
93+
"shard_key": log.String(resp.ShardKey),
94+
})
95+
96+
if resp.Error != "" {
97+
logger.Error().LogErrorf("problem with QueueACHFile: %v", resp.Error)
98+
} else {
99+
logger.Info().Logf("received QueueACHFile response")
100+
}
101+
102+
fileID := strings.TrimSuffix(resp.FileID, ".ach")
103+
104+
c.queueFileLock.Lock()
105+
out, exists := c.activeQueueFiles[fileID]
106+
if exists {
107+
out <- resp
108+
delete(c.activeQueueFiles, fileID)
109+
}
110+
c.queueFileLock.Unlock()
111+
}
112+
}()
113+
}
114+
69115
func (c *FilesController) listenForCancellations() {
70116
c.logger.Info().Log("listening for cancellation responses")
71117
go func() {
@@ -121,9 +167,14 @@ func (c *FilesController) CreateFileHandler(w http.ResponseWriter, r *http.Reque
121167
))
122168
defer span.End()
123169

170+
logger := c.logger.With(log.Fields{
171+
"shard_key": log.String(shardKey),
172+
"file_id": log.String(fileID),
173+
})
174+
124175
bs, err := c.readBody(r)
125176
if err != nil {
126-
c.logger.LogErrorf("error reading file: %v", err)
177+
logger.LogErrorf("error reading file: %v", err)
127178
w.WriteHeader(http.StatusBadRequest)
128179
return
129180
}
@@ -139,17 +190,30 @@ func (c *FilesController) CreateFileHandler(w http.ResponseWriter, r *http.Reque
139190
file = *f
140191
}
141192

142-
if err := c.publishFile(ctx, shardKey, fileID, &file); err != nil {
143-
c.logger.With(log.Fields{
144-
"shard_key": log.String(shardKey),
145-
"file_id": log.String(fileID),
146-
}).LogErrorf("publishing file", err)
193+
waiter := make(chan incoming.QueueACHFileResponse, 1)
194+
if err := c.publishFile(ctx, shardKey, fileID, &file, waiter); err != nil {
195+
logger.LogErrorf("publishing file", err)
147196

148197
w.WriteHeader(http.StatusInternalServerError)
149198
return
150199
}
151200

201+
var response incoming.QueueACHFileResponse
202+
select {
203+
case resp := <-waiter:
204+
response = resp
205+
206+
case <-time.After(10 * time.Second):
207+
response = incoming.QueueACHFileResponse{
208+
FileID: fileID,
209+
ShardKey: shardKey,
210+
Error: "timeout exceeded",
211+
}
212+
}
213+
214+
w.Header().Set("Content-Type", "application/json")
152215
w.WriteHeader(http.StatusOK)
216+
json.NewEncoder(w).Encode(response)
153217
}
154218

155219
func (c *FilesController) readBody(req *http.Request) ([]byte, error) {
@@ -167,7 +231,11 @@ func (c *FilesController) readBody(req *http.Request) ([]byte, error) {
167231
return compliance.Reveal(c.cfg.Transform, bs)
168232
}
169233

170-
func (c *FilesController) publishFile(ctx context.Context, shardKey, fileID string, file *ach.File) error {
234+
func (c *FilesController) publishFile(ctx context.Context, shardKey, fileID string, file *ach.File, waiter chan incoming.QueueACHFileResponse) error {
235+
c.queueFileLock.Lock()
236+
c.activeQueueFiles[fileID] = waiter
237+
c.queueFileLock.Unlock()
238+
171239
bs, err := compliance.Protect(c.cfg.Transform, models.Event{
172240
Event: incoming.ACHFile{
173241
FileID: fileID,
@@ -207,7 +275,6 @@ func (c *FilesController) CancelFileHandler(w http.ResponseWriter, r *http.Reque
207275
defer span.End()
208276

209277
waiter := make(chan models.FileCancellationResponse, 1)
210-
211278
err := c.cancelFile(ctx, shardKey, fileID, waiter)
212279
if err != nil {
213280
c.logger.With(log.Fields{

internal/incoming/web/api_files_test.go

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,27 +42,38 @@ import (
4242
func TestCreateFileHandler(t *testing.T) {
4343
topic, sub := streamtest.InmemStream(t)
4444

45+
queueFileResponses := make(chan incoming.QueueACHFileResponse)
4546
cancellationResponses := make(chan models.FileCancellationResponse)
46-
controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic, cancellationResponses)
47+
controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic, queueFileResponses, cancellationResponses)
4748
r := mux.NewRouter()
4849
controller.AppendRoutes(r)
4950

51+
// Setup the response
52+
go func() {
53+
time.Sleep(time.Second)
54+
55+
queueFileResponses <- incoming.QueueACHFileResponse{
56+
FileID: "f1",
57+
}
58+
}()
59+
5060
// Send a file over HTTP
5161
bs, _ := os.ReadFile(filepath.Join("..", "..", "..", "testdata", "ppd-valid.json"))
5262
req := httptest.NewRequest("POST", "/shards/s1/files/f1", bytes.NewReader(bs))
5363

5464
w := httptest.NewRecorder()
55-
r.ServeHTTP(w, req)
56-
65+
r.ServeHTTP(w, req) // blocking call
5766
require.Equal(t, http.StatusOK, w.Code)
5867

59-
// Verify our subscription receives a message
68+
// Wait for the subscription to receive the QueueACHFile
6069
msg, err := sub.Receive(context.Background())
6170
require.NoError(t, err)
6271

6372
var file incoming.ACHFile
64-
require.NoError(t, models.ReadEvent(msg.Body, &file))
73+
err = models.ReadEvent(msg.Body, &file)
74+
require.NoError(t, err)
6575

76+
// Verify the file details
6677
require.Equal(t, "f1", file.FileID)
6778
require.Equal(t, "s1", file.ShardKey)
6879
require.Equal(t, "231380104", file.File.Header.ImmediateDestination)
@@ -72,11 +83,48 @@ func TestCreateFileHandler(t *testing.T) {
7283
require.True(t, validateOpts.PreserveSpaces)
7384
}
7485

86+
func TestCreateFileHandler_Error(t *testing.T) {
87+
topic, _ := streamtest.InmemStream(t)
88+
89+
queueFileResponses := make(chan incoming.QueueACHFileResponse)
90+
cancellationResponses := make(chan models.FileCancellationResponse)
91+
controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic, queueFileResponses, cancellationResponses)
92+
r := mux.NewRouter()
93+
controller.AppendRoutes(r)
94+
95+
// Setup the response
96+
go func() {
97+
time.Sleep(time.Second)
98+
99+
queueFileResponses <- incoming.QueueACHFileResponse{
100+
FileID: "f1",
101+
Error: "bad thing",
102+
}
103+
}()
104+
105+
// Send a file over HTTP
106+
bs, _ := os.ReadFile(filepath.Join("..", "..", "..", "testdata", "ppd-debit.ach"))
107+
req := httptest.NewRequest("POST", "/shards/s1/files/f1", bytes.NewReader(bs))
108+
109+
w := httptest.NewRecorder()
110+
r.ServeHTTP(w, req) // blocking call
111+
require.Equal(t, http.StatusOK, w.Code)
112+
113+
var resp models.QueueACHFileResponse
114+
err := json.Unmarshal(w.Body.Bytes(), &resp)
115+
require.NoError(t, err)
116+
117+
require.Equal(t, "f1", resp.FileID)
118+
require.Equal(t, "", resp.ShardKey)
119+
require.Equal(t, "bad thing", resp.Error)
120+
}
121+
75122
func TestCreateFileHandlerErr(t *testing.T) {
76123
topic, _ := streamtest.InmemStream(t)
77124

125+
queueFileResponses := make(chan incoming.QueueACHFileResponse)
78126
cancellationResponses := make(chan models.FileCancellationResponse)
79-
controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic, cancellationResponses)
127+
controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic, queueFileResponses, cancellationResponses)
80128
r := mux.NewRouter()
81129
controller.AppendRoutes(r)
82130

@@ -92,8 +140,9 @@ func TestCreateFileHandlerErr(t *testing.T) {
92140
func TestCancelFileHandler(t *testing.T) {
93141
topic, sub := streamtest.InmemStream(t)
94142

143+
queueFileResponses := make(chan incoming.QueueACHFileResponse)
95144
cancellationResponses := make(chan models.FileCancellationResponse)
96-
controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic, cancellationResponses)
145+
controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic, queueFileResponses, cancellationResponses)
97146
r := mux.NewRouter()
98147
controller.AppendRoutes(r)
99148

@@ -103,6 +152,7 @@ func TestCancelFileHandler(t *testing.T) {
103152
// Setup the response
104153
go func() {
105154
time.Sleep(time.Second)
155+
106156
cancellationResponses <- models.FileCancellationResponse{
107157
FileID: "f2.ach",
108158
ShardKey: "s2",

internal/pipeline/aggregate.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,18 @@ func (xfagg *aggregator) Shutdown() {
185185
}
186186
}
187187

188-
func (xfagg *aggregator) acceptFile(ctx context.Context, msg incoming.ACHFile) error {
189-
return xfagg.merger.HandleXfer(ctx, msg)
188+
func (xfagg *aggregator) acceptFile(ctx context.Context, msg incoming.ACHFile) (incoming.QueueACHFileResponse, error) {
189+
err := xfagg.merger.HandleXfer(ctx, msg)
190+
191+
resp := incoming.QueueACHFileResponse{
192+
FileID: msg.FileID,
193+
ShardKey: msg.ShardKey,
194+
}
195+
if err != nil {
196+
resp.Error = err.Error()
197+
}
198+
199+
return resp, err
190200
}
191201

192202
func (xfagg *aggregator) cancelFile(ctx context.Context, msg incoming.CancelACHFile) (models.FileCancellationResponse, error) {

internal/pipeline/aggregate_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,15 @@ func TestAggregateACHFile(t *testing.T) {
7373
file, err := ach.ReadFile(filepath.Join("..", "..", "testdata", "ppd-debit.ach"))
7474
require.NoError(t, err)
7575

76-
err = xfagg.acceptFile(context.Background(), incoming.ACHFile{
76+
response, err := xfagg.acceptFile(context.Background(), incoming.ACHFile{
7777
FileID: "ppd-file1",
7878
ShardKey: "test",
7979
File: file,
8080
})
8181
require.NoError(t, err)
82+
require.Equal(t, "ppd-file1", response.FileID)
83+
require.Equal(t, "test", response.ShardKey)
84+
require.Empty(t, response.Error)
8285

8386
require.NotNil(t, merge.LatestFile)
8487
require.Equal(t, "ppd-file1", merge.LatestFile.FileID)

internal/pipeline/file_receiver.go

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ type FileReceiver struct {
6464
httpFiles stream.Subscription
6565
streamFiles stream.Subscription
6666

67+
QueueFileResponses chan incoming.QueueACHFileResponse
6768
CancellationResponses chan models.FileCancellationResponse
6869

6970
transformConfig *models.TransformConfig
@@ -89,6 +90,7 @@ func newFileReceiver(
8990
shardAggregators: shardAggregators,
9091
fileRepository: fileRepository,
9192
httpFiles: httpFiles,
93+
QueueFileResponses: make(chan incoming.QueueACHFileResponse, 1000),
9294
CancellationResponses: make(chan models.FileCancellationResponse, 1000),
9395
transformConfig: transformConfig,
9496
}
@@ -117,24 +119,6 @@ func (fr *FileReceiver) reconnect() error {
117119
return nil
118120
}
119121

120-
func (fr *FileReceiver) ReplaceStreamFiles(ctx context.Context, sub stream.Subscription) {
121-
fr.mu.Lock()
122-
defer fr.mu.Unlock()
123-
124-
fr.logger.Info().Log("replacing stream subscription")
125-
126-
// Shut down receiving messages
127-
fr.cancel()
128-
129-
// Close an existing stream subscription
130-
if fr.streamFiles != nil {
131-
fr.streamFiles.Shutdown(context.Background())
132-
}
133-
fr.streamFiles = sub
134-
135-
go fr.Start(ctx)
136-
}
137-
138122
func (fr *FileReceiver) Start(ctx context.Context) {
139123
for {
140124
// Create a context that will be shutdown by its parent or after a read iteration
@@ -362,9 +346,11 @@ func (fr *FileReceiver) processMessage(ctx context.Context, msg *pubsub.Message)
362346
}
363347
return err
364348
}
349+
365350
if !committed {
366351
msg.Ack()
367352
}
353+
368354
return nil
369355
}
370356
}
@@ -487,11 +473,13 @@ func (fr *FileReceiver) processACHFile(ctx context.Context, file incoming.ACHFil
487473
}
488474
logger.Log("begin handling of received ACH file")
489475

490-
err = agg.acceptFile(ctx, file)
476+
response, err := agg.acceptFile(ctx, file)
491477
if err != nil {
492478
return logger.Error().LogErrorf("problem accepting file under shardName=%s", agg.shard.Name).Err()
493479
}
494480

481+
fr.QueueFileResponses <- response
482+
495483
// Record the file as accepted
496484
pendingFiles.With("shard", agg.shard.Name).Add(1)
497485
logger.Log("finished handling ACH file")

0 commit comments

Comments
 (0)