Skip to content

Commit 8782bd5

Browse files
[Chrony Receiver] Part 2 - Internal chrony client (#12102)
* [ Chrony Receiver ] Adding internal components Adding in the client with rewrites to allow usage with open telemetry collector * Allowing sequence to be based of time
1 parent 9739eb3 commit 8782bd5

File tree

6 files changed

+512
-2
lines changed

6 files changed

+512
-2
lines changed

receiver/chronyreceiver/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/chrony
33
go 1.17
44

55
require (
6+
github.com/facebook/time v0.0.0-20220713225404-f7a0d7702d50
67
github.com/stretchr/testify v1.8.0
78
github.com/tilinna/clock v1.1.0
89
go.opentelemetry.io/collector v0.56.0
@@ -22,8 +23,8 @@ require (
2223
github.com/mitchellh/reflectwalk v1.0.2 // indirect
2324
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
2425
github.com/modern-go/reflect2 v1.0.2 // indirect
25-
github.com/pkg/errors v0.9.1 // indirect
2626
github.com/pmezard/go-difflib v1.0.0 // indirect
27+
github.com/sirupsen/logrus v1.8.1 // indirect
2728
go.opencensus.io v0.23.0 // indirect
2829
go.opentelemetry.io/otel v1.8.0 // indirect
2930
go.opentelemetry.io/otel/metric v0.31.0 // indirect

receiver/chronyreceiver/go.sum

Lines changed: 5 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package chrony // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/chronyreceiver/internal/chrony"
16+
17+
import (
18+
"context"
19+
"encoding/binary"
20+
"errors"
21+
"net"
22+
"time"
23+
24+
"github.com/facebook/time/ntp/chrony"
25+
"github.com/tilinna/clock"
26+
"go.uber.org/multierr"
27+
)
28+
29+
var (
30+
errBadRequest = errors.New("bad request")
31+
)
32+
33+
type Client interface {
34+
// GetTrackingData will connection the configured chronyd endpoint
35+
// and will read that instance tracking information relatively to the configured
36+
// upstream NTP server(s).
37+
GetTrackingData(ctx context.Context) (*Tracking, error)
38+
}
39+
40+
type clientOption func(c *client)
41+
42+
// client is a partial rewrite of the client provided by
43+
// github.com/facebook/time/ntp/chrony
44+
//
45+
// The reason for the partial rewrite is that the original
46+
// client uses logrus' global instance within the main code path.
47+
type client struct {
48+
proto, addr string
49+
timeout time.Duration
50+
dialer func(network, addr string) (net.Conn, error)
51+
}
52+
53+
// New creates a client ready to use with chronyd
54+
func New(addr string, timeout time.Duration, opts ...clientOption) (Client, error) {
55+
if timeout < 1 {
56+
return nil, errors.New("timeout must be positive")
57+
}
58+
59+
network, endpoint, err := SplitNetworkEndpoint(addr)
60+
if err != nil {
61+
return nil, err
62+
}
63+
64+
c := &client{
65+
proto: network,
66+
addr: endpoint,
67+
timeout: timeout,
68+
dialer: net.Dial,
69+
}
70+
for _, opt := range opts {
71+
opt(c)
72+
}
73+
74+
return c, nil
75+
}
76+
77+
func (c *client) GetTrackingData(ctx context.Context) (*Tracking, error) {
78+
sock, err := c.dialer(c.proto, c.addr)
79+
if err != nil {
80+
return nil, err
81+
}
82+
clk := clock.FromContext(ctx)
83+
84+
if err = sock.SetDeadline(clk.Now().Add(c.timeout)); err != nil {
85+
return nil, err
86+
}
87+
88+
packet := chrony.NewTrackingPacket()
89+
packet.SetSequence(uint32(clk.Now().UnixNano()))
90+
91+
if err := binary.Write(sock, binary.BigEndian, packet); err != nil {
92+
return nil, multierr.Combine(err, sock.Close())
93+
}
94+
data := make([]uint8, 1024)
95+
if _, err := sock.Read(data); err != nil {
96+
return nil, multierr.Combine(err, sock.Close())
97+
}
98+
99+
if err := sock.Close(); err != nil {
100+
return nil, err
101+
}
102+
103+
return newTrackingData(data)
104+
}
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package chrony
16+
17+
import (
18+
"context"
19+
"encoding/binary"
20+
"fmt"
21+
"net"
22+
"os"
23+
"testing"
24+
"time"
25+
26+
"github.com/stretchr/testify/assert"
27+
"github.com/stretchr/testify/require"
28+
)
29+
30+
func newMockConn(tb testing.TB, handler func(net.Conn) error) net.Conn {
31+
client, server := net.Pipe()
32+
tb.Cleanup(func() {
33+
assert.NoError(tb, server.Close(), "Must not error when closing server connection")
34+
})
35+
assert.NoError(tb, server.SetDeadline(time.Now().Add(time.Second)), "Must not error when assigning deadline")
36+
go func() {
37+
assert.NoError(tb, binary.Read(server, binary.BigEndian, &requestTrackingContent{}), "Must not error when reading binary data")
38+
assert.NoError(tb, handler(server), "Must not error when processing request")
39+
}()
40+
return client
41+
}
42+
43+
func TestNew(t *testing.T) {
44+
t.Parallel()
45+
46+
tests := []struct {
47+
scenario string
48+
addr string
49+
toError bool
50+
}{
51+
{scenario: "valid host", addr: "udp://localhost:323", toError: false},
52+
{scenario: "missing port", addr: "udp://localhost", toError: true},
53+
{scenario: "missing protocol", addr: "localhost:323", toError: true},
54+
{scenario: "existing socket to potentially connect to", addr: fmt.Sprint("unix://", t.TempDir()), toError: false},
55+
{scenario: "invalid path", addr: "unix:///no/socket", toError: true},
56+
{scenario: "invalid protocol", addr: "http:/localhost:323", toError: true},
57+
}
58+
59+
for _, tc := range tests {
60+
tc := tc
61+
t.Run(tc.scenario, func(t *testing.T) {
62+
t.Parallel()
63+
cl, err := New(tc.addr, time.Second)
64+
if tc.toError {
65+
assert.Error(t, err, "Must error")
66+
assert.Nil(t, cl, "Must have a nil client")
67+
} else {
68+
assert.NoError(t, err, "Must not error")
69+
assert.NotNil(t, cl, "Must have a client")
70+
}
71+
})
72+
}
73+
}
74+
75+
func TestGettingTrackingData(t *testing.T) {
76+
t.Parallel()
77+
78+
tests := []struct {
79+
scenario string
80+
handler func(conn net.Conn) error
81+
timeout time.Duration
82+
data *Tracking
83+
err error
84+
}{
85+
{
86+
scenario: "Successful read binary from socket",
87+
timeout: 10 * time.Second,
88+
handler: func(conn net.Conn) error {
89+
type response struct {
90+
ReplyHead
91+
replyTrackingContent
92+
}
93+
resp := &response{
94+
ReplyHead: ReplyHead{
95+
Version: 6,
96+
Status: successfulRequest,
97+
Reply: replyTrackingCode,
98+
},
99+
replyTrackingContent: replyTrackingContent{
100+
RefID: 100,
101+
IPAddr: ipAddr{
102+
IP: [16]uint8{127, 0, 0, 1},
103+
Family: ipAddrInet4,
104+
},
105+
Stratum: 10,
106+
LeapStatus: 0,
107+
RefTime: timeSpec{
108+
100, 10, 0,
109+
},
110+
CurrentCorrection: binaryFloat(1300),
111+
LastOffset: binaryFloat(10000),
112+
RMSOffset: binaryFloat(12000),
113+
FreqPPM: binaryFloat(3300),
114+
ResidFreqPPM: binaryFloat(123456),
115+
SkewPPM: binaryFloat(9943),
116+
RootDelay: binaryFloat(-1220),
117+
RootDispersion: binaryFloat(-1100000),
118+
LastUpdateInterval: binaryFloat(120),
119+
},
120+
}
121+
122+
return binary.Write(conn, binary.BigEndian, resp)
123+
},
124+
data: &Tracking{
125+
RefID: 100,
126+
IPAddr: net.IP([]byte{127, 0, 0, 1}),
127+
Stratum: 10,
128+
LeapStatus: 0,
129+
RefTime: (&timeSpec{100, 10, 0}).Time(),
130+
CurrentCorrection: binaryFloat(1300).Float(),
131+
LastOffset: binaryFloat(10000).Float(),
132+
RMSOffset: binaryFloat(12000).Float(),
133+
FreqPPM: binaryFloat(3300).Float(),
134+
ResidFreqPPM: binaryFloat(123456).Float(),
135+
SkewPPM: binaryFloat(9943).Float(),
136+
RootDelay: binaryFloat(-1220).Float(),
137+
RootDispersion: binaryFloat(-1100000).Float(),
138+
LastUpdateInterval: binaryFloat(120).Float(),
139+
},
140+
err: nil,
141+
},
142+
{
143+
scenario: "Timeout waiting for response",
144+
timeout: 10 * time.Millisecond,
145+
handler: func(conn net.Conn) error {
146+
time.Sleep(100 * time.Millisecond)
147+
return nil
148+
},
149+
err: os.ErrDeadlineExceeded,
150+
},
151+
{
152+
scenario: "invalid status returned",
153+
timeout: 5 * time.Second,
154+
handler: func(conn net.Conn) error {
155+
resp := &ReplyHead{
156+
Version: 6,
157+
Status: 1,
158+
Reply: replyTrackingCode,
159+
}
160+
return binary.Write(conn, binary.BigEndian, resp)
161+
},
162+
err: errBadRequest,
163+
},
164+
{
165+
scenario: "invalid status command",
166+
timeout: 5 * time.Second,
167+
handler: func(conn net.Conn) error {
168+
resp := &ReplyHead{
169+
Version: 6,
170+
Status: successfulRequest,
171+
Reply: 0,
172+
}
173+
return binary.Write(conn, binary.BigEndian, resp)
174+
},
175+
err: errBadRequest,
176+
},
177+
}
178+
179+
for _, tc := range tests {
180+
tc := tc
181+
t.Run(tc.scenario, func(t *testing.T) {
182+
t.Parallel()
183+
conn := newMockConn(t, tc.handler)
184+
185+
client, err := New(fmt.Sprintf("unix://%s", t.TempDir()), tc.timeout, func(c *client) {
186+
c.dialer = func(_, _ string) (net.Conn, error) {
187+
return conn, nil
188+
}
189+
})
190+
require.NoError(t, err, "Must not error when creating client")
191+
192+
data, err := client.GetTrackingData(context.Background())
193+
assert.EqualValues(t, tc.data, data, "Must match the expected data")
194+
assert.ErrorIs(t, err, tc.err, "Must match the expected error")
195+
})
196+
}
197+
}

0 commit comments

Comments
 (0)