1
0
Fork 0
forked from barak/tarpoon

Add glide.yaml and vendor deps

This commit is contained in:
Dalton Hubble 2016-12-03 22:43:32 -08:00
parent db918f12ad
commit 5b3d5e81bd
18880 changed files with 5166045 additions and 1 deletions

View file

@ -0,0 +1,49 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = ["attach_detach_controller.go"],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/release_1_5:go_default_library",
"//pkg/client/clientset_generated/release_1_5/typed/core/v1:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/populator:go_default_library",
"//pkg/controller/volume/attachdetach/reconciler:go_default_library",
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/io:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/runtime:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor:github.com/golang/glog",
],
)
go_test(
name = "go_default_test",
srcs = ["attach_detach_controller_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/controller/informers:go_default_library",
"//pkg/controller/volume/attachdetach/testing:go_default_library",
],
)

View file

@ -0,0 +1,2 @@
assignees:
- saad-ali

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,50 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package attachdetach
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/controller/informers"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
)
func Test_NewAttachDetachController_Positive(t *testing.T) {
// Arrange
fakeKubeClient := controllervolumetesting.CreateTestClient()
resyncPeriod := 5 * time.Minute
podInformer := informers.NewPodInformer(fakeKubeClient, resyncPeriod)
nodeInformer := informers.NewNodeInformer(fakeKubeClient, resyncPeriod)
pvcInformer := informers.NewPVCInformer(fakeKubeClient, resyncPeriod)
pvInformer := informers.NewPVInformer(fakeKubeClient, resyncPeriod)
// Act
_, err := NewAttachDetachController(
fakeKubeClient,
podInformer,
nodeInformer,
pvcInformer,
pvInformer,
nil, /* cloud */
nil /* plugins */)
// Assert
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
}

View file

