@@ -14,16 +14,39 @@ limitations under the License.
1414package datamover
1515
1616import (
17+ "context"
1718 "fmt"
19+ "os"
1820 "strings"
1921 "time"
2022
23+ "github.com/pkg/errors"
2124 "github.com/sirupsen/logrus"
2225 "github.com/spf13/cobra"
26+ v1 "k8s.io/api/core/v1"
27+ "k8s.io/apimachinery/pkg/fields"
28+ "k8s.io/apimachinery/pkg/runtime"
29+ "k8s.io/client-go/kubernetes"
30+ "sigs.k8s.io/controller-runtime/pkg/log/zap"
2331
32+ "github.com/vmware-tanzu/velero/internal/credentials"
2433 "github.com/vmware-tanzu/velero/pkg/buildinfo"
2534 "github.com/vmware-tanzu/velero/pkg/client"
35+ "github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
36+ "github.com/vmware-tanzu/velero/pkg/datamover"
37+ "github.com/vmware-tanzu/velero/pkg/datapath"
38+ "github.com/vmware-tanzu/velero/pkg/repository"
39+ "github.com/vmware-tanzu/velero/pkg/uploader"
40+ "github.com/vmware-tanzu/velero/pkg/util/filesystem"
2641 "github.com/vmware-tanzu/velero/pkg/util/logging"
42+
43+ ctrl "sigs.k8s.io/controller-runtime"
44+
45+ velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
46+ velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
47+
48+ ctlcache "sigs.k8s.io/controller-runtime/pkg/cache"
49+ ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
2750)
2851
2952type dataMoverBackupConfig struct {
@@ -52,7 +75,10 @@ func NewBackupCommand(f client.Factory) *cobra.Command {
5275 logger .Infof ("Starting Velero data-mover backup %s (%s)" , buildinfo .Version , buildinfo .FormattedGitSHA ())
5376
5477 f .SetBasename (fmt .Sprintf ("%s-%s" , c .Parent ().Name (), c .Name ()))
55- s := newdataMoverBackup (logger , config )
78+ s , err := newdataMoverBackup (logger , f , config )
79+ if err != nil {
80+ exitWithMessage (logger , false , "Failed to create data mover backup, %v" , err )
81+ }
5682
5783 s .run ()
5884 },
@@ -73,20 +99,186 @@ func NewBackupCommand(f client.Factory) *cobra.Command {
7399 return command
74100}
75101
102+ const (
103+ // defaultCredentialsDirectory is the path on disk where credential
104+ // files will be written to
105+ defaultCredentialsDirectory = "/tmp/credentials"
106+ )
107+
76108type dataMoverBackup struct {
77- logger logrus.FieldLogger
78- config dataMoverBackupConfig
109+ logger logrus.FieldLogger
110+ ctx context.Context
111+ cancelFunc context.CancelFunc
112+ client ctlclient.Client
113+ cache ctlcache.Cache
114+ namespace string
115+ nodeName string
116+ config dataMoverBackupConfig
117+ kubeClient kubernetes.Interface
118+ dataPathMgr * datapath.Manager
79119}
80120
81- func newdataMoverBackup (logger logrus.FieldLogger , config dataMoverBackupConfig ) * dataMoverBackup {
121+ func newdataMoverBackup (logger logrus.FieldLogger , factory client.Factory , config dataMoverBackupConfig ) (* dataMoverBackup , error ) {
122+ ctx , cancelFunc := context .WithCancel (context .Background ())
123+
124+ clientConfig , err := factory .ClientConfig ()
125+ if err != nil {
126+ cancelFunc ()
127+ return nil , errors .Wrap (err , "error to create client config" )
128+ }
129+
130+ ctrl .SetLogger (zap .New (zap .UseDevMode (true )))
131+
132+ scheme := runtime .NewScheme ()
133+ if err := velerov1api .AddToScheme (scheme ); err != nil {
134+ cancelFunc ()
135+ return nil , errors .Wrap (err , "error to add velero v1 scheme" )
136+ }
137+
138+ if err := velerov2alpha1api .AddToScheme (scheme ); err != nil {
139+ cancelFunc ()
140+ return nil , errors .Wrap (err , "error to add velero v2alpha1 scheme" )
141+ }
142+
143+ if err := v1 .AddToScheme (scheme ); err != nil {
144+ cancelFunc ()
145+ return nil , errors .Wrap (err , "error to add core v1 scheme" )
146+ }
147+
148+ nodeName := os .Getenv ("NODE_NAME" )
149+
150+ // use a field selector to filter to only pods scheduled on this node.
151+ cacheOption := ctlcache.Options {
152+ Scheme : scheme ,
153+ ByObject : map [ctlclient.Object ]ctlcache.ByObject {
154+ & v1.Pod {}: {
155+ Field : fields.Set {"spec.nodeName" : nodeName }.AsSelector (),
156+ },
157+ & velerov2alpha1api.DataUpload {}: {
158+ Field : fields.Set {"metadata.namespace" : factory .Namespace ()}.AsSelector (),
159+ },
160+ },
161+ }
162+
163+ cli , err := ctlclient .New (clientConfig , ctlclient.Options {
164+ Scheme : scheme ,
165+ })
166+ if err != nil {
167+ cancelFunc ()
168+ return nil , errors .Wrap (err , "error to create client" )
169+ }
170+
171+ cache , err := ctlcache .New (clientConfig , cacheOption )
172+ if err != nil {
173+ cancelFunc ()
174+ return nil , errors .Wrap (err , "error to create client cache" )
175+ }
176+
82177 s := & dataMoverBackup {
83- logger : logger ,
84- config : config ,
178+ logger : logger ,
179+ ctx : ctx ,
180+ cancelFunc : cancelFunc ,
181+ client : cli ,
182+ cache : cache ,
183+ config : config ,
184+ namespace : factory .Namespace (),
185+ nodeName : nodeName ,
186+ }
187+
188+ s .kubeClient , err = factory .KubeClient ()
189+ if err != nil {
190+ cancelFunc ()
191+ return nil , errors .Wrap (err , "error to create kube client" )
85192 }
86193
87- return s
194+ s .dataPathMgr = datapath .NewManager (1 )
195+
196+ return s , nil
88197}
89198
199+ var funcExitWithMessage = exitWithMessage
200+ var funcCreateDataPathService = (* dataMoverBackup ).createDataPathService
201+
90202func (s * dataMoverBackup ) run () {
91- time .Sleep (time .Duration (1 << 63 - 1 ))
203+ signals .CancelOnShutdown (s .cancelFunc , s .logger )
204+ go func () {
205+ if err := s .cache .Start (s .ctx ); err != nil {
206+ s .logger .WithError (err ).Warn ("error starting cache" )
207+ }
208+ }()
209+
210+ s .runDataPath ()
211+ }
212+
213+ func (s * dataMoverBackup ) runDataPath () {
214+ s .logger .Infof ("Starting micro service in node %s for du %s" , s .nodeName , s .config .duName )
215+
216+ dpService , err := funcCreateDataPathService (s )
217+ if err != nil {
218+ s .cancelFunc ()
219+ funcExitWithMessage (s .logger , false , "Failed to create data path service for DataUpload %s: %v" , s .config .duName , err )
220+ return
221+ }
222+
223+ s .logger .Infof ("Starting data path service %s" , s .config .duName )
224+
225+ err = dpService .Init ()
226+ if err != nil {
227+ s .cancelFunc ()
228+ funcExitWithMessage (s .logger , false , "Failed to init data path service for DataUpload %s: %v" , s .config .duName , err )
229+ return
230+ }
231+
232+ s .logger .Infof ("Running data path service %s" , s .config .duName )
233+
234+ result , err := dpService .RunCancelableDataPath (s .ctx )
235+ if err != nil {
236+ s .cancelFunc ()
237+ funcExitWithMessage (s .logger , false , "Failed to run data path service for DataUpload %s: %v" , s .config .duName , err )
238+ return
239+ }
240+
241+ s .logger .WithField ("du" , s .config .duName ).Info ("Data path service completed" )
242+
243+ dpService .Shutdown ()
244+
245+ s .logger .WithField ("du" , s .config .duName ).Info ("Data path service is shut down" )
246+
247+ s .cancelFunc ()
248+
249+ funcExitWithMessage (s .logger , true , result )
250+ }
251+
252+ var funcNewCredentialFileStore = credentials .NewNamespacedFileStore
253+ var funcNewCredentialSecretStore = credentials .NewNamespacedSecretStore
254+
255+ func (s * dataMoverBackup ) createDataPathService () (dataPathService , error ) {
256+ credentialFileStore , err := funcNewCredentialFileStore (
257+ s .client ,
258+ s .namespace ,
259+ defaultCredentialsDirectory ,
260+ filesystem .NewFileSystem (),
261+ )
262+ if err != nil {
263+ return nil , errors .Wrapf (err , "error to create credential file store" )
264+ }
265+
266+ credSecretStore , err := funcNewCredentialSecretStore (s .client , s .namespace )
267+ if err != nil {
268+ return nil , errors .Wrapf (err , "error to create credential secret store" )
269+ }
270+
271+ credGetter := & credentials.CredentialGetter {FromFile : credentialFileStore , FromSecret : credSecretStore }
272+
273+ duInformer , err := s .cache .GetInformer (s .ctx , & velerov2alpha1api.DataUpload {})
274+ if err != nil {
275+ return nil , errors .Wrap (err , "error to get controller-runtime informer from manager" )
276+ }
277+
278+ repoEnsurer := repository .NewEnsurer (s .client , s .logger , s .config .resourceTimeout )
279+
280+ return datamover .NewBackupMicroService (s .ctx , s .client , s .kubeClient , s .config .duName , s .namespace , s .nodeName , datapath.AccessPoint {
281+ ByPath : s .config .volumePath ,
282+ VolMode : uploader .PersistentVolumeMode (s .config .volumeMode ),
283+ }, s .dataPathMgr , repoEnsurer , credGetter , duInformer , s .logger ), nil
92284}
0 commit comments