☁️ AWS クラウドインフラ実践ガイド - 2025年総合版

2025年のAWS・クラウドインフラ構築における最新技術と実践的ノウハウを総合的にまとめた決定版ガイドです。SageMaker機械学習開発、EC2・M4 Pro Mac運用、セキュリティベストプラクティス、コスト最適化から運用自動化まで、現代のクラウドインフラに必要な知識を体系的に整理しました。

🚀 AWS最新アップデート 2025年9月

主要サービスの機能拡張

Amazon EC2 M4・M4 Pro Mac Instances発表: Apple Silicon M4チップを搭載した新世代Mac インスタンスが利用可能になりました。

主要スペック:

  • M4: 8コアCPU、8コアGPU、16GB統合メモリ
  • M4 Pro: 10コアCPU、16コアGPU、24GB統合メモリ
  • ストレージ: 最大8TB SSD
  • ネットワーク: 最大25Gbps

活用シナリオ:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# CloudFormation テンプレート例
Resources:
  MacDevInstance:
    Type: AWS::EC2::Instance
    Properties:
      InstanceType: mac2-m4.metal  # M4 Macインスタンス
      ImageId: ami-0123456789abcdef0  # macOS Ventura
      KeyName: !Ref KeyPairName
      SecurityGroupIds:
        - !Ref MacSecurityGroup
      SubnetId: !Ref PrivateSubnet
      
      # 専用ホスト必須
      Tenancy: dedicated
      
      # Mac特有の設定
      UserData:
        Fn::Base64: !Sub |
          #!/bin/bash
          # Xcodeインストール
          sudo xcode-select --install
          
          # Homebrew セットアップ
          /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
          
          # 開発環境構築
          brew install node python go docker
          
          # CI/CD エージェント設定
          curl -o actions-runner.tar.gz -L https://github.com/actions/runner/releases/download/v2.300.0/actions-runner-osx-arm64-2.300.0.tar.gz

iOS開発での活用例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
#!/bin/bash
# iOS CI/CD パイプライン

# プロジェクトビルド
xcodebuild -workspace MyApp.xcworkspace \
           -scheme MyApp \
           -configuration Release \
           -archivePath MyApp.xcarchive \
           archive

# App Store Connect アップロード
xcodebuild -exportArchive \
           -archivePath MyApp.xcarchive \
           -exportPath export \
           -exportOptionsPlist ExportOptions.plist

# 自動テスト実行
xcodebuild test -workspace MyApp.xcworkspace \
                -scheme MyAppTests \
                -destination 'platform=iOS Simulator,name=iPhone 14 Pro'

🤖 Amazon SageMaker実践開発

ローカルモード開発環境の構築

SageMaker Local Mode による効率的開発:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import sagemaker
from sagemaker.pytorch import PyTorch
from sagemaker.local import LocalSession
import boto3