@ -0,0 +1,46 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = [
"actual_state_of_world.go",
"desired_state_of_world.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/types:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor:github.com/golang/glog",
],
)
go_test(
name = "go_default_test",
srcs = [
"actual_state_of_world_test.go",
"desired_state_of_world_test.go",
],
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/controller/volume/attachdetach/testing:go_default_library",
"//pkg/types:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util/types:go_default_library",
],
)

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,363 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package cache implements data structures used by the attach/detach controller
to keep track of volumes, the nodes they are attached to, and the pods that
reference them.
*/
package cache
import (
"fmt"
"sync"
"k8s.io/kubernetes/pkg/api/v1"
k8stypes "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// DesiredStateOfWorld defines a set of thread-safe operations supported on
// the attach/detach controller's desired state of the world cache.
// This cache contains nodes->volumes->pods where nodes are all the nodes
// managed by the attach/detach controller, volumes are all the volumes that
// should be attached to the specified node, and pods are the pods that
// reference the volume and are scheduled to that node.
// Note: This is distinct from the DesiredStateOfWorld implemented by the
// kubelet volume manager. The both keep track of different objects. This
// contains attach/detach controller specific state.
type DesiredStateOfWorld interface {
// AddNode adds the given node to the list of nodes managed by the attach/
// detach controller.
// If the node already exists this is a no-op.
AddNode(nodeName k8stypes.NodeName)
// AddPod adds the given pod to the list of pods that reference the
// specified volume and is scheduled to the specified node.
// A unique volumeName is generated from the volumeSpec and returned on
// success.
// If the pod already exists under the specified volume, this is a no-op.
// If volumeSpec is not an attachable volume plugin, an error is returned.
// If no volume with the name volumeName exists in the list of volumes that
// should be attached to the specified node, the volume is implicitly added.
// If no node with the name nodeName exists in list of nodes managed by the
// attach/detach attached controller, an error is returned.
AddPod(podName types.UniquePodName, pod *v1.Pod, volumeSpec *volume.Spec, nodeName k8stypes.NodeName) (v1.UniqueVolumeName, error)
// DeleteNode removes the given node from the list of nodes managed by the
// attach/detach controller.
// If the node does not exist this is a no-op.
// If the node exists but has 1 or more child volumes, an error is returned.
DeleteNode(nodeName k8stypes.NodeName) error
// DeletePod removes the given pod from the list of pods that reference the
// specified volume and are scheduled to the specified node.
// If no pod exists in the list of pods that reference the specified volume
// and are scheduled to the specified node, this is a no-op.
// If a node with the name nodeName does not exist in the list of nodes
// managed by the attach/detach attached controller, this is a no-op.
// If no volume with the name volumeName exists in the list of managed
// volumes under the specified node, this is a no-op.
// If after deleting the pod, the specified volume contains no other child
// pods, the volume is also deleted.
DeletePod(podName types.UniquePodName, volumeName v1.UniqueVolumeName, nodeName k8stypes.NodeName)
// NodeExists returns true if the node with the specified name exists in
// the list of nodes managed by the attach/detach controller.
NodeExists(nodeName k8stypes.NodeName) bool
// VolumeExists returns true if the volume with the specified name exists
// in the list of volumes that should be attached to the specified node by
// the attach detach controller.
VolumeExists(volumeName v1.UniqueVolumeName, nodeName k8stypes.NodeName) bool
// GetVolumesToAttach generates and returns a list of volumes to attach
// and the nodes they should be attached to based on the current desired
// state of the world.
GetVolumesToAttach() []VolumeToAttach
// GetPodToAdd generates and returns a map of pods based on the current desired
// state of world
GetPodToAdd() map[types.UniquePodName]PodToAdd
}
// VolumeToAttach represents a volume that should be attached to a node.
type VolumeToAttach struct {
operationexecutor.VolumeToAttach
}
// PodToAdd represents a pod that references the underlying volume and is
// scheduled to the underlying node.
type PodToAdd struct {
// pod contains the api object of pod
Pod *v1.Pod
// volumeName contains the unique identifier for this volume.
VolumeName v1.UniqueVolumeName
// nodeName contains the name of this node.
NodeName k8stypes.NodeName
}
// NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld.
func NewDesiredStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) DesiredStateOfWorld {
return &desiredStateOfWorld{
nodesManaged: make(map[k8stypes.NodeName]nodeManaged),
volumePluginMgr: volumePluginMgr,
}
}
type desiredStateOfWorld struct {
// nodesManaged is a map containing the set of nodes managed by the attach/
// detach controller. The key in this map is the name of the node and the
// value is a node object containing more information about the node.
nodesManaged map[k8stypes.NodeName]nodeManaged
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr
sync.RWMutex
}
// nodeManaged represents a node that is being managed by the attach/detach
// controller.
type nodeManaged struct {
// nodeName contains the name of this node.
nodeName k8stypes.NodeName
// volumesToAttach is a map containing the set of volumes that should be
// attached to this node. The key in the map is the name of the volume and
// the value is a pod object containing more information about the volume.
volumesToAttach map[v1.UniqueVolumeName]volumeToAttach
}
// The volume object represents a volume that should be attached to a node.
type volumeToAttach struct {
// volumeName contains the unique identifier for this volume.
volumeName v1.UniqueVolumeName
// spec is the volume spec containing the specification for this volume.
// Used to generate the volume plugin object, and passed to attach/detach
// methods.
spec *volume.Spec
// scheduledPods is a map containing the set of pods that reference this
// volume and are scheduled to the underlying node. The key in the map is
// the name of the pod and the value is a pod object containing more
// information about the pod.
scheduledPods map[types.UniquePodName]pod
}
// The pod represents a pod that references the underlying volume and is
// scheduled to the underlying node.
type pod struct {
// podName contains the unique identifier for this pod
podName types.UniquePodName
// pod object contains the api object of pod
podObj *v1.Pod
}
func (dsw *desiredStateOfWorld) AddNode(nodeName k8stypes.NodeName) {
dsw.Lock()
defer dsw.Unlock()
if _, nodeExists := dsw.nodesManaged[nodeName]; !nodeExists {
dsw.nodesManaged[nodeName] = nodeManaged{
nodeName: nodeName,
volumesToAttach: make(map[v1.UniqueVolumeName]volumeToAttach),
}
}
}
func (dsw *desiredStateOfWorld) AddPod(
podName types.UniquePodName,
podToAdd *v1.Pod,
volumeSpec *volume.Spec,
nodeName k8stypes.NodeName) (v1.UniqueVolumeName, error) {
dsw.Lock()
defer dsw.Unlock()
nodeObj, nodeExists := dsw.nodesManaged[nodeName]
if !nodeExists {
return "", fmt.Errorf(
"no node with the name %q exists in the list of managed nodes",
nodeName)
}
attachableVolumePlugin, err := dsw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil || attachableVolumePlugin == nil {
return "", fmt.Errorf(
"failed to get AttachablePlugin from volumeSpec for volume %q err=%v",
volumeSpec.Name(),
err)
}
volumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
attachableVolumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q err=%v",
volumeSpec.Name(),
err)
}
volumeObj, volumeExists := nodeObj.volumesToAttach[volumeName]
if !volumeExists {
volumeObj = volumeToAttach{
volumeName: volumeName,
spec: volumeSpec,
scheduledPods: make(map[types.UniquePodName]pod),
}
dsw.nodesManaged[nodeName].volumesToAttach[volumeName] = volumeObj
}
if _, podExists := volumeObj.scheduledPods[podName]; !podExists {
dsw.nodesManaged[nodeName].volumesToAttach[volumeName].scheduledPods[podName] =
pod{
podName: podName,
podObj: podToAdd,
}
}
return volumeName, nil
}
func (dsw *desiredStateOfWorld) DeleteNode(nodeName k8stypes.NodeName) error {
dsw.Lock()
defer dsw.Unlock()
nodeObj, nodeExists := dsw.nodesManaged[nodeName]
if !nodeExists {
return nil
}
if len(nodeObj.volumesToAttach) > 0 {
return fmt.Errorf(
"failed to delete node %q from list of nodes managed by attach/detach controller--the node still contains %v volumes in its list of volumes to attach",
nodeName,
len(nodeObj.volumesToAttach))
}
delete(
dsw.nodesManaged,
nodeName)
return nil
}
func (dsw *desiredStateOfWorld) DeletePod(
podName types.UniquePodName,
volumeName v1.UniqueVolumeName,
nodeName k8stypes.NodeName) {
dsw.Lock()
defer dsw.Unlock()
nodeObj, nodeExists := dsw.nodesManaged[nodeName]
if !nodeExists {
return
}
volumeObj, volumeExists := nodeObj.volumesToAttach[volumeName]
if !volumeExists {
return
}
if _, podExists := volumeObj.scheduledPods[podName]; !podExists {
return
}
delete(
dsw.nodesManaged[nodeName].volumesToAttach[volumeName].scheduledPods,
podName)
if len(volumeObj.scheduledPods) == 0 {
delete(
dsw.nodesManaged[nodeName].volumesToAttach,
volumeName)
}
}
func (dsw *desiredStateOfWorld) NodeExists(nodeName k8stypes.NodeName) bool {
dsw.RLock()
defer dsw.RUnlock()
_, nodeExists := dsw.nodesManaged[nodeName]
return nodeExists
}
func (dsw *desiredStateOfWorld) VolumeExists(
volumeName v1.UniqueVolumeName, nodeName k8stypes.NodeName) bool {
dsw.RLock()
defer dsw.RUnlock()
nodeObj, nodeExists := dsw.nodesManaged[nodeName]
if nodeExists {
if _, volumeExists := nodeObj.volumesToAttach[volumeName]; volumeExists {
return true
}
}
return false
}
func (dsw *desiredStateOfWorld) GetVolumesToAttach() []VolumeToAttach {
dsw.RLock()
defer dsw.RUnlock()
volumesToAttach := make([]VolumeToAttach, 0 /* len */, len(dsw.nodesManaged) /* cap */)
for nodeName, nodeObj := range dsw.nodesManaged {
for volumeName, volumeObj := range nodeObj.volumesToAttach {
volumesToAttach = append(volumesToAttach,
VolumeToAttach{
VolumeToAttach: operationexecutor.VolumeToAttach{
VolumeName: volumeName,
VolumeSpec: volumeObj.spec,
NodeName: nodeName,
ScheduledPods: getPodsFromMap(volumeObj.scheduledPods),
}})
}
}
return volumesToAttach
}
// Construct a list of v1.Pod objects from the given pod map
func getPodsFromMap(podMap map[types.UniquePodName]pod) []*v1.Pod {
pods := make([]*v1.Pod, 0, len(podMap))
for _, pod := range podMap {
pods = append(pods, pod.podObj)
}
return pods
}
func (dsw *desiredStateOfWorld) GetPodToAdd() map[types.UniquePodName]PodToAdd {
dsw.RLock()
defer dsw.RUnlock()
pods := make(map[types.UniquePodName]PodToAdd)
for nodeName, nodeObj := range dsw.nodesManaged {
for volumeName, volumeObj := range nodeObj.volumesToAttach {
for podUID, pod := range volumeObj.scheduledPods {
pods[podUID] = PodToAdd{
Pod: pod.podObj,
VolumeName: volumeName,
NodeName: nodeName,
}
}
}
}
return pods
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,25 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = ["desired_state_of_world_populator.go"],
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/util/wait:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor:github.com/golang/glog",
],
)

