Initial commit

This commit is contained in:
haochengkuo
2021-08-31 10:18:35 +08:00
parent 2ad6fb7b44
commit dc05a795b7
44 changed files with 5197 additions and 1 deletions

View File

@@ -0,0 +1,493 @@
/*
Copyright 2021 Synology Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"context"
"fmt"
"time"
"strconv"
"github.com/golang/protobuf/ptypes"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/SynologyOpenSource/synology-csi/pkg/dsm/webapi"
"github.com/SynologyOpenSource/synology-csi/pkg/interfaces"
"github.com/SynologyOpenSource/synology-csi/pkg/models"
"github.com/SynologyOpenSource/synology-csi/pkg/utils"
)
type controllerServer struct {
Driver *Driver
dsmService interfaces.IDsmService
Initiator *initiatorDriver
}
func getSizeByCapacityRange(capRange *csi.CapacityRange) (int64, error) {
if capRange == nil {
return 1 * utils.UNIT_GB, nil
}
minSize := capRange.GetRequiredBytes()
maxSize := capRange.GetLimitBytes()
if 0 < maxSize && maxSize < minSize {
return 0, status.Error(codes.InvalidArgument, "Invalid input: limitBytes is smaller than requiredBytes")
}
if minSize < utils.UNIT_GB {
return 0, status.Error(codes.InvalidArgument, "Invalid input: required bytes is smaller than 1G")
}
return int64(minSize), nil
}
func (cs *controllerServer) isVolumeAccessModeSupport(mode csi.VolumeCapability_AccessMode_Mode) bool {
for _, accessMode := range cs.Driver.getVolumeCapabilityAccessModes() {
if mode == accessMode.Mode {
return true
}
}
return false
}
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
sizeInByte, err := getSizeByCapacityRange(req.GetCapacityRange())
volName, volCap := req.GetName(), req.GetVolumeCapabilities()
volContentSrc := req.GetVolumeContentSource()
var srcSnapshotId string = ""
var srcVolumeId string = ""
var multiSession bool = false
if err != nil {
return nil, err
}
if volName == "" {
return nil, status.Errorf(codes.InvalidArgument, "No name is provided")
}
if volCap == nil {
return nil, status.Errorf(codes.InvalidArgument, "No volume capabilities are provided")
}
for _, cap := range volCap {
accessMode := cap.GetAccessMode().GetMode()
if !cs.isVolumeAccessModeSupport(accessMode) {
return nil, status.Errorf(codes.InvalidArgument, "Invalid volume capability access mode")
}
if accessMode == csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER {
multiSession = false
} else if accessMode == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER {
multiSession = true
}
}
if volContentSrc != nil {
if srcSnapshot := volContentSrc.GetSnapshot(); srcSnapshot != nil {
srcSnapshotId = srcSnapshot.SnapshotId
} else if srcVolume := volContentSrc.GetVolume(); srcVolume != nil {
srcVolumeId = srcVolume.VolumeId
} else {
return nil, status.Errorf(codes.InvalidArgument, "Invalid volume content source")
}
}
params := req.GetParameters()
isThin := true
if params["thin_provisioning"] != "" {
isThin = utils.StringToBoolean(params["thin_provisioning"])
}
spec := &models.CreateK8sVolumeSpec{
DsmIp: params["dsm"],
K8sVolumeName: volName,
LunName: fmt.Sprintf("%s-%s", models.LunPrefix, volName),
Location: params["location"],
Size: sizeInByte,
Type: params["type"],
ThinProvisioning: isThin,
TargetName: fmt.Sprintf("%s-%s", models.LunPrefix, volName),
MultipleSession: multiSession,
SourceSnapshotId: srcSnapshotId,
SourceVolumeId: srcVolumeId,
}
lunInfo, dsmIp, err := cs.dsmService.CreateVolume(spec)
if err != nil {
return nil, err
}
if int64(lunInfo.Size) != sizeInByte {
return nil , status.Errorf(codes.AlreadyExists, "Already existing volume name with different capacity")
}
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: lunInfo.Uuid,
CapacityBytes: int64(lunInfo.Size),
ContentSource: volContentSrc,
VolumeContext: map[string]string{
"dsm": dsmIp,
},
},
}, nil
}
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
volumeId := req.GetVolumeId()
if volumeId == "" {
return nil, status.Errorf(codes.InvalidArgument, "No volume id is provided")
}
if err := cs.dsmService.DeleteVolume(volumeId); err != nil {
return nil, status.Errorf(codes.Internal,
fmt.Sprintf("Failed to DeleteVolume(%s), err: %v", volumeId, err))
}
return &csi.DeleteVolumeResponse{}, nil
}
func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
volumeId, volCap := req.GetVolumeId(), req.GetVolumeCapabilities()
if volumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if volCap == nil {
return nil, status.Error(codes.InvalidArgument, "No volume capabilities are provided")
}
if cs.dsmService.GetVolume(volumeId) == nil {
return nil, status.Errorf(codes.NotFound, "Volume[%s] does not exist", volumeId)
}
for _, cap := range volCap {
if !cs.isVolumeAccessModeSupport(cap.GetAccessMode().GetMode()) {
return nil, status.Errorf(codes.NotFound, "Driver does not support volume capabilities:%v", volCap)
}
}
return &csi.ValidateVolumeCapabilitiesResponse{}, nil
}
func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
maxEntries := req.GetMaxEntries()
startingToken := req.GetStartingToken()
var entries []*csi.ListVolumesResponse_Entry
var nextToken string = ""
if 0 > maxEntries {
return nil, status.Error(codes.InvalidArgument, "Max entries can not be negative.")
}
pagingSkip := ("" != startingToken)
infos := cs.dsmService.ListVolumes()
var count int32 = 0
for _, info := range infos {
if info.Lun.Uuid == startingToken {
pagingSkip = false
}
if pagingSkip {
continue
}
if maxEntries > 0 && count >= maxEntries {
nextToken = info.Lun.Uuid
break
}
entries = append(entries, &csi.ListVolumesResponse_Entry{
Volume: &csi.Volume{
VolumeId: info.Lun.Uuid,
CapacityBytes: int64(info.Lun.Size),
VolumeContext: map[string]string{
"dsm": info.DsmIp,
"lunName": info.Lun.Name,
"targetIqn": info.Target.Iqn,
},
},
})
count++
}
if pagingSkip {
return nil, status.Errorf(codes.Aborted, fmt.Sprintf("Invalid StartingToken(%s)", startingToken))
}
return &csi.ListVolumesResponse{
Entries: entries,
NextToken: nextToken,
}, nil
}
func (cs *controllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
params := req.GetParameters()
volInfos, err := cs.dsmService.ListDsmVolumes(params["dsm"])
if err != nil {
return nil, status.Error(codes.InvalidArgument, "Failed to list dsm volumes")
}
var availableCapacity int64 = 0
location := params["location"]
for _, info := range volInfos {
if location != "" && info.Path != location {
continue
}
freeSize, err := strconv.ParseInt(info.Free, 10, 64)
if err != nil {
continue
}
availableCapacity += freeSize
}
return &csi.GetCapacityResponse{
AvailableCapacity: availableCapacity,
}, nil
}
func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: cs.Driver.csCap,
}, nil
}
func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
srcVolId := req.GetSourceVolumeId()
snapshotName := req.GetName()
params := req.GetParameters()
if srcVolId == "" {
return nil, status.Error(codes.InvalidArgument, "Source volume id is empty.")
}
if snapshotName == "" {
return nil, status.Error(codes.InvalidArgument, "Snapshot name is empty.")
}
snapshotInfos, err := cs.dsmService.ListAllSnapshots()
if err != nil {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("Failed to ListAllSnapshots(), err: %v", err))
}
// idempotency
for _, snapshotInfo := range snapshotInfos {
if snapshotInfo.Name == snapshotName {
if snapshotInfo.ParentUuid != srcVolId {
return nil, status.Errorf(codes.AlreadyExists, fmt.Sprintf("Snapshot [%s] already exists but volume id is incompatible", snapshotName))
}
createTime, err := ptypes.TimestampProto(time.Unix(snapshotInfo.CreateTime, 0))
if err != nil {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("Failed to convert create time, err: %v", err))
}
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: snapshotInfo.TotalSize,
SnapshotId: snapshotInfo.Uuid,
SourceVolumeId: snapshotInfo.ParentUuid,
CreationTime: createTime,
ReadyToUse: (snapshotInfo.Status == "Healthy"),
},
}, nil
}
}
spec := &models.CreateK8sVolumeSnapshotSpec{
K8sVolumeId: srcVolId,
SnapshotName: snapshotName,
Description: params["description"],
TakenBy: models.K8sCsiName,
IsLocked: utils.StringToBoolean(params["is_locked"]),
}
snapshotId, err := cs.dsmService.CreateSnapshot(spec)
if err != nil {
if err == utils.OutOfFreeSpaceError("") || err == utils.SnapshotReachMaxCountError("") {
return nil,status.Errorf(codes.ResourceExhausted, fmt.Sprintf("Failed to CreateSnapshot(%s), err: %v", srcVolId, err))
} else {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("Failed to CreateSnapshot(%s), err: %v", srcVolId, err))
}
}
snapshotInfo, err := cs.dsmService.GetSnapshot(snapshotId)
if err != nil {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("Failed to GetSnapshot(%s), err: %v", snapshotId, err))
}
createTime, err := ptypes.TimestampProto(time.Unix(snapshotInfo.CreateTime, 0))
if err != nil {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("Failed to convert create time, err: %v", err))
}
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: snapshotInfo.TotalSize,
SnapshotId: snapshotInfo.Uuid,
SourceVolumeId: snapshotInfo.ParentUuid,
CreationTime: createTime,
ReadyToUse: (snapshotInfo.Status == "Healthy"),
},
}, nil
}
func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
snapshotId := req.GetSnapshotId()
if snapshotId == "" {
return nil, status.Error(codes.InvalidArgument, "Snapshot id is empty.")
}
err := cs.dsmService.DeleteSnapshot(snapshotId)
if err != nil {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("Failed to DeleteSnapshot(%s), err: %v", snapshotId, err))
}
return &csi.DeleteSnapshotResponse{}, nil
}
func (cs *controllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
srcVolId := req.GetSourceVolumeId()
snapshotId := req.GetSnapshotId()
maxEntries := req.GetMaxEntries()
startingToken := req.GetStartingToken()
var entries []*csi.ListSnapshotsResponse_Entry
var nextToken string = ""
if 0 > maxEntries {
return nil, status.Error(codes.InvalidArgument, "Max entries can not be negative.")
}
pagingSkip := ("" != startingToken)
var snapshotInfos []webapi.SnapshotInfo
var err error
if (srcVolId != "") {
snapshotInfos, err = cs.dsmService.ListSnapshots(srcVolId)
} else {
snapshotInfos, err = cs.dsmService.ListAllSnapshots()
}
if err != nil {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("Failed to ListSnapshots(%s), err: %v", srcVolId, err))
}
var count int32 = 0
for _, snapshotInfo := range snapshotInfos {
if snapshotInfo.Uuid == startingToken {
pagingSkip = false
}
if pagingSkip {
continue
}
if snapshotId != "" && snapshotInfo.Uuid != snapshotId {
continue
}
if maxEntries > 0 && count >= maxEntries {
nextToken = snapshotInfo.Uuid
break
}
createTime, err := ptypes.TimestampProto(time.Unix(snapshotInfo.CreateTime, 0))
if err != nil {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("Failed to convert create time, err: %v", err))
}
entries = append(entries, &csi.ListSnapshotsResponse_Entry{
Snapshot: &csi.Snapshot{
SizeBytes: snapshotInfo.TotalSize,
SnapshotId: snapshotInfo.Uuid,
SourceVolumeId: snapshotInfo.ParentUuid,
CreationTime: createTime,
ReadyToUse: (snapshotInfo.Status == "Healthy"),
},
})
count++
}
if pagingSkip {
return nil, status.Errorf(codes.Aborted, fmt.Sprintf("Invalid StartingToken(%s)", startingToken))
}
return &csi.ListSnapshotsResponse{
Entries: entries,
NextToken: nextToken,
}, nil
}
func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
volumeId, capRange := req.GetVolumeId(), req.GetCapacityRange()
if volumeId == "" || capRange == nil {
return nil, status.Error(codes.InvalidArgument,
"InvalidArgument: Please check volume ID and capacity range.")
}
sizeInByte, err := getSizeByCapacityRange(capRange)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument,
"InvalidArgument: Please check CapacityRange[%v]", capRange)
}
if err := cs.dsmService.ExpandLun(volumeId, sizeInByte); err != nil {
return nil, status.Error(codes.Internal,
fmt.Sprintf("Failed to expand volume [%s], err: %v", volumeId, err))
}
return &csi.ControllerExpandVolumeResponse{
CapacityBytes: sizeInByte,
NodeExpansionRequired: true,
}, nil
}
func (cs *controllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

129
pkg/driver/driver.go Normal file
View File

@@ -0,0 +1,129 @@
/*
Copyright 2021 Synology Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"github.com/container-storage-interface/spec/lib/go/csi"
log "github.com/sirupsen/logrus"
"github.com/SynologyOpenSource/synology-csi/pkg/interfaces"
)
const (
DriverName = "csi.san.synology.com" // CSI dirver name
DriverVersion = "1.0.0"
)
type IDriver interface {
Activate()
}
type Driver struct {
// *csicommon.CSIDriver
name string
nodeID string
version string
endpoint string
csCap []*csi.ControllerServiceCapability
vCap []*csi.VolumeCapability_AccessMode
nsCap []*csi.NodeServiceCapability
DsmService interfaces.IDsmService
}
func NewControllerAndNodeDriver(nodeID string, endpoint string, dsmService interfaces.IDsmService) (*Driver, error) {
log.Debugf("NewControllerAndNodeDriver: DriverName: %v, DriverVersion: %v", DriverName, DriverVersion)
// TODO version format and validation
d := &Driver{
name: DriverName,
version: DriverVersion,
nodeID: nodeID,
endpoint: endpoint,
DsmService: dsmService,
}
d.addControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
csi.ControllerServiceCapability_RPC_GET_CAPACITY,
})
d.addVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
})
d.addNodeServiceCapabilities([]csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
})
log.Infof("New driver created: name=%s, nodeID=%s, version=%s, endpoint=%s", d.name, d.nodeID, d.version, d.endpoint)
return d, nil
}
// TODO: func NewNodeDriver() {}
// TODO: func NewControllerDriver() {}
func (d *Driver) Activate() {
go func() {
RunControllerandNodePublishServer(d.endpoint, d, NewControllerServer(d), NewNodeServer(d))
}()
}
func (d *Driver) addControllerServiceCapabilities(cl []csi.ControllerServiceCapability_RPC_Type) {
var csc []*csi.ControllerServiceCapability
for _, c := range cl {
log.Debugf("Enabling controller service capability: %v", c.String())
csc = append(csc, NewControllerServiceCapability(c))
}
d.csCap = csc
return
}
func (d *Driver) addVolumeCapabilityAccessModes(vc []csi.VolumeCapability_AccessMode_Mode) {
var vca []*csi.VolumeCapability_AccessMode
for _, c := range vc {
log.Debugf("Enabling volume access mode: %v", c.String())
vca = append(vca, NewVolumeCapabilityAccessMode(c))
}
d.vCap = vca
return
}
func (d *Driver) addNodeServiceCapabilities(nsc []csi.NodeServiceCapability_RPC_Type) {
var nca []*csi.NodeServiceCapability
for _, c := range nsc {
log.Debugf("Enabling node service capability: %v", c.String())
nca = append(nca, NewNodeServiceCapability(c))
}
d.nsCap = nca
return
}
func (d *Driver) getVolumeCapabilityAccessModes() []*csi.VolumeCapability_AccessMode { // for debugging
return d.vCap
}

114
pkg/driver/grpc.go Normal file
View File

@@ -0,0 +1,114 @@
/*
Copyright 2021 Synology Inc.
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"net"
"os"
"sync"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"github.com/container-storage-interface/spec/lib/go/csi"
)
// Defines Non blocking GRPC server interfaces
type NonBlockingGRPCServer interface {
// Start services at the endpoint
Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer)
// Waits for the service to stop
Wait()
// Stops the service gracefully
Stop()
// Stops the service forcefully
ForceStop()
}
func NewNonBlockingGRPCServer() NonBlockingGRPCServer {
return &nonBlockingGRPCServer{}
}
// NonBlocking server
type nonBlockingGRPCServer struct {
wg sync.WaitGroup
server *grpc.Server
}
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
s.wg.Add(1)
go s.serve(endpoint, ids, cs, ns)
return
}
func (s *nonBlockingGRPCServer) Wait() {
s.wg.Wait()
}
func (s *nonBlockingGRPCServer) Stop() {
s.server.GracefulStop()
}
func (s *nonBlockingGRPCServer) ForceStop() {
s.server.Stop()
}
func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
proto, addr, err := ParseEndpoint(endpoint)
if err != nil {
log.Fatal(err.Error())
}
if proto == "unix" {
addr = "/" + addr
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
log.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
}
}
listener, err := net.Listen(proto, addr)
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(logGRPC),
}
server := grpc.NewServer(opts...)
s.server = server
if ids != nil {
csi.RegisterIdentityServer(server, ids)
}
if cs != nil {
csi.RegisterControllerServer(server, cs)
}
if ns != nil {
csi.RegisterNodeServer(server, ns)
}
log.Infof("Listening for connections on address: %#v", listener.Addr())
if err := server.Serve(listener); err != nil {
log.Fatal(err.Error())
}
}

View File

@@ -0,0 +1,61 @@
/*
Copyright 2021 Synology Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/container-storage-interface/spec/lib/go/csi"
)
type identityServer struct {
Driver *Driver
}
func (ids *identityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
if ids.Driver.name == "" {
return nil, status.Error(codes.Unavailable, "Driver name not configured")
}
if ids.Driver.version == "" {
return nil, status.Error(codes.Unavailable, "Driver is missing version")
}
return &csi.GetPluginInfoResponse{
Name: ids.Driver.name,
VendorVersion: ids.Driver.version,
}, nil
}
func (ids *identityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
return &csi.ProbeResponse{}, nil
}
func (ids *identityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
},
}, nil
}

162
pkg/driver/initiator.go Normal file
View File

@@ -0,0 +1,162 @@
// Copyright 2021 Synology Inc.
package driver
import (
"errors"
"fmt"
"strings"
log "github.com/sirupsen/logrus"
utilexec "k8s.io/utils/exec"
)
type initiatorDriver struct {
chapUser string
chapPassword string
}
const (
ISCSIPort = 3260
)
func iscsiadm(cmdArgs ...string) utilexec.Cmd {
executor := utilexec.New()
return executor.Command("iscsiadm", cmdArgs...)
}
func iscsiadm_session() string {
cmd := iscsiadm("-m", "session")
out, err := cmd.CombinedOutput()
if err != nil {
exitErr, ok := err.(utilexec.ExitError)
if ok && exitErr.ExitStatus() == 21 { // iscsiadm: No active sessions
log.Info("No active iscsi session found.")
} else {
log.Errorf("Failed to run iscsiadm session: %v", err)
}
return ""
}
return string(out)
}
func iscsiadm_discovery(ip string) error {
cmd := iscsiadm(
"-m", "discoverydb",
"--type", "sendtargets",
"--portal", ip,
"--discover")
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("%s (%v)", string(out), err)
}
return nil
}
func iscsiadm_login(iqn, portal string) error {
cmd := iscsiadm(
"-m", "node",
"--targetname", iqn,
"--portal", portal,
"--login")
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("%s (%v)", string(out), err)
}
return nil
}
func iscsiadm_logout(iqn string) error {
cmd := iscsiadm(
"-m", "node",
"--targetname", iqn,
"--logout")
if _, err := cmd.CombinedOutput(); err != nil {
return err
}
return nil
}
func iscsiadm_rescan(iqn string) error {
cmd := iscsiadm(
"-m", "node",
"--targetname", iqn,
"-R")
if _, err := cmd.CombinedOutput(); err != nil {
return err
}
return nil
}
func hasSession(targetIqn string) bool{
sessions := iscsiadm_session();
for _, line := range strings.Split(sessions, "\n") {
if strings.Contains(line, targetIqn) {
return true
}
}
return false
}
func (d *initiatorDriver) login(targetIqn string, ip string) error{
portal := fmt.Sprintf("%s:%d", ip, ISCSIPort)
if (hasSession(targetIqn)) {
log.Infof("Session[%s] already exists.", targetIqn)
return nil
}
if err := iscsiadm_discovery(ip); err != nil {
log.Errorf("Failed in discovery of the target: %v", err)
return err
}
if err := iscsiadm_login(targetIqn, portal); err != nil {
log.Errorf("Failed in login of the target: %v", err)
return err
}
log.Infof("Login target portal [%s], iqn [%s].", portal, targetIqn)
return nil
}
func (d *initiatorDriver) logout(targetIqn string, ip string) error{
if (!hasSession(targetIqn)) {
log.Infof("Session[%s] doesn't exist.", targetIqn)
return nil
}
portal := fmt.Sprintf("%s:%d", ip, ISCSIPort)
if err := iscsiadm_logout(targetIqn); err != nil {
log.Errorf("Failed in logout of the target.\nTarget [%s], Portal [%s], Err[%v]",
targetIqn, portal, err)
return err
}
log.Infof("Logout target portal [%s], iqn [%s].", portal, targetIqn)
return nil
}
func (d *initiatorDriver) rescan(targetIqn string) error{
if (!hasSession(targetIqn)) {
msg := fmt.Sprintf("Session[%s] doesn't exist.", targetIqn)
log.Error(msg)
return errors.New(msg)
}
if err := iscsiadm_rescan(targetIqn); err != nil {
log.Errorf("Failed in rescan of the target.\nTarget [%s], Err[%v]",
targetIqn, err)
return err
}
log.Infof("Rescan target iqn [%s].", targetIqn)
return nil
}

403
pkg/driver/nodeserver.go Normal file
View File

@@ -0,0 +1,403 @@
/*
Copyright 2021 Synology Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/container-storage-interface/spec/lib/go/csi"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/mount-utils"
"github.com/SynologyOpenSource/synology-csi/pkg/interfaces"
"github.com/SynologyOpenSource/synology-csi/pkg/utils"
)
type nodeServer struct {
Driver *Driver
Mounter *mount.SafeFormatAndMount
dsmService interfaces.IDsmService
Initiator *initiatorDriver
}
func getExistedDevicePath(paths []string) string {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
timer := time.NewTimer(60 * time.Second)
defer timer.Stop()
for {
select {
case <-ticker.C:
for _, path := range paths {
exists, err := mount.PathExists(path)
if err == nil && exists == true {
return path
} else {
log.Errorf("Can't find device path [%s], err: %v", path, err)
}
}
case <-timer.C:
return ""
}
}
}
func (ns *nodeServer) getVolumeMountPath(volumeId string) string {
paths := []string{}
k8sVolume := ns.dsmService.GetVolume(volumeId)
if k8sVolume == nil {
log.Errorf("Failed to get Volume id:%d.")
return ""
}
// Assume target and lun 1-1 mapping
mappingIndex := k8sVolume.Target.MappedLuns[0].MappingIndex
ips, err := utils.LookupIPv4(k8sVolume.DsmIp)
if err != nil {
log.Errorf("Failed to lookup ipv4 for host: %s", k8sVolume.DsmIp)
paths = append(paths, fmt.Sprintf("%sip-%s:3260-iscsi-%s-lun-%d", "/dev/disk/by-path/", k8sVolume.DsmIp, k8sVolume.Target.Iqn, mappingIndex))
} else {
for _, ipv4 := range ips {
paths = append(paths, fmt.Sprintf("%sip-%s:3260-iscsi-%s-lun-%d", "/dev/disk/by-path/", ipv4, k8sVolume.Target.Iqn, mappingIndex))
}
}
path := getExistedDevicePath(paths)
if path == "" {
log.Errorf("Volume mount path is not exist.")
return ""
}
return path
}
func createTargetMountPath(mounter mount.Interface, mountPath string, isBlock bool) (bool, error) {
notMount, err := mount.IsNotMountPoint(mounter, mountPath)
if err != nil {
if os.IsNotExist(err) {
if isBlock {
pathFile, err := os.OpenFile(mountPath, os.O_CREATE|os.O_RDWR, 0750)
if err != nil {
log.Errorf("Failed to create mountPath:%s with error: %v", mountPath, err)
return notMount, err
}
if err = pathFile.Close(); err != nil {
log.Errorf("Failed to close mountPath:%s with error: %v", mountPath, err)
return notMount, err
}
} else {
err = os.MkdirAll(mountPath, 0750)
if err != nil {
return notMount, err
}
}
notMount = true
} else {
return false, err
}
}
return notMount, nil
}
func (ns *nodeServer) loginTarget(volumeId string) error {
k8sVolume := ns.dsmService.GetVolume(volumeId);
if k8sVolume == nil {
return status.Error(codes.NotFound, fmt.Sprintf("Volume[%s] is not found", volumeId))
}
if err := ns.Initiator.login(k8sVolume.Target.Iqn, k8sVolume.DsmIp); err != nil {
return status.Errorf(codes.Internal,
fmt.Sprintf("Failed to login with target iqn [%s], err: %v", k8sVolume.Target.Iqn, err))
}
return nil
}
func (ns *nodeServer) logoutTarget(volumeId string) {
k8sVolume := ns.dsmService.GetVolume(volumeId)
if k8sVolume == nil {
return
}
ns.Initiator.logout(k8sVolume.Target.Iqn, k8sVolume.DsmIp)
}
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
volumeId, stagingTargetPath, volumeCapability :=
req.GetVolumeId(), req.GetStagingTargetPath(), req.GetVolumeCapability()
if volumeId == "" || stagingTargetPath == "" || volumeCapability == nil {
return nil, status.Error(codes.InvalidArgument,
"InvalidArgument: Please check volume ID, staging target path and volume capability.")
}
// if block mode, skip mount
if volumeCapability.GetBlock() != nil {
return &csi.NodeStageVolumeResponse{}, nil
}
if err := ns.loginTarget(volumeId); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
volumeMountPath := ns.getVolumeMountPath(volumeId)
if volumeMountPath == "" {
return nil, status.Error(codes.Internal, "Can't get volume mount path")
}
notMount, err := ns.Mounter.Interface.IsLikelyNotMountPoint(stagingTargetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if !notMount {
return &csi.NodeStageVolumeResponse{}, nil
}
fsType := volumeCapability.GetMount().GetFsType()
mountFlags := volumeCapability.GetMount().GetMountFlags()
options := append([]string{"rw"}, mountFlags...)
if err = ns.Mounter.FormatAndMount(volumeMountPath, stagingTargetPath, fsType, options); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.NodeStageVolumeResponse{}, nil
}
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
if req.GetVolumeId() == "" { // Useless, just for sanity check
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
stagingTargetPath := req.GetStagingTargetPath()
if stagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
notMount, err := mount.IsNotMountPoint(ns.Mounter.Interface, stagingTargetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if !notMount {
err = ns.Mounter.Interface.Unmount(stagingTargetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
return &csi.NodeUnstageVolumeResponse{}, nil
}
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
volumeId, targetPath, stagingTargetPath := req.GetVolumeId(), req.GetTargetPath(), req.GetStagingTargetPath()
isBlock := req.GetVolumeCapability().GetBlock() != nil
if volumeId == "" || targetPath == "" || stagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument,
"InvalidArgument: Please check volume ID, target path and staging target path.")
}
notMount, err := createTargetMountPath(ns.Mounter.Interface, targetPath, isBlock)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if !notMount {
return &csi.NodePublishVolumeResponse{}, nil
}
if err := ns.loginTarget(volumeId); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
volumeMountPath := ns.getVolumeMountPath(volumeId)
if volumeMountPath == "" {
return nil, status.Error(codes.Internal, "Can't get volume mount path")
}
options := []string{"bind"}
if req.GetReadonly() {
options = append(options, "ro")
}
if isBlock {
err = ns.Mounter.Interface.Mount(volumeMountPath, targetPath, "", options)
} else {
fsType := req.GetVolumeCapability().GetMount().GetFsType()
err = ns.Mounter.Interface.Mount(stagingTargetPath, targetPath, fsType, options)
}
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.NodePublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
volumeId, targetPath := req.GetVolumeId(), req.GetTargetPath()
if volumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if targetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
if _, err := os.Stat(targetPath); err != nil {
if os.IsNotExist(err){
return &csi.NodeUnpublishVolumeResponse{}, nil
}
return nil, status.Errorf(codes.Internal, err.Error())
}
notMount, err := mount.IsNotMountPoint(ns.Mounter.Interface, targetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if notMount {
return &csi.NodeUnpublishVolumeResponse{}, nil
}
needToLogout := true
list, err := ns.Mounter.Interface.GetMountRefs(targetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
for _, path := range list {
filePrefix := "/var/lib/kubelet/pods/"
blkPrefix := "/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish/"
if strings.HasPrefix(path, filePrefix) || strings.HasPrefix(path, blkPrefix) {
needToLogout = false
break
}
}
if err := ns.Mounter.Interface.Unmount(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if err := os.Remove(targetPath); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to remove target path.")
}
if needToLogout {
ns.logoutTarget(volumeId)
}
return &csi.NodeUnpublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
log.Debugf("Using default NodeGetInfo, ns.Driver.nodeID = [%s]", ns.Driver.nodeID)
return &csi.NodeGetInfoResponse{
NodeId: ns.Driver.nodeID,
}, nil
}
func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
return &csi.NodeGetCapabilitiesResponse{
Capabilities: ns.Driver.nsCap,
}, nil
}
func (ns *nodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
volumeId, volumePath := req.GetVolumeId(), req.GetVolumePath()
if volumeId == "" || volumePath == "" {
return nil, status.Error(codes.InvalidArgument, "Invalid Argument")
}
k8sVolume := ns.dsmService.GetVolume(volumeId)
if k8sVolume == nil {
return nil, status.Error(codes.NotFound,
fmt.Sprintf("Volume[%s] is not found", volumeId))
}
notMount, err := mount.IsNotMountPoint(ns.Mounter.Interface, volumePath)
if err != nil || notMount {
return nil, status.Error(codes.NotFound,
fmt.Sprintf("Volume[%s] does not exist on the %s", volumeId, volumePath))
}
lun := k8sVolume.Lun
return &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
&csi.VolumeUsage{
Available: int64(lun.Size - lun.Used),
Total: int64(lun.Size),
Used: int64(lun.Used),
Unit: csi.VolumeUsage_BYTES,
},
},
}, nil
}
func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
volumeId, volumePath := req.GetVolumeId(), req.GetVolumePath()
sizeInByte, err := getSizeByCapacityRange(req.GetCapacityRange())
if volumeId == "" || volumePath == "" {
return nil, status.Error(codes.InvalidArgument, "InvalidArgument: Please check volume ID and volume path.")
}
k8sVolume := ns.dsmService.GetVolume(volumeId)
if k8sVolume == nil {
return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume[%s] is not found", volumeId))
}
if err := ns.Initiator.rescan(k8sVolume.Target.Iqn); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to rescan. err: %v", err))
}
isBlock := req.GetVolumeCapability() != nil && req.GetVolumeCapability().GetBlock() != nil
if isBlock {
return &csi.NodeExpandVolumeResponse{
CapacityBytes: sizeInByte}, nil
}
volumeMountPath := ns.getVolumeMountPath(volumeId)
if volumeMountPath == "" {
return nil, status.Error(codes.Internal, "Can't get volume mount path")
}
ok, err := mount.NewResizeFs(ns.Mounter.Exec).Resize(volumeMountPath, volumePath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if !ok {
return nil, status.Error(codes.Internal, "Failed to expand volume filesystem")
}
return &csi.NodeExpandVolumeResponse{
CapacityBytes: sizeInByte}, nil
}

113
pkg/driver/utils.go Normal file
View File

@@ -0,0 +1,113 @@
/*
Copyright 2021 Synology Inc.
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"context"
"fmt"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
log "github.com/sirupsen/logrus"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"google.golang.org/grpc"
"k8s.io/utils/exec"
"k8s.io/mount-utils"
)
func ParseEndpoint(ep string) (string, string, error) {
if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
s := strings.SplitN(ep, "://", 2)
if s[1] != "" {
return s[0], s[1], nil
}
}
return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
}
func NewControllerServer(d *Driver) *controllerServer {
return &controllerServer{
Driver: d,
dsmService: d.DsmService,
}
}
func NewNodeServer(d *Driver) *nodeServer {
return &nodeServer{
Driver: d,
dsmService: d.DsmService,
Mounter: &mount.SafeFormatAndMount{
Interface: mount.New(""),
Exec: exec.New(),
},
Initiator: &initiatorDriver{
chapUser: "",
chapPassword: "",
},
}
}
func NewIdentityServer(d *Driver) *identityServer {
return &identityServer{
Driver: d,
}
}
func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *csi.VolumeCapability_AccessMode {
return &csi.VolumeCapability_AccessMode{Mode: mode}
}
func NewControllerServiceCapability(cap csi.ControllerServiceCapability_RPC_Type) *csi.ControllerServiceCapability {
return &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: cap,
},
},
}
}
func NewNodeServiceCapability(cap csi.NodeServiceCapability_RPC_Type) *csi.NodeServiceCapability {
return &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: cap,
},
},
}
}
func RunControllerandNodePublishServer(endpoint string, d *Driver, cs csi.ControllerServer, ns csi.NodeServer) {
ids := NewIdentityServer(d)
s := NewNonBlockingGRPCServer()
s.Start(endpoint, ids, cs, ns)
}
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log.Infof("GRPC call: %s", info.FullMethod)
log.Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
resp, err := handler(ctx, req)
if err != nil {
log.Errorf("GRPC error: %v", err)
} else {
log.Infof("GRPC response: %s", protosanitizer.StripSecrets(resp))
}
return resp, err
}