class SageMakerLocalDeveloper:
    def __init__(self, local_mode=True):
        if local_mode:
            # ローカル開発環境
            self.session = LocalSession()
            self.session.config = {'local': {'local_code': True}}
            self.instance_type = 'local'
        else:
            # クラウド環境
            self.session = sagemaker.Session()
            self.instance_type = 'ml.m5.large'
        
        self.role = self.get_execution_role()
    
    def get_execution_role(self):
        """実行ロールの取得"""
        if isinstance(self.session, LocalSession):
            # ローカル開発用のダミーロール
            return 'arn:aws:iam::123456789012:role/service-role/AmazonSageMaker-ExecutionRole'
        else:
            return sagemaker.get_execution_role()
    
    def create_pytorch_estimator(self, script_path, requirements_file=None):
        """PyTorch Estimator の作成"""
        
        estimator_config = {
            'entry_point': script_path,
            'source_dir': 'src',
            'role': self.role,
            'instance_type': self.instance_type,
            'instance_count': 1,
            'framework_version': '2.0.0',
            'py_version': 'py310',
            'session': self.session
        }
        
        # 依存関係の指定
        if requirements_file:
            estimator_config['dependencies'] = [requirements_file]
        
        # ローカルモード特有の設定
        if isinstance(self.session, LocalSession):
            estimator_config.update({
                'volume_size': 1,  # GB (ローカルでは小さく)
                'max_run': 60 * 60,  # 1時間
            })
        else:
            estimator_config.update({
                'volume_size': 30,  # GB
                'max_run': 60 * 60 * 24,  # 24時間
                'checkpoint_s3_uri': f's3://{self.bucket}/checkpoints',
                'use_spot_instances': True,
                'max_wait': 60 * 60 * 25  # 25時間
            })
        
        return PyTorch(**estimator_config)
    
    def local_training_pipeline(self, train_data, validation_data):
        """ローカル訓練パイプライン"""
        
        # 1. データ前処理
        preprocessed_data = self.preprocess_local_data(train_data)
        
        # 2. モデル訓練
        estimator = self.create_pytorch_estimator('train.py')
        
        # ローカルデータパスの指定
        train_input = sagemaker.inputs.TrainingInput(
            preprocessed_data['train'],
            content_type='application/x-parquet'
        )
        
        val_input = sagemaker.inputs.TrainingInput(
            preprocessed_data['validation'],
            content_type='application/x-parquet'
        )
        
        # 訓練実行
        estimator.fit({
            'train': train_input,
            'validation': val_input
        })
        
        return estimator
    
    def deploy_local_endpoint(self, estimator):
        """ローカルエンドポイントのデプロイ"""
        
        # ローカル推論エンドポイント
        predictor = estimator.deploy(
            initial_instance_count=1,
            instance_type='local',
            endpoint_name='local-model-endpoint'
        )
        
        return predictor

# 使用例:ローカル開発からクラウドへの移行
def ml_development_workflow():
    """ML開発ワークフロー"""
    
    # Phase 1: ローカル開発・実験
    local_dev = SageMakerLocalDeveloper(local_mode=True)
    
    # 小規模データでの実験
    sample_data = load_sample_dataset()
    local_estimator = local_dev.local_training_pipeline(
        sample_data['train'], 
        sample_data['validation']
    )
    
    # ローカル推論テスト
    local_predictor = local_dev.deploy_local_endpoint(local_estimator)
    test_prediction = local_predictor.predict(sample_data['test'][:10])
    
    print(f"Local prediction results: {test_prediction}")
    
    # Phase 2: クラウド環境での本格訓練
    cloud_dev = SageMakerLocalDeveloper(local_mode=False)
    
    # 大規模データでの訓練
    full_data = load_full_dataset_from_s3()
    cloud_estimator = cloud_dev.create_pytorch_estimator('train.py')
    
    cloud_estimator.fit({
        'train': 's3://my-bucket/train/',
        'validation': 's3://my-bucket/validation/'
    })
    
    # 本番エンドポイントデプロイ
    production_predictor = cloud_estimator.deploy(
        initial_instance_count=2,
        instance_type='ml.m5.xlarge',
        endpoint_name='production-model-endpoint'
    )
    
    return {
        'local_model': local_predictor,
        'production_model': production_predictor
    }

SageMaker Pipelines による MLOps実装

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import TrainingStep, ProcessingStep
from sagemaker.workflow.parameters import ParameterString, ParameterFloat