View file

@ -0,0 +1,125 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package populator implements interfaces that monitor and keep the states of the
// desired_state_of_word in sync with the "ground truth" from informer.
package populator
import (
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/v1"
kcache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// DesiredStateOfWorldPopulator periodically verifies that the pods in the
// desired state of th world still exist, if not, it removes them.
// TODO: it also loops through the list of active pods and ensures that
// each one exists in the desired state of the world cache
// if it has volumes.
type DesiredStateOfWorldPopulator interface {
Run(stopCh <-chan struct{})
}
// NewDesiredStateOfWorldPopulator returns a new instance of DesiredStateOfWorldPopulator.
// loopSleepDuration - the amount of time the populator loop sleeps between
// successive executions
// podManager - the kubelet podManager that is the source of truth for the pods
// that exist on this host
// desiredStateOfWorld - the cache to populate
func NewDesiredStateOfWorldPopulator(
loopSleepDuration time.Duration,
podInformer kcache.SharedInformer,
desiredStateOfWorld cache.DesiredStateOfWorld) DesiredStateOfWorldPopulator {
return &desiredStateOfWorldPopulator{
loopSleepDuration: loopSleepDuration,
podInformer: podInformer,
desiredStateOfWorld: desiredStateOfWorld,
}
}
type desiredStateOfWorldPopulator struct {
loopSleepDuration time.Duration
podInformer kcache.SharedInformer
desiredStateOfWorld cache.DesiredStateOfWorld
}
func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) {
wait.Until(dswp.populatorLoopFunc(), dswp.loopSleepDuration, stopCh)
}
func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
return func() {
dswp.findAndRemoveDeletedPods()
}
}
// Iterate through all pods in desired state of world, and remove if they no
// longer exist in the informer
func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
for dswPodUID, dswPodToAdd := range dswp.desiredStateOfWorld.GetPodToAdd() {
dswPodKey, err := kcache.MetaNamespaceKeyFunc(dswPodToAdd.Pod)
if err != nil {
glog.Errorf("MetaNamespaceKeyFunc failed for pod %q (UID %q) with: %v", dswPodKey, dswPodUID, err)
continue
}
// Retrieve the pod object from pod informer with the namespace key
informerPodObj, exists, err :=
dswp.podInformer.GetStore().GetByKey(dswPodKey)
if err != nil {
glog.Errorf(
"podInformer GetByKey failed for pod %q (UID %q) with %v",
dswPodKey,
dswPodUID,
err)
continue
}
if exists && informerPodObj == nil {
glog.Info(
"podInformer GetByKey found pod, but informerPodObj is nil for pod %q (UID %q)",
dswPodKey,
dswPodUID)
continue
}
if exists {
informerPod, ok := informerPodObj.(*v1.Pod)
if !ok {
glog.Errorf("Failed to cast obj %#v to pod object for pod %q (UID %q)", informerPod, dswPodKey, dswPodUID)
continue
}
informerPodUID := volumehelper.GetUniquePodName(informerPod)
// Check whether the unique identifier of the pod from dsw matches the one retrieved from pod informer
if informerPodUID == dswPodUID {
glog.V(10).Infof(
"Verified pod %q (UID %q) from dsw exists in pod informer.", dswPodKey, dswPodUID)
continue
}
}
// the pod from dsw does not exist in pod informer, or it does not match the unique identifer retrieved
// from the informer, delete it from dsw
glog.V(1).Infof(
"Removing pod %q (UID %q) from dsw because it does not exist in pod informer.", dswPodKey, dswPodUID)
dswp.desiredStateOfWorld.DeletePod(dswPodUID, dswPodToAdd.VolumeName, dswPodToAdd.NodeName)
}
}

