Skip to content

Commit b8baa6c

Browse files
authored
added stream aggregation support for vmagent and vmsingle operator resources (#605)
* added stream aggregation support for vmagent and vmsingle operator resources * added stream aggregation support for vmagent and vmsingle operator resources * fix review comments
1 parent d2c5ddb commit b8baa6c

14 files changed

+1187
-6
lines changed

api/v1beta1/additional.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,3 +372,81 @@ type ConfigMapKeyReference struct {
372372
// The ConfigMap key to refer to.
373373
Key string `json:"key"`
374374
}
375+
376+
// StreamAggrConfig defines the stream aggregation config
377+
// +k8s:openapi-gen=true
378+
type StreamAggrConfig struct {
379+
// Stream aggregation rules
380+
Rules []StreamAggrRule `json:"rules"`
381+
// Allows writing both raw and aggregate data
382+
// +optional
383+
KeepInput bool `json:"keepInput,omitempty"`
384+
// Allows setting different de-duplication intervals per each configured remote storage
385+
// +optional
386+
DedupInterval string `json:"dedupInterval,omitempty"`
387+
}
388+
389+
// StreamAggrRule defines the rule in stream aggregation config
390+
// +k8s:openapi-gen=true
391+
type StreamAggrRule struct {
392+
// Match is a label selector for filtering time series for the given selector.
393+
//
394+
// If the match isn't set, then all the input time series are processed.
395+
// +optional
396+
Match string `json:"match,omitempty" yaml:"match,omitempty"`
397+
398+
// Interval is the interval between aggregations.
399+
Interval string `json:"interval" yaml:"interval"`
400+
401+
// Outputs is a list of output aggregate functions to produce.
402+
//
403+
// The following names are allowed:
404+
//
405+
// - total - aggregates input counters
406+
// - increase - counts the increase over input counters
407+
// - count_series - counts the input series
408+
// - count_samples - counts the input samples
409+
// - sum_samples - sums the input samples
410+
// - last - the last biggest sample value
411+
// - min - the minimum sample value
412+
// - max - the maximum sample value
413+
// - avg - the average value across all the samples
414+
// - stddev - standard deviation across all the samples
415+
// - stdvar - standard variance across all the samples
416+
// - histogram_bucket - creates VictoriaMetrics histogram for input samples
417+
// - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1]
418+
//
419+
// The output time series will have the following names:
420+
//
421+
// input_name:aggr_<interval>_<output>
422+
//
423+
Outputs []string `json:"outputs"`
424+
425+
// By is an optional list of labels for grouping input series.
426+
//
427+
// See also Without.
428+
//
429+
// If neither By nor Without are set, then the Outputs are calculated
430+
// individually per each input time series.
431+
// +optional
432+
By []string `json:"by,omitempty" yaml:"by,omitempty""`
433+
434+
// Without is an optional list of labels, which must be excluded when grouping input series.
435+
//
436+
// See also By.
437+
//
438+
// If neither By nor Without are set, then the Outputs are calculated
439+
// individually per each input time series.
440+
// +optional
441+
Without []string `json:"without,omitempty" yaml:"without,omitempty"`
442+
443+
// InputRelabelConfigs is an optional relabeling rules, which are applied on the input
444+
// before aggregation.
445+
// +optional
446+
InputRelabelConfigs []RelabelConfig `json:"input_relabel_configs,omitempty" yaml:"input_relabel_configs,omitempty"`
447+
448+
// OutputRelabelConfigs is an optional relabeling rules, which are applied
449+
// on the aggregated output before being sent to remote storage.
450+
// +optional
451+
OutputRelabelConfigs []RelabelConfig `json:"output_relabel_configs,omitempty" yaml:"output_relabel_configs,omitempty"`
452+
}

api/v1beta1/vmagent_types.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,9 @@ type VMAgentRemoteWriteSpec struct {
458458
// vmagent supports since 1.79.0 version
459459
// +optional
460460
Headers []string `json:"headers,omitempty"`
461+
// StreamAggrConfig defines stream aggregation configuration for VMAgent for -remoteWrite.url
462+
// +optional
463+
StreamAggrConfig *StreamAggrConfig `json:"streamAggrConfig,omitempty"`
461464
}
462465

463466
// AsMapKey key for internal cache map
@@ -470,6 +473,16 @@ func (rw *VMAgentRemoteWriteSpec) AsSecretKey(idx int, suffix string) string {
470473
return fmt.Sprintf("RWS_%d-SECRET-%s", idx, strings.ToUpper(suffix))
471474
}
472475

476+
// AsConfigMapKey key for kubernetes configmap
477+
func (rw *VMAgentRemoteWriteSpec) AsConfigMapKey(idx int, suffix string) string {
478+
return fmt.Sprintf("RWS_%d-CM-%s", idx, strings.ToUpper(suffix))
479+
}
480+
481+
// HasStreamAggr returns true if stream aggregation is enabled for this remoteWrite
482+
func (rw *VMAgentRemoteWriteSpec) HasStreamAggr() bool {
483+
return rw.StreamAggrConfig != nil && len(rw.StreamAggrConfig.Rules) > 0
484+
}
485+
473486
// VmAgentStatus defines the observed state of VmAgent
474487
// +k8s:openapi-gen=true
475488
type VMAgentStatus struct {
@@ -588,6 +601,10 @@ func (cr VMAgent) RelabelingAssetName() string {
588601
return fmt.Sprintf("relabelings-assets-vmagent-%s", cr.Name)
589602
}
590603

604+
func (cr VMAgent) StreamAggrConfigName() string {
605+
return fmt.Sprintf("stream-aggr-vmagent-%s", cr.Name)
606+
}
607+
591608
func (cr VMAgent) HealthPath() string {
592609
return buildPathWithPrefixFlag(cr.Spec.ExtraArgs, healthPath)
593610
}

api/v1beta1/vmsingle_types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ type VMSingleSpec struct {
187187
TerminationGracePeriodSeconds *int64 `json:"terminationGracePeriodSeconds,omitempty"`
188188
// ReadinessGates defines pod readiness gates
189189
ReadinessGates []v1.PodReadinessGate `json:"readinessGates,omitempty"`
190+
// StreamAggrConfig defines stream aggregation configuration for VMSingle
191+
StreamAggrConfig *StreamAggrConfig `json:"streamAggrConfig,omitempty"`
190192
}
191193

192194
// UnmarshalJSON implements json.Unmarshaler interface
@@ -319,6 +321,10 @@ func (cr VMSingle) PrefixedName() string {
319321
return fmt.Sprintf("vmsingle-%s", cr.Name)
320322
}
321323

324+
func (cr VMSingle) StreamAggrConfigName() string {
325+
return fmt.Sprintf("stream-aggr-vmsingle-%s", cr.Name)
326+
}
327+
322328
func (cr VMSingle) MetricPath() string {
323329
return buildPathWithPrefixFlag(cr.Spec.ExtraArgs, metricPath)
324330
}

api/v1beta1/zz_generated.deepcopy.go

Lines changed: 76 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/victoriametrics/v1beta1/additional.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,3 +372,81 @@ type ConfigMapKeyReference struct {
372372
// The ConfigMap key to refer to.
373373
Key string `json:"key"`
374374
}
375+
376+
// StreamAggrConfig defines the stream aggregation config
377+
// +k8s:openapi-gen=true
378+
type StreamAggrConfig struct {
379+
// Stream aggregation rules
380+
Rules []StreamAggrRule `json:"rules"`
381+
// Allows writing both raw and aggregate data
382+
// +optional
383+
KeepInput bool `json:"keepInput,omitempty"`
384+
// Allows setting different de-duplication intervals per each configured remote storage
385+
// +optional
386+
DedupInterval string `json:"dedupInterval,omitempty"`
387+
}
388+
389+
// StreamAggrRule defines the rule in stream aggregation config
390+
// +k8s:openapi-gen=true
391+
type StreamAggrRule struct {
392+
// Match is a label selector for filtering time series for the given selector.
393+
//
394+
// If the match isn't set, then all the input time series are processed.
395+
// +optional
396+
Match string `json:"match,omitempty" yaml:"match,omitempty"`
397+
398+
// Interval is the interval between aggregations.
399+
Interval string `json:"interval" yaml:"interval"`
400+
401+
// Outputs is a list of output aggregate functions to produce.
402+
//
403+
// The following names are allowed:
404+
//
405+
// - total - aggregates input counters
406+
// - increase - counts the increase over input counters
407+
// - count_series - counts the input series
408+
// - count_samples - counts the input samples
409+
// - sum_samples - sums the input samples
410+
// - last - the last biggest sample value
411+
// - min - the minimum sample value
412+
// - max - the maximum sample value
413+
// - avg - the average value across all the samples
414+
// - stddev - standard deviation across all the samples
415+
// - stdvar - standard variance across all the samples
416+
// - histogram_bucket - creates VictoriaMetrics histogram for input samples
417+
// - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1]
418+
//
419+
// The output time series will have the following names:
420+
//
421+
// input_name:aggr_<interval>_<output>
422+
//
423+
Outputs []string `json:"outputs"`
424+
425+
// By is an optional list of labels for grouping input series.
426+
//
427+
// See also Without.
428+
//
429+
// If neither By nor Without are set, then the Outputs are calculated
430+
// individually per each input time series.
431+
// +optional
432+
By []string `json:"by,omitempty" yaml:"by,omitempty""`
433+
434+
// Without is an optional list of labels, which must be excluded when grouping input series.
435+
//
436+
// See also By.
437+
//
438+
// If neither By nor Without are set, then the Outputs are calculated
439+
// individually per each input time series.
440+
// +optional
441+
Without []string `json:"without,omitempty" yaml:"without,omitempty"`
442+
443+
// InputRelabelConfigs is an optional relabeling rules, which are applied on the input
444+
// before aggregation.
445+
// +optional
446+
InputRelabelConfigs []RelabelConfig `json:"input_relabel_configs,omitempty" yaml:"input_relabel_configs,omitempty"`
447+
448+
// OutputRelabelConfigs is an optional relabeling rules, which are applied
449+
// on the aggregated output before being sent to remote storage.
450+
// +optional
451+
OutputRelabelConfigs []RelabelConfig `json:"output_relabel_configs,omitempty" yaml:"output_relabel_configs,omitempty"`
452+
}

api/victoriametrics/v1beta1/vmagent_types.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,9 @@ type VMAgentRemoteWriteSpec struct {
458458
// vmagent supports since 1.79.0 version
459459
// +optional
460460
Headers []string `json:"headers,omitempty"`
461+
// StreamAggrConfig defines stream aggregation configuration for VMAgent for -remoteWrite.url
462+
// +optional
463+
StreamAggrConfig *StreamAggrConfig `json:"streamAggrConfig,omitempty"`
461464
}
462465

463466
// AsMapKey key for internal cache map
@@ -470,6 +473,16 @@ func (rw *VMAgentRemoteWriteSpec) AsSecretKey(idx int, suffix string) string {
470473
return fmt.Sprintf("RWS_%d-SECRET-%s", idx, strings.ToUpper(suffix))
471474
}
472475

476+
// AsConfigMapKey key for kubernetes configmap
477+
func (rw *VMAgentRemoteWriteSpec) AsConfigMapKey(idx int, suffix string) string {
478+
return fmt.Sprintf("RWS_%d-CM-%s", idx, strings.ToUpper(suffix))
479+
}
480+
481+
// HasStreamAggr returns true if stream aggregation is enabled for this remoteWrite
482+
func (rw *VMAgentRemoteWriteSpec) HasStreamAggr() bool {
483+
return rw.StreamAggrConfig != nil && len(rw.StreamAggrConfig.Rules) > 0
484+
}
485+
473486
// VmAgentStatus defines the observed state of VmAgent
474487
// +k8s:openapi-gen=true
475488
type VMAgentStatus struct {
@@ -588,6 +601,10 @@ func (cr VMAgent) RelabelingAssetName() string {
588601
return fmt.Sprintf("relabelings-assets-vmagent-%s", cr.Name)
589602
}
590603

604+
func (cr VMAgent) StreamAggrConfigName() string {
605+
return fmt.Sprintf("stream-aggr-vmagent-%s", cr.Name)
606+
}
607+
591608
func (cr VMAgent) HealthPath() string {
592609
return buildPathWithPrefixFlag(cr.Spec.ExtraArgs, healthPath)
593610
}

api/victoriametrics/v1beta1/vmsingle_types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ type VMSingleSpec struct {
187187
TerminationGracePeriodSeconds *int64 `json:"terminationGracePeriodSeconds,omitempty"`
188188
// ReadinessGates defines pod readiness gates
189189
ReadinessGates []v1.PodReadinessGate `json:"readinessGates,omitempty"`
190+
// StreamAggrConfig defines stream aggregation configuration for VMSingle
191+
StreamAggrConfig *StreamAggrConfig `json:"streamAggrConfig,omitempty"`
190192
}
191193

192194
// UnmarshalJSON implements json.Unmarshaler interface
@@ -319,6 +321,10 @@ func (cr VMSingle) PrefixedName() string {
319321
return fmt.Sprintf("vmsingle-%s", cr.Name)
320322
}
321323

324+
func (cr VMSingle) StreamAggrConfigName() string {
325+
return fmt.Sprintf("stream-aggr-vmsingle-%s", cr.Name)
326+
}
327+
322328
func (cr VMSingle) MetricPath() string {
323329
return buildPathWithPrefixFlag(cr.Spec.ExtraArgs, metricPath)
324330
}

0 commit comments

Comments
 (0)