class MLOpsPipeline:
    def __init__(self, session, role):
        self.session = session
        self.role = role
        
        # パイプライン パラメータ
        self.model_name = ParameterString(name="ModelName", default_value="customer-churn-model")
        self.train_instance_type = ParameterString(name="TrainInstanceType", default_value="ml.m5.xlarge")
        self.learning_rate = ParameterFloat(name="LearningRate", default_value=0.001)
    
    def create_preprocessing_step(self):
        """データ前処理ステップ"""
        
        from sagemaker.sklearn.processing import SKLearnProcessor
        
        processor = SKLearnProcessor(
            framework_version='1.0-1',
            instance_type='ml.m5.large',
            instance_count=1,
            base_job_name='preprocessing',
            role=self.role
        )
        
        processing_step = ProcessingStep(
            name="PreprocessingStep",
            processor=processor,
            code='preprocessing.py',
            inputs=[
                sagemaker.processing.ProcessingInput(
                    source='s3://my-bucket/raw-data/',
                    destination='/opt/ml/processing/input'
                )
            ],
            outputs=[
                sagemaker.processing.ProcessingOutput(
                    output_name='train_data',
                    source='/opt/ml/processing/train',
                    destination='s3://my-bucket/processed/train'
                ),
                sagemaker.processing.ProcessingOutput(
                    output_name='validation_data',
                    source='/opt/ml/processing/validation',
                    destination='s3://my-bucket/processed/validation'
                )
            ]
        )
        
        return processing_step
    
    def create_training_step(self, preprocessing_step):
        """モデル訓練ステップ"""
        
        estimator = PyTorch(
            entry_point='train.py',
            source_dir='src',
            role=self.role,
            instance_type=self.train_instance_type,
            framework_version='2.0.0',
            py_version='py310',
            hyperparameters={
                'learning_rate': self.learning_rate,
                'epochs': 50,
                'batch_size': 32
            }
        )
        
        training_step = TrainingStep(
            name="TrainingStep",
            estimator=estimator,
            inputs={
                'train': TrainingInput(
                    s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs['train_data'].S3Output.S3Uri
                ),
                'validation': TrainingInput(
                    s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs['validation_data'].S3Output.S3Uri
                )
            }
        )
        
        return training_step
    
    def create_evaluation_step(self, training_step):
        """モデル評価ステップ"""
        
        from sagemaker.workflow.steps import ProcessingStep
        from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
        from sagemaker.workflow.condition_step import ConditionStep
        from sagemaker.workflow.properties import PropertyFile
        
        # 評価処理
        evaluation_processor = SKLearnProcessor(
            framework_version='1.0-1',
            instance_type='ml.m5.large',
            instance_count=1,
            role=self.role
        )
        
        evaluation_step = ProcessingStep(
            name="EvaluationStep",
            processor=evaluation_processor,
            code='evaluate.py',
            inputs=[
                ProcessingInput(
                    source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
                    destination='/opt/ml/processing/model'
                )
            ],
            outputs=[
                ProcessingOutput(
                    output_name='evaluation_results',
                    source='/opt/ml/processing/evaluation',
                    destination='s3://my-bucket/evaluation'
                )
            ],
            property_files=[
                PropertyFile(
                    name="EvaluationReport",
                    output_name="evaluation_results",
                    path="evaluation.json"
                )
            ]
        )
        
        return evaluation_step
    
    def build_pipeline(self):
        """パイプライン構築"""
        
        # ステップ定義
        preprocessing_step = self.create_preprocessing_step()
        training_step = self.create_training_step(preprocessing_step)
        evaluation_step = self.create_evaluation_step(training_step)
        
        # 条件分岐(モデル品質チェック)
        model_quality_condition = ConditionGreaterThanOrEqualTo(
            left=JsonGet(
                step_name=evaluation_step.name,
                property_file="EvaluationReport",
                json_path="metrics.accuracy"
            ),
            right=0.8  # 精度80%以上
        )
        
        # 条件に基づくモデル登録
        condition_step = ConditionStep(
            name="ModelQualityCheck",
            conditions=[model_quality_condition],
            if_steps=[self.create_model_registration_step(training_step)],
            else_steps=[]
        )
        
        # パイプライン作成
        pipeline = Pipeline(
            name=f"MLPipeline-{self.model_name.default_value}",
            parameters=[
                self.model_name,
                self.train_instance_type,
                self.learning_rate
            ],
            steps=[
                preprocessing_step,
                training_step,
                evaluation_step,
                condition_step
            ]
        )
        
        return pipeline

🏗️ EC2・インフラ運用

新世代インスタンス活用戦略

M4 Pro Mac インスタンス活用例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# Terraform設定例
resource "aws_dedicated_host" "mac_host" {
  instance_type = "mac2.metal"
  availability_zone = "us-east-1a"
  
  tags = {
    Name = "mac-development-host"
    Purpose = "iOS-CI-CD"
  }
}