View file

@ -0,0 +1,46 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = ["reconciler.go"],
tags = ["automanaged"],
deps = [
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
"//pkg/util/wait:go_default_library",
"//pkg/volume/util/nestedpendingoperations:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//vendor:github.com/golang/glog",
],
)
go_test(
name = "go_default_test",
srcs = ["reconciler_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/controller/informers:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/controller/volume/attachdetach/testing:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/wait:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/types:go_default_library",
],
)

View file

@ -0,0 +1,224 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package reconciler implements interfaces that attempt to reconcile the
// desired state of the with the actual state of the world by triggering
// actions.
package reconciler
import (
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
// Reconciler runs a periodic loop to reconcile the desired state of the with
// the actual state of the world by triggering attach detach operations.
// Note: This is distinct from the Reconciler implemented by the kubelet volume
// manager. This reconciles state for the attach/detach controller. That
// reconciles state for the kubelet volume manager.
type Reconciler interface {
// Starts running the reconciliation loop which executes periodically, checks
// if volumes that should be attached are attached and volumes that should
// be detached are detached. If not, it will trigger attach/detach
// operations to rectify.
Run(stopCh <-chan struct{})
}
// NewReconciler returns a new instance of Reconciler that waits loopPeriod
// between successive executions.
// loopPeriod is the amount of time the reconciler loop waits between
// successive executions.
// maxWaitForUnmountDuration is the max amount of time the reconciler will wait
// for the volume to be safely unmounted, after this it will detach the volume
// anyway (to handle crashed/unavailable nodes). If during this time the volume
// becomes used by a new pod, the detach request will be aborted and the timer
// cleared.
func NewReconciler(
loopPeriod time.Duration,
maxWaitForUnmountDuration time.Duration,
syncDuration time.Duration,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
attacherDetacher operationexecutor.OperationExecutor,
nodeStatusUpdater statusupdater.NodeStatusUpdater) Reconciler {
return &reconciler{
loopPeriod: loopPeriod,
maxWaitForUnmountDuration: maxWaitForUnmountDuration,
syncDuration: syncDuration,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
attacherDetacher: attacherDetacher,
nodeStatusUpdater: nodeStatusUpdater,
timeOfLastSync: time.Now(),
}
}
type reconciler struct {
loopPeriod time.Duration
maxWaitForUnmountDuration time.Duration
syncDuration time.Duration
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
attacherDetacher operationexecutor.OperationExecutor
nodeStatusUpdater statusupdater.NodeStatusUpdater
timeOfLastSync time.Time
}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
wait.Until(rc.reconciliationLoopFunc(), rc.loopPeriod, stopCh)
}
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
rc.reconcile()
// reconciler periodically checks whether the attached volumes from actual state
// are still attached to the node and udpate the status if they are not.
if time.Since(rc.timeOfLastSync) > rc.syncDuration {
rc.sync()
}
}
}
func (rc *reconciler) sync() {
defer rc.updateSyncTime()
rc.syncStates()
}
func (rc *reconciler) updateSyncTime() {
rc.timeOfLastSync = time.Now()
}
func (rc *reconciler) syncStates() {
volumesPerNode := rc.actualStateOfWorld.GetAttachedVolumesPerNode()
for nodeName, volumes := range volumesPerNode {
err := rc.attacherDetacher.VerifyVolumesAreAttached(volumes, nodeName, rc.actualStateOfWorld)
if err != nil {
glog.Errorf("Error in syncing states for volumes: %v", err)
}
}
}
func (rc *reconciler) reconcile() {
// Detaches are triggered before attaches so that volumes referenced by
// pods that are rescheduled to a different node are detached first.
// Ensure volumes that should be detached are detached.
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName) {
// Set the detach request time
elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName)
if err != nil {
glog.Errorf("Cannot trigger detach because it fails to set detach request time with error %v", err)
continue
}
// Check whether timeout has reached the maximum waiting time
timeout := elapsedTime > rc.maxWaitForUnmountDuration
// Check whether volume is still mounted. Skip detach if it is still mounted unless timeout
if attachedVolume.MountedByNode && !timeout {
glog.V(12).Infof("Cannot trigger detach for volume %q on node %q because volume is still mounted",
attachedVolume.VolumeName,
attachedVolume.NodeName)
continue
}
// Before triggering volume detach, mark volume as detached and update the node status
// If it fails to update node status, skip detach volume
rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName)
// Update Node Status to indicate volume is no longer safe to mount.
err = rc.nodeStatusUpdater.UpdateNodeStatuses()
if err != nil {
// Skip detaching this volume if unable to update node status
glog.Errorf("UpdateNodeStatuses failed while attempting to report volume %q as attached to node %q with: %v ",
attachedVolume.VolumeName,
attachedVolume.NodeName,
err)
continue
}
// Trigger detach volume which requires verifing safe to detach step
// If timeout is true, skip verifySafeToDetach check
glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
verifySafeToDetach := !timeout
err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)
if err == nil {
if !timeout {
glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
} else {
glog.Infof("Started DetachVolume for volume %q from node %q. This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching",
attachedVolume.VolumeName,
attachedVolume.NodeName,
rc.maxWaitForUnmountDuration)
}
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
attachedVolume.NodeName,
err)
}
}
}
// Ensure volumes that should be attached are attached.
for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
if rc.actualStateOfWorld.VolumeNodeExists(
volumeToAttach.VolumeName, volumeToAttach.NodeName) {
// Volume/Node exists, touch it to reset detachRequestedTime
glog.V(5).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName)
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
} else {
// Volume/Node doesn't exist, spawn a goroutine to attach it
glog.V(5).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
if err == nil {
glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.AttachVolume failed to start for volume %q (spec.Name: %q) to node %q with err: %v",
volumeToAttach.VolumeName,
volumeToAttach.VolumeSpec.Name(),
volumeToAttach.NodeName,
err)
}
}
}
// Update Node Status
err := rc.nodeStatusUpdater.UpdateNodeStatuses()
if err != nil {
glog.Infof("UpdateNodeStatuses failed with: %v", err)
}
}

