Skip to content

WIP - do not merge - Grpc archive storage rebase #2505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f0a5cd8
Grpc plugin archive storage support
m8rge Jun 29, 2020
eaf0cb6
Introduce service PluginCapabilities
m8rge Jul 20, 2020
5f39282
Add comments
m8rge Jul 20, 2020
a1063fa
Introduce PluginServices
m8rge Jul 21, 2020
12d044d
Format imports, insert copyright
m8rge Jul 21, 2020
37305f1
Bubble up error on Capabilities() call
m8rge Jul 21, 2020
97a783e
Add empty_test.go
m8rge Jul 21, 2020
3ceb751
Remove ArchiveReader, ArchiveWriter
m8rge Aug 7, 2020
5fa56cf
Pass config.PluginServices to grpc.Serve
m8rge Aug 7, 2020
0501faa
Introduce shared.ArchiveReader/ArchiveWriter
m8rge Aug 10, 2020
12b3f53
Improve ArchiveReader/ArchiveWriter according PR comments
m8rge Aug 31, 2020
4dbfc75
Test fixes
m8rge Aug 31, 2020
0ce165f
Validate plugin type
m8rge Sep 4, 2020
724105a
Add context to WriteSpan method
m8rge Sep 4, 2020
b3a893e
Return plugin capabilities according ArchiveImpl property
m8rge Sep 10, 2020
cdf3ec5
Apply changes from master
m8rge Sep 10, 2020
2c976ba
Extract PluginServices to shared package, introduce ClientPluginServi…
m8rge Sep 10, 2020
6490ecc
Improve error text
m8rge Sep 11, 2020
055c3a4
Rebase-related updates
m8rge Sep 14, 2020
5b120f7
Rename memoryStore to memoryStorePlugin
m8rge Sep 14, 2020
3c9ae7f
minor clean-up
yurishkuro Sep 15, 2020
0db7752
Return codes.NotFound in grpc plugin on missing trace
m8rge Sep 21, 2020
1762c3c
make fmt
yurishkuro Sep 21, 2020
b5e1db7
Handle spanstore.ErrTraceNotFound in grpcServer.GetArchiveTrace
m8rge Sep 22, 2020
980aa03
Undo unnecessary change
Sep 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions examples/memstore-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand All @@ -45,21 +46,37 @@ func main() {
opts := memory.Options{}
opts.InitFromViper(v)

grpc.Serve(&memoryStore{store: memory.NewStore()})
plugin := &memoryStorePlugin{
store: memory.NewStore(),
archiveStore: memory.NewStore(),
}
grpc.Serve(&shared.PluginServices{
Store: plugin,
ArchiveStore: plugin,
})
}

