Redis 캐시 전략 실전 — 단순 TTL을 넘어서

게시일: 2025년 10월 28일 · 14분 읽기

Redis를 단순 캐시로만 쓰는 건 진짜 아깝다

많은 개발자들이 Redis를 써본다. 하지만 대부분은 이렇게 쓴다: "데이터를 Redis에 넣고 5분 후 자동으로 삭제한다." 이게 캐싱의 전부는 아니다.

내가 현업에서 지켜본 Redis는 훨씬 더 강력한 도구다. 세션 관리, 실시간 순위판, 속도 제한, 메시지 큐, 분산 락 등 많은 것들을 할 수 있다. 하지만 패턴을 모르면 그냥 "빠른 저장소" 정도일 뿐이다.

캐싱 패턴 1: Cache-Aside (Lazy Loading)

가장 흔한 패턴이다. 데이터가 필요할 때 캐시를 확인하고, 없으면 DB에서 가져온다:

import redis
import json
import time

r = redis.Redis(host='localhost', port=6379)

def get_user(user_id: int) -> dict:
    # 1. 캐시에서 확인
    cache_key = f'user:{user_id}'
    cached_data = r.get(cache_key)

    if cached_data:
        print(f'Cache HIT for user {user_id}')
        return json.loads(cached_data)

    # 2. 캐시 미스: DB에서 가져오기
    print(f'Cache MISS for user {user_id}')
    user = fetch_from_database(user_id)  # 느린 DB 쿼리

    # 3. 캐시에 저장 (1시간 TTL)
    r.setex(cache_key, 3600, json.dumps(user))

    return user

# 데이터베이스에서 가져오는 시뮬레이션
def fetch_from_database(user_id: int) -> dict:
    time.sleep(0.1)  # DB 쿼리 지연 시뮬레이션
    return {
        'id': user_id,
        'name': f'User {user_id}',
        'email': f'user{user_id}@example.com'
    }

# 첫 번째 호출: 캐시 미스 (느림)
user = get_user(1)  # 약 100ms

# 두 번째 호출: 캐시 히트 (빠름)
user = get_user(1)  # 약 1ms

장점: 간단하고, 필요한 것만 캐시한다

단점: 첫 요청은 느리고, 데이터 동기화가 복잡할 수 있다

캐싱 패턴 2: Cache-Through (Write-Through)

데이터를 쓸 때 캐시도 함께 업데이트한다:

def update_user(user_id: int, data: dict) -> None:
    # 1. DB 업데이트
    update_in_database(user_id, data)

    # 2. 캐시 업데이트
    cache_key = f'user:{user_id}'
    r.setex(cache_key, 3600, json.dumps(data))

def delete_user(user_id: int) -> None:
    # 1. DB 삭제
    delete_from_database(user_id)

    # 2. 캐시 삭제
    cache_key = f'user:{user_id}'
    r.delete(cache_key)

장점: 데이터 일관성이 높다

단점: 모든 쓰기에서 캐시도 업데이트해야 함

캐싱 패턴 3: Cache-Behind (Write-Behind)

데이터를 캐시에만 먼저 저장하고, 나중에 배치로 DB에 반영한다. 매우 높은 처리량이 필요할 때 유용하다:

from datetime import datetime, timedelta

def increment_user_score(user_id: int, points: int) -> None:
    # 1. 캐시에만 저장 (비동기)
    cache_key = f'user:score:{user_id}'
    r.incrby(cache_key, points)

    # 2. "쓰기 대기" 리스트에 추가
    r.sadd('pending_writes', user_id)

    # 3. 나중에 배치로 처리
    # 별도 워커 프로세스에서 1분마다 실행
    # flush_to_database()

def flush_to_database():
    # 대기 중인 모든 사용자 처리
    pending_users = r.smembers('pending_writes')

    for user_id in pending_users:
        cache_key = f'user:score:{user_id}'
        new_score = r.get(cache_key)

        if new_score:
            # DB에 반영
            update_user_score_in_database(int(user_id), int(new_score))

            # 캐시 정리
            r.delete(cache_key)
            r.srem('pending_writes', user_id)

# 백그라운드 워커에서 주기적으로 실행
import threading
import time

def background_flush_worker():
    while True:
        time.sleep(60)  # 1분마다
        flush_to_database()

# 시작
worker_thread = threading.Thread(target=background_flush_worker, daemon=True)
worker_thread.start()

장점: 매우 빠른 쓰기 성능

단점: 일시적 데이터 손실 위험, 복잡함

Redis Pub/Sub — 실시간 메시징

Redis의 Pub/Sub은 간단하지만 강력한 메시징 시스템이다:

import redis
import threading

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 퍼블리셔
def publish_notification():
    r.publish('notifications', json.dumps({
        'type': 'user_registered',
        'user_id': 123,
        'email': 'newuser@example.com',
        'timestamp': datetime.now().isoformat()
    }))

# 구독자
def subscribe_notifications():
    pubsub = r.pubsub()
    pubsub.subscribe('notifications')

    print('Waiting for notifications...')
    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            print(f'Received: {data}')

            # 새 사용자 등록 이메일 발송
            if data['type'] == 'user_registered':
                send_welcome_email(data['email'])

# 백그라운드에서 구독 시작
subscriber_thread = threading.Thread(target=subscribe_notifications, daemon=True)
subscriber_thread.start()

# 메인 스레드에서 이벤트 발행
time.sleep(1)
publish_notification()

실제 사용 사례: 사용자가 실시간으로 알림을 받을 수 있다.

Redis Sorted Sets — 실시간 순위판

게임, 리더보드, 최신 데이터 추적에 완벽하다:

def record_game_score(player_id: str, score: int):
    # Sorted set에 플레이어 추가 (스코어가 정렬 기준)
    r.zadd('game_leaderboard', {player_id: score})

def get_leaderboard(limit: int = 10) -> list:
    # 상위 10명 조회 (스코어 내림차순)
    return r.zrange('game_leaderboard', 0, limit - 1, rev=True, withscores=True)

def get_player_rank(player_id: str) -> int:
    # 플레이어의 순위 (0이 1위)
    rank = r.zrevrank('game_leaderboard', player_id)
    return rank + 1 if rank is not None else 0

# 예제
record_game_score('alice', 1500)
record_game_score('bob', 1200)
record_game_score('charlie', 1800)

# 상위 3명
leaderboard = get_leaderboard(3)
print(leaderboard)
# [('charlie', 1800.0), ('alice', 1500.0), ('bob', 1200.0)]

# Alice의 순위
print(f"Alice's rank: {get_player_rank('alice')}")
# Alice's rank: 2

# 특정 범위의 스코어
top_scorers = r.zrangebyscore('game_leaderboard', 1500, float('inf'), withscores=True)
print(top_scorers)
# [('charlie', 1800.0), ('alice', 1500.0)]

Redis Lua 스크립트 — 원자성 보장

여러 명령을 하나의 원자적(atomic) 작업으로 실행한다:

# 자동차 충돌: 포인트 인출 (존재하는 경우만)
withdraw_script = r.register_script(

      
iL
ian.lab

실무 개발자입니다. 현장에서 겪은 문제와 해결 과정을 기록합니다. 오류 제보는 연락처로 보내주세요.