-
Notifications
You must be signed in to change notification settings - Fork 56
feat: upgrade logger #1265
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
tckeong
wants to merge
10
commits into
source-academy:feat/upgrade-logger
Choose a base branch
from
tckeong:feat/upgrade-logger
base: feat/upgrade-logger
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
feat: upgrade logger #1265
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
41b1cda
Create CloudWatch Logger Backend
tckeong 045a55e
Modified prod config
tckeong 31c54f3
Fix credo issue
tckeong 6411d20
Merge branch 'master' into feat/upgrade-logger
tckeong d6c92e1
Add docs
tckeong d271b44
Refactor the CloudWatch Logger
tckeong 7e62a59
Refactor CloudWatch Logger, add buffer to the logs
tckeong 8da8e2d
Enhance aws validation check.
tckeong ec0e120
Merge branch 'master' into feat/upgrade-logger
tckeong 7e55e87
Prevent timer_ref leak
tckeong File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,281 @@ | ||
defmodule Cadet.Logger.CloudWatchLogger do | ||
@moduledoc """ | ||
A custom Logger backend that sends logs to AWS CloudWatch. | ||
This backend can be configured to log at different levels and formats, | ||
and can include specific metadata in the logs. | ||
""" | ||
|
||
@behaviour :gen_event | ||
require Logger | ||
|
||
defstruct [:level, :format, :metadata, :log_group, :log_stream, :buffer, :timer_ref] | ||
|
||
@max_buffer_size 1000 | ||
@max_retries 3 | ||
@retry_delay 500 | ||
@flush_interval 5000 | ||
|
||
@impl true | ||
def init({__MODULE__, opts}) when is_list(opts) do | ||
config = configure_merge(read_env(), opts) | ||
{:ok, init(config, %__MODULE__{})} | ||
end | ||
|
||
@impl true | ||
def init({__MODULE__, name}) when is_atom(name) do | ||
config = read_env() | ||
tckeong marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{:ok, init(config, %__MODULE__{})} | ||
end | ||
|
||
@impl true | ||
def handle_call({:configure, options}, state) do | ||
{:ok, :ok, configure(options, state)} | ||
end | ||
|
||
@impl true | ||
def handle_event({level, _gl, {Logger, msg, ts, md}}, state) do | ||
%{ | ||
format: format, | ||
metadata: metadata, | ||
buffer: buffer, | ||
log_stream: log_stream, | ||
log_group: log_group | ||
} = state | ||
|
||
if meet_level?(level, state.level) and not meet_cloudwatch_error?(msg) do | ||
formatted_msg = Logger.Formatter.format(format, level, msg, ts, take_metadata(md, metadata)) | ||
timestamp = timestamp_from_logger_ts(ts) | ||
|
||
log_event = %{ | ||
"timestamp" => timestamp, | ||
"message" => IO.chardata_to_string(formatted_msg) | ||
} | ||
|
||
new_buffer = [log_event | buffer] | ||
|
||
new_buffer = | ||
if length(new_buffer) >= @max_buffer_size do | ||
flush_buffer_async(log_stream, log_group, new_buffer) | ||
[] | ||
else | ||
new_buffer | ||
end | ||
|
||
{:ok, %{state | buffer: new_buffer}} | ||
tckeong marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else | ||
{:ok, state} | ||
end | ||
end | ||
|
||
@impl true | ||
def handle_info(:flush_buffer, state) do | ||
%{buffer: buffer, timer_ref: timer_ref, log_stream: log_stream, log_group: log_group} = state | ||
|
||
if timer_ref, do: Process.cancel_timer(timer_ref) | ||
|
||
new_state = | ||
if length(buffer) > 0 do | ||
flush_buffer_sync(log_stream, log_group, buffer) | ||
%{state | buffer: []} | ||
else | ||
state | ||
end | ||
|
||
new_timer_ref = schedule_flush(@flush_interval) | ||
{:ok, %{new_state | timer_ref: new_timer_ref}} | ||
end | ||
|
||
@impl true | ||
def terminate(_reason, state) do | ||
%{log_stream: log_stream, log_group: log_group, buffer: buffer, timer_ref: timer_ref} = state | ||
|
||
if timer_ref, do: Process.cancel_timer(timer_ref) | ||
flush_buffer_sync(log_stream, log_group, buffer) | ||
:ok | ||
end | ||
|
||
def handle_event(_, state), do: {:ok, state} | ||
def handle_call(_, state), do: {:ok, :ok, state} | ||
def handle_info(_, state), do: {:ok, state} | ||
|
||
# Helpers | ||
defp configure(options, state) do | ||
config = configure_merge(read_env(), options) | ||
Application.put_env(:logger, __MODULE__, config) | ||
init(config, state) | ||
end | ||
|
||
defp meet_level?(_lvl, nil), do: true | ||
|
||
defp meet_level?(lvl, min) do | ||
Logger.compare_levels(lvl, min) != :lt | ||
end | ||
|
||
defp meet_cloudwatch_error?(msg) when is_binary(msg) do | ||
String.starts_with?(msg, "Failed to send log to CloudWatch") | ||
end | ||
|
||
defp meet_cloudwatch_error?(_) do | ||
false | ||
end | ||
|
||
defp flush_buffer_async(log_stream, log_group, buffer) do | ||
if length(buffer) > 0 do | ||
Task.start(fn -> send_to_cloudwatch(log_stream, log_group, buffer) end) | ||
end | ||
end | ||
|
||
defp flush_buffer_sync(log_stream, log_group, buffer) do | ||
if length(buffer) > 0 do | ||
send_to_cloudwatch(log_stream, log_group, buffer) | ||
end | ||
end | ||
|
||
defp schedule_flush(interval) do | ||
Process.send_after(self(), :flush_buffer, interval) | ||
end | ||
|
||
defp send_to_cloudwatch(log_stream, log_group, buffer) do | ||
# Ensure that the already have ExAws authentication configured | ||
with :ok <- check_exaws_config() do | ||
operation = build_log_operation(log_stream, log_group, buffer) | ||
|
||
operation | ||
|> send_with_retry() | ||
end | ||
end | ||
|
||
defp build_log_operation(log_stream, log_group, buffer) do | ||
# The headers and body structure can be found in the AWS API documentation: | ||
# https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html | ||
%ExAws.Operation.JSON{ | ||
http_method: :post, | ||
service: :logs, | ||
headers: [ | ||
{"x-amz-target", "Logs_20140328.PutLogEvents"}, | ||
{"content-type", "application/x-amz-json-1.1"} | ||
], | ||
data: %{ | ||
"logGroupName" => log_group, | ||
"logStreamName" => log_stream, | ||
"logEvents" => Enum.reverse(buffer) | ||
tckeong marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
end | ||
|
||
defp check_exaws_config do | ||
id = System.get_env("AWS_ACCESS_KEY_ID") | ||
secret = System.get_env("AWS_SECRET_ACCESS_KEY") | ||
region = Application.get_env(:ex_aws, :region) || System.get_env("AWS_REGION") | ||
|
||
cond do | ||
is_nil(id) or id == "" or is_nil(secret) or secret == "" -> | ||
Logger.error( | ||
"Failed to send log to CloudWatch. AWS credentials missing. Ensure AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set. | ||
" | ||
) | ||
|
||
:error | ||
|
||
region in [nil, ""] -> | ||
Logger.error( | ||
"Failed to send log to CloudWatch. AWS region not configured. Set AWS_REGION or :region under :ex_aws in config. | ||
" | ||
) | ||
|
||
:error | ||
|
||
true -> | ||
:ok | ||
end | ||
end | ||
|
||
defp send_with_retry(operation, retries \\ @max_retries) | ||
|
||
defp send_with_retry(operation, retries) when retries > 0 do | ||
case ExAws.request(operation) do | ||
{:ok, _response} -> | ||
:ok | ||
|
||
{:error, reason} -> | ||
Logger.error("Failed to send log to CloudWatch: #{inspect(reason)}. Retrying...") | ||
# Wait before retrying | ||
:timer.sleep(@retry_delay) | ||
send_with_retry(operation, retries - 1) | ||
end | ||
end | ||
|
||
defp send_with_retry(_, 0) do | ||
Logger.error("Failed to send log to CloudWatch after multiple retries.") | ||
end | ||
|
||
defp init(config, state) do | ||
level = Keyword.get(config, :level) | ||
format = Logger.Formatter.compile(Keyword.get(config, :format)) | ||
raw_metadata = Keyword.get(config, :metadata, []) | ||
metadata = configure_metadata(raw_metadata) | ||
log_group = Keyword.get(config, :log_group, "cadet-logs") | ||
log_stream = Keyword.get(config, :log_stream, "#{node()}-#{:os.system_time(:second)}") | ||
timer_ref = schedule_flush(@flush_interval) | ||
|
||
%{ | ||
state | ||
| level: level, | ||
format: format, | ||
metadata: metadata, | ||
log_group: log_group, | ||
log_stream: log_stream, | ||
buffer: [], | ||
timer_ref: timer_ref | ||
} | ||
end | ||
|
||
defp configure_metadata(:all), do: :all | ||
defp configure_metadata(metadata), do: Enum.reverse(metadata) | ||
|
||
defp take_metadata(metadata, :all) do | ||
metadata | ||
end | ||
|
||
defp take_metadata(metadata, keys) do | ||
Enum.reduce(keys, [], fn key, acc -> | ||
case Keyword.fetch(metadata, key) do | ||
{:ok, val} -> [{key, val} | acc] | ||
:error -> acc | ||
end | ||
end) | ||
end | ||
|
||
defp timestamp_from_logger_ts({{year, month, day}, {hour, minute, second, microsecond}}) do | ||
datetime = %DateTime{ | ||
year: year, | ||
month: month, | ||
day: day, | ||
hour: hour, | ||
minute: minute, | ||
second: second, | ||
microsecond: {microsecond, 6}, | ||
time_zone: "Etc/UTC", | ||
zone_abbr: "UTC", | ||
utc_offset: 0, | ||
std_offset: 0 | ||
} | ||
|
||
DateTime.to_unix(datetime, :millisecond) | ||
end | ||
|
||
defp read_env do | ||
Application.get_env(:logger, __MODULE__, Application.get_env(:logger, :cloudwatch_logger, [])) | ||
end | ||
|
||
""" | ||
Merges the given options with the existing environment configuration. | ||
If a key exists in both, the value from `options` will take precedence. | ||
""" | ||
|
||
defp configure_merge(env, options) do | ||
Keyword.merge(env, options, fn | ||
_, _v1, v2 -> v2 | ||
end) | ||
end | ||
end |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.