Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
KibanaSessionSecretName = "kibana-session-secret" //nolint:gosec

LegacySecureforward = "_LEGACY_SECUREFORWARD"
LegacySyslog = "_LEGACY_SYSLOG"
LegacySyslog = "_LEGACY_SYSLOG"
)

var ReconcileForGlobalProxyList = []string{FluentdTrustedCAName}
7 changes: 4 additions & 3 deletions pkg/generators/forwarding/fluentd/generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
)

//ConfigGenerator is a config generator for fluentd
type ConfigGenerator struct {
*generators.Generator
Expand Down Expand Up @@ -67,12 +68,12 @@ func (engine *ConfigGenerator) Generate(clfSpec *logging.ClusterLogForwarderSpec
logging.InputNameApplication,
logging.InputNameAudit,
)
for _, logType := range inputs.List(){
for _, logType := range inputs.List() {
if engine.includeLegacySyslogConfig {
routeMap.Insert(logType,constants.LegacySyslog)
routeMap.Insert(logType, constants.LegacySyslog)
}
if engine.includeLegacyForwardConfig {
routeMap.Insert(logType,constants.LegacySecureforward)
routeMap.Insert(logType, constants.LegacySecureforward)
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/k8shandler/forwarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ func (clusterRequest *ClusterLoggingRequest) NormalizeForwarder() (*logging.Clus
if clusterRequest.Cluster.Spec.LogStore != nil && clusterRequest.Cluster.Spec.LogStore.Type == logging.LogStoreTypeElasticsearch {
log.V(2).Info("ClusterLogForwarder forwarding to default store")
defaultPipeline := logging.PipelineSpec{
InputRefs: []string{logging.InputNameApplication, logging.InputNameInfrastructure},
OutputRefs: []string{logging.OutputNameDefault},
}
clusterRequest.ForwarderSpec.Pipelines = []logging.PipelineSpec{ defaultPipeline }
InputRefs: []string{logging.InputNameApplication, logging.InputNameInfrastructure},
OutputRefs: []string{logging.OutputNameDefault},
}
clusterRequest.ForwarderSpec.Pipelines = []logging.PipelineSpec{defaultPipeline}
if clusterRequest.includeLegacySyslogConfig() {
defaultPipeline.OutputRefs = append(defaultPipeline.OutputRefs,constants.LegacySyslog)
defaultPipeline.OutputRefs = append(defaultPipeline.OutputRefs, constants.LegacySyslog)
}
if clusterRequest.includeLegacyForwardConfig() {
defaultPipeline.OutputRefs = append(defaultPipeline.OutputRefs,constants.LegacySecureforward)
defaultPipeline.OutputRefs = append(defaultPipeline.OutputRefs, constants.LegacySecureforward)
}
// Continue with normalization to fill out spec and status.
} else if clusterRequest.ForwarderRequest == nil {
Expand Down
1 change: 1 addition & 0 deletions test/functional/cluster_log_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (b *ClusterLogForwarderBuilder) FromInput(inputName string) *PipelineBuilde
}

func (p *PipelineBuilder) ToFluentForwardOutput() *ClusterLogForwarderBuilder {

return p.ToFluentForwardOutputWithVisitor(func(output *logging.OutputSpec) {})
}

Expand Down
83 changes: 52 additions & 31 deletions test/functional/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package functional

import (
"fmt"
"github.com/openshift/cluster-logging-operator/pkg/certificates"
"math/rand"
"os"
"path/filepath"
Expand All @@ -14,7 +15,6 @@ import (
"github.com/ViaQ/logerr/log"
"github.com/openshift/cluster-logging-operator/internal/pkg/generator/forwarder"
logging "github.com/openshift/cluster-logging-operator/pkg/apis/logging/v1"
"github.com/openshift/cluster-logging-operator/pkg/certificates"
"github.com/openshift/cluster-logging-operator/pkg/constants"
"github.com/openshift/cluster-logging-operator/pkg/utils"
"github.com/openshift/cluster-logging-operator/test/client"
Expand Down Expand Up @@ -47,6 +47,8 @@ var (
defaultRetryInterval time.Duration
)

type receiverBuilder func(f *FluentdFunctionalFramework, b *runtime.PodBuilder, output logging.OutputSpec) error

//FluentdFunctionalFramework deploys stand alone fluentd with the fluent.conf as generated by input ClusterLogForwarder CR
type FluentdFunctionalFramework struct {
Name string
Expand All @@ -55,9 +57,10 @@ type FluentdFunctionalFramework struct {
image string
labels map[string]string
Forwarder *logging.ClusterLogForwarder
test *client.Test
Test *client.Test
pod *corev1.Pod
fluentContainerId string
receiverBuilders []receiverBuilder
}

func init() {
Expand Down Expand Up @@ -85,75 +88,91 @@ func NewFluentdFunctionalFramework() *FluentdFunctionalFramework {
"testtype": "functional",
"testname": testName,
},
test: t,
Forwarder: runtime.NewClusterLogForwarder(),
Test: t,
Forwarder: runtime.NewClusterLogForwarder(),
receiverBuilders: []receiverBuilder{},
}
return framework
}

func NewCRIOLogMessage(timestamp, message string, partial bool) string {
fullOrPartial := "F"
if partial {
fullOrPartial = "P"
}
return fmt.Sprintf("%s stdout %s %s $n", timestamp, fullOrPartial, message)
}

func (f *FluentdFunctionalFramework) Cleanup() {
f.test.Close()
f.Test.Close()
}

func (f *FluentdFunctionalFramework) RunCommand(container string, cmd ...string) (string, error) {
log.V(2).Info("Running", "container", container, "cmd", cmd)
out, err := runtime.ExecOc(f.pod, container, cmd[0], cmd[1:]...)
out, err := runtime.ExecOc(f.pod, strings.ToLower(container), cmd[0], cmd[1:]...)
log.V(2).Info("Exec'd", "out", out, "err", err)
return out, err
}

//Deploy the objects needed to functional test
//Deploy the objects needed to functional Test
func (f *FluentdFunctionalFramework) Deploy() (err error) {
visiter := func(b *runtime.PodBuilder) error {
return f.addOutputContainers(b, f.Forwarder.Spec.Outputs)
visitors := []runtime.PodBuilderVisitor{
func(b *runtime.PodBuilder)error{
return f.addOutputContainers(b, f.Forwarder.Spec.Outputs)
},
}
return f.DeployWithVisitor(visiter)
return f.DeployWithVisitors(visitors)
}

//Deploy the objects needed to functional test
func (f *FluentdFunctionalFramework) DeployWithVisitor(visit runtime.PodBuilderVisitor) (err error) {
func (f *FluentdFunctionalFramework) DeployWithVisitor(visitor runtime.PodBuilderVisitor) (err error) {
visitors := []runtime.PodBuilderVisitor{
visitor,
}
return f.DeployWithVisitors(visitors)
}
//Deploy the objects needed to functional Test
func (f *FluentdFunctionalFramework) DeployWithVisitors(visitors []runtime.PodBuilderVisitor) (err error) {
log.V(2).Info("Generating config", "forwarder", f.Forwarder)
clfYaml, _ := yaml.Marshal(f.Forwarder)
if f.Conf, err = forwarder.Generate(string(clfYaml), false, false); err != nil {
return err
}
log.V(2).Info("Generating Certificates")
if err, _ = certificates.GenerateCertificates(f.test.NS.Name,
if err, _ = certificates.GenerateCertificates(f.Test.NS.Name,
utils.GetScriptsDir(), "elasticsearch",
utils.DefaultWorkingDir); err != nil {
return err
}
log.V(2).Info("Creating config configmap")
configmap := runtime.NewConfigMap(f.test.NS.Name, f.Name, map[string]string{})
configmap := runtime.NewConfigMap(f.Test.NS.Name, f.Name, map[string]string{})
runtime.NewConfigMapBuilder(configmap).
Add("fluent.conf", f.Conf).
Add("run.sh", string(utils.GetFileContents(utils.GetShareDir()+"/fluentd/run.sh")))
if err = f.test.Client.Create(configmap); err != nil {
if err = f.Test.Client.Create(configmap); err != nil {
return err
}

log.V(2).Info("Creating certs configmap")
certsName := "certs-" + f.Name
certs := runtime.NewConfigMap(f.test.NS.Name, certsName, map[string]string{})
certs := runtime.NewConfigMap(f.Test.NS.Name, certsName, map[string]string{})
runtime.NewConfigMapBuilder(certs).
Add("tls.key", string(utils.GetWorkingDirFileContents("system.logging.fluentd.key"))).
Add("tls.crt", string(utils.GetWorkingDirFileContents("system.logging.fluentd.crt")))
if err = f.test.Client.Create(certs); err != nil {
if err = f.Test.Client.Create(certs); err != nil {
return err
}

log.V(2).Info("Creating service")
service := runtime.NewService(f.test.NS.Name, f.Name)
service := runtime.NewService(f.Test.NS.Name, f.Name)
runtime.NewServiceBuilder(service).
AddServicePort(24231, 24231).
WithSelector(f.labels)
if err = f.test.Client.Create(service); err != nil {
if err = f.Test.Client.Create(service); err != nil {
return err
}

log.V(2).Info("Defining pod...")
var containers []corev1.Container
f.pod = runtime.NewPod(f.test.NS.Name, f.Name, containers...)
f.pod = runtime.NewPod(f.Test.NS.Name, f.Name, containers...)
b := runtime.NewPodBuilder(f.pod).
WithLabels(f.labels).
AddConfigMapVolume("config", f.Name).
Expand All @@ -166,24 +185,26 @@ func (f *FluentdFunctionalFramework) DeployWithVisitor(visit runtime.PodBuilderV
AddVolumeMount("entrypoint", "/opt/app-root/src/run.sh", "run.sh", true).
AddVolumeMount("certs", "/etc/fluent/metrics", "", true).
End()
if err = visit(b); err != nil {
return err
for _, visit := range visitors {
if err = visit(b); err != nil {
return err
}
}
log.V(2).Info("Creating pod", "pod", f.pod)
if err = f.test.Client.Create(f.pod); err != nil {
if err = f.Test.Client.Create(f.pod); err != nil {
return err
}

log.V(2).Info("waiting for pod to be ready")
if err = oc.Literal().From("oc wait -n %s pod/%s --timeout=120s --for=condition=Ready", f.test.NS.Name, f.Name).Output(); err != nil {
if err = oc.Literal().From("oc wait -n %s pod/%s --timeout=120s --for=condition=Ready", f.Test.NS.Name, f.Name).Output(); err != nil {
return err
}
if err = f.test.Client.Get(f.pod); err != nil {
if err = f.Test.Client.Get(f.pod); err != nil {
return err
}
log.V(2).Info("waiting for service endpoints to be ready")
err = wait.PollImmediate(time.Second*2, time.Second*10, func() (bool, error) {
ips, err := oc.Get().WithNamespace(f.test.NS.Name).Resource("endpoints", f.Name).OutputJsonpath("{.subsets[*].addresses[*].ip}").Run()
ips, err := oc.Get().WithNamespace(f.Test.NS.Name).Resource("endpoints", f.Name).OutputJsonpath("{.subsets[*].addresses[*].ip}").Run()
if err != nil {
return false, nil
}
Expand All @@ -198,7 +219,7 @@ func (f *FluentdFunctionalFramework) DeployWithVisitor(visit runtime.PodBuilderV
}
log.V(2).Info("waiting for fluentd to be ready")
err = wait.PollImmediate(time.Second*2, time.Second*30, func() (bool, error) {
output, err := oc.Literal().From("oc logs -n %s pod/%s %s", f.test.NS.Name, f.Name, constants.FluentdName).Run()
output, err := oc.Literal().From("oc logs -n %s pod/%s %s", f.Test.NS.Name, f.Name, constants.FluentdName).Run()
if err != nil {
return false, nil
}
Expand Down Expand Up @@ -234,11 +255,11 @@ func (f *FluentdFunctionalFramework) addOutputContainers(b *runtime.PodBuilder,
}

func (f *FluentdFunctionalFramework) WaitForPodToBeReady() error {
return oc.Literal().From(fmt.Sprintf("oc wait -n %s pod/%s --timeout=60s --for=condition=Ready", f.test.NS.Name, f.Name)).Output()
return oc.Literal().From(fmt.Sprintf("oc wait -n %s pod/%s --timeout=60s --for=condition=Ready", f.Test.NS.Name, f.Name)).Output()
}

func (f *FluentdFunctionalFramework) WritesApplicationLogs(numOfLogs int) error {
msg := "2020-11-04T18:13:59.061892999+00:00 stdout F Functional test message $n"
msg := "2020-11-04T18:13:59.061892999+00:00 stdout F Functional Test message $n"
return f.WriteMessagesToApplicationLog(msg, numOfLogs)
}

Expand Down Expand Up @@ -284,7 +305,7 @@ func (f *FluentdFunctionalFramework) ReadLogsFrom(outputName string, outputLogTy
}
err = wait.PollImmediate(defaultRetryInterval, maxDuration, func() (done bool, err error) {
result, err = f.RunCommand(outputName, "cat", file)
if err == nil {
if result != "" && err == nil {
return true, nil
}
log.V(4).Error(err, "Polling logs")
Expand Down
40 changes: 40 additions & 0 deletions test/functional/message_templates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package functional

import (
"github.com/openshift/cluster-logging-operator/test/helpers/types"
"time"
)

var (
templateForAnyPipelineMetadata = types.PipelineMetadata{
Collector: types.Collector{
Ipaddr4: "*",
Inputname: "*",
Name: "*",
Version: "*",
ReceivedAt: time.Time{},
},
}
templateForAnyKubernetes = types.Kubernetes{
ContainerName: "*",
PodName: "*",
NamespaceName: "*",
NamespaceID: "*",
OrphanedNamespace: "*",
}
)

func NewApplicationLogTempate() types.ApplicationLog{
return types.ApplicationLog{
Timestamp: time.Time{},
Message: "*",
ViaqIndexName: "app-write",
Level: "unknown",
ViaqMsgID: "*",
PipelineMetadata: templateForAnyPipelineMetadata,
Docker: types.Docker{
ContainerID: "*",
},
Kubernetes: templateForAnyKubernetes,
}
}
8 changes: 4 additions & 4 deletions test/functional/normalization/eventrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ var _ = Describe("[Normalization] Fluentd normalization for EventRouter messages

const timestamp string = "1985-10-21T09:00:00.00000+00:00"
var (
framework *functional.FluentdFunctionalFramework
pod *corev1.Pod
framework *functional.FluentdFunctionalFramework
pod *corev1.Pod
nanoTime, _ = time.Parse(time.RFC3339Nano, timestamp)
)

Expand All @@ -55,7 +55,7 @@ var _ = Describe("[Normalization] Fluentd normalization for EventRouter messages
framework.Cleanup()
})

for _, verb := range []string{"ADDED","UPDATED"} {
for _, verb := range []string{"ADDED", "UPDATED"} {
It(fmt.Sprintf("Should parse EventRouter %s message and check values", verb), func() {
podRef, err := reference.GetReference(scheme.Scheme, pod)
Expect(err).To(BeNil())
Expand All @@ -82,7 +82,7 @@ var _ = Describe("[Normalization] Fluentd normalization for EventRouter messages
func NewEventDataBuilder(verb string, podRef *corev1.ObjectReference) types.EventData {
newEvent := types.NewMockEvent(podRef, corev1.EventTypeNormal, "reason", "message")
if verb == "UPDATED" {
oldEvent := types.NewMockEvent(podRef, corev1.EventTypeWarning,"old_reason", "old_message")
oldEvent := types.NewMockEvent(podRef, corev1.EventTypeWarning, "old_reason", "old_message")
return types.EventData{Verb: "UPDATED", Event: newEvent, OldEvent: oldEvent}
} else {
return types.EventData{Verb: "ADDED", Event: newEvent}
Expand Down
7 changes: 4 additions & 3 deletions test/functional/output_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

const (

unsecureFluentConf = `
<system>
log_level trace
Expand Down Expand Up @@ -56,8 +57,7 @@ const (
<format>
@type json
</format>
</match>
`
</match>`
)

func (f *FluentdFunctionalFramework) addForwardOutput(b *runtime.PodBuilder, output logging.OutputSpec) error {
Expand All @@ -68,7 +68,7 @@ func (f *FluentdFunctionalFramework) addForwardOutput(b *runtime.PodBuilder, out
config := runtime.NewConfigMap(b.Pod.Namespace, configName, map[string]string{
"fluent.conf": unsecureFluentConf,
})
if err := f.test.Client.Create(config); err != nil {
if err := f.Test.Client.Create(config); err != nil {
return err
}

Expand All @@ -80,3 +80,4 @@ func (f *FluentdFunctionalFramework) addForwardOutput(b *runtime.PodBuilder, out
AddConfigMapVolume(config.Name, config.Name)
return nil
}

Loading