Skip to content

Commit aa22026

Browse files
committed
Alt lookup speedup
Roughly makes looking up alts ~10x faster
1 parent 7e3c5ce commit aa22026

File tree

12 files changed

+259
-17
lines changed

12 files changed

+259
-17
lines changed

src/database/cache.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ pub async fn get_redis_pool(redis_host: &Option<String>) -> anyhow::Result<Redis
111111
None => Err(ConfigMissingFieldError {field_name: String::from("redis-host") }.into()),
112112
Some(redis_host) => {
113113
let redis_uri = format!("redis://{}", redis_host);
114-
println!("Connecting to redis at {}", &redis_uri);
114+
info!("Connecting to redis at {}", &redis_uri);
115115
let client = redis::Client::open(redis_uri)?;
116116
let manager = RedisConnectionManager::new(client);
117117
let pool = Pool::builder()
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use std::collections::HashMap;
2+
use std::ffi::c_int;
3+
use futures::{Stream, StreamExt, TryStreamExt};
4+
use futures::stream::FuturesUnordered;
5+
use mongodb::bson::{doc, Document};
6+
use mongodb::error::Error;
7+
use mongodb::options::FindOptions;
8+
use crate::database::Database;
9+
use crate::database::migrations::DatabaseMigration;
10+
use crate::database::models::ip_identity::IpIdentity;
11+
use crate::database::models::player::Player;
12+
13+
pub struct DenormalizeIpIdentitiesMigration {}
14+
15+
#[async_trait]
16+
impl DatabaseMigration for DenormalizeIpIdentitiesMigration {
17+
fn get_id(&self) -> String {
18+
String::from("denormalize_ip_identities")
19+
}
20+
21+
async fn perform(&self, database: &Database) {
22+
let mut find_options = FindOptions::default();
23+
// batch-read into memory 50k records at a time
24+
find_options.batch_size = Some(50_000);
25+
26+
let count = database.players.count_documents(doc! {}, None).await.unwrap_or(0);
27+
info!("{} document(s) in the player collection to migrate", count);
28+
29+
let mut cursor = database.players.find(doc! {}, Some(find_options)).await.expect("find all players to succeed");
30+
// aggregate 25k players' worth of IPs into memory at a time before flushing
31+
let step_size = 25_000u32;
32+
let mut total_accumulated = 0u32;
33+
let mut accumulated = 0u32;
34+
let mut error_count = 0u32;
35+
let mut ip_map : HashMap<String, Vec<String>> = HashMap::new();
36+
while let Ok(more) = cursor.advance().await {
37+
if !more {
38+
break
39+
}
40+
if accumulated >= step_size {
41+
info!("Flushing batch of IPs to ip identities, accumulation progress: {}/{}", total_accumulated, count);
42+
Self::flush_ips(database, ip_map).await;
43+
ip_map = HashMap::new();
44+
accumulated = 0;
45+
}
46+
let player = match cursor.deserialize_current() {
47+
Ok(p) => p,
48+
Err(e) => {
49+
let doc_result : mongodb::bson::raw::Result<Document> = cursor.current().try_into();
50+
let doc = match doc_result {
51+
Ok(doc) => doc,
52+
Err(e) => {
53+
warn!("Error to parse doc: {}", e);
54+
return
55+
}
56+
};
57+
warn!("document in question: {:?}", doc);
58+
warn!("Deserialization error: {}", e);
59+
error_count += 1;
60+
continue
61+
}
62+
};
63+
for ip in &player.ips {
64+
match ip_map.get_mut(ip) {
65+
Some(players_for_ip) => {
66+
players_for_ip.push(player.id.to_owned());
67+
}
68+
None => {
69+
ip_map.insert(ip.to_owned(), vec![player.id.to_owned()]);
70+
}
71+
}
72+
}
73+
accumulated += 1;
74+
total_accumulated += 1;
75+
}
76+
info!("Flushing any remaining IPs...");
77+
Self::flush_ips(database, ip_map).await;
78+
info!("Total flushed: {}", total_accumulated);
79+
info!("Error count: {}", error_count);
80+
}
81+
}
82+
83+
impl DenormalizeIpIdentitiesMigration {
84+
async fn flush_ips(database: &Database, ip_map : HashMap<String, Vec<String>>) {
85+
let unordered_futures = FuturesUnordered::new();
86+
for (ip, players) in ip_map.into_iter() {
87+
let task = async {
88+
let ip_identity_optional = IpIdentity::get_ip_identity_by_ip(&database.ip_identities, &ip).await;
89+
match ip_identity_optional {
90+
Some(mut ip_identity) => {
91+
ip_identity.players.extend(players);
92+
database.save(&ip_identity).await;
93+
}
94+
None => {
95+
let ip_identity = IpIdentity {
96+
ip, players
97+
};
98+
database.save(&ip_identity).await
99+
}
100+
}
101+
};
102+
unordered_futures.push(task);
103+
}
104+
unordered_futures.collect::<Vec<_>>().await;
105+
}
106+
}

src/database/migrations/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use crate::database::Database;
2+
use crate::database::migrations::denormalize_ip_identities::DenormalizeIpIdentitiesMigration;
3+
4+
pub mod denormalize_ip_identities;
5+
6+
#[async_trait]
7+
pub trait DatabaseMigration {
8+
fn get_id(&self) -> String;
9+
async fn perform(&self, database: &Database);
10+
}
11+
12+
pub struct MigrationExecutor {
13+
migrations: Vec<Box<dyn DatabaseMigration>>
14+
}
15+
16+
impl MigrationExecutor {
17+
pub fn new() -> Self {
18+
let denormalize_ip_identities_migration =
19+
Box::new(DenormalizeIpIdentitiesMigration {});
20+
Self {
21+
migrations: vec![denormalize_ip_identities_migration]
22+
}
23+
}
24+
25+
pub async fn execute_migration_by_name(&self, database: &Database, name: String) {
26+
match self.migrations.iter().find(|migration| migration.get_id() == name) {
27+
Some(migration) => {
28+
migration.perform(database).await
29+
}
30+
None => {}
31+
}
32+
}
33+
}

src/database/mod.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{str::FromStr, time::Duration};
2+
use std::collections::HashSet;
23

34
use mars_api_rs_macro::IdentifiableDocument;
45
use mongodb::{options::{ClientOptions, FindOneOptions, UpdateOptions}, Client, Collection, bson::{doc, oid::ObjectId}, Cursor, results::DeleteResult};
@@ -8,13 +9,17 @@ use rocket::serde::DeserializeOwned;
89
use futures::StreamExt;
910
use serde::Serialize;
1011
use anyhow::anyhow;
12+
use futures::stream::FuturesUnordered;
13+
use rocket::form::validate::Contains;
1114

1215
use crate::{database::models::player::Player, util::r#macro::unwrap_helper};
16+
use crate::database::models::ip_identity::IpIdentity;
1317
use crate::util::validation::verbose_result_ok;
1418

1519
use self::models::{achievement::Achievement, death::Death, level::Level, r#match::Match, punishment::Punishment, rank::Rank, session::Session};
1620

1721
pub mod models;
22+
pub mod migrations;
1823
pub mod cache;
1924

2025
pub trait CollectionOwner<T> {
@@ -32,7 +37,8 @@ pub struct Database {
3237
pub ranks: Collection<Rank>,
3338
pub matches: Collection<Match>,
3439
pub deaths: Collection<Death>,
35-
pub levels: Collection<Level>
40+
pub levels: Collection<Level>,
41+
pub ip_identities: Collection<IpIdentity>
3642
}
3743

3844
impl Database {
@@ -148,10 +154,26 @@ impl Database {
148154
}
149155

150156
pub async fn get_alts_for_player(&self, player: &Player) -> Vec<Player> {
151-
let cursor = unwrap_helper::result_return_default!(self.players.find(doc! {
152-
"ips": {"$in": &player.ips}, "_id": {"$ne": &player.id}
153-
}, None).await, Vec::new());
154-
Database::consume_cursor_into_owning_vec(cursor).await
157+
let unordered_futures = FuturesUnordered::new();
158+
for ip in &player.ips {
159+
unordered_futures.push(IpIdentity::find_players_for_ip(self, ip));
160+
}
161+
let players = {
162+
let players_with_duplicates = unordered_futures.collect::<Vec<_>>().await
163+
.into_iter().flatten().collect::<Vec<_>>();
164+
let mut player_set : Vec<Player> = Vec::new();
165+
for player in players_with_duplicates {
166+
if !player_set.iter().map(|p| p.id.clone()).collect::<Vec<_>>().contains(&player.id) {
167+
player_set.push(player);
168+
}
169+
}
170+
player_set
171+
};
172+
players
173+
// let cursor = unwrap_helper::result_return_default!(self.players.find(doc! {
174+
// "ips": {"$in": &player.ips}, "_id": {"$ne": &player.id}
175+
// }, None).await, Vec::new());
176+
// Database::consume_cursor_into_owning_vec(cursor).await
155177
}
156178

157179
pub async fn save<R>(&self, record: &R) where R: CollectionOwner<R> + Serialize + IdentifiableDocument {
@@ -211,10 +233,11 @@ pub async fn connect(db_url: &String, min_pool_size: Option<u32>, max_pool_size:
211233
let matches = db.collection::<Match>(Match::get_collection_name());
212234
let levels = db.collection::<Level>(Level::get_collection_name());
213235
let deaths = db.collection::<Death>(Death::get_collection_name());
236+
let ip_identities = db.collection::<IpIdentity>(IpIdentity::get_collection_name());
214237

215238
info!("Connected to database successfully.");
216239
Ok(Database {
217240
mongo: db, tags, achievements, players, sessions,
218-
punishments, ranks, matches, levels, deaths
241+
punishments, ranks, matches, levels, deaths, ip_identities
219242
})
220243
}

src/database/models/ip_identity.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use futures::stream::FuturesUnordered;
2+
use futures::StreamExt;
3+
use mars_api_rs_derive::IdentifiableDocument;
4+
use mars_api_rs_macro::IdentifiableDocument;
5+
use mongodb::Collection;
6+
use rocket::serde::{Deserialize, Serialize};
7+
use crate::database::{CollectionOwner, Database};
8+
use crate::database::models::player::Player;
9+
10+
#[derive(Deserialize, Serialize, IdentifiableDocument, Clone)]
11+
pub struct IpIdentity {
12+
#[id]
13+
#[serde(rename = "_id")]
14+
pub ip: String,
15+
pub players: Vec<String>
16+
}
17+
18+
impl IpIdentity {
19+
pub async fn add_player_ip(database: &Database, ip: &String, player: &String) {
20+
let collection = &database.ip_identities;
21+
let ip_identity = match Self::get_ip_identity_by_ip(collection, ip).await {
22+
Some(mut ip_identity) => {
23+
if !ip_identity.players.contains(&player) {
24+
ip_identity.players.push(player.clone());
25+
}
26+
ip_identity
27+
},
28+
None => IpIdentity { ip: ip.clone(), players: vec![player.clone()] }
29+
};
30+
database.save(&ip_identity).await;
31+
}
32+
33+
pub async fn find_players_for_ip(database: &Database, ip: &String) -> Vec<Player> {
34+
let collection = &database.ip_identities;
35+
let record = Self::get_ip_identity_by_ip(collection, ip).await;
36+
match record {
37+
Some(ip_identity) => {
38+
let unordered_futures = FuturesUnordered::new();
39+
for player in ip_identity.players.iter() {
40+
unordered_futures.push(
41+
Database::find_by_id(&database.players, player)
42+
);
43+
}
44+
let results : Vec<_> = unordered_futures.collect().await;
45+
let players : Vec<_> = results.into_iter().filter_map(|r| r).collect();
46+
players
47+
}
48+
None => Vec::new()
49+
}
50+
}
51+
52+
pub async fn get_ip_identity_by_ip(collection: &Collection<IpIdentity>, ip: &String) -> Option<IpIdentity> {
53+
Database::find_by_id(collection, ip.as_str()).await
54+
}
55+
}
56+
57+
impl CollectionOwner<IpIdentity> for IpIdentity {
58+
fn get_collection(database: &Database) -> &Collection<IpIdentity> {
59+
&database.ip_identities
60+
}
61+
62+
fn get_collection_name() -> &'static str {
63+
"ip_identity"
64+
}
65+
}

src/database/models/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ pub mod death;
1212
pub mod join_sound;
1313
pub mod server;
1414
pub mod achievement;
15+
pub mod ip_identity;

src/database/models/player.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ pub struct PlayerStats {
8585
#[serde(default)]
8686
pub xp: u32,
8787
#[serde(default)]
88-
pub server_playtime: u64,
88+
pub server_playtime: i64,
8989
#[serde(default)]
9090
pub game_playtime: u64,
9191
#[serde(default)]

src/http/player/mod.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use sha2::{Sha256, Digest};
1010

1111
use self::payloads::{PlayerPreLoginResponse, PlayerPreLoginResponder, PlayerLoginResponse, PlayerLogoutRequest, PlayerProfileResponder, PlayerProfileResponse, PlayerAltResponse};
1212
use std::{time::{SystemTime, UNIX_EPOCH}, collections::HashMap};
13+
use crate::database::models::ip_identity::IpIdentity;
1314

1415
use super::punishment::payloads::PunishmentIssueRequest;
1516

@@ -31,7 +32,8 @@ pub async fn prelogin(
3132
if let Some(mut returning_player) = player_optional {
3233
returning_player.name = data.player.name.clone();
3334
returning_player.name_lower = returning_player.name.to_lowercase();
34-
if !returning_player.ips.contains(&ip) {
35+
let new_ip = !returning_player.ips.contains(&ip);
36+
if new_ip {
3537
returning_player.ips.push(ip.clone());
3638
};
3739

@@ -60,6 +62,10 @@ pub async fn prelogin(
6062
};
6163
state.player_cache.set(&state.database, &returning_player.name, &returning_player, true).await;
6264
state.database.ensure_player_name_uniqueness(&data.player.name, &data.player.id).await;
65+
// denormalize ip player relationship
66+
if new_ip {
67+
IpIdentity::add_player_ip(&state.database, &ip, &returning_player.id).await;
68+
}
6369

6470
Ok(PlayerPreLoginResponder {
6571
response: PlayerPreLoginResponse {
@@ -70,13 +76,13 @@ pub async fn prelogin(
7076
}
7177
})
7278
} else {
73-
println!("Could not find player {} in database!", player_id);
79+
debug!("Could not find player {} in database!", player_id);
7480
let time_millis : f64 = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as f64;
7581
let player = Player {
7682
id: data.player.id.clone(),
7783
name: data.player.name.clone(),
7884
name_lower: data.player.name.to_lowercase(),
79-
ips: vec![ip],
85+
ips: vec![ip.clone()],
8086
first_joined_at: time_millis,
8187
last_joined_at: time_millis,
8288
rank_ids: Vec::new(),
@@ -91,6 +97,7 @@ pub async fn prelogin(
9197

9298
state.player_cache.set(&state.database, &player.name, &player, true).await;
9399
state.database.ensure_player_name_uniqueness(&data.player.name, &data.player.id).await;
100+
IpIdentity::add_player_ip(&state.database, &ip, &data.player.id).await;
94101

95102
Ok(PlayerPreLoginResponder {
96103
response: PlayerPreLoginResponse {
@@ -184,7 +191,7 @@ pub async fn logout(
184191

185192
let time_millis : u64 = u64::try_from(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()).unwrap_or(u64::MAX);
186193
session.ended_at = Some(time_millis);
187-
player.stats.server_playtime += data.playtime;
194+
player.stats.server_playtime += (data.playtime as i64);
188195

189196
state.leaderboards.server_playtime.increment(&player.id_name(), Some(u32::try_from(data.playtime).unwrap_or(u32::MAX))).await; // Will break in 2106
190197

@@ -339,7 +346,10 @@ pub async fn lookup_player(
339346
let player_alts : Vec<PlayerAltResponse> = {
340347
let mut player_alts : Vec<PlayerAltResponse> = Vec::new();
341348
if alts {
349+
let t1 = get_u64_time_millis();
342350
let fetched_alts = state.database.get_alts_for_player(&player).await;
351+
let t2 = get_u64_time_millis();
352+
debug!("Alt lookup for {} took {}ms", &player.name, (t2 - t1));
343353
let pun_tasks : Vec<_> = fetched_alts.iter().map(|alt| {
344354
state.database.get_player_punishments(alt)
345355
}).collect();

src/http/server/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ async fn server_startup(
4747
sessions_to_write.push(hanging_session.to_owned());
4848

4949
let mut cached_player = unwrap_helper::continue_default!(state.player_cache.get(&state.database, &hanging_session.player.name).await);
50-
cached_player.stats.server_playtime += hanging_session.length().unwrap_or(0);
50+
cached_player.stats.server_playtime += (hanging_session.length().unwrap_or(0) as i64);
5151
players_to_write.push(cached_player);
5252
}
5353

0 commit comments

Comments
 (0)