Skip to content

Commit 40d522f

Browse files
thisisaaronlandsfomuseumbot
andauthored
Add "follower" processing queue (#19)
* snapshot: block out process follow queue stuff, untested * rewrite process follower stuff to work with follower IDs * Makefile tweaks * process_follower is the new process_follow * copy * docs * fix typos and stuff * docs --------- Co-authored-by: sfomuseumbot <sfomuseumbot@localhost>
1 parent 14cd6eb commit 40d522f

20 files changed

+838
-83
lines changed

Makefile

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ deliver-pubsub:
120120
-insecure \
121121
-verbose
122122

123-
setup-accounts:
123+
local-accounts:
124124
go run cmd/add-account/main.go \
125125
-accounts-database-uri '$(ACCOUNTS_DB_URI)' \
126126
-aliases-database-uri '$(ALIASES_DB_URI)' \
@@ -266,7 +266,7 @@ list-inbox:
266266
SERVER_DISABLED=false
267267
SERVER_VERBOSE=true
268268

269-
server:
269+
local-server:
270270
go run cmd/server/main.go \
271271
-accounts-database-uri '$(ACCOUNTS_DB_URI)' \
272272
-aliases-database-uri '$(ALIASES_DB_URI)' \
@@ -281,6 +281,7 @@ server:
281281
-likes-database-uri '$(LIKES_DB_URI)' \
282282
-properties-database-uri '$(PROPERTIES_DB_URI)' \
283283
-process-message-queue-uri 'stdout://' \
284+
-process-follow-queue-uri 'slog://' \
284285
-allow-remote-icon-uri \
285286
-allow-create \
286287
-verbose=$(SERVER_VERBOSE) \
@@ -304,12 +305,17 @@ retrieve:
304305
-verbose \
305306
-insecure
306307

307-
dynamo-tables-local:
308+
local-tables:
308309
go run -mod vendor cmd/create-dynamodb-tables/main.go \
309310
-refresh \
310311
-table-prefix '$(TABLE_PREFIX)' \
311312
-dynamodb-client-uri 'awsdynamodb://?region=localhost&credentials=anon:&local=true'
312313

314+
local-setup:
315+
@make local-tables
316+
@make local-accounts
317+
@make local-server
318+
313319
# I haven't been able to get this to work yet...
314320
# https://dev.mysql.com/doc/mysql-installation-excerpt/8.3/en/docker-mysql-getting-started.html#docker-starting-mysql-server
315321

app/follower/process/example/flags.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package example
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"os"
7+
8+
"github.com/sfomuseum/go-flags/flagset"
9+
"github.com/sfomuseum/go-flags/multi"
10+
)
11+
12+
var mode string
13+
var follower_ids multi.MultiInt64
14+
15+
var accounts_database_uri string
16+
var properties_database_uri string
17+
var activities_database_uri string
18+
var messages_database_uri string
19+
var notes_database_uri string
20+
var posts_database_uri string
21+
var post_tags_database_uri string
22+
var deliveries_database_uri string
23+
var followers_database_uri string
24+
25+
var delivery_queue_uri string
26+
27+
var max_attempts int
28+
var hostname string
29+
var insecure bool
30+
31+
func DefaultFlagSet() *flag.FlagSet {
32+
33+
fs := flagset.NewFlagSet("activitypub")
34+
35+
fs.StringVar(&mode, "mode", "cli", "The mode of operation. Valid options are: cli, lambda.")
36+
fs.Var(&follower_ids, "id", "One or more follower IDs to process (if -mode=cli).")
37+
38+
fs.StringVar(&accounts_database_uri, "accounts-database-uri", "", "A registered sfomuseum/go-activitypub/database.AccountsDatabase URI.")
39+
fs.StringVar(&properties_database_uri, "properties-database-uri", "", "A registered sfomuseum/go-activitypub/database.PropertiesDatabase URI.")
40+
fs.StringVar(&activities_database_uri, "activities-database-uri", "", "A registered sfomuseum/go-activitypub/database.ActivitiesDatabase URI.")
41+
fs.StringVar(&messages_database_uri, "messages-database-uri", "", "A registered sfomuseum/go-activitypub/database.MessagesDatabase URI.")
42+
fs.StringVar(&notes_database_uri, "notes-database-uri", "", "A registered sfomuseum/go-activitypub/database.NotesDatabase URI.")
43+
fs.StringVar(&posts_database_uri, "posts-database-uri", "", "A registered sfomuseum/go-activitypub/database.PostsDatabase URI.")
44+
fs.StringVar(&post_tags_database_uri, "post-tags-database-uri", "", "A registered sfomuseum/go-activitypub/database.PostTagsDatabase URI.")
45+
fs.StringVar(&deliveries_database_uri, "deliveries-database-uri", "", "A registered sfomuseum/go-activitypub/database.DeliveriesDatabase URI.")
46+
fs.StringVar(&followers_database_uri, "followers-database-uri", "", "A registered sfomuseum/go-activitypub/database.FollowersDatabase URI.")
47+
48+
fs.StringVar(&delivery_queue_uri, "delivery-queue-uri", "synchronous://", "A registered sfomuseum/go-activitypub/queue.DeliveryQueue URI.")
49+
50+
fs.IntVar(&max_attempts, "max-attempts", 5, "The maximum number of attempts to try delivering activities.")
51+
fs.StringVar(&hostname, "hostname", "", "The hostname of the ActivityPub server delivering activities.")
52+
fs.BoolVar(&insecure, "insecure", false, "A boolean flag indicating the ActivityPub server delivering activities is insecure (not using TLS).")
53+
54+
fs.Usage = func() {
55+
fmt.Fprintf(os.Stderr, "An example application for processing messages delivered through a go-activitypub/queue.ProcessMessageQueue publisher.\n")
56+
fmt.Fprintf(os.Stderr, "Usage:\n\t %s [options]\n", os.Args[0])
57+
fmt.Fprintf(os.Stderr, "Valid options are:\n")
58+
fs.PrintDefaults()
59+
}
60+
61+
return fs
62+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package example
2+
3+
import (
4+
"context"
5+
"flag"
6+
"fmt"
7+
8+
"github.com/sfomuseum/go-activitypub/uris"
9+
"github.com/sfomuseum/go-flags/flagset"
10+
)
11+
12+
type RunOptions struct {
13+
Mode string
14+
FollowerIds []int64
15+
MessagesDatabaseURI string
16+
NotesDatabaseURI string
17+
AccountsDatabaseURI string
18+
PropertiesDatabaseURI string
19+
ActivitiesDatabaseURI string
20+
PostsDatabaseURI string
21+
PostTagsDatabaseURI string
22+
DeliveriesDatabaseURI string
23+
FollowersDatabaseURI string
24+
DeliveryQueueURI string
25+
MaxAttempts int
26+
URIs *uris.URIs
27+
}
28+
29+
func OptionsFromFlagSet(ctx context.Context, fs *flag.FlagSet) (*RunOptions, error) {
30+
31+
flagset.Parse(fs)
32+
33+
err := flagset.SetFlagsFromEnvVars(fs, "ACTIVITYPUB")
34+
35+
if err != nil {
36+
return nil, fmt.Errorf("Failed to derive flags from environment variables, %w", err)
37+
}
38+
39+
uris_table := uris.DefaultURIs()
40+
uris_table.Hostname = hostname
41+
uris_table.Insecure = insecure
42+
43+
opts := &RunOptions{
44+
Mode: mode,
45+
FollowerIds: follower_ids,
46+
MessagesDatabaseURI: messages_database_uri,
47+
NotesDatabaseURI: notes_database_uri,
48+
AccountsDatabaseURI: accounts_database_uri,
49+
PropertiesDatabaseURI: properties_database_uri,
50+
ActivitiesDatabaseURI: activities_database_uri,
51+
PostsDatabaseURI: posts_database_uri,
52+
PostTagsDatabaseURI: post_tags_database_uri,
53+
DeliveriesDatabaseURI: deliveries_database_uri,
54+
FollowersDatabaseURI: followers_database_uri,
55+
DeliveryQueueURI: delivery_queue_uri,
56+
URIs: uris_table,
57+
MaxAttempts: max_attempts,
58+
}
59+
60+
return opts, nil
61+
}
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
package example
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"flag"
7+
"fmt"
8+
"log/slog"
9+
10+
"github.com/aws/aws-lambda-go/events"
11+
"github.com/aws/aws-lambda-go/lambda"
12+
"github.com/sfomuseum/go-activitypub/database"
13+
"github.com/sfomuseum/go-activitypub/queue"
14+
)
15+
16+
func Run(ctx context.Context) error {
17+
fs := DefaultFlagSet()
18+
return RunWithFlagSet(ctx, fs)
19+
}
20+
21+
func RunWithFlagSet(ctx context.Context, fs *flag.FlagSet) error {
22+
23+
opts, err := OptionsFromFlagSet(ctx, fs)
24+
25+
if err != nil {
26+
return fmt.Errorf("Failed to derive options from flagset, %w", err)
27+
}
28+
29+
return RunWithOptions(ctx, opts)
30+
}
31+
32+
func RunWithOptions(ctx context.Context, opts *RunOptions) error {
33+
34+
messages_db, err := database.NewMessagesDatabase(ctx, opts.MessagesDatabaseURI)
35+
36+
if err != nil {
37+
return fmt.Errorf("Failed to create new messages database, %w", err)
38+
}
39+
40+
defer messages_db.Close(ctx)
41+
42+
notes_db, err := database.NewNotesDatabase(ctx, opts.NotesDatabaseURI)
43+
44+
if err != nil {
45+
return fmt.Errorf("Failed to create new notes database, %w", err)
46+
}
47+
48+
defer notes_db.Close(ctx)
49+
50+
accounts_db, err := database.NewAccountsDatabase(ctx, opts.AccountsDatabaseURI)
51+
52+
if err != nil {
53+
return fmt.Errorf("Failed to create new accounts database, %w", err)
54+
}
55+
56+
defer accounts_db.Close(ctx)
57+
58+
properties_db, err := database.NewPropertiesDatabase(ctx, opts.PropertiesDatabaseURI)
59+
60+
if err != nil {
61+
return fmt.Errorf("Failed to create new properties database, %w", err)
62+
}
63+
64+
defer properties_db.Close(ctx)
65+
66+
activities_db, err := database.NewActivitiesDatabase(ctx, opts.ActivitiesDatabaseURI)
67+
68+
if err != nil {
69+
return fmt.Errorf("Failed to create new activities database, %w", err)
70+
}
71+
72+
defer activities_db.Close(ctx)
73+
74+
posts_db, err := database.NewPostsDatabase(ctx, opts.PostsDatabaseURI)
75+
76+
if err != nil {
77+
return fmt.Errorf("Failed to create new posts database, %w", err)
78+
}
79+
80+
defer posts_db.Close(ctx)
81+
82+
post_tags_db, err := database.NewPostTagsDatabase(ctx, opts.PostTagsDatabaseURI)
83+
84+
if err != nil {
85+
return fmt.Errorf("Failed to create new post tags database, %w", err)
86+
}
87+
88+
defer post_tags_db.Close(ctx)
89+
90+
deliveries_db, err := database.NewDeliveriesDatabase(ctx, opts.DeliveriesDatabaseURI)
91+
92+
if err != nil {
93+
return fmt.Errorf("Failed to create new deliveries database, %w", err)
94+
}
95+
96+
defer deliveries_db.Close(ctx)
97+
98+
followers_db, err := database.NewFollowersDatabase(ctx, opts.FollowersDatabaseURI)
99+
100+
if err != nil {
101+
return fmt.Errorf("Failed to create new followers database, %w", err)
102+
}
103+
104+
defer followers_db.Close(ctx)
105+
106+
_, err = queue.NewDeliveryQueue(ctx, opts.DeliveryQueueURI)
107+
108+
if err != nil {
109+
return fmt.Errorf("Failed to create new delivery queue, %w", err)
110+
}
111+
112+
// Note: Don't close delivery_q in a Lambda context. This will trigger errors like this:
113+
// "Failed to send message, pubsub: Topic has been Shutdown (code=FailedPrecondition)"
114+
115+
process_follower := func(ctx context.Context, follower_id int64) error {
116+
117+
logger := slog.Default()
118+
logger = logger.With("method", "process_follow")
119+
logger = logger.With("follower id", follower_id)
120+
121+
logger.Info("Process follow")
122+
123+
// Message is the thing which was dispatched by the process message queue (in www/inbox_post.go)
124+
125+
f, err := followers_db.GetFollowerWithId(ctx, follower_id)
126+
127+
if err != nil {
128+
logger.Error("Failed to retrieve message", "error", err)
129+
return fmt.Errorf("Failed to retrieve message, %w", err)
130+
}
131+
132+
// Account is the account that the note (message) was delivered to
133+
134+
msg_acct, err := accounts_db.GetAccountWithId(ctx, f.AccountId)
135+
136+
if err != nil {
137+
logger.Error("Failed to retrieve account for follow", "account id", f.AccountId, "error", err)
138+
return fmt.Errorf("Failed to retrieve account for message, %w", err)
139+
}
140+
141+
logger = logger.With("follow account id", msg_acct.Id)
142+
logger = logger.With("follow follower address", f.FollowerAddress)
143+
144+
logger.Info("Your code goes here (parse note, etc.)")
145+
return nil
146+
}
147+
148+
// Process multiple follower IDs
149+
// Maybe try to do this concurrently?
150+
151+
process_followers := func(ctx context.Context, follower_ids ...int64) error {
152+
153+
for _, id := range follower_ids {
154+
155+
err := process_follower(ctx, id)
156+
157+
if err != nil {
158+
slog.Error("Failed to process follower", "id", id, "error", err)
159+
}
160+
}
161+
162+
return nil
163+
}
164+
165+
// Actually start the application
166+
167+
switch opts.Mode {
168+
case "cli":
169+
170+
return process_followers(ctx, opts.FollowerIds...)
171+
case "lambda":
172+
173+
handler := func(ctx context.Context, sqsEvent events.SQSEvent) error {
174+
175+
follower_ids := make([]int64, len(sqsEvent.Records))
176+
177+
for idx, message := range sqsEvent.Records {
178+
179+
logger := slog.Default()
180+
logger = logger.With("message id", message.MessageId)
181+
182+
// logger.Debug("SQS", "message", message.Body)
183+
184+
var follower_id int64
185+
186+
err := json.Unmarshal([]byte(message.Body), &follower_id)
187+
188+
if err != nil {
189+
logger.Error("Failed to unmarshal message", "error", err)
190+
return fmt.Errorf("Failed to unmarshal message ID, %w", err)
191+
}
192+
193+
follower_ids[idx] = follower_id
194+
}
195+
196+
return process_followers(ctx, follower_ids...)
197+
}
198+
199+
lambda.Start(handler)
200+
201+
default:
202+
return fmt.Errorf("Invalid or unsupported mode")
203+
}
204+
205+
return nil
206+
}

app/server/flags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ var likes_database_uri string
2222
var boosts_database_uri string
2323

2424
var process_message_queue_uri string
25+
var process_follower_queue_uri string
2526

2627
var server_uri string
2728
var hostname string
@@ -68,6 +69,7 @@ func DefaultFlagSet() *flag.FlagSet {
6869
fs.BoolVar(&insecure, "insecure", false, "A boolean flag indicating the ActivityPub server delivering activities is insecure (not using TLS).")
6970

7071
fs.StringVar(&process_message_queue_uri, "process-message-queue-uri", "null://", "A registered go-activitypub/queue.ProcessMessageQueue URI.")
72+
fs.StringVar(&process_follower_queue_uri, "process-follower-queue-uri", "null://", "A registered go-activitypub/queue.ProcessFollowerQueue URI.")
7173

7274
fs.BoolVar(&allow_remote_icon_uri, "allow-remote-icon-uri", false, "Allow account icons hosted on a remote host.")
7375
fs.BoolVar(&verbose, "verbose", false, "Enable verbose (debug) logging.")

0 commit comments

Comments
 (0)