hisamounaのブログ

アウトプットを習慣化するためのブログ

cluster autoscalerがNodeGroup情報をキャッシュしていないか、コードを読んで確認

前提条件

クラスタ : EKS(v1.21.1)

NodeGroup : ASG

なぜ、キャッシュしているのではという仮定にいたったのか

  • 特定条件の時、ASGがスケールアウトしない

ASGの起動台数が0の時、ephemeral-storageをrequestsで要求しているpodのためにcluster autoscalerがASGをスケールアップしてくれない事象に遭遇しました。

ASGの起動台数が1以上の時はスケールできていました。起動台数を1から0にした場合も、スケールできました。

issueも出ていたので、自分の環境が原因というわけではなさそうです。

reason: Insufficient ephemeral-storageというエラーログが出力されます。

  • cluster autoscalerがNodeGroupをキャッシュしている?

一度起動したnodeからNodeGroup情報を取得していて、次回以降どのNodeGroupをスケールさせるかの条件に 利用しているのではと推測しました。

実際に、推測通りの挙動をしているかコードを読んで確認してみました。

今回は、キャッシュしているかどうかの部分に焦点を当てているので、その他の処理については詳しく追っていません。

コードリーディング

実際にスケールアップを実行している関数

nodeGroupの情報は nodeInfosForGroupsで関数に渡していそう

scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, a.ignoredTaints)

コード

nodeInfosForGroups変数を作成している箇所

nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.ignoredTaints)

コード

Processメソッドを見てみましょう。

ちなみに、作成したあとに別のProcessメソッドを呼び出していますが、ここでは特に何もしていませんでした。

nodeInfosForGroups, err = a.processors.NodeInfoProcessor.Process(autoscalingContext, nodeInfosForGroups)

                                |
                                V

// processors/nodeinfos/node_info_processor.go
func (p *NoOpNodeInfoProcessor) Process(ctx *context.AutoscalingContext, nodeInfosForNodeGroups map[string]*schedulerframework.NodeInfo) (map[string]*schedulerframework.NodeInfo, error) {
        return nodeInfosForNodeGroups, nil
    }

コード

Processの処理

変数nodesはkubectl get nodesで出力されたnodeのうち、正常に起動しているnode一覧と考えていただいて問題ないです。

resultを初期化しています。こちらは、nodeInfoのmapです。

nodesの要素ごとに、条件を満たした際に、nodeInfoCacheにnodeInfoをセットしています。

added == true は、processNode関数を確認すると、nodeが所属するASGがまだresultに追加されていないときに満たされることがわかります。

p.nodeInfoCache != nilは、NewMixedTemplateNodeInfoProvider関数を実行して、MixedTemplateNodeInfoProviderを作成している場合は条件にマッチします。