View file

@ -0,0 +1,478 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconciler
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
k8stypes "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/wait"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/types"
)
const (
reconcilerLoopPeriod time.Duration = 0 * time.Millisecond
syncLoopPeriod time.Duration = 100 * time.Minute
maxWaitForUnmountDuration time.Duration = 50 * time.Millisecond
resyncPeriod time.Duration = 5 * time.Minute
)
// Calls Run()
// Verifies there are no calls to attach or detach.
func Test_Run_Positive_DoNothing(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(
fakeKubeClient, volumePluginMgr, fakeRecorder, false)
nodeInformer := informers.NewNodeInformer(
fakeKubeClient, resyncPeriod)
nsu := statusupdater.NewNodeStatusUpdater(
fakeKubeClient, nodeInformer, asw)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
// Act
ch := make(chan struct{})
go reconciler.Run(ch)
defer close(ch)
// Assert
waitForNewAttacherCallCount(t, 0 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, true /* expectZeroNewAttacherCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 0 /* expectedAttachCallCount */, fakePlugin)
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
}
// Populates desiredStateOfWorld cache with one node/volume/pod tuple.
// Calls Run()
// Verifies there is one attach call and no detach calls.
func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder, false)
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
podName := "pod-uid"
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := k8stypes.NodeName("node-name")
dsw.AddNode(nodeName)
volumeExists := dsw.VolumeExists(volumeName, nodeName)
if volumeExists {
t.Fatalf(
"Volume %q/node %q should not exist, but it does.",
volumeName,
nodeName)
}
_, podErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
if podErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podErr)
}
// Act
ch := make(chan struct{})
go reconciler.Run(ch)
defer close(ch)
// Assert
waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
}
// Populates desiredStateOfWorld cache with one node/volume/pod tuple.
// Calls Run()
// Verifies there is one attach call and no detach calls.
// Marks the node/volume as unmounted.
// Deletes the node/volume/pod tuple from desiredStateOfWorld cache.
// Verifies there is one detach call and no (new) attach calls.
func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder, false)
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
podName := "pod-uid"
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := k8stypes.NodeName("node-name")
dsw.AddNode(nodeName)
volumeExists := dsw.VolumeExists(volumeName, nodeName)
if volumeExists {
t.Fatalf(
"Volume %q/node %q should not exist, but it does.",
volumeName,
nodeName)
}
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
// Act
ch := make(chan struct{})
go reconciler.Run(ch)
defer close(ch)
// Assert
waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
// Act
dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, nodeName)
volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName)
if volumeExists {
t.Fatalf(
"Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
podName,
generatedVolumeName,
nodeName)
}
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
// Assert
waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
}
// Populates desiredStateOfWorld cache with one node/volume/pod tuple.
// Calls Run()
// Verifies there is one attach call and no detach calls.
// Deletes the node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted.
// Verifies there is one detach call and no (new) attach calls.
func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder, false)
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
podName := "pod-uid"
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := k8stypes.NodeName("node-name")
dsw.AddNode(nodeName)
volumeExists := dsw.VolumeExists(volumeName, nodeName)
if volumeExists {
t.Fatalf(
"Volume %q/node %q should not exist, but it does.",
volumeName,
nodeName)
}
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
// Act
ch := make(chan struct{})
go reconciler.Run(ch)
defer close(ch)
// Assert
waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
// Act
dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, nodeName)
volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName)
if volumeExists {
t.Fatalf(
"Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
podName,
generatedVolumeName,
nodeName)
}
// Assert -- Timer will triger detach
waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
}
// Populates desiredStateOfWorld cache with one node/volume/pod tuple.
// Has node update fail
// Calls Run()
// Verifies there is one attach call and no detach calls.
// Marks the node/volume as unmounted.
// Deletes the node/volume/pod tuple from desiredStateOfWorld cache.
// Verifies there are NO detach call and no (new) attach calls.
func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdateStatusFail(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder, false)
nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
podName := "pod-uid"
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := k8stypes.NodeName("node-name")
dsw.AddNode(nodeName)
volumeExists := dsw.VolumeExists(volumeName, nodeName)
if volumeExists {
t.Fatalf(
"Volume %q/node %q should not exist, but it does.",
volumeName,
nodeName)
}
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
// Act
go reconciler.Run(wait.NeverStop)
// Assert
waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
// Act
dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, nodeName)
volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName)
if volumeExists {
t.Fatalf(
"Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
podName,
generatedVolumeName,
nodeName)
}
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
// Assert
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
}
func waitForNewAttacherCallCount(
t *testing.T,
expectedCallCount int,
fakePlugin *volumetesting.FakeVolumePlugin) {
err := retryWithExponentialBackOff(
time.Duration(5*time.Millisecond),
func() (bool, error) {
actualCallCount := fakePlugin.GetNewAttacherCallCount()
if actualCallCount >= expectedCallCount {
return true, nil
}
t.Logf(
"Warning: Wrong NewAttacherCallCount. Expected: <%v> Actual: <%v>. Will retry.",
expectedCallCount,
actualCallCount)
return false, nil
},
)
if err != nil {
t.Fatalf(
"Timed out waiting for NewAttacherCallCount. Expected: <%v> Actual: <%v>",
expectedCallCount,
fakePlugin.GetNewAttacherCallCount())
}
}
func waitForNewDetacherCallCount(
t *testing.T,
expectedCallCount int,
fakePlugin *volumetesting.FakeVolumePlugin) {
err := retryWithExponentialBackOff(
time.Duration(5*time.Millisecond),
func() (bool, error) {
actualCallCount := fakePlugin.GetNewDetacherCallCount()
if actualCallCount >= expectedCallCount {
return true, nil
}
t.Logf(
"Warning: Wrong NewDetacherCallCount. Expected: <%v> Actual: <%v>. Will retry.",
expectedCallCount,
actualCallCount)
return false, nil
},
)
if err != nil {
t.Fatalf(
"Timed out waiting for NewDetacherCallCount. Expected: <%v> Actual: <%v>",
expectedCallCount,
fakePlugin.GetNewDetacherCallCount())
}
}
func waitForAttachCallCount(
t *testing.T,
expectedAttachCallCount int,
fakePlugin *volumetesting.FakeVolumePlugin) {
if len(fakePlugin.GetAttachers()) == 0 && expectedAttachCallCount == 0 {
return
}
err := retryWithExponentialBackOff(
time.Duration(5*time.Millisecond),
func() (bool, error) {
for i, attacher := range fakePlugin.GetAttachers() {
actualCallCount := attacher.GetAttachCallCount()
if actualCallCount == expectedAttachCallCount {
return true, nil
}
t.Logf(
"Warning: Wrong attacher[%v].GetAttachCallCount(). Expected: <%v> Actual: <%v>. Will try next attacher.",
i,
expectedAttachCallCount,
actualCallCount)
}
t.Logf(
"Warning: No attachers have expected AttachCallCount. Expected: <%v>. Will retry.",
expectedAttachCallCount)
return false, nil
},
)
if err != nil {
t.Fatalf(
"No attachers have expected AttachCallCount. Expected: <%v>",
expectedAttachCallCount)
}
}
func waitForDetachCallCount(
t *testing.T,
expectedDetachCallCount int,
fakePlugin *volumetesting.FakeVolumePlugin) {
if len(fakePlugin.GetDetachers()) == 0 && expectedDetachCallCount == 0 {
return
}
err := retryWithExponentialBackOff(
time.Duration(5*time.Millisecond),
func() (bool, error) {
for i, detacher := range fakePlugin.GetDetachers() {
actualCallCount := detacher.GetDetachCallCount()
if actualCallCount == expectedDetachCallCount {
return true, nil
}
t.Logf(
"Wrong detacher[%v].GetDetachCallCount(). Expected: <%v> Actual: <%v>. Will try next detacher.",
i,
expectedDetachCallCount,
actualCallCount)
}
t.Logf(
"Warning: No detachers have expected DetachCallCount. Expected: <%v>. Will retry.",
expectedDetachCallCount)
return false, nil
},
)
if err != nil {
t.Fatalf(
"No detachers have expected DetachCallCount. Expected: <%v>",
expectedDetachCallCount)
}
}
func verifyNewAttacherCallCount(
t *testing.T,
expectZeroNewAttacherCallCount bool,
fakePlugin *volumetesting.FakeVolumePlugin) {
if expectZeroNewAttacherCallCount &&
fakePlugin.GetNewAttacherCallCount() != 0 {
t.Fatalf(
"Wrong NewAttacherCallCount. Expected: <0> Actual: <%v>",
fakePlugin.GetNewAttacherCallCount())
}
}
func verifyNewDetacherCallCount(
t *testing.T,
expectZeroNewDetacherCallCount bool,
fakePlugin *volumetesting.FakeVolumePlugin) {
if expectZeroNewDetacherCallCount &&
fakePlugin.GetNewDetacherCallCount() != 0 {
t.Fatalf("Wrong NewDetacherCallCount. Expected: <0> Actual: <%v>",
fakePlugin.GetNewDetacherCallCount())
}
}
func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
backoff := wait.Backoff{
Duration: initialDuration,
Factor: 3,
Jitter: 0,
Steps: 6,
}
return wait.ExponentialBackoff(backoff, fn)
}

