Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pubsub-lib/JetStreamUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ type NatsTopic struct {
}
type ConfigJson struct {
// StreamConfigJson is a json string of map[string]NatsStreamConfig
StreamConfigJson string `env:"STREAM_CONFIG_JSON"`
StreamConfigJson string `env:"STREAM_CONFIG_JSON" description:"This is json map of nats stream configurations per stream. eg: {"ORCHESTRATOR":{"max_age":3600,"replicas":1}}"`

// ConsumerConfigJson is a json string of map[string]NatsConsumerConfig
// eg: "{\"ARGO_PIPELINE_STATUS_UPDATE_DURABLE-1\" : \"{\"natsMsgProcessingBatchSize\" : 3, \"natsMsgBufferSize\" : 3, \"ackWaitInSecs\": 300}\"}"
ConsumerConfigJson string `env:"CONSUMER_CONFIG_JSON"`
ConsumerConfigJson string `env:"CONSUMER_CONFIG_JSON" description:"ConsumerConfigJson is a json string of map[string]NatsConsumerConfig" `

}

var natsTopicMapping = map[string]NatsTopic{
Expand Down
11 changes: 6 additions & 5 deletions pubsub-lib/NatsClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type NatsClientConfig struct {

// consumer wise
// NatsMsgProcessingBatchSize is the number of messages that will be processed in one go
NatsMsgProcessingBatchSize int `env:"NATS_MSG_PROCESSING_BATCH_SIZE" envDefault:"1"`
NatsMsgProcessingBatchSize int `env:"NATS_MSG_PROCESSING_BATCH_SIZE" envDefault:"1" description:"NatsMsgProcessingBatchSize is the number of messages that will be processed in one go"`


// NatsMsgBufferSize is the number of messages that will be buffered in memory (channel size)
// it is recommended to set this value equal to NatsMsgProcessingBatchSize as we want to process maximum messages in the buffer in one go.
Expand All @@ -45,10 +46,10 @@ type NatsClientConfig struct {
// NatsMsgBufferSize can be configured independently of NatsMsgProcessingBatchSize if needed by setting its value to positive value in env.
// if NatsMsgBufferSize set to a non-positive value then it will take the value of NatsMsgProcessingBatchSize.
// Note: always get this value by calling GetNatsMsgBufferSize method
NatsMsgBufferSize int `env:"NATS_MSG_BUFFER_SIZE" envDefault:"-1"`
NatsMsgMaxAge int `env:"NATS_MSG_MAX_AGE" envDefault:"86400"`
NatsMsgAckWaitInSecs int `env:"NATS_MSG_ACK_WAIT_IN_SECS" envDefault:"120"`
NatsMsgReplicas int `env:"NATS_MSG_REPLICAS" envDefault:"0"`
NatsMsgBufferSize int `env:"NATS_MSG_BUFFER_SIZE" envDefault:"-1" description:"NatsMsgBufferSize is the number of messages that will be buffered in memory (channel size)"`
NatsMsgMaxAge int `env:"NATS_MSG_MAX_AGE" envDefault:"86400" description:"Age for the message to persist"`
NatsMsgAckWaitInSecs int `env:"NATS_MSG_ACK_WAIT_IN_SECS" envDefault:"120" description:"Time to wait for acknowledging the message"`
NatsMsgReplicas int `env:"NATS_MSG_REPLICAS" envDefault:"0" description:"Replica count for runnings nats instance"`
}

func (ncc NatsClientConfig) GetNatsMsgBufferSize() int {
Expand Down
4 changes: 2 additions & 2 deletions utils/grpc/GrpcConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package grpc
import "github.com/caarlos0/env"

type Configuration struct {
KubelinkMaxRecvMsgSize int `env:"KUBELINK_GRPC_MAX_RECEIVE_MSG_SIZE" envDefault:"20"` // In mb
KubelinkMaxSendMsgSize int `env:"KUBELINK_GRPC_MAX_SEND_MSG_SIZE" envDefault:"4"` // In mb
KubelinkMaxRecvMsgSize int `env:"KUBELINK_GRPC_MAX_RECEIVE_MSG_SIZE" envDefault:"20" description:"Message size to receive from kubelink"` // In mb
KubelinkMaxSendMsgSize int `env:"KUBELINK_GRPC_MAX_SEND_MSG_SIZE" envDefault:"4" description:"Message size to send to kubelink"` // In mb
}

func GetConfiguration() (*Configuration, error) {
Expand Down
15 changes: 8 additions & 7 deletions utils/k8s/bean.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,19 @@ func GetResourceKey(obj *unstructured.Unstructured) ResourceKey {
}

type CustomK8sHttpTransportConfig struct {
UseCustomTransport bool `env:"USE_CUSTOM_HTTP_TRANSPORT" envDefault:"false"`
TimeOut int `env:"K8s_TCP_TIMEOUT" envDefault:"30"`
KeepAlive int `env:"K8s_TCP_KEEPALIVE" envDefault:"30"`
TLSHandshakeTimeout int `env:"K8s_TLS_HANDSHAKE_TIMEOUT" envDefault:"10"`
MaxIdleConnsPerHost int `env:"K8s_CLIENT_MAX_IDLE_CONNS_PER_HOST" envDefault:"25"`
IdleConnTimeout int `env:"K8s_TCP_IDLE_CONN_TIMEOUT" envDefault:"300"`
UseCustomTransport bool `env:"USE_CUSTOM_HTTP_TRANSPORT" envDefault:"false" description:"There is an issue in updating cluster bearer token with same cluster url due to transport layer token caching in k8s client lib. so we added a custom transport while calling k8s api server. this flag controls the usage of this transport.(custom or k8s)"`
TimeOut int `env:"K8s_TCP_TIMEOUT" envDefault:"30" description:"Is the maximum amount of time a dial will wait for a connect to complete, required only if USE_CUSTOM_HTTP_TRANSPORT is true."`
KeepAlive int `env:"K8s_TCP_KEEPALIVE" envDefault:"30" description:"Specifies the interval between keep-alive probes for an active, required only if USE_CUSTOM_HTTP_TRANSPORT is true."`
TLSHandshakeTimeout int `env:"K8s_TLS_HANDSHAKE_TIMEOUT" envDefault:"10" description:"Specifies the maximum amount of time to wait for a TLS handshake. Zero means no timeout, required only if USE_CUSTOM_HTTP_TRANSPORT is true."`
MaxIdleConnsPerHost int `env:"K8s_CLIENT_MAX_IDLE_CONNS_PER_HOST" envDefault:"25" description:"k8s client max idle connections per host"`
IdleConnTimeout int `env:"K8s_TCP_IDLE_CONN_TIMEOUT" envDefault:"300" description:"Maximum amount of time an idle (keep-alive) connection will remain idle before closing itself, required only if USE_CUSTOM_HTTP_TRANSPORT is true."`
}

type LocalDevMode bool

type RuntimeConfig struct {
LocalDevMode LocalDevMode `env:"RUNTIME_CONFIG_LOCAL_DEV" envDefault:"false"`
LocalDevMode LocalDevMode `env:"RUNTIME_CONFIG_LOCAL_DEV" envDefault:"false" description: "Used in local development process. Allows to read from local kube config file path for default cluster operations."`

}

func GetRuntimeConfig() (*RuntimeConfig, error) {
Expand Down