Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 80 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,51 @@ bot's token. You must also depend on `rustls`, `tokio`, `twilight-cache-inmemory
`twilight-gateway`, `twilight-http`, and `twilight-model` in your `Cargo.toml`.

```rust,no_run
use std::{env, error::Error, sync::Arc};
mod context {
use std::{ops::Deref, sync::OnceLock};
use twilight_cache_inmemory::DefaultInMemoryCache;
use twilight_http::Client;
pub static CONTEXT: Handle = Handle(OnceLock::new());
#[derive(Debug)]
pub struct Context {
pub cache: DefaultInMemoryCache,
pub http: Client,
}
pub fn initialize(cache: DefaultInMemoryCache, http: Client) {
let context = Context { cache, http };
assert!(CONTEXT.0.set(context).is_ok());
}
pub struct Handle(OnceLock<Context>);
impl Deref for Handle {
type Target = Context;
fn deref(&self) -> &Self::Target {
self.0.get().unwrap()
}
}
}
use context::CONTEXT;
use std::env;
use twilight_cache_inmemory::{DefaultInMemoryCache, ResourceType};
use twilight_gateway::{Event, EventTypeFlags, Intents, Shard, ShardId, StreamExt as _};
use twilight_http::Client as HttpClient;
use twilight_http::Client;
use twilight_model::gateway::payload::incoming::MessageCreate;
// Use event type flags to only deserialize message create events.
const EVENT_TYPES: EventTypeFlags = EventTypeFlags::MESSAGE_CREATE;
#[tokio::main]
// Use intents to only receive guild message events.
const INTENTS: Intents = Intents::GUILD_MESSAGES.union(Intents::MESSAGE_CONTENT);
// Cache only new messages.
const RESOURCE_TYPES: ResourceType = ResourceType::MESSAGE;
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
// Initialize the tracing subscriber.
tracing_subscriber::fmt::init();
Expand All @@ -134,50 +173,58 @@ async fn main() -> anyhow::Result<()> {
let token = env::var("DISCORD_TOKEN")?;
// Use intents to only receive guild message events.
let mut shard = Shard::new(
ShardId::ONE,
token.clone(),
Intents::GUILD_MESSAGES | Intents::MESSAGE_CONTENT,
);
// HTTP is separate from the gateway, so create a new client.
let http = Arc::new(HttpClient::new(token));
// Since we only care about new messages, make the cache only
// cache new messages.
let cache = DefaultInMemoryCache::builder()
.resource_types(ResourceType::MESSAGE)
.resource_types(RESOURCE_TYPES)
.build();
let http = Client::new(token.clone());
// Initialize the bot context.
context::initialize(cache, http);
// Process each event as they come in.
while let Some(item) = shard.next_event(EventTypeFlags::all()).await {
let Ok(event) = item else {
tracing::warn!(source = ?item.unwrap_err(), "error receiving event");
let shard = Shard::new(ShardId::ONE, token, INTENTS);
dispatcher(shard).await;
continue;
Ok(())
}
#[tracing::instrument(fields(shard = %shard.id()), skip_all)]
async fn dispatcher(mut shard: Shard) {
loop {
let event = match shard.next_event(EVENT_TYPES).await {
Some(Ok(event)) => event,
Some(Err(source)) => {
tracing::warn!(?source, "error receiving event");
continue;
}
None => break,
};
// Update the cache with the event.
cache.update(&event);
CONTEXT.cache.update(&event);
tokio::spawn(handle_event(event, Arc::clone(&http)));
}
// Route the event to a handler.
let handler = match event {
Event::MessageCreate(event) => message(event),
_ => continue,
};
Ok(())
tokio::spawn(async move {
if let Err(source) = handler.await {
tracing::warn!(?source, "error handling event");
}
});
}
}
async fn handle_event(
event: Event,
http: Arc<HttpClient>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
match event {
Event::MessageCreate(msg) if msg.content == "!ping" => {
http.create_message(msg.channel_id)
#[tracing::instrument(fields(id = %event.id), skip_all)]
async fn message(event: Box<MessageCreate>) -> anyhow::Result<()> {
match event.content.as_ref() {
"!ping" => {
CONTEXT
.http
.create_message(event.channel_id)
.content("Pong!")
.await?;
}
// Other events here...
_ => {}
}
Expand Down
1 change: 1 addition & 0 deletions twilight-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ serde_test = { default-features = false, version = "1.0.136" }
static_assertions = { default-features = false, version = "1" }
tokio = { default-features = false, features = ["macros", "rt-multi-thread", "signal", "test-util"], version = "1.12" }
tokio-stream = { default-features = false, version = "0.1" }
tokio-util = { default-features = false, features = ["rt"], version = "0.7" }
tracing-subscriber = { default-features = false, features = ["fmt", "tracing-log"], version = "0.3" }

[features]
Expand Down
108 changes: 78 additions & 30 deletions twilight-gateway/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,48 +38,85 @@ from a `Fn(ShardId, ConfigBuilder) -> Config` closure, with the help of the
Create the recommended number of shards and loop over their guild messages:

```rust,no_run
use std::{env, sync::Arc};
use tokio::{signal, sync::watch};
mod context {
use std::{ops::Deref, sync::OnceLock};
use twilight_http::Client;
pub static CONTEXT: Handle = Handle(OnceLock::new());
#[derive(Debug)]
pub struct Context {
pub http: Client,
}
pub fn initialize(http: Client) {
let context = Context { http };
assert!(CONTEXT.0.set(context).is_ok());
}
pub struct Handle(OnceLock<Context>);
impl Deref for Handle {
type Target = Context;
fn deref(&self) -> &Self::Target {
self.0.get().unwrap()
}
}
}
use context::CONTEXT;
use std::{env, pin::pin};
use tokio::signal;
use tokio_util::task::TaskTracker;
use twilight_gateway::{
CloseFrame, Config, Event, EventTypeFlags, Intents, MessageSender, Shard, StreamExt as _,
};
use twilight_http::Client;
use twilight_model::gateway::payload::{incoming::MessageCreate, outgoing::UpdateVoiceState};
const EVENT_TYPES: EventTypeFlags = EventTypeFlags::MESSAGE_CREATE;
const INTENTS: Intents = Intents::GUILD_MESSAGES.union(Intents::MESSAGE_CONTENT);
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize the tracing subscriber.
tracing_subscriber::fmt::init();
// Select rustls backend
rustls::crypto::ring::default_provider().install_default().unwrap();
let token = env::var("DISCORD_TOKEN")?;
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let client = Arc::new(Client::new(token.clone()));
let config = Config::new(token, Intents::GUILD_MESSAGES | Intents::MESSAGE_CONTENT);
let tasks = twilight_gateway::create_recommended(&client, config, |_, builder| builder.build())
.await?
.map(|shard| tokio::spawn(dispatcher(Arc::clone(&client), shard, shutdown_rx.clone())))
.collect::<Vec<_>>();
let config = Config::new(token.clone(), INTENTS);
let http = Client::new(token);
let shards =
twilight_gateway::create_recommended(&http, config, |_, builder| builder.build()).await?;
context::initialize(http);
signal::ctrl_c().await?;
_ = shutdown_tx.send(true);
for task in tasks {
_ = task.await;
let tracker = TaskTracker::new();
for shard in shards {
tracker.spawn(dispatcher(shard));
}
tracker.close();
tracker.wait().await;
Ok(())
}
#[tracing::instrument(fields(shard = %shard.id()), skip_all)]
async fn dispatcher(client: Arc<Client>, mut shard: Shard, mut shutdown: watch::Receiver<bool>) {
async fn dispatcher(mut shard: Shard) {
let mut ctrl_c = pin!(signal::ctrl_c());
let mut shutdown = false;
let tracker = TaskTracker::new();
loop {
tokio::select! {
_ = shutdown.changed() => shard.close(CloseFrame::NORMAL),
Some(item) = shard.next_event(EventTypeFlags::all()) => {
// Do not poll ctrl_c after it's completed.
_ = &mut ctrl_c, if !shutdown => {
// Cleanly shut down once we receive the echo close frame.
shard.close(CloseFrame::NORMAL);
shutdown = true;
},
Some(item) = shard.next_event(EVENT_TYPES) => {
let event = match item {
Ok(event) => event,
Err(source) => {
Expand All @@ -88,37 +125,48 @@ async fn dispatcher(client: Arc<Client>, mut shard: Shard, mut shutdown: watch::
}
};
match event {
Event::GatewayClose(_) if *shutdown.borrow() => break,
Event::MessageCreate(e) => {
tokio::spawn(msg_handler(Arc::clone(&client), e, shard.sender()));
let handler = match event {
// Clean shutdown exit condition.
Event::GatewayClose(_) if shutdown => break,
Event::MessageCreate(e) => message(e, shard.sender()),
_ => continue,
};
tracker.spawn(async move {
if let Err(source) = handler.await {
tracing::warn!(?source, "error handling event");
}
_ => {}
}
});
}
}
}
tracker.close();
tracker.wait().await;
}
#[tracing::instrument(fields(id = %event.id), skip_all)]
async fn msg_handler(client: Arc<Client>, event: Box<MessageCreate>, sender: MessageSender) {
async fn message(event: Box<MessageCreate>, sender: MessageSender) -> anyhow::Result<()> {
match event.content.as_ref() {
"!join" if event.guild_id.is_some() => {
let _result = sender.command(&UpdateVoiceState::new(
sender.command(&UpdateVoiceState::new(
event.guild_id.unwrap(),
Some(event.channel_id),
false,
false,
));
))?;
}
"!ping" => {
let _result = client
CONTEXT
.http
.create_message(event.channel_id)
.content("pong!")
.await;
.content("Pong!")
.await?;
}
_ => {}
}
Ok(())
}
```

Expand Down