View file

@ -0,0 +1,29 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = [
"fake_node_status_updater.go",
"node_status_updater.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/release_1_5:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/conversion:go_default_library",
"//pkg/util/strategicpatch:go_default_library",
"//vendor:github.com/golang/glog",
],
)

View file

@ -0,0 +1,39 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package statusupdater
import (
"fmt"
)
func NewFakeNodeStatusUpdater(returnError bool) NodeStatusUpdater {
return &fakeNodeStatusUpdater{
returnError: returnError,
}
}
type fakeNodeStatusUpdater struct {
returnError bool
}
func (fnsu *fakeNodeStatusUpdater) UpdateNodeStatuses() error {
if fnsu.returnError {
return fmt.Errorf("fake error on update node status")
}
return nil
}

View file

@ -0,0 +1,139 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package statusupdater implements interfaces that enable updating the status
// of API objects.
package statusupdater
import (
"encoding/json"
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/v1"
kcache "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/util/strategicpatch"
)
// NodeStatusUpdater defines a set of operations for updating the
// VolumesAttached field in the Node Status.
type NodeStatusUpdater interface {
// Gets a list of node statuses that should be updated from the actual state
// of the world and updates them.
UpdateNodeStatuses() error
}
// NewNodeStatusUpdater returns a new instance of NodeStatusUpdater.
func NewNodeStatusUpdater(
kubeClient clientset.Interface,
nodeInformer kcache.SharedInformer,
actualStateOfWorld cache.ActualStateOfWorld) NodeStatusUpdater {
return &nodeStatusUpdater{
actualStateOfWorld: actualStateOfWorld,
nodeInformer: nodeInformer,
kubeClient: kubeClient,
}
}
type nodeStatusUpdater struct {
kubeClient clientset.Interface
nodeInformer kcache.SharedInformer
actualStateOfWorld cache.ActualStateOfWorld
}
func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
// TODO: investigate right behavior if nodeName is empty
// kubernetes/kubernetes/issues/37777
nodesToUpdate := nsu.actualStateOfWorld.GetVolumesToReportAttached()
for nodeName, attachedVolumes := range nodesToUpdate {
nodeObj, exists, err := nsu.nodeInformer.GetStore().GetByKey(string(nodeName))
if nodeObj == nil || !exists || err != nil {
// If node does not exist, its status cannot be updated, log error and
// reset flag statusUpdateNeeded back to true to indicate this node status
// needs to be updated again
glog.V(2).Infof(
"Could not update node status. Failed to find node %q in NodeInformer cache. %v",
nodeName,
err)
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
continue
}
clonedNode, err := conversion.NewCloner().DeepCopy(nodeObj)
if err != nil {
return fmt.Errorf("error cloning node %q: %v",
nodeName,
err)
}
node, ok := clonedNode.(*v1.Node)
if !ok || node == nil {
return fmt.Errorf(
"failed to cast %q object %#v to Node",
nodeName,
clonedNode)
}
oldData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf(
"failed to Marshal oldData for node %q. %v",
nodeName,
err)
}
node.Status.VolumesAttached = attachedVolumes
newData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf(
"failed to Marshal newData for node %q. %v",
nodeName,
err)
}
patchBytes, err :=
strategicpatch.CreateStrategicMergePatch(oldData, newData, node)
if err != nil {
return fmt.Errorf(
"failed to CreateStrategicMergePatch for node %q. %v",
nodeName,
err)
}
_, err = nsu.kubeClient.Core().Nodes().PatchStatus(string(nodeName), patchBytes)
if err != nil {
// If update node status fails, reset flag statusUpdateNeeded back to true
// to indicate this node status needs to be updated again
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
return fmt.Errorf(
"failed to kubeClient.Core().Nodes().Patch for node %q. %v",
nodeName,
err)
}
glog.V(2).Infof(
"Updating status for node %q succeeded. patchBytes: %q VolumesAttached: %v",
nodeName,
string(patchBytes),
node.Status.VolumesAttached)
}
return nil
}