resource "aws_instance" "mac_dev" {
  ami           = data.aws_ami.macos.id
  instance_type = "mac2-m4.metal"
  
  # 専用ホスト指定
  host_id = aws_dedicated_host.mac_host.id
  
  # ネットワーク設定
  subnet_id              = aws_subnet.private.id
  vpc_security_group_ids = [aws_security_group.mac_dev.id]
  
  # EBS最適化
  ebs_optimized = true
  
  root_block_device {
    volume_type = "gp3"
    volume_size = 500
    iops        = 3000
    throughput  = 125
    encrypted   = true
  }
  
  # 追加ストレージ
  ebs_block_device {
    device_name = "/dev/sdf"
    volume_type = "gp3"
    volume_size = 1000
    iops        = 3000
    encrypted   = true
  }
  
  user_data = base64encode(templatefile("mac_userdata.sh", {
    github_token = var.github_token
    xcode_version = var.xcode_version
  }))
  
  tags = {
    Name = "Mac-Development-Instance"
    OS   = "macOS"
  }
}

# Mac用セキュリティグループ
resource "aws_security_group" "mac_dev" {
  name_prefix = "mac-dev-"
  vpc_id      = aws_vpc.main.id
  
  # SSH アクセス
  ingress {
    from_port   = 22
    to_port     = 22
    protocol    = "tcp"
    cidr_blocks = [var.admin_cidr]
  }
  
  # VNC(リモートデスクトップ)
  ingress {
    from_port   = 5900
    to_port     = 5900
    protocol    = "tcp"
    cidr_blocks = [var.admin_cidr]
  }
  
  # Xcode Server
  ingress {
    from_port   = 20300
    to_port     = 20300
    protocol    = "tcp"
    cidr_blocks = [aws_vpc.main.cidr_block]
  }
  
  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

Auto Scaling 最適化

需要予測ベースの Auto Scaling:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import boto3
import json
from datetime import datetime, timedelta

class PredictiveAutoScaling:
    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
        self.autoscaling = boto3.client('autoscaling')
        self.forecast = boto3.client('forecast')
    
    def setup_predictive_scaling(self, asg_name):
        """予測的スケーリング設定"""
        
        # CloudWatch メトリクス設定
        custom_metrics = [
            {
                'MetricName': 'RequestLatency',
                'Namespace': 'AWS/ApplicationELB',
                'Statistic': 'Average'
            },
            {
                'MetricName': 'CPUUtilization',
                'Namespace': 'AWS/EC2',
                'Statistic': 'Average'
            },
            {
                'MetricName': 'NetworkIn',
                'Namespace': 'AWS/EC2',
                'Statistic': 'Sum'
            }
        ]
        
        # 予測モデル作成
        forecast_config = {
            'ForecastName': f'{asg_name}-demand-forecast',
            'PredictorName': f'{asg_name}-predictor',
            'ForecastHorizon': 24,  # 24時間先まで予測
            'PerformAutoML': True,
            'InputDataConfig': {
                'DatasetGroupArn': self.create_dataset_group(asg_name),
                'SupplementaryFeatures': [
                    {
                        'Name': 'holiday',
                        'Value': 'US'
                    }
                ]
            }
        }
        
        return forecast_config
    
    def create_scaling_policy(self, asg_name, forecast_data):
        """動的スケーリングポリシー作成"""
        
        # 予測データに基づくスケーリング計画
        scaling_schedule = []
        
        for hour, predicted_load in enumerate(forecast_data):
            target_capacity = self.calculate_target_capacity(predicted_load)
            
            # 事前スケーリング(需要の15分前)
            schedule_time = datetime.now() + timedelta(hours=hour, minutes=-15)
            
            scaling_schedule.append({
                'ScheduledActionName': f'predictive-scale-{hour}',
                'Schedule': schedule_time.strftime('%M %H %d %m ? %Y'),
                'DesiredCapacity': target_capacity,
                'MinSize': max(1, target_capacity - 2),
                'MaxSize': target_capacity + 5
            })
        
        return scaling_schedule
    
    def calculate_target_capacity(self, predicted_load):
        """予測負荷から必要キャパシティを算出"""
        
        # ベースライン capacity
        base_capacity = 2
        
        # 負荷レベルに応じた追加キャパシティ
        if predicted_load < 50:
            additional = 0
        elif predicted_load < 80:
            additional = 2
        elif predicted_load < 90:
            additional = 5
        else:
            additional = 10
        
        return base_capacity + additional
    
    def implement_multi_metric_scaling(self, asg_name):
        """複数メトリクスによるスケーリング"""
        
        # Target Tracking Scaling Policies
        policies = [
            {
                'PolicyName': f'{asg_name}-cpu-scaling',
                'PolicyType': 'TargetTrackingScaling',
                'TargetTrackingConfiguration': {
                    'TargetValue': 70.0,
                    'PredefinedMetricSpecification': {
                        'PredefinedMetricType': 'ASGAverageCPUUtilization'
                    },
                    'ScaleOutCooldown': 300,
                    'ScaleInCooldown': 300
                }
            },
            {
                'PolicyName': f'{asg_name}-alb-request-scaling',
                'PolicyType': 'TargetTrackingScaling',
                'TargetTrackingConfiguration': {
                    'TargetValue': 1000.0,  # 1000 requests per target
                    'PredefinedMetricSpecification': {
                        'PredefinedMetricType': 'ALBRequestCountPerTarget',
                        'ResourceLabel': 'app/my-alb/50dc6c495c0c9188/targetgroup/my-targets/73e2d6bc24d8a067'
                    }
                }
            }
        ]
        
        for policy in policies:
            response = self.autoscaling.put_scaling_policy(
                AutoScalingGroupName=asg_name,
                **policy
            )
            
            print(f"Created scaling policy: {response['PolicyARN']}")

💰 コスト最適化・FinOps

New Relic アプリケーション監視によるコスト可視化

無駄なリソース発見システム:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import boto3
import pandas as pd
from datetime import datetime, timedelta

class AWSCostOptimizer:
    def __init__(self):
        self.ce = boto3.client('ce')  # Cost Explorer
        self.ec2 = boto3.client('ec2')
        self.cloudwatch = boto3.client('cloudwatch')
        self.rds = boto3.client('rds')
    
    def analyze_unused_resources(self, days=30):
        """未使用リソースの分析"""
        
        end_date = datetime.now()
        start_date = end_date - timedelta(days=days)
        
        unused_resources = {
            'ec2_instances': self.find_unused_ec2_instances(start_date, end_date),
            'ebs_volumes': self.find_unused_ebs_volumes(),
            'elastic_ips': self.find_unused_elastic_ips(),
            'load_balancers': self.find_unused_load_balancers(start_date, end_date),
            'rds_instances': self.find_unused_rds_instances(start_date, end_date)
        }
        
        return unused_resources
    
    def find_unused_ec2_instances(self, start_date, end_date):
        """CPU使用率が低いEC2インスタンスを特定"""
        
        instances = self.ec2.describe_instances()
        unused_instances = []
        
        for reservation in instances['Reservations']:
            for instance in reservation['Instances']:
                if instance['State']['Name'] != 'running':
                    continue
                
                instance_id = instance['InstanceId']
                
                # CloudWatch メトリクス取得
                cpu_metrics = self.cloudwatch.get_metric_statistics(
                    Namespace='AWS/EC2',
                    MetricName='CPUUtilization',
                    Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
                    StartTime=start_date,
                    EndTime=end_date,
                    Period=3600,  # 1時間
                    Statistics=['Average']
                )
                
                if cpu_metrics['Datapoints']:
                    avg_cpu = sum(dp['Average'] for dp in cpu_metrics['Datapoints']) / len(cpu_metrics['Datapoints'])
                    
                    # 平均CPU使用率が5%以下のインスタンス
                    if avg_cpu < 5.0:
                        # コスト計算
                        monthly_cost = self.calculate_ec2_monthly_cost(instance['InstanceType'])
                        
                        unused_instances.append({
                            'InstanceId': instance_id,
                            'InstanceType': instance['InstanceType'],
                            'AvgCPUUtilization': avg_cpu,
                            'MonthlyCost': monthly_cost,
                            'LaunchTime': instance['LaunchTime'],
                            'Recommendation': 'Consider termination or downsizing'
                        })
        
        return unused_instances
    
    def find_unused_ebs_volumes(self):
        """未アタッチのEBSボリューム"""
        
        volumes = self.ec2.describe_volumes()
        unused_volumes = []
        
        for volume in volumes['Volumes']:
            if volume['State'] == 'available':  # 未アタッチ
                monthly_cost = self.calculate_ebs_monthly_cost(
                    volume['Size'], 
                    volume['VolumeType']
                )
                
                unused_volumes.append({
                    'VolumeId': volume['VolumeId'],
                    'Size': volume['Size'],
                    'VolumeType': volume['VolumeType'],
                    'MonthlyCost': monthly_cost,
                    'CreateTime': volume['CreateTime'],
                    'Recommendation': 'Delete if no longer needed'
                })
        
        return unused_volumes
    
    def generate_cost_optimization_report(self, unused_resources):
        """コスト最適化レポート生成"""
        
        total_potential_savings = 0
        
        report = {
            'analysis_date': datetime.now().isoformat(),
            'potential_savings': {},
            'recommendations': []
        }
        
        # EC2インスタンス
        if unused_resources['ec2_instances']:
            ec2_savings = sum(inst['MonthlyCost'] for inst in unused_resources['ec2_instances'])
            total_potential_savings += ec2_savings
            
            report['potential_savings']['ec2'] = {
                'monthly_savings': ec2_savings,
                'affected_instances': len(unused_resources['ec2_instances'])
            }
            
            report['recommendations'].append({
                'service': 'EC2',
                'action': 'Terminate or downsize low-utilization instances',
                'potential_savings': ec2_savings,
                'instances': unused_resources['ec2_instances']
            })
        
        # EBSボリューム
        if unused_resources['ebs_volumes']:
            ebs_savings = sum(vol['MonthlyCost'] for vol in unused_resources['ebs_volumes'])
            total_potential_savings += ebs_savings
            
            report['potential_savings']['ebs'] = {
                'monthly_savings': ebs_savings,
                'affected_volumes': len(unused_resources['ebs_volumes'])
            }
        
        report['total_monthly_savings'] = total_potential_savings
        report['annual_savings'] = total_potential_savings * 12
        
        return report
    
    def implement_automated_cost_controls(self):
        """自動コスト制御の実装"""
        
        # Lambda関数による定期チェック
        lambda_function = {
            'FunctionName': 'cost-optimizer',
            'Runtime': 'python3.9',
            'Role': 'arn:aws:iam::account:role/lambda-cost-optimizer',
            'Handler': 'lambda_function.lambda_handler',
            'Code': {
                'ZipFile': self.get_lambda_code()
            },
            'Environment': {
                'Variables': {
                    'SNS_TOPIC_ARN': 'arn:aws:sns:region:account:cost-alerts'
                }
            }
        }
        
        # EventBridge による定期実行
        schedule_rule = {
            'Name': 'daily-cost-check',
            'ScheduleExpression': 'rate(1 day)',
            'State': 'ENABLED',
            'Targets': [{
                'Id': '1',
                'Arn': 'arn:aws:lambda:region:account:function:cost-optimizer'
            }]
        }
        
        return {
            'lambda_function': lambda_function,
            'schedule_rule': schedule_rule
        }

# 使用例
optimizer = AWSCostOptimizer()
unused_resources = optimizer.analyze_unused_resources(days=30)
report = optimizer.generate_cost_optimization_report(unused_resources)

print(f"Potential monthly savings: ${report['total_monthly_savings']:.2f}")
print(f"Potential annual savings: ${report['annual_savings']:.2f}")

🔒 セキュリティ・ガバナンス

IAM ベストプラクティス実装

最小権限の原則に基づくIAM設計:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "EC2DeveloperAccess",
      "Effect": "Allow",
      "Action": [
        "ec2:DescribeInstances",
        "ec2:DescribeImages",
        "ec2:DescribeSecurityGroups",
        "ec2:DescribeSubnets",
        "ec2:DescribeVpcs"
      ],
      "Resource": "*"
    },
    {
      "Sid": "EC2InstanceManagement",
      "Effect": "Allow",
      "Action": [
        "ec2:StartInstances",
        "ec2:StopInstances",
        "ec2:RebootInstances"
      ],
      "Resource": "arn:aws:ec2:*:*:instance/*",
      "Condition": {
        "StringEquals": {
          "ec2:ResourceTag/Environment": ["development", "staging"]
        }
      }
    },
    {
      "Sid": "S3DeveloperBucketAccess",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject"
      ],
      "Resource": [
        "arn:aws:s3:::dev-bucket/*",
        "arn:aws:s3:::staging-bucket/*"
      ]
    },
    {
      "Sid": "CloudWatchLogsAccess",
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "logs:DescribeLogGroups",
        "logs:DescribeLogStreams"
      ],
      "Resource": "arn:aws:logs:*:*:log-group:/aws/lambda/dev-*"
    }
  ]
}

