-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Add kafka topics observer implementation #38060
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add kafka topics observer implementation #38060
Conversation
f0507ad
to
269ca4a
Compare
config *Config | ||
cancel func() | ||
once *sync.Once | ||
ctx context.Context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would strongly advise against storing context since it can be invalid at any point, and it can also encourage some non go idiomatic behaviour.
once *sync.Once | ||
ctx context.Context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I suggest instead of storing the context and the once operation that you start up a background routine using https://pkg.go.dev/golang.org/x/sync/errgroup#Group ?
The reason being is that this can manage the routine for you and similar to a threadpool in other languages.
You can store the cancel in the type so you can shutdown the group at any point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for noticing that. Btw, I've seen that context is stored in dockerObserver
type, so maybe it would be good to change it in that observer too.
For this kafka topics observer instead of errorgroup I adopted similar approach as in cfGardenObserver
, which is storing doneChan
inside the observer struct and closing it in the Shutdown
method. What do you think about it?
return nil | ||
} | ||
|
||
var createKafkaClusterAdmin = func(ctx context.Context, config Config) (sarama.ClusterAdmin, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason this is stored as a global far instead of typed function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aaa, I saw it got used in tests.
269ca4a
to
f93cfe6
Compare
update according to golint; update metadata.yaml add changelog return all config errors at once; don't store context in kafkaTopicsObserver struct update go.mod fix race detected during execution of test error make gendistributions
f93cfe6
to
8a7044b
Compare
* main: (55 commits) [chore] Update core dependencies (open-telemetry#38124) Add kafka topics observer implementation (open-telemetry#38060) [exporter/splunk_hec] Mute errors from draining the response body (open-telemetry#38118) [chore] [exporter/splunk_hec] Remove dead code (open-telemetry#38113) Add support for JUnit test results (open-telemetry#37941) [chore] amend changelog for prometheus receiver change (open-telemetry#38109) [chore] Fix dead links in issue-triaging.md (open-telemetry#38105) [chore] fix deprecation (open-telemetry#38107) [exporter/coralogix] Add new batch options to Coralogix exporter (open-telemetry#38082) [chore][exporter/datadog] fix integration test (open-telemetry#38091) [chore] Update otel to unblock contrib test in core repo (open-telemetry#38100) [chore] Bump go-version match to 1.23 (open-telemetry#38099) [exporter/elasticsearch] Add _metric_names_hash to avoid metric rejections (open-telemetry#37511) elasticsearchexporter: refactor encoding; drop metrics support from raw/none/bodymap mapping modes (open-telemetry#37928) [exporter/stefexporter] Fix incorrectly implemented STEF exporter zstd compression option (open-telemetry#38089) [exporter/clickhouse] Add client info for identifying exporter in `system.query_log` (open-telemetry#37146) [chore] Prepare release 0.120.1 (open-telemetry#38055) [extension/httpforwarder] Shutdown should wait server exit (open-telemetry#37735) receiver/prometheusremotewrite: Add two fields timestamp and value. (open-telemetry#37895) [reciver/sqlqueryreceiver] Add support for SapASE (sybase) (open-telemetry#37773) ...
require.Equal(t, want, kEndpoints) | ||
} | ||
|
||
func TestCollectEndpointsAllConfigSettings(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is leading to CI issues, see #38192
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I can see what causes the problem, I'll create a PR with this test fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My PR with a fix #38218
Description
Add implementation and tests for kafka topics observer
Link to tracking issue
New component
#37665
Testing
Unit tests