type memoryStore struct {
store *memory.Store
type memoryStorePlugin struct {
store *memory.Store
archiveStore *memory.Store
}

func (ns *memoryStore) DependencyReader() dependencystore.Reader {
func (ns *memoryStorePlugin) DependencyReader() dependencystore.Reader {
return ns.store
}

func (ns *memoryStore) SpanReader() spanstore.Reader {
func (ns *memoryStorePlugin) SpanReader() spanstore.Reader {
return ns.store
}

func (ns *memoryStore) SpanWriter() spanstore.Writer {
func (ns *memoryStorePlugin) SpanWriter() spanstore.Writer {
return ns.store
}

func (ns *memoryStorePlugin) ArchiveSpanReader() spanstore.Reader {
return ns.archiveStore
}

func (ns *memoryStorePlugin) ArchiveSpanWriter() spanstore.Writer {
return ns.archiveStore
}
38 changes: 31 additions & 7 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,19 @@ type Configuration struct {
PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"`
}

// ClientPluginServices defines services plugin can expose and its capabilities
type ClientPluginServices struct {
shared.PluginServices
Capabilities shared.PluginCapabilities
}

// PluginBuilder is used to create storage plugins. Implemented by Configuration.
type PluginBuilder interface {
Build() (shared.StoragePlugin, error)
Build() (*ClientPluginServices, error)
}

// Build instantiates a StoragePlugin
func (c *Configuration) Build() (shared.StoragePlugin, error) {
// Build instantiates a PluginServices
func (c *Configuration) Build() (*ClientPluginServices, error) {
// #nosec G204
cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile)

Expand All @@ -60,18 +66,36 @@ func (c *Configuration) Build() (shared.StoragePlugin, error) {

rpcClient, err := client.Client()
if err != nil {
return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %s", err)
return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %w", err)
}

raw, err := rpcClient.Dispense(shared.StoragePluginIdentifier)
if err != nil {
return nil, fmt.Errorf("unable to retrieve storage plugin instance: %s", err)
return nil, fmt.Errorf("unable to retrieve storage plugin instance: %w", err)
}

// in practice, the type of `raw` is *shared.grpcClient, and type casts below cannot fail
storagePlugin, ok := raw.(shared.StoragePlugin)
if !ok {
return nil, fmt.Errorf("unexpected type for plugin \"%s\"", shared.StoragePluginIdentifier)
return nil, fmt.Errorf("unable to cast %T to shared.StoragePlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
archiveStoragePlugin, ok := raw.(shared.ArchiveStoragePlugin)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.ArchiveStoragePlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
capabilities, ok := raw.(shared.PluginCapabilities)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.PluginCapabilities for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}

return storagePlugin, nil
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: storagePlugin,
ArchiveStore: archiveStoragePlugin,
},
Capabilities: capabilities,
}, nil
}
41 changes: 38 additions & 3 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand All @@ -36,7 +37,9 @@ type Factory struct {

builder config.PluginBuilder

store shared.StoragePlugin
store shared.StoragePlugin
archiveStore shared.ArchiveStoragePlugin
capabilities shared.PluginCapabilities
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -65,12 +68,14 @@ func (f *Factory) InitFromOptions(opts Options) {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

store, err := f.builder.Build()
services, err := f.builder.Build()
if err != nil {
return fmt.Errorf("grpc-plugin builder failed to create a store: %w", err)
}

f.store = store
f.store = services.Store
f.archiveStore = services.ArchiveStore
f.capabilities = services.Capabilities
logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration))
return nil
}
Expand All @@ -89,3 +94,33 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return f.store.DependencyReader(), nil
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.capabilities == nil {
return nil, storage.ErrArchiveStorageNotSupported
}
capabilities, err := f.capabilities.Capabilities()
if err != nil {
return nil, err
}
if capabilities == nil || !capabilities.ArchiveSpanReader {
return nil, storage.ErrArchiveStorageNotSupported
}
return f.archiveStore.ArchiveSpanReader(), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if f.capabilities == nil {
return nil, storage.ErrArchiveStorageNotSupported
}
capabilities, err := f.capabilities.Capabilities()
if err != nil {
return nil, err
}
if capabilities == nil || !capabilities.ArchiveSpanWriter {
return nil, storage.ErrArchiveStorageNotSupported
}
return f.archiveStore.ArchiveSpanWriter(), nil
}
121 changes: 118 additions & 3 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/jaegertracing/jaeger/pkg/config"
grpcConfig "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/mocks"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand All @@ -41,19 +42,41 @@ type mockPluginBuilder struct {
err error
}

func (b *mockPluginBuilder) Build() (shared.StoragePlugin, error) {
func (b *mockPluginBuilder) Build() (*grpcConfig.ClientPluginServices, error) {
if b.err != nil {
return nil, b.err
}
return b.plugin, nil

return &grpcConfig.ClientPluginServices{
PluginServices: shared.PluginServices{
Store: b.plugin,
ArchiveStore: b.plugin,
},
Capabilities: b.plugin,
}, nil
}

type mockPlugin struct {
spanReader spanstore.Reader
spanWriter spanstore.Writer
archiveReader spanstore.Reader
archiveWriter spanstore.Writer
capabilities shared.PluginCapabilities
dependencyReader dependencystore.Reader
}

func (mp *mockPlugin) Capabilities() (*shared.Capabilities, error) {
return mp.capabilities.Capabilities()
}

func (mp *mockPlugin) ArchiveSpanReader() spanstore.Reader {
return mp.archiveReader
}

func (mp *mockPlugin) ArchiveSpanWriter() spanstore.Writer {
return mp.archiveWriter
}

func (mp *mockPlugin) SpanReader() spanstore.Reader {
return mp.spanReader
}
Expand Down Expand Up @@ -84,6 +107,9 @@ func TestGRPCStorageFactory(t *testing.T) {
plugin: &mockPlugin{
spanWriter: new(spanStoreMocks.Writer),
spanReader: new(spanStoreMocks.Reader),
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
capabilities: new(mocks.PluginCapabilities),
dependencyReader: new(dependencyStoreMocks.Reader),
},
}
Expand All @@ -101,14 +127,103 @@ func TestGRPCStorageFactory(t *testing.T) {
assert.Equal(t, f.store.DependencyReader(), depReader)
}

func TestGRPCStorageFactory_Capabilities(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

capabilities := new(mocks.PluginCapabilities)
capabilities.On("Capabilities").
Return(&shared.Capabilities{
ArchiveSpanReader: true,
ArchiveSpanWriter: true,
}, nil)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

assert.NotNil(t, f.store)
reader, err := f.CreateArchiveSpanReader()
assert.NoError(t, err)
assert.NotNil(t, reader)
writer, err := f.CreateArchiveSpanWriter()
assert.NoError(t, err)
assert.NotNil(t, writer)
}

func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

capabilities := new(mocks.PluginCapabilities)
capabilities.On("Capabilities").
Return(&shared.Capabilities{
ArchiveSpanReader: false,
ArchiveSpanWriter: false,
}, nil)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

assert.NotNil(t, f.store)
reader, err := f.CreateArchiveSpanReader()
assert.EqualError(t, err, storage.ErrArchiveStorageNotSupported.Error())
assert.Nil(t, reader)
writer, err := f.CreateArchiveSpanWriter()
assert.EqualError(t, err, storage.ErrArchiveStorageNotSupported.Error())
assert.Nil(t, writer)
}

func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

capabilities := new(mocks.PluginCapabilities)
customError := errors.New("made-up error")
capabilities.On("Capabilities").
Return(nil, customError)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

assert.NotNil(t, f.store)
reader, err := f.CreateArchiveSpanReader()
assert.EqualError(t, err, customError.Error())
assert.Nil(t, reader)
writer, err := f.CreateArchiveSpanWriter()
assert.EqualError(t, err, customError.Error())
assert.Nil(t, writer)
}

func TestWithConfiguration(t *testing.T) {
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{
err := command.ParseFlags([]string{
"--grpc-storage-plugin.binary=noop-grpc-plugin",
"--grpc-storage-plugin.configuration-file=config.json",
"--grpc-storage-plugin.log-level=debug",
})
assert.NoError(t, err)
f.InitFromViper(v)
assert.Equal(t, f.options.Configuration.PluginBinary, "noop-grpc-plugin")
assert.Equal(t, f.options.Configuration.PluginConfigurationFile, "config.json")
Expand Down
10 changes: 6 additions & 4 deletions plugin/storage/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@ import (
)

// Serve creates a plugin configuration using the implementation of StoragePlugin and then serves it.
func Serve(implementation shared.StoragePlugin) {
ServeWithGRPCServer(implementation, plugin.DefaultGRPCServer)
func Serve(services *shared.PluginServices) {
ServeWithGRPCServer(services, plugin.DefaultGRPCServer)
}

// ServeWithGRPCServer creates a plugin configuration using the implementation of StoragePlugin and
// function to create grpcServer, and then serves it.
func ServeWithGRPCServer(implementation shared.StoragePlugin, grpcServer func([]grpc.ServerOption) *grpc.Server) {
func ServeWithGRPCServer(services *shared.PluginServices, grpcServer func([]grpc.ServerOption) *grpc.Server,
) {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: shared.Handshake,
VersionedPlugins: map[int]plugin.PluginSet{
1: map[string]plugin.Plugin{
shared.StoragePluginIdentifier: &shared.StorageGRPCPlugin{
Impl: implementation,
Impl: services.Store,
ArchiveImpl: services.ArchiveStore,
},
},
},
Expand Down
Loading