AWS Config による継続的コンプライアンス

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import boto3
import json

class AWSComplianceMonitor:
    def __init__(self):
        self.config = boto3.client('config')
        self.iam = boto3.client('iam')
        self.ec2 = boto3.client('ec2')
    
    def setup_config_rules(self):
        """AWS Config ルールの設定"""
        
        config_rules = [
            {
                'ConfigRuleName': 'ec2-security-groups-restricted-ssh',
                'Description': 'SSH access should be restricted',
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'INCOMING_SSH_DISABLED'
                },
                'InputParameters': json.dumps({
                    'desiredInstanceType': 't3.micro,t3.small'
                })
            },
            {
                'ConfigRuleName': 'iam-password-policy-check',
                'Description': 'IAM password policy should meet requirements',
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'IAM_PASSWORD_POLICY'
                },
                'InputParameters': json.dumps({
                    'RequireUppercaseCharacters': 'true',
                    'RequireLowercaseCharacters': 'true',
                    'RequireNumbers': 'true',
                    'MinimumPasswordLength': '12'
                })
            },
            {
                'ConfigRuleName': 's3-bucket-ssl-requests-only',
                'Description': 'S3 buckets should require SSL requests only',
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'S3_BUCKET_SSL_REQUESTS_ONLY'
                }
            }
        ]
        
        for rule in config_rules:
            try:
                self.config.put_config_rule(ConfigRule=rule)
                print(f"Created Config rule: {rule['ConfigRuleName']}")
            except Exception as e:
                print(f"Failed to create rule {rule['ConfigRuleName']}: {e}")
    
    def create_remediation_configurations(self):
        """自動修復設定"""
        
        remediation_configs = [
            {
                'ConfigRuleName': 'ec2-security-groups-restricted-ssh',
                'TargetType': 'SSM_DOCUMENT',
                'TargetId': 'AWSConfigRemediation-DeleteUnrestrictedSourceInSecurityGroup',
                'TargetVersion': '1',
                'Parameters': {
                    'AutomationAssumeRole': {
                        'StaticValue': {
                            'Values': ['arn:aws:iam::account:role/aws-config-remediation-role']
                        }
                    },
                    'GroupId': {
                        'ResourceValue': {
                            'Value': 'RESOURCE_ID'
                        }
                    }
                },
                'Automatic': True,
                'MaximumAutomaticAttempts': 3
            }
        ]
        
        for config in remediation_configs:
            try:
                self.config.put_remediation_configurations(
                    RemediationConfigurations=[config]
                )
                print(f"Created remediation for: {config['ConfigRuleName']}")
            except Exception as e:
                print(f"Failed to create remediation: {e}")