func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) {
    ...
    result := make(map[string]*schedulerframework.NodeInfo)
    ...
    for _, node := range nodes {
        ...
        added, id, typedErr := processNode(node)
        ...
        if added && p.nodeInfoCache != nil {
            if nodeInfoCopy, err := utils.DeepCopyNodeInfo(result[id]); err == nil {
                p.nodeInfoCache[id] = nodeInfoCopy
            }
        }
    }

コード

nodeInfoCacheにnodeInfoをセットしたあとに、NodeGroupsごとに処理が実行されます。

nodeInfoCacheのキーにnodeGroupのidがあれば、nodeInfoCacheの値をresultにセットします。

nodeInfoCacheになければ、NodeGroupの情報を元にnodeInfoが作成され、resultにセットされるようです。

なので、providerとしてAWSを利用している場合、 kubectl get nodeが取得されたnode情報のキャッシュ → (キャッシュがないASGの場合は、)ASGのデータ(タグなど)が

nodeInfoとして利用される認識で間違いないと思います。

for _, nodeGroup := range ctx.CloudProvider.NodeGroups() {
        id := nodeGroup.Id()
        ...

        // No good template, check cache of previously running nodes.
        if p.nodeInfoCache != nil {
            if nodeInfo, found := p.nodeInfoCache[id]; found {
                if nodeInfoCopy, err := utils.DeepCopyNodeInfo(nodeInfo); err == nil {
                    result[id] = nodeInfoCopy
                    continue
                }
            }
        }

        // No good template, trying to generate one. This is called only if there are no
        // working nodes in the node groups. By default CA tries to use a real-world example.
        nodeInfo, err := utils.GetNodeInfoFromTemplate(nodeGroup, daemonsets, ctx.PredicateChecker, ignoredTaints)
        ...
        result[id] = nodeInfo
    }

MixedTemplateNodeInfoProviderはcluster autoscalerの起動のときに初期化されるので(intervalのたびに作成されない)、podが起動している限りはキャッシュを持ち続けるようです。

感想

実際にキャッシュとしてnode情報を保持していることがわかりました。

なので、すでに存在しているASGに ephemeral-storageタグを付与したとしても、cluster autoscalerのpodを再作成しないと、podのresource.requestsにephemeral-storage: ~と書いても期待通りNodeGroupがスケールしてくれなそうです。

誤った解釈がありましたら、ご指摘いただけると非常に助かります。

DNSがよくわかる教科書を読んだついでに、DNSについて色々と調べてみました

www.amazon.co.jp

年末年始でゆっくり本を読むことができたので、DNSについて改めて勉強しました。

こちらの本はDNSの基礎や運用していく上での知識などを体系的に学ぶことができ大変おすすめです。

以下、学んだことを書きつつ、DNSについて色々と調べてみました。

DNSが作られた背景

IPアドレスは「通信したい相手の指定」と「通信してきた相手の判別」に使われます。

通信相手の指定をIPアドレスで行うと、数字の羅列のため覚えづらく使いにくいです。

その問題を解消させるために、ホストの指定を名前(ドメイン名)で行えるようにするのに開発された システムが DNS です。

IPアドレスドメイン名があることで、システムの移行においてもメリットがあると感じました。

ドメイン名に紐づくIPアドレスを変更することで、システムの移行をクライアント側の手続きなしで行えるようになります。

IPアドレスでアクセスされている場合は、新しいシステムのIPアドレスを参照するようにクライアントに対応を依頼する必要があります。

リソースレコード

ドメイン名にIPアドレスの情報を設定する際は、IPv4IPv6の2種類それぞれ必要に応じてAリソースレコードとAAAAリソースレコードを設定します。

また、CNAMEリソースレコードを使うことで、ドメイン名に別のドメイン名を設定できます。

クラウドサービスではさらに便利な仕組みがあります。

例えば、AWSではalias recordがあります。

これを使い、Route53のドメインにA/AAAAリソースレコードにELBのドメイン名をセットします。

Route53ドメインの名前解決を試みると、ELBのIPアドレスが複数返却されます。

この仕組みにより、Route53を設定する人がELBのIPアドレスを気にする必要がなくなるため非常に便利です。

また、ELBはスケールしIPアドレスが変わることがよくありますが、都度Route53の設定を変更する必要がないので助かります。

Route53ドメイン名と紐づくホスト名の設定だけ見ると、まるでCNAMEリソースレコードのように見えますが、A/AAAAリソースレコードでこの仕組みを実現しているのが面白いです。

スタブリゾルバー

RFC1034ではスタブリゾルバーが必要とするのはネームサーバのリストであると書かれています。

All that the remaining stub needs is a list of name server addresses that will perform the recursive requests.

linuxmacでは /etc/resolv.confにネームサーバのリストが記載されているので、スタブリゾルバーはおそらくこのファイルを参照しにいくことが推測されます。

試しに、自分のMac/etc/resolv.confを見てみました。

$ cat /etc/resolv.conf
# This file is automatically generated.
#
search flets-east.jp iptvf.jp
nameserver 2404:1a8:7f01:b::3
nameserver 2404:1a8:7f01:a::3
nameserver 192.168.0.1

nameserverの上2つはNTT東日本DNSサーバのアドレスです。西日本用のアドレスもあります。参考

192.168.0.1はデフォルトゲートウェイで、ルータやモデムがadmin画面にログインするために利用されます。参考

ネットワークの設定を切ると、自動的に/etc/resolv.confの書き込みも消されます。

$ cat /etc/resolv.conf
# This file is automatically generated.
#

Goのスタブリゾルバーの実装

Goでスタブリゾルバーの設定を定義している構造体です。

Go実行時にresolvConf変数が作成され、 lookupメソッドが実行された際に/etc/resolv.confを読み込みサーバ情報などを取得しているようです。

// A resolverConfig represents a DNS stub resolver configuration.
type resolverConfig struct {
    initOnce sync.Once // guards init of resolverConfig

    // ch is used as a semaphore that only allows one lookup at a
    // time to recheck resolv.conf.
    ch          chan struct{} // guards lastChecked and modTime
    lastChecked time.Time     // last time resolv.conf was checked

    mu        sync.RWMutex // protects dnsConfig
    dnsConfig *dnsConfig   // parsed resolv.conf structure used in lookups
}

var resolvConf resolverConfig

コード

10月にfacebookで起こった障害

2021/10/05にfacebookのサービス(InstagramやWhatsApp)のDNS名を解決出来なくなっていました。(障害の詳細レポート)

定期メンテナンス時に発行されたコマンドに誤りがあったことによって、Facebook内部のbackbone(internetからサーバーへの通信の中継点)からサーバーへの通信が切れてしまったようです。

その通信の断絶を検知し、Facebookが所有するDNSサーバがBGPによってFacebookへのルーティングをストップさせたことで、 インターネットからFacebookのサービスのドメインの名前解決ができなくなったようです。

障害の調査に利用する内部ツール自体も利用できなくなってしまい、障害解決に時間がかかったしまったみたいです。

CloudFlareの記事で、外部サービス(CloudFlare)かの障害状況レポートが報告されており、何が起こったかを知ることができるので一読をおすすめします。

DNS関連の障害の時は常ですがユーザやアプリケーションからリトライ通信が多発していたようですが、今回のケースではCloudFlareのDNSサーバに通常時の30倍のクエリがあったようで、DNSサーバを自前運用することの大変さを感じました。

感想

色々と調べていくうちにまとまりがない記事になってしまいました、、、本を読み色々と調べていけば行くほど、DNSの奥深さを感じていきます。

Facebookの障害からもDNSが障害の起因になりやすく、その影響範囲も甚大であることがわかります。

DNSを運用していくことの責任の重大さをひしひしと感じます。AWSがRoute53のSLAを100%に定めていることから、ベンダーも同じように認識していそうです。(Route53 SLA)

今後、新たに学んだことを追記していきたいと思います。

cluster-autoscalerのコードを読む (ASG一覧をどうやって取得しているか)

EKSでcluster-autoscalerを利用してASGをスケールさせているのですが、詳しい挙動を把握していなかったので調べてみました。EKSなので、今回はAWS環境に特化しています。

Karpenterについても気になっているので近々触れてみたいと考えています。

この記事では、ASG一覧をどうやって取得しているかを見ていきます。

podを割り当てるためにどのASGをスケールさせるかについても記事にしたいと思います。

cluster-autoscalerはscalInterval秒ごとに処理( RunOnce )を実行している

コード

for {
        select {
        case <-time.After(*scanInterval):
            {
                loopStart := time.Now()
                metrics.UpdateLastTime(metrics.Main, loopStart)
                healthCheck.UpdateLastActivity(loopStart)

                err := autoscaler.RunOnce(loopStart)

client-goを使って、node一覧を取得しています。

allNodes, readyNodes, typedErr := a.obtainNodeLists(a.CloudProvider)

どうやってcluster-autoscalerはターゲットとなるASGsを取得するのか

refreshInterval(1分)ごとにASG一覧を取得 (コード)

refreshIntervalはハードコーディングされているので、起動引数で変更することはできなそうです。ASGを削除してから1分以内にスケールイベントが発生するときはエラーが発生する可能性がありそうです。

func (m *AwsManager) Refresh() error {
    if m.lastRefresh.Add(refreshInterval).After(time.Now()) {
        return nil
    }
    return m.forceRefresh()
}

指定したtagsを元にASG一覧を取得

tagsはASGの起動引数(--node-group-auto-discovery)で指定しています。

e.g. --node-group-auto-discovery=asg:tag=k8s.io/cluster-autoscaler/enabled,k8s.io/cluster-autoscaler/sample-eks-cluster

cluster-autoscalerが、引数の値をparseして、asg:tag=の右の値の箇所(k8s.io...,k8s.io...)をカンマ区切りでリスト化しています。 (コード)

aws-sdk-goを使って指定したTagを満たすASG一覧をフェッチ(コード)

tagごとにASG一覧を取得します。

err := m.DescribeTagsPages(input, func(out *autoscaling.DescribeTagsOutput, _ bool) bool {
        tags = append(tags, out.Tags...)
        // We return true while we want to be called with the next page of
        // results, if any.
        return true
})

すべてのtagを満たすASGをasgNamesにセット

asgNames := []string{}
asgNameOccurrences := make(map[string]int)
for _, t := range tags {
    asgName := aws.StringValue(t.ResourceId)
    occurrences := asgNameOccurrences[asgName] + 1
    if occurrences >= len(kvs) {
        asgNames = append(asgNames, asgName)
    }
    asgNameOccurrences[asgName] = occurrences
}

return asgNames, nil

asgNamesをasgCacheにセットします。

ASGの情報を取得するために、再度aws-sdk-goを使って、describe autoscalingを叩いています。(getAutoscalingGroupsByNames)

func (m *asgCache) regenerate() error {
    ...
    refreshNames, err := m.buildAsgNames()
    ...
    groups, err := m.awsService.getAutoscalingGroupsByNames(refreshNames)

Rustを使ってコンテナの実装を学ぶ (UID/GID の設定)

Rustを使ってコンテナの実装を学ぶの続きをやっていきたいと思います。

hisamouna.hatenablog.com

実行環境(再掲)

GCPVMインスタンスを1台起動させて、実行環境として利用しました。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Debian
Description:    Debian GNU/Linux 10 (buster)
Release:    10
Codename:   buster

$ arch
x86_64

UID/GID の設定

設定する前のUID/GIDを確認すると、nobody/nogroupとなっています。

$ id
uid=26038 gid=65534(nogroup) groups=65534(nogroup)

これは CLONE_NEWUSERフラグを付与してclone(2)を実行したことで user namespaceが分離されたからです。

/proc/[pid]/uid_map と /proc/[pid]/gid_mapへ書き込みを行うことで、namespace内外のuser間でのmappingを設定できます。 (参考)

UID/GIDを0(root)に変更してみたいと思います。

実装

/proc/[pid]/uid_map と /proc/[pid]/gid_mapに書き込みを行う関数を作成しました。

write_id_mapping でmappingフォーマットを定義しています。

フォーマット: ns内のID ns外のID length

write_fileでファイル書き込みを行います。

fn write_id_mapping(
    container_id: u32,
    host_id: u32,
    length: u8,
    map_file: &str,
) -> Result<()> {
    let mapping = format!("{} {} {}", container_id,host_id,length);
    write_file(map_file,mapping)?;
    Ok(())
}

fn write_file<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> Result<()> {
    let path = path.as_ref();
    fs::write(path,contents).with_context(|| format!("failed to write to {:?}",path))?;
    Ok(())
}

子プロセスを生成した後に、mapping処理を実装しました。

let child_pid = sched::clone(cb, stack, flags, Some(Signal::SIGCHLD as i32)).expect("failed to create child process");

let root_id = 0;

let uid_map_file : &str = &format!("/proc/{}/uid_map",child_pid);
let host_uid = unsafe { libc::getuid() };
if let Err(err) = write_id_mapping(root_id,host_uid, 1,uid_map_file) {
    eprintln!("UID mapping failed: {}",err)
}

let host_gid = unsafe { libc::getgid() };
let gid_map_file : &str = &format!("/proc/{}/gid_map",child_pid);
if let Err(err) = write_id_mapping(root_id,host_gid, 1,gid_map_file) {
    eprintln!("GID mapping failed: {}",err)
}

if let Err(err) = ... は Goでいう if err := func(); err != nil { ... } と同じようなことが出来ます。 より短くかけるので良きです。

ビルドして実行してみたところ、uid/gidが0となりました!

$ id
uid=0(root) gid=0(root) groups=0(root)

Rustを使ってコンテナの実装を学ぶ

普段なんとなくコンテナを利用しているのですが、内部でどういったことをしているのかあまり分かっていなかったので勉強しました。

(内部のロジックを知らずに扱えるDockerの抽象度の高さに改めて感心しました。)

クックパッドさんが、Linuxカーネルの機能を用いてコンテナの実装を学ぶための資料をアップしていました。詳細に説明がされており、大変勉強になりました。

GitHub - rrreeeyyy/container-internship

せっかくなので、Rustを使ってコンテナ機能の一部を実装してみました。

名前空間の分離

/bin/shを起動する際にclone(2)に渡すflagsの値によって、IPC,Network,Userなどの名前空間を分離することができます。

改めて、shとは

man shの実行結果を確認 (DistributionがDebianなので、dash)

shは、ファイルまたはターミナルからコマンドを読み込み、解釈し、実行するコマンド。

shellが実行 -> 完了までの流れはこちらの記事が大変参考になりました。

shell実行のために子プロセスが作られた後はどうなるのだろうと疑問に思っていましたが、子プロセスがexit(0)をcallしてプロセスをtermiateさせ、

親プロセスでcallしたwait()で子プロセスのstateの変更を検知して、子プロセスのために割り当てたリソースがリリースされるということを学びました。

Rustで名前空間の分離を実装

実行環境

GCPVMインスタンスを1台起動させて、実行環境として利用しました。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Debian
Description:    Debian GNU/Linux 10 (buster)
Release:    10
Codename:   buster

$ arch
x86_64

実装

syscallを呼び出すために、nixを利用しました。libc crateによるunsafeなAPIをsafeな形で提供してくれているようです。

まずは、flagsに CLONE_IOのみセットしました。

use std::process::Command ;
use nix::sys::signal::Signal;
use nix::sched;
use nix::sys::wait::waitpid;


fn run(cmd: &str) -> isize {
    let exit_status = Command::new(cmd)
        .spawn().expect("failed to execute container command")
        .wait().unwrap();
    match exit_status.code() {
        Some(code) => code as isize,
        None => -1
    }
}

fn main() {
    const STACK_SIZE: usize = 1024 * 1024;
    let stack: &mut [u8; STACK_SIZE] = &mut [0; STACK_SIZE];

    let cmd :&str = "/bin/sh";
    let cb = Box::new(|| run(cmd));

    let flags = sched::CloneFlags::CLONE_IO;

    let child_pid = sched::clone(cb, stack, flags, Some(Signal::SIGCHLD as i32)).expect("failed to create child process");
    waitpid(child_pid,None).expect("faile to wait pid");
}

cloneで子プロセスを生成しています。

そして、waitpidメソッドが引数でセットされた子プロセスのstatusを変更を観測しています。

子プロセスの中で、 ip addr show, id を実行してみました、

ネットワーキングの名前空間が分離されていないので、NetworkInterface(ens4)が子プロセスも確認できます。

UID, GIDも親プロセスと同じ値が確認できます。

$ ip addr show
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
       valid_lft forever preferred_lft forever
    inet6 ::1/128 scope host
       valid_lft forever preferred_lft forever
2: ens4: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1460 qdisc mq state UP group default qlen 1000
...

$ id
uid=1000(xxxxx) gid=1001(xxxxx) groups=1001(xxxxx),,,

flagsにセットするCloneFlagsを追加し、NetworkingとUser名前空間を分離

let flags = sched::CloneFlags::CLONE_NEWUSER
        | sched::CloneFlags::CLONE_NEWNET
        | sched::CloneFlags::CLONE_NEWIPC
        | sched::CloneFlags::CLONE_IO;

もう一度、ip addr show, id を実行してみました。

見事に名前空間が分離されていることを確認できました。

$ ip addr show
1: lo: <LOOPBACK> mtu 65536 qdisc noop state DOWN group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
$ id
uid=65534(nobody) gid=65534(nogroup) groups=65534(nogroup)

残りのコンテナ実装についても引き続きRustで実装してみたいと思います。

argo-rolloutsのPromoteについて調べる

argocdのGUI上でargo-rolloutsリソースに対してできるオペレーションに、 PROMOTEPROMOTE-FULLがありますが、 ドキュメントをざっと調べた限り、詳細な挙動が分からなかったのでコードを見てみました。

画面のサンプル

利用できない操作は半透明にして非活性化されています。

f:id:hisamouna:20211121190106p:plain
argo-rollouts-dashboard

argo-rolloutsとは

Deploymentリソースに対してblue-greenやcanaryといった拡張機能を追加することができるCRDsとcontrollerを提供してくれています。

github.com

Deploymentの代わりとなるリソースのため、v1.0からは Workload Referencingと呼ばれる既存のDeploymentを参照できる機能も導入されているようです。

blog.argoproj.io

PromoteのAPIがcallされるまで

ブラウザで、 PROMOTEまたはPROMOTE-FULLボタンをクリック -> promoteのAPIをfetchしていました。

長くなるので、フロントエンドの詳細は今回は割愛します。

APIの定義

APIのインターフェースを確認してみました。 ブラウザから叩かれるAPIでもあるので、grpc-gatewayを利用して、RESTのインターフェースも定義しています。

rpc PromoteRollout(PromoteRolloutRequest) returns (github.com.argoproj.argo_rollouts.pkg.apis.rollouts.v1alpha1.Rollout) {
    option (google.api.http) = {
        put: "/api/v1/rollouts/{namespace}/{name}/promote"
        body: "*"
    };
}

https://github.com/argoproj/argo-rollouts/blob/master/pkg/apiclient/rollout/rollout.proto#L187-L192

APIの実装

ProtoBufから生成されたgoファイル(rollout.pb.go)で書かれているinterfaceを満たすstructを生成し、登録しています。

func (s *ArgoRolloutsServer) newGRPCServer() *grpc.Server {
    grpcS := grpc.NewServer()
    var rolloutsServer rollout.RolloutServiceServer = NewServer(s.Options)
    rollout.RegisterRolloutServiceServer(grpcS, rolloutsServer)
    return grpcS
}

https://github.com/argoproj/argo-rollouts/blob/master/server/server.go#L130-L135

interfaceで定義しているPromoteRolloutメソッドを実装している箇所。

rolloutIfがrolloutのデータや振る舞いが定義されています。

Patch処理もここで定義されていますが、内部ではclient-goを利用していました。

実際に、Promoteも行うメソッドが promote.PromoteRolloutです。

func (s *ArgoRolloutsServer) PromoteRollout(ctx context.Context, q *rollout.PromoteRolloutRequest) (*v1alpha1.Rollout, error) {
    rolloutIf := s.Options.RolloutsClientset.ArgoprojV1alpha1().Rollouts(q.GetNamespace())
    return promote.PromoteRollout(rolloutIf, q.GetName(), false, false, q.GetFull())
}

https://github.com/argoproj/argo-rollouts/blob/master/server/server.go#L384-L387

Promoteのロジック

https://github.com/argoproj/argo-rollouts/blob/master/pkg/kubectl-argo-rollouts/cmd/promote/promote.go

メソッド呼び出し元で、skipCurrentStepとskipAllStepsはfalseにしているので、ここのif処理は常にスキップされます。

trueをセットした呼び出しはtestコードしかなさそうでした。

if skipCurrentStep || skipAllSteps {
    if ro.Spec.Strategy.BlueGreen != nil {
        return nil, fmt.Errorf(skipFlagsWithBlueGreenError)
    }
    if ro.Spec.Strategy.Canary != nil && len(ro.Spec.Strategy.Canary.Steps) == 0 {
        return nil, fmt.Errorf(skipFlagWithNoStepCanaryError)
    }
}

v0.9とv0.10+で互換性を持たせるために getPatchesを実行しています。

status subresourcesと呼ばれるCRDがv0.10からのみ使用されているからのようです。

specPatch, statusPatch, unifiedPatch := getPatches(ro, skipCurrentStep, skipAllSteps, full)

PROMOTE-FULLボタンをクリックしたときはstatus.promoteFullフィールドにtrueをセット。

case full:
        if rollout.Status.CurrentPodHash != rollout.Status.StableRS {
            statusPatch = []byte(promoteFullPatch)
        }

PROMOTEボタンをクリックしたときはもう少し複雑です。

rolloutがpaused状態であったらpausedフィールドをfalseにし、clearPauseConditionsPatchフィールドに値があればnullをセットしています。 paused状態で処理がpendingとなっているのをクリアにするようにしています。

if rollout.Spec.Paused {
            specPatch = []byte(unpausePatch)
        }
        if len(rollout.Status.PauseConditions) > 0 {
            statusPatch = []byte(clearPauseConditionsPatch)

strategyをcanaryにしている場合はcanaryのstepを1つ進めるようにindexをインクリメントしています。

_, index := replicasetutil.GetCurrentCanaryStep(rollout)

if index != nil {
    if *index < int32(len(rollout.Spec.Strategy.Canary.Steps)) {
        *index++
    }
    statusPatch = []byte(fmt.Sprintf(clearPauseConditionsPatchWithStep, *index))
    unifiedPatch = []byte(fmt.Sprintf(unpauseAndClearPauseConditionsPatchWithStep, *index))
}

いざ、Patch実行

client-goを使って、Patchをapi-serverに対してリクエストしています。

if statusPatch != nil {
    ro, err = rolloutIf.Patch(ctx, name, types.MergePatchType,
    ...
}
if specPatch != nil {
    ro, err = rolloutIf.Patch(ctx, name, types.MergePatchType, 
    ...
}

どういったときに利用できるのか

B/Gを利用している場合であれば、 postPromotionAnalysisが一時的な何らかの問題で失敗してしまった時に手動で処理を進めたい際に利用できそうです。

promoteを使えば、rolloutを最初からやり直し(podの再作成)することなく済むかと思います。

Canaryであれば、例えば sleep 120sのstepをスキップしたいときなどに使えそうです。

Rust製WebサーバにCircuitBreakerを入れてみる

Rust製WebサーバにCircuitBreakerを導入してみました。

最近だと、proxy(e.g. envoy)にCircuitBreakerを入れるパターンもあるかと思いますが、今回はアプリケーションに入れるパターンです。

使用するライブラリ

Rocket: RustでWebFrameworkといえば定番になるかと思います。

github.com

failsafe-rs: Rust circuitbreaker で検索したときに一番最初に出てきました。

github.com

実装

コード

CircuitBreakerのstateに応じてレスポンスを分ける。

match state.circuit.call(|| hello(query.name)) {
    Err(Error::Inner(_)) => {
        eprintln!("fail");
        return Err(Status::InternalServerError)
    },
    Err(Error::Rejected) => {
        eprintln!("rejected");
        return Err(Status::ServiceUnavailable)
    },
    Ok(x) => {
        return Ok(x)
    }
}

CircuitBreakerの設定をどこで定義するかに悩みました。

Golangであれば、Globalの変数を定義してStatefulに管理しようとしましたが、Rustは変数のscopeを厳密にする必要があるため同じことができませんでした。

軽く調べた限り(issue)、 Heapへのallocationは開発者が直接操作できず、runtimeでしか行われない。

NGパターン

static mut bbo :Exponential = backoff::exponential(Duration::from_secs(3), Duration::from_secs(30));
static mut pl :ConsecutiveFailures<failsafe::backoff::Exponential> = failure_policy::consecutive_failures(3, bbo);
static mut cb :StateMachine<failsafe::failure_policy::ConsecutiveFailures<failsafe::backoff::Exponential>, ()>= Config::new()
    .failure_policy(pl)
    .build();

エラーメッセージ:

calls in statics are limited to constant functions, tuple structs and tuple variants

至極まっとうなメッセージでした。

rocketのStateを使って解決

struct RocketState {
    circuit : StateMachine<ConsecutiveFailures<Exponential>, ()>
}

#[launch]
fn rocket() -> _ {
    let back_off = backoff::exponential(Duration::from_secs(30), Duration::from_secs(60));
    let policy = failure_policy::consecutive_failures(3, back_off);
    let circuit_breaker = Config::new()
        .failure_policy(policy)
        .build();
    let hystrix_conf = RocketState{circuit: circuit_breaker};
    rocket::build()
        .manage(hystrix_conf)
    .
    .
    .

動作確認

│rocket_circuitbreaker_trial on  main
└─> cargo run
.
.
.

🛰  Routes:
   >> (api_hello) GET /hello?<query..>
📡 Fairings:

# 別ターミナルで
└─> curl -I -XGET 'http://localhost:8000/hello?name=hello'
HTTP/1.1 200 OK
content-type: text/plain; charset=utf-8
server: Rocket
permissions-policy: interest-cohort=()
x-content-type-options: nosniff
x-frame-options: SAMEORIGIN
content-length: 5
date: Tue, 26 Oct 2021 05:35:59 GMT

┌───────────────────>
│~
└─> curl -I -XGET 'http://localhost:8000/hello?name=error'
HTTP/1.1 500 Internal Server Error
content-type: text/html; charset=utf-8
server: Rocket
permissions-policy: interest-cohort=()
x-content-type-options: nosniff
x-frame-options: SAMEORIGIN
content-length: 436
date: Tue, 26 Oct 2021 05:36:06 GMT

┌───────────────────>
│~
└─> curl -I -XGET 'http://localhost:8000/hello?name=error'
HTTP/1.1 500 Internal Server Error
content-type: text/html; charset=utf-8
server: Rocket
permissions-policy: interest-cohort=()
x-content-type-options: nosniff
x-frame-options: SAMEORIGIN
content-length: 436
date: Tue, 26 Oct 2021 05:36:06 GMT

┌───────────────────>
│~
└─> curl -I -XGET 'http://localhost:8000/hello?name=error'
HTTP/1.1 500 Internal Server Error
content-type: text/html; charset=utf-8
server: Rocket
permissions-policy: interest-cohort=()
x-content-type-options: nosniff
x-frame-options: SAMEORIGIN
content-length: 436
date: Tue, 26 Oct 2021 05:36:09 GMT

# num_failures(3)を超えたので 503エラーに
┌───────────────────>
│~
└─> curl -I -XGET 'http://localhost:8000/hello?name=error'
HTTP/1.1 503 Service Unavailable
content-type: text/html; charset=utf-8
server: Rocket
permissions-policy: interest-cohort=()
x-content-type-options: nosniff
x-frame-options: SAMEORIGIN
content-length: 397
date: Tue, 26 Oct 2021 05:36:10 GMT

# stateがclosedとなっているので、503エラーに
┌───────────────────>
│~
└─> curl -I -XGET 'http://localhost:8000/hello?name=hello'
HTTP/1.1 503 Service Unavailable
content-type: text/html; charset=utf-8
server: Rocket
permissions-policy: interest-cohort=()
x-content-type-options: nosniff
x-frame-options: SAMEORIGIN
content-length: 397
date: Tue, 26 Oct 2021 05:36:17 GMT

# stateがhalf openとなったことで、200 OKに
┌───────────────────>
│~
└─> curl -I -XGET 'http://localhost:8000/hello?name=hello'
HTTP/1.1 200 OK
content-type: text/plain; charset=utf-8
server: Rocket
permissions-policy: interest-cohort=()
x-content-type-options: nosniff
x-frame-options: SAMEORIGIN
content-length: 5
date: Tue, 26 Oct 2021 05:36:24 GMT

今のままだと、全APIでひとつのCircuitBreakerのStateに左右されてしまうので、 RocketStateAPIごとのCircuitBreakerの設定フィールドを用意するなど工夫が必要かと思います。

ご指摘あれば、細かいところでも教えていただけると助かります。