View file

@ -0,0 +1,26 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = ["testvolumespec.go"],
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/release_1_5/fake:go_default_library",
"//pkg/client/testing/core:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/types:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/watch:go_default_library",
],
)

View file

@ -0,0 +1,115 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"fmt"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
)
// GetTestVolumeSpec returns a test volume spec
func GetTestVolumeSpec(volumeName string, diskName v1.UniqueVolumeName) *volume.Spec {
return &volume.Spec{
Volume: &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: string(diskName),
FSType: "fake",
ReadOnly: false,
},
},
},
}
}
func CreateTestClient() *fake.Clientset {
fakeClient := &fake.Clientset{}
fakeClient.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
obj := &v1.PodList{}
podNamePrefix := "mypod"
namespace := "mynamespace"
for i := 0; i < 5; i++ {
podName := fmt.Sprintf("%s-%d", podNamePrefix, i)
pod := v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
ObjectMeta: v1.ObjectMeta{
Name: podName,
Namespace: namespace,
Labels: map[string]string{
"name": podName,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "containerName",
Image: "containerImage",
VolumeMounts: []v1.VolumeMount{
{
Name: "volumeMountName",
ReadOnly: false,
MountPath: "/mnt",
},
},
},
},
Volumes: []v1.Volume{
{
Name: "volumeName",
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "pdName",
FSType: "ext4",
ReadOnly: false,
},
},
},
},
},
}
obj.Items = append(obj.Items, pod)
}
return true, obj, nil
})
fakeWatch := watch.NewFake()
fakeClient.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
return fakeClient
}
// NewPod returns a test pod object
func NewPod(uid, name string) *v1.Pod {
return &v1.Pod{
ObjectMeta: v1.ObjectMeta{
UID: types.UID(uid),
Name: name,
Namespace: name,
},
}
}