Kubernetes × envoy × gRPC環境下でのGraceful Shutdownの流れ
- はじめに
- 何について書くか・前提
- Graceful Shutdownの実装
- Kubernetesのライフサイクルについて
- envoyサイドカー導入時の注意点
- Readiness Probeについて
- おわりに
はじめに
ABEMAの広告プロダクト開発チームで内定者バイトをしていた林です!
この記事はCyberAgentメディア事業部の広告横軸組織PTAのアドベントカレンダー18日目の記事です!
何について書くか・前提
Kubernetes × envoy × gRPC 環境下でのGraceful Shutdownに関して書いていきます。
Istioが導入されている環境でのGraceful Shutdownについてはいくつか記事が見つかったのですが、Headless Servicesを利用してenvoyをサイドカーにしている環境でのGraceful Shutdownについては記事が少なく?Istioがあるかどうかで内容も変わりそうなのでまとめてみたかったというようなモチベーションです。
以下のようなイメージで、Kubernetes上でenvoyプロキシを使ってgRPCのアプリケーションが動いているマイクロサービスを考えます。
KubenetesのHeadless Servicesを利用してServiceが持つPodのIPアドレス一覧を返して、envoyでバランシングしているような状況です。
Graceful Shutdownの実装
一般的な話として、サーバーを停止する際に停止が指示された時点で新たなリクエストは受け入れず既に受け付けていた処理中のリクエストは処理・レスポンスしてからサーバーを停止することをGraceful Shutdownと言います。
これによって処理中のリクエストを捌ききってから安全にサーバーを停止することができます。
Graceful Shutdownの具体的なユースケースは、運用中のサービスだと例えばリリース時にPodをrolloutするときなどが該当します。 Podで処理中のリクエストがある状態でGracefulにPodが終了されない場合、クライアント側で500系エラーが頻発してしまいます。
GoのGraceful Shutdownの実装
Goのnet/httpやnet/httpをラップしているechoではShutdownメソッドがGracefulになるように実装されています。
https://pkg.go.dev/net/http#Server.Shutdown
https://pkg.go.dev/github.com/labstack/echo/v4#Echo.Shutdown
一方でGoのgrpcパッケージはGracefulではないShutdownのメソッドも提供しています。
https://pkg.go.dev/google.golang.org/grpc@v1.45.0#Server.Stop
https://pkg.go.dev/google.golang.org/grpc@v1.45.0#Server.GracefulStop
引数にコンテキストを取るかどうか、返り値にエラーが返るかどうかなどシグネチャも若干異なります。 gRPCサーバーの場合、例えば以下のようにGraceful Shutodownを実装できそうです。
func main() { ... ... ... // サーバーの初期化 server, err := newServer(cfg) if err != nil { logger.Fatalf("failed to setup server: %v", err) } // シグナルの受け取り stopChan := make(chan os.Signal, 1) signal.Notify(stopChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGTSTP, ) go func() { <-stopChan logger.Infof("start server shutdown") server.shutdown(cfg.GracefulPeriod) }() // サーバーの起動 if err := server.run(); err != nil { logger.Fatalf("failed to serve: %#v", err) } } func (s *server) shutdown(period time.Duration) { gracefulStopChan := make(chan bool, 1) go func() { logger.Infof("grpcserver: graceful shutdown is running") s.grpcServer.GracefulStop() gracefulStopChan <- true }() t := time.NewTimer(period) select { case <-gracefulStopChan: logger.Infof("grpcserver: graceful shutdown completed before timing out") case <-t.C: logger.Infof("graceful shutdown failed timeout, closing pending RPCs.") s.grpcServer.Stop() } }
ちなみにnet/httpやechoだと引数にContextを取るので以下リンク先のようにも実装できそうです。
Kubernetesのライフサイクルについて
アプリケーション側でSIGTERMなどのシグナルを受け取ってからGracefulにサーバーを停止する流れについて確認してきました。 次にシグナルの発行がどういう流れでされるかについて確認していきます。
kubectl rollout restart deployment名
の実行などでPodが終了するイベントが起きた場合を考えます。
いくつかの処理が走るのですがGraceful Shutdownに関わる部分を掻い摘んで書くと、kubeletによるプロセスのシャットダウンとサービスアウトが独立して実行されます。
kubeletによるプロセスのシャットダウンではpreStopフックを実行してからDockerデーモンにコンテナの終了(シグナルの発火)を依頼します。preStopフックはプロセス終了前に実行する事前処理のことでmanifestに記述します。
サービスアウトの処理ではServiceから終了対象のPodを除外してトラフィックの配送ルールを更新することによって、終了対象のPodに対して新規のTCPコネクションが貼られないようにします。
これらの処理は依存関係がなくそれぞれが独立して実行されるので、サービスアウトされる前にプロセスのシャットダウンがされてしまうと、リクエストがシャットダウン済みのプロセスにきてしまいエラーが発生してしまいます。 そこでpreStopフックには以下のようにsleepの処理を書くことで対処します。
apiVersion: apps/v1 kind: Deployment metadata: name: test-component spec: ... ... ... template: metadata: labels: app: test-component spec: ... ... ... containers: - args: - --config-path /etc/envoy/envoy.json name: vega-linear-deliver-sidecar-envoy command: - /usr/local/bin/envoy image: envoyproxy/envoy:v1.7.0 volumeMounts: - mountPath: /etc/envoy name: envoy ... ... ... name: test-component image: asia.gcr.io/test-project/test-component:latest lifecycle: preStop: exec: command: - sh - -c - sleep 10 ... ... ... volumes: - configMap: name: test-component-sidecar-envoy name: envoy
どれくらいの時間sleepする必要があるかは諸説ありそうですが下記記事によると10秒程度?で十分そうです。
It depends on the latency of your network and the nodes. You may need to perform some tests to find out this value. However, multiple sources suggest a value between 5-10 seconds should be enough for most cases.
envoyサイドカー導入時の注意点
Kubernetesのライフサイクルとアプリケーション側でのGraceful Shutdownの流れを確認してきました。
一見するとこれでGracefulにサーバーを停止できそうな気がします。
が、envoyをサイドカーとして導入している場合にはもう1つ処理が必要です。というのもKubernetesのライフサイクルで述べたPodの終了処理の中のkubeletによるプロセスのシャットダウンはPod内のコンテナごとに行われます。
そのためenvoyコンテナの方にもpreStopでsleep処理を書かないと、アプリ本体のコンテナより先にenvoyコンテナが終了してしまいます。 サービスアウトが完了する前にenvoyコンテナが終了してしまうので、新規リクエストがアプリケーション側に到達せずにエラーが発生してしまいます。
この問題に対処するためには、manifestのenvoyコンテナにもpreStopフックでSleep処理を書くと良さそうです。
Readiness Probeについて
Kubernetesにはコンテナがトラフィックを受け入れられる状態かどうかを認識する仕組みとしてReadiness Probeがあります。
Readiess Probeによって一定間隔でコンテナのヘルスチェックを行って、ヘルスチェックが失敗したPodはServiceのロードバランシングから切り離されてトラフィックを受信しないようになります。
これは自分が勘違いしていたポイントなのですが、Graceful Shutdownに関して言えば、Kubernetesのライフサイクルとして先ほども説明したようなサービスアウトが実行されるので、アプリケーション側でGraceful Shutdownを実装する際にわざわざヘルスチェックを失敗させるようにするような処理は必要ありません。
おわりに
Kubernetesのライフサイクルとアプリケーション側での処理とサービスメッシュの理解が深まり良い経験になりました!
ヒープと優先度付きキュー
競技プログラミングで度々必要になるヒープをまとめてみる。
ヒープとは
ヒープは木構造の1つで、二分木として表現される。
各ノードがその子ノードより小さい(or 大きい)か等しくなるように配置される。
そのため根ノードが最小(or 最大)になる。
このノードの関係によって、最小ヒープと最大ヒープの2種類がある。
index=1のノードnode(i)
と各ノードの関係は以下のようになる。
根ノード|
i = 1
(i = 0 の実装もあるのでその場合はj = i - 1
として ノードj について考える)親ノード|
parent(i) = i / 2
(小数は切り捨て)左の子ノード|
left(i) = 2i
右の子ノード|
right(i) = 2i + 1
例えば0008
は配列のindex=5に位置(node(5)
)している。
この0008
のノードの親ノードは0003
のノードでparent(5) = 5 /2 = 2
である。
左側の子ノードは0017
のノードでleft(5) = 2*5 = 10
である。
右側の子ノードは0019
のノードでright(5) = 2*5 + 1 = 11
である。
競技プログラミングでのヒープ
競技プログラミングでもヒープは度々必要になる。
特に最大値や最小値を取り出して都度何らかの操作をするパターンの問題ではヒープを便利に使うことができる。
それ以外にも、単純にスライスのソートをsort
パッケージのsort.Ints()
でやっていてTLEしてしまい、初めからスライスに格納するのではなくヒープに格納して最大値や最小値を求める方が早い場合がある。
数字以外にも属性が存在する場合は優先度付きキューを使う。
Goではcontainer/heap
パッケージでヒープや優先度付きキューが提供されている。
ヒープで実装する問題
以下の問題を考える。
この問題では最大値を求めて取り出し、割引をするという操作をクーポンの数だけ繰り返す必要がある。
この問題を単純に考えて、配列をfor文で回して最大値を求めるというやり方でやると、計算量はO(NM)かかる(1≤N,M≤105)のでTLEしてしまう。
そこでヒープを使って以下のように実装する。
package main import ( "bufio" "container/heap" "fmt" "os" "strconv" ) // An IntHeap is a min-heap of ints. type IntHeap []int func (h IntHeap) Len() int { return len(h) } func (h IntHeap) Less(i, j int) bool { return h[i] > h[j] } func (h IntHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *IntHeap) Push(x interface{}) { // Push and Pop use pointer receivers because they modify the slice's length, // not just its contents. *h = append(*h, x.(int)) } func (h *IntHeap) Pop() interface{} { old := *h n := len(old) x := old[n-1] *h = old[0 : n-1] return x } var scanner = bufio.NewScanner(os.Stdin) func nextInt() int { scanner.Scan() // トークンの読み込み i, e := strconv.Atoi(scanner.Text()) // 読み込んだトークンをTextメソッドで取り出し if e != nil { panic(e) } return i } func main() { scanner.Split(bufio.ScanWords) // 空白区切りで受け取るので引数にbufio.ScanwWordsを渡す N := nextInt() M := nextInt() h := &IntHeap{} heap.Init(h) for i := 0; i < N; i++ { heap.Push(h, nextInt()) } for i := 0; i < M; i++ { maxValue := heap.Pop(h).(int) discountedValue := maxValue / 2 heap.Push(h, discountedValue) } var sum int for h.Len() > 0 { sum += heap.Pop(h).(int) } fmt.Println(sum) }
最大公約数と最小公倍数の求め方
競プロでは最大公約数や最小公倍数が度々出題される。
今回は最大公約数や最小公倍数についてまとめてみる。
最大公約数はGreatest Common Divisor
を略してgcd
やg
と表記されることがある。
最小公倍数はLeast Common Multiple
を略してlcm
やl
と表記されることがある。
ユークリッドの互除法と最小公倍数
最大公約数や最小公倍数関連の問題ではユークリッドの互除法をほぼ使うので軽く説明しておく。
ユークリッドの互除法は2つの整数a, bの最小公約数を求めるアルゴリズム。
数IAでやったよね、懐かしい。
以下の手順で最小公約数を求める。
b = 0
の場合、a
を最小公約数とするb = 0
でない場合、再帰でb
とa をbで割った余り
の最小公約数を求める
コードで表現すると以下のようになる。
func gcd(a, b int) int { if b == 0 { return a } return gcd(b, a%b) }
具体例で考える
a = 6, b = 4
で考えてみる
この場合、a = 6, b = 4
→ a = 4, b = 2
→ a = 2, b = 0
となり、最小公約数として2
が返される。
a = 4, b = 6
で考えてみる
この場合、a = 4, b = 6
→ a = 6, b = 4
→ a = 4, b = 2
→ a = 2, b = 0
となり、最小公約数として2
が返される。
このように a と b の大小関係は気にしなくてOK
a = 2, b = 3
で考えてみる
この場合、a = 2, b = 3
→ a = 3, b = 1
→ a = 1, b = 0
となり、最小公約数として1
が返される。
最小公約数が1
のとき、「a
とb
は互いに素」と呼ぶ。
a = 2, b = 0
で考えてみる
この場合、最小公約数として2
が返される。
※ 0 = 2 × 0
となるように 2
は 0
の約数の 1つ。
a × b = g × l
最小公倍数は最大公約数を用いて求めることができる。
a, b の最大公約数を g、最小公倍数を l とおくと、
a × b = g × l
が成り立つ。
よって
最小公倍数 l = a / gcd(a, b) * b
と表される。
最大公約数を求める問題
以下の最大公約数を求める問題を考えてみる。
この問題ではN個の整数からN-1個の整数を選んで最大公約数の最大値を求める。
最大公約数の計算から除く1つの整数の選び方はN通りある。 最大公約数の計算から除く1つの整数を選んで、他の整数の最大公約数を求めると計算量はO(N2)になる。
2≤N≤105なのでこれではTLEしてしまう。
O(N)でこの問題を解くには累積和と似たような考えを使うことできる。
除く整数の左側の整数群の最大公約数を保存しておく配列と右側の整数群の最大公約数を保存しておく配列を作り、それらを利用して最大公約数を求めることで整数を1つ除いて最大公約数を求めることができる。
コードで表現すると以下のようになる。
package main import ( "bufio" "fmt" "os" "strconv" ) var scanner = bufio.NewScanner(os.Stdin) func nextInt() int { scanner.Scan() // トークンの読み込み i, e := strconv.Atoi(scanner.Text()) // 読み込んだトークンをTextメソッドで取り出し if e != nil { panic(e) } return i } func main() { scanner.Split(bufio.ScanWords) // 空白区切りで受け取るので引数にbufio.ScanwWordsを渡す N := nextInt() A := make([]int, N) for i := 0; i < N; i++ { A[i] = nextInt() } left := make([]int, N) for i := 1; i < N; i++ { left[i] = gcd(A[i-1], left[i-1]) } right := make([]int, N) for i := N - 2; i >= 0; i-- { right[i] = gcd(A[i+1], right[i+1]) } var max int for i := 0; i < N; i++ { g := gcd(left[i], right[i]) if max < g { max = g } } fmt.Println(max) } func gcd(a, b int) int { if b == 0 { return a } return gcd(b, a%b) }
こうすることである整数を除いて最大公約数を求める処理は、その数の左側の整数群の最大公約数と右側の整数群の最大公約数の最小公約数を求めれば良い。
このように最小公約数の計算は計算順序などは問われないので計算順序も柔軟に変えて計算できる。
最小公倍数を求める問題
以下の最小公倍数を求める問題を考えてみる。
3つ以上の値の最小公倍数を求める問題。
単純に2つの値の最小公倍数を求める際は先ほど説明したa × b = g × l
を使って求めることができる。
3つ以上の値の最小公倍数は1
と1 つ目の値 の最小公倍数
, 1と1つ目の値の最小公倍数
と2つ目の値
, ...という風に初めに1を使って求めていく。
コードで表現すると以下のようになる。
package main import ( "fmt" ) func main() { var N int fmt.Scan(&N) T := make([]int, N) for i := 0; i < N; i++ { fmt.Scan(&T[i]) } ans := 1 for i := 0; i < N; i++ { ans = ans / GCD(ans, T[i]) * T[i] } fmt.Println(ans) } func GCD(a, b int) int { if b == 0 { return a } return GCD(b, a%b) }
このようにユークリッドの互除法とa × b = g × l
の性質を使って最大公約数と最小公倍数関連の問題は解いていく。
競プロで度々使う累積和の使い方
1次元の累積和
以下の問題を考える。
連続するk個の総和の最大値を求める問題。
シンプルに考えると、連続するk個の値はn-k個のパターンがあり、それぞれを求めて最大値を求めていく。
これだと、連続するk個の総和を求める処理が計算量にするとO(N2)かかる。
コードで表現すると以下のような感じ。
package main import ( "bufio" "fmt" "os" "strconv" ) var scanner = bufio.NewScanner(os.Stdin) func nextInt() int { scanner.Scan() // トークンの読み込み i, e := strconv.Atoi(scanner.Text()) // 読み込んだトークンをTextメソッドで取り出し if e != nil { panic(e) } return i } func main() { scanner.Split(bufio.ScanWords) // 空白区切りで受け取るので引数にbufio.ScanwWordsを渡す N := nextInt() A := make([]int, N) for i := 0; i < N; i++ { A[i] = nextInt() } for k := 0; k < N; k++ { var max int for i := 0; i < N-k; i++ { a := A[i : i+k+1] var sum int for j := range a { sum += a[j] } if max < sum { max = sum } } fmt.Println(max) } }
これだと全体の計算量はO(N3)になってしまい、TLEする。
こういう時は累積和を考える。
累積和Sは、S[n+1] = S[n] + a[n+1]
と定義される。
つまり、累積和S[n+1]
は 配列a[0]
から a[n+1]
までの累積の和である。(数IIBでやったよね懐かしい)
この累積和をあらかじめ求めておくと、連続するk個の和は S[i+k] - S[i]
で求められるので、連続するk個の総和を求める処理がO(N)、全体でO(N2)となりACできる。
コードで表現すると以下になる。
package main import ( "fmt" ) func main() { var N int fmt.Scan(&N) A := make([]int, N) for i := 0; i < N; i++ { fmt.Scan(&A[i]) } s := make([]int, N+1) // 累積和 for i := 0; i < N; i++ { s[i+1] = s[i] + A[i] } for k := 1; k < N+1; k++ { // 連続するk個の区画について考える var max int for i := 0; i < N+1; i++ { if i+k > N { continue } if max < s[i+k]-s[i] { // 連続するk個の和 max = s[i+k] - s[i] } } fmt.Println(max) } }
このように特定の範囲での総和を求める問題などでは、あらかじめ累積和を求めておくことで計算量を下げられることがよくある。
2次元の累積和
次に以下の問題を考える。
この問題では長方形になるようなブロックを列挙して面積の最大値を求める。
普通に長方形になるようなブロックを選ぼうとすると、長方形かどうかの判定も書かねばならないので煩雑になりそう。
長方形になるようなブロックの選び方を列挙しなければいけないときは2次元の累積和を使うと比較的楽に実装できる。
2次元累積和の考え方
2次元の累積和では、(0, 0)
から配列の座標までの累積和を記録する。
つまりこの問題で言うと、2次元累積和S[i][j]
では (0, 0)
から (i, j)
までのブロックの価値の累積和を記録する。
この際、2次元累積和は S[i+1][j+1] = S[i+1][j] + S[i][j+1] - S[i][j] + A[i+1][j+1]
と定義される(ブロックの足し引き考えればイメージできる)
このように、あらかじめ2次元累積和で長方形の形で価値の累積和を記録しておくと、任意の長方形の価値を抽出する際も長方形から任意の長方形を足し引きすることで求められる。
(k, l)
から (i, j)
からなる長方形は S[i][j] - S[k][j] - S[i][l] + S[k][l]
と表せる。
コードで表現すると、以下のようになる。
package main import ( "fmt" ) func main() { var H, W, K, V int fmt.Scan(&H, &W, &K, &V) A := make([][]int, H) sum := make([][]int, H+1) // 累積和: (0,0)と(i,j)を頂点とする長方形の地価の合計値 for i := range sum { sum[i] = make([]int, W+1) } for i := 0; i < H; i++ { A[i] = make([]int, W) for j := 0; j < W; j++ { fmt.Scan(&A[i][j]) sum[i+1][j+1] = sum[i][j+1] + sum[i+1][j] - sum[i][j] + A[i][j] // sum[i][j]はsum[i][j+1]とsum[i+1][j]の重複部分 } } var max int for i := 0; i < W+1; i++ { // i, jで大元の長方形を決定 for j := 0; j < H+1; j++ { for k := 0; k < i; k++ { // k, lで切り出す長方形を決定 for l := 0; l < j; l++ { area := (i - k) * (j - l) // 面積 areaCost := sum[j][i] - sum[j][k] - sum[l][i] + sum[l][k] buildingCost := area * K cost := areaCost + buildingCost if V < cost { continue } if max < area { max = area } } } } } fmt.Println(max) }
このように2次元累積和は長方形で物事を考えなくてはいけない場合に、計算量を下げたり計算が容易になったりする。
高速な素数判定
競技プログラミングで度々出題される素数判定についてまとめる。
単純な素因数分解問題
以下のような単純に素因数分解する問題を考える。
https://judge.u-aizu.ac.jp/onlinejudge/description.jsp?id=NTL_1_A&lang=ja
2から√nまでの整数で割っていき、割り切れれば割った整数を素数として追加し、最終的に残った数字が1より大きければ最後の素数になるのでその数も素数として追加する。
コードで表現すると以下のようになる。
package main import "fmt" func main() { var n int fmt.Scan(&n) fmt.Printf("%d:", n) primeFactor := make([]int, 0) for i := 2; i*i < n+1; i++ { // 2~√nまでの約数を考える for { if n%i == 0 { primeFactor = append(primeFactor, i) n /= i } else { break } } } if n > 2 { // 1以上であれば最後の約数(素数) primeFactor = append(primeFactor, n) } for i := range primeFactor { fmt.Printf(" %d", primeFactor[i]) } fmt.Println() }
通常の素因数分解はこのように解ける。
エラトステネスの篩を使った素数判定
以下の問題を考える。
以下のコードのように、問題文通りにl
からr
までの奇数x
について素数かどうかを判定するとTLEしてしまう。
package main import "fmt" func main() { var Q int fmt.Scan(&Q) l := make([]int, Q) r := make([]int, Q) for i := 0; i < Q; i++ { fmt.Scan(&l[i], &r[i]) } for i := 0; i < Q; i++ { var count int for x := l[i]; x < r[i]+1; x += 2 { if x == 1 { // 1は素数ではないので除外 continue } if isPrime(x) && isPrime((x+1)/2) { count++ } } fmt.Println(count) } } func isPrime(n int) bool { isPrime := true for i := 2; i*i < n+1; i++ { // 2~√nまでの間で割れるものがなければ素数 if n%i == 0 { isPrime = false return isPrime } } return isPrime }
上のコードでのボトルネックは、明らかに2~√nまでの数字で割っている部分である。
このように素数判定をいくつもの数字でやる必要がある場合は「エラトステネスの篩」を使うと高速に素数判定ができる。
エラトステネスの篩
エラトステネスの篩は、配列にそのインデックスの数が素数かどうかを記録した表のこと。
配列の中身をtrue
で初期化して、素数でない数字を記録していく。
素数でない数字はfor文で整数を回してその倍数を素数でない数字として記録する。
コードで表現すると以下のようになる。
// エラトステネスの篩 prime := make([]bool, 100001) // 0 ~ 100000までの数字が素数かどうかを表で表す for i := 2; i < len(prime); i++ { // 0, 1以外を素数として初期化 prime[i] = true } for i := 2; i < len(prime); i++ { if !prime[i] { // 例えば4は既に2の倍数として素数じゃないと判定されているので考えなくてOK continue } for j := i * 2; j < len(prime); j += i { // iの倍数が素数でないことを記録 prime[j] = false } }
このエラトステネスの篩を使うことで高速に素数判定できる。
これを踏まえると次のようなコードでACできる。
package main import "fmt" func main() { var Q int fmt.Scan(&Q) l := make([]int, Q) r := make([]int, Q) for i := 0; i < Q; i++ { fmt.Scan(&l[i], &r[i]) } // エラトステネスの篩 prime := make([]bool, 100001) // 0 ~ 100000までの数字が素数かどうかを表で表す for i := 2; i < len(prime); i++ { // 0, 1以外を素数として初期化 prime[i] = true } for i := 2; i < len(prime); i++ { if !prime[i] { // 例えば4は既に2の倍数として素数じゃないと判定されているので考えなくてOK continue } for j := i * 2; j < len(prime); j += i { // iの倍数が素数でないことを記録 prime[j] = false } } similar := make([]bool, 100001) for x := 1; x < len(similar); x += 2 { // 奇数だけを見ていく if prime[x] && prime[(x+1)/2] { similar[x] = true } } s := make([]int, len(similar)+1) // 累積和 for i := 0; i < len(similar); i++ { if similar[i] { s[i+1] = s[i] + 1 } else { s[i+1] = s[i] } } for i := 0; i < Q; i++ { fmt.Println(s[r[i]+1] - s[l[i]]) } }
ナップザックDP
競技プログラミングで有名なアルゴリズムに動的計画法(DP)がある。
無駄に同じ計算をしないように、ある条件下での計算結果を配列などに保存しておき、計算を高速化するためのアルゴリズム。
言葉で言ってもぴんとこないので例題で考えてみる。
フェボナッチ数列の計算
まず以下の問題で考えてみる。
https://judge.u-aizu.ac.jp/onlinejudge/description.jsp?id=ALDS1_10_A&lang=ja
フェボナッチ数列のある項を求める問題。
フェボナッチ数列は以下のように
fib(n) = fib(n−1) + fib(n−2) ただし、fib(0) = 1, fib(1) = 1
と定義される。
単純に計算
これを普通に求めようとしてみる。
例えば n = 5 の項を求めようとすると、以下のように6回の計算をする必要がある。
fib(5) = fib(4) + fib(3)
fib(4) = fib(3) + fib(2)
fib(3) = fib(2) + fib(1)
- fib(2) = fib(1) + fib(0)
fib(3) = fib(2) + fib(1)
- fib(2) = fib(1) + fib(0)
しかし、fib(3)やfib(2)の計算が複数回されていて無駄がある。
fib(3)やfib(2)の計算は1度だけ行い、再利用すれば無駄な処理が減る。
配列に計算結果をメモする
無駄な計算を減らすために、配列に計算結果をメモする。
配列のindex=1にfib(1)を、index=2にfib(2)を...というように配列に計算結果を格納する。
コードで表現すると以下のようになる。
package main import "fmt" func main() { var n int fmt.Scan(&n) dp := make([]int, n+1) dp[0] = 1 dp[1] = 1 for i := 2; i < n+1; i++ { dp[i] = dp[i-1] + dp[i-2] } fmt.Println(dp[n]) }
こうすると、計算は fib(3), fib(4), fib(5)の3回しか行われず無駄な重複した計算が省ける。
これは厳密的には、メモ化再帰といい、DPの入口になっている。
ナップザックDP
次に典型問題のナップザックDPについて考える。
https://judge.u-aizu.ac.jp/onlinejudge/description.jsp?id=DPL_1_B&lang=ja
重さ制限があるナップザックに価値が最大になるようにアイテムを入れていく。
このような問題ではアイテムの個数を1個ずつ増やしていき、各状況で重さ制限を満たす価値の最大値を記録していく。
つまり考慮するアイテムの個数と重さ制限の重量をインデックスに取る2重配列に価値の最大値を記録する。
コードで表現すると以下のようになる。
package main import ( "fmt" ) type item struct { v int w int } func main() { var N, W int fmt.Scan(&N, &W) items := make([]item, N+1) // indexと番目が一致するように標準入力を受け取る for i := 1; i < N+1; i++ { fmt.Scan(&items[i].v, &items[i].w) } dp := make([][]int, N+1) // 品物を0からN個使う場合それぞれでの価値の最大値を記録する二重配列 for i := 0; i < N+1; i++ { dp[i] = make([]int, W+1) } for i := 1; i < N+1; i++ { // 品物を1個からN個使う場合それぞれを考える. 品物が0個のときは価値は0なのでスキップしてOK. item := items[i] for w := 0; w < W+1; w++ { // 制限が0からWまでの場合での価値の最大値を考える if item.w <= w { surplus := dp[i-1][w-item.w] // 余剰分の重さ制限での価値の最大値 if item.v+surplus > dp[i-1][w] { dp[i][w] = item.v + surplus } else { dp[i][w] = dp[i-1][w] } } else { dp[i][w] = dp[i-1][w] } } } var max int for i := range dp[N] { // 全てのアイテムを考慮した場合の最大値を求める if max < dp[N][i] { max = dp[N][i] } } fmt.Println(max) }
アイテムを1つずつ増やして、まずアイテムの重さが重さ制限を満たしていることが第1条件になる。
その上で、重さ制限が空いていれば新アイテムをただ追加する、容量が空いていなくても既存のアイテムを除いて新アイテムを追加した方が価値が最大になる可能性もある。
このように幾つかのパターンが考えられるが、新アイテムの重さから重さ制限までの重量で余剰している重さでの価値の最大値を考えて、新アイテムの価値と余剰分の最大値の和が既存の価値の最大値を上回れば更新することにすれば、アイテム追加のパターンを考える必要はない。
このようにして、DPでは配列にメモした計算結果を再利用して無駄な重複した計算を省いて処理を進めていく。
Linuxのプロセス管理
Linuxのプロセス周りについて自分なりにまとめてみる。
プロセス
プロセスとはコンピューターシステムのプログラムの実行単位。 プロセスはOS上で実行中のプログラムを指し、プログラムの実行単位である。 OSがファイルを実行する際にメモリやCPUなどのリソースを確保・消費してプロセスを生成する。
プロセスの生成方法
- システムコール
execve()
実行ファイルを読み出してプロセスのメモリに必要な情報を読み出す通常のプロセス生成で利用される。
- システムコール
fork()
1つのプロセスをもとに新たにプロセスを生成する際に利用される。 子プロセス用のメモリに元になる親プロセスのメモリをコピーすることによって実現する。
Apacheでは複数リクエストを捌くのにfork()
を使っており、リクエストが増えたときに処理できない場合はある(C10K問題(クライアント1万台問題))
ファイルディスクリプタ
カーネルは各プロセスでどういった入出力が行われるかを管理する必要があり、各プロセスが関与しているファイル情報のリストを持っている。 ファイルディスクリプタはそのリストのインデックス値であり、識別子として利用される。ファイルディスクリプタ(インデックス値)を指定することで対応するファイルにアクセスできる。
OSはプロセスが生成されるとまず3つの擬似ファイルを作成し、それぞれにファイルディスクリプタを割り当てる。0が標準入力、1が標準出力、2が標準エラー出力。 そのプロセスでファイルやソケットをオープンするたびに対応するファイルディスクリプタはインクリメントされる。
まずbashのプロセスのIDを確認する。
$ ps aux | grep bash root 9091 0.0 0.0 3864 0 pts/0 Ss+ Jun01 0:00 bash
ファイルディスクリプタの情報は/proc/${PID}/fd/
に含まれている。
$ sudo ls -al /proc/9091/fd/ total 0 dr-x------ 2 root root 0 Jun 4 14:17 . dr-xr-xr-x 9 root root 0 Jun 4 14:17 .. lrwx------ 1 root root 64 Jun 4 14:17 0 -> /dev/pts/0 lrwx------ 1 root root 64 Jun 4 14:17 1 -> /dev/pts/0 lrwx------ 1 root root 64 Jun 4 14:17 2 -> /dev/pts/0 lrwx------ 1 root root 64 Jun 4 14:17 255 -> /dev/pts/0
これを見るとbashの場合は、0 1 2
は全て/dev/pts/0
へのリンクになっている。
デーモンプロセス
デーモンとはバックグラウンドプロセスを作るための仕組みのこと。 一般的なプロセスはシェルの子プロセスとして生成されるのでログアウトしたりシェルを閉じてしまうと終了してしまう。
デーモン化は、プロセスの作業ディレクトリをルートに移動させて、プロセスをforkしてから親プロセスをsystemd(初期プロセス)に変更し、元々の親プロセスは終了させることで作成する。標準入出力もプロセス生成時のものから通常は/dev/null
に変更して捨てる。
$ ps -ef | grep daemon UID PID PPID C STIME TTY TIME CMD message+ 718 1 0 2021 ? 00:00:27 /usr/bin/dbus-daemon --system --address=systemd: --nofork --nopidfile --systemd-activation --syslog-only daemon 745 1 0 2021 ? 00:00:00 /usr/sbin/atd -f hysrtr 1396238 1396202 0 23:57 pts/1 00:00:00 grep --color=auto daemon root 2626495 1 0 2021 ? 00:02:55 /usr/lib/accountsservice/accounts-daemon
※systemdはサービスを起動するための初期プロセスで、カーネル(PID=0)から起動され、PIDは1になっている。
$ ps aux | head -n 5 USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND root 1 0.0 1.5 169084 7388 ? Ss 2021 6:57 /lib/systemd/systemd --system --deserialize 57 root 2 0.0 0.0 0 0 ? S 2021 0:01 [kthreadd] root 3 0.0 0.0 0 0 ? I< 2021 0:00 [rcu_gp] root 4 0.0 0.0 0 0 ? I< 2021 0:00 [rcu_par_gp]
スレッド
スレッドは1つのプロセスから生成され、スレッドを使うことで並行処理を実現できる。
親プロセスと同じメモリ空間を使用し、同じプロセスのスレッド同士はお互いのメモリ領域にアクセスできる。 一方でプロセスは固有のメモリ領域を持っているので別プロセスのメモリ領域にはアクセスできない。
プロセススケジューラ
複数プロセスの実行はプロセススケジューラによって管理されている。
一般に1つのCPU上で同時に処理できるプロセスは1つだけで、プロセススケジューラによって複数プロセスが同時に動作しているように見える。
※マルチコアCPUでは1つのコアが1つのCPUとして認識される。
つまり、CPU上ではプロセスを順番に1つずつ動かして1周したらまた最初のプロセスから動かすラウンドロビン方式でプロセスは動作している。
ラウンドロビン方式とはCPUを使用できる時間を定め、その時間内に処理が終わらない場合は次のタスクにCPUの使用権が与えられる方式。
CPU上で動作するプロセスが変わることをコンテキストスイッチという。