📊 運用監視・ログ管理

CloudWatch Insights 活用

アプリケーションログの高度な分析:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
import boto3
from datetime import datetime, timedelta

class CloudWatchInsightsAnalyzer:
    def __init__(self):
        self.logs_client = boto3.client('logs')
    
    def analyze_application_performance(self, log_group_name, hours=24):
        """アプリケーションパフォーマンス分析"""
        
        start_time = datetime.now() - timedelta(hours=hours)
        end_time = datetime.now()
        
        # レスポンス時間分析
        response_time_query = """
        fields @timestamp, @message
        | filter @message like /response_time/
        | parse @message "response_time: * ms" as response_time
        | stats avg(response_time) as avg_response_time, 
                max(response_time) as max_response_time,
                count() as request_count by bin(5m)
        | sort @timestamp desc
        """
        
        # エラー率分析
        error_rate_query = """
        fields @timestamp, @message
        | filter @message like /ERROR/ or @message like /error/
        | stats count() as error_count by bin(1h)
        | sort @timestamp desc
        """
        
        # 実行
        queries = [
            ('response_time_analysis', response_time_query),
            ('error_rate_analysis', error_rate_query)
        ]
        
        results = {}
        for query_name, query in queries:
            response = self.logs_client.start_query(
                logGroupName=log_group_name,
                startTime=int(start_time.timestamp()),
                endTime=int(end_time.timestamp()),
                queryString=query
            )
            
            query_id = response['queryId']
            
            # クエリ完了待機
            while True:
                result = self.logs_client.get_query_results(queryId=query_id)
                if result['status'] == 'Complete':
                    results[query_name] = result['results']
                    break
                elif result['status'] == 'Failed':
                    print(f"Query {query_name} failed")
                    break
                
                time.sleep(1)
        
        return results
    
    def create_custom_dashboards(self):
        """カスタムダッシュボード作成"""
        
        cloudwatch = boto3.client('cloudwatch')
        
        dashboard_body = {
            "widgets": [
                {
                    "type": "log",
                    "x": 0,
                    "y": 0,
                    "width": 12,
                    "height": 6,
                    "properties": {
                        "query": "SOURCE '/aws/lambda/my-function'\n| fields @timestamp, @message\n| filter @message like /ERROR/\n| sort @timestamp desc\n| limit 100",
                        "region": "us-east-1",
                        "title": "Recent Errors",
                        "view": "table"
                    }
                },
                {
                    "type": "metric",
                    "x": 0,
                    "y": 6,
                    "width": 12,
                    "height": 6,
                    "properties": {
                        "metrics": [
                            ["AWS/Lambda", "Duration", "FunctionName", "my-function"],
                            ["AWS/Lambda", "Errors", "FunctionName", "my-function"],
                            ["AWS/Lambda", "Invocations", "FunctionName", "my-function"]
                        ],
                        "period": 300,
                        "stat": "Average",
                        "region": "us-east-1",
                        "title": "Lambda Metrics"
                    }
                }
            ]
        }
        
        cloudwatch.put_dashboard(
            DashboardName='ApplicationMonitoring',
            DashboardBody=json.dumps(dashboard_body)
        )

🎯 まとめ・2025年AWS戦略

クラウドファースト時代の成功要因

技術的成功要因:

  1. コスト最適化: 継続的な使用量監視と自動最適化
  2. セキュリティファースト: 設計段階からのセキュリティ組み込み
  3. 運用自動化: Infrastructure as Code とCI/CD統合
  4. パフォーマンス監視: リアルタイム監視とアラート体制

ビジネス的成功要因:

  1. スケーラビリティ: 需要変動への柔軟な対応
  2. 可用性: 高可用性アーキテクチャの実装
  3. ガバナンス: 組織的なクラウド利用ルール
  4. スキル開発: チーム全体のAWSスキル向上

2025年注目のAWS技術トレンド

新サービス・機能:

  • Graviton4: 次世代ARMプロセッサ
  • Bedrock Agents: 生成AIワークフロー
  • CodeWhisperer for Infrastructure: IaCの自動生成
  • Sustainability Dashboard: 環境負荷可視化

ベストプラクティスの進化:

  • FinOps: 財務とエンジニアリングの統合
  • Platform Engineering: 開発者体験の向上
  • Observability: 可観測性の全社的実装
  • Sustainability: 持続可能なクラウド利用

学習・キャリアパス

2025年必須スキル:

  1. SageMaker MLOps: 機械学習運用の自動化
  2. EKS/Fargate: コンテナ運用の専門知識
  3. EventBridge: イベント駆動アーキテクチャ
  4. CDK/Terraform: Infrastructure as Code

認定資格戦略:

  • Solutions Architect Professional: アーキテクチャ設計
  • DevOps Engineer Professional: 運用自動化
  • Machine Learning Specialty: AI/ML実装
  • Security Specialty: セキュリティ専門性

このガイドは2025年9月のAWS技術情報を基に作成されています。最新情報は各公式サイトでご確認ください。

技術ネタ、趣味や備忘録などを書いているブログです
Hugo で構築されています。
テーマ StackJimmy によって設計されています。