본문 바로가기

Spring boot

동시성 관리하기 1탄) Application 레벨 - synchronized, ConcurrentHashMap

* 이 글은 정보제공을 우선으로 하여 구어체를 사용합니다. 🥹


Spring은 멀티 스레드 방식이기 때문에 성능은 좋지만

한 자원에 대해 여러 스레드가 접근할 시, 동시성 문제가 발생할 수 있다. 

 

예를 들어, 제한 인원이 100명으로 정해져 있는 한 그룹에 참여하고 싶을 때, 

동시에 100명이 참여 신청을 한다면 최종적으로 참여 인원은 100명 이하 일 것이다.

이 문제의 원인은 Race condition이 발생한 것이다. 

* Race condition이란? 

  두 개 이상의 프로세스(혹은 스레드)들이 하나의 자원에 접근하여 변경하는 경우, 접근 순서에 따라 실행 결과가 달라지는 현상

 

참여 인원 뿐만 아니라 예금 인출이나 재고 처리 등, 동시성 처리는 매우 중요한 과정이다.

지금부터 자원의 신뢰성과 안정성을 보장하는 동시성 제어 방법 몇 가지를 소개하겠다. 

 

(1탄에서는 Application 레벨에서 처리할 수 있는 synchronized, ConcurrentHashMap에 대해 소개하겠다.)

Synchronized

자바에서 제공해주는 synchronized는 메서드나 객체 변수에 사용된다. 

synchronized가 붙은 자원(메서드나 객체 변수)에는 오직 한 개의 스레드에서만 접근이 가능하고, 다른 스레드들은 blocked 상태로 대기한다. 따라서 synchronized를 남발하면 오히려 성능저하 문제가 일어날 수 있다. 

 

아래 상황은 한 그룹에 참여신청을 보냈을 때 service에서 일어나는 로직이다. 

그룹의 제한인원(personnel)을 검사하고 현재 참여 인원이 제한 인원을 넘는지 검사하고 있다. 

@Synchronized 어노테이션 확인

 

* synchronized는 java 문법이므로 java에서는 public synchronized void methodName( parameterType parameterName ) 이렇게 사용되지만 Kotlin은 @Synchronized 어노테이션을 사용해야 한다. 

 

이렇게 synchronized로 join method를 감싸줬지만 동시에 100명이 참여 신청을 보낸다면,

참여 신청이 완료된 최종 인원은 100명보다 적은 수 일 것이다. 

 

 

원인

원인은 @Transacitonal에 있다. @Transacitonal을 사용하면 첫번째 스레드가 완료되기 전에 두번째 스레드가 실행될 수 있다.

 

이유가 무엇일까?

 

현재 join 메소드는 @Transacitonal도 감싸진 메소드이다. AOP를 통해 구현된 @Transacitonal 의 동작원리를 살펴보면, 해당 메소드 앞 뒤로 트랜잭션 시작, 트랜잭션 커밋/롤백을 수행한다. 즉, 다음과 같은 형태이다.

Spring Begins Transaction -> join Method -> Spring commits/rollback Transaction

 

우선, 트랜잭션 프록시가 호출되면 데이터 소스를 찾아 사용하면서 이 때 커넥션 풀에서 커넥션을 획득하게 된다. 이렇게 동시에 온 요청들이 모두 각각 커넥션을 소유할 수 있기 때문에 트랜잭션의 시작은 모두가 동시에도 가능하게 된다. 이 후, synchronized를 걸었던 method에 대해서는 이전에 말했듯 한 스레드씩 순차적으로 실행된다. 

 

하지만 첫 스레드가 synchronized method를 처리하고 commits 또는 rollback을 시도하려는 순간, 다른 스레드가 synchronized method를 획득하게 된다. 하지만 앞 스레드가 아직 DB에 commits 또는 rollback을 하지 않았고 따라서 두번째 스레드는 첫번째 스레드의 결과를 반영하지 않은 자원을 조회하게 된다. 또 다시 race condition이 발생한 것이다. 

T1: B--S--C
T2: B-----S--C

여기서 B는 Spring Begins Transaction  /  S는 Synchronized Method  /  C는 Spring commits Transaction  를  나타낸다.

 

 

해결

이 문제를 해결하기 위해 method에 Synchronized를 붙여줄 facade 계층을 만들고 그 아래 service 계층에 @Transactional을 붙여주었다. 

Facade -> syncronized

Service -> @Transactional

따라서 아래와 같이 동작한다. 

T1: B--S--C 
T2: -------B--S--C

* 조금 더 복잡하거나 Lock 거는 것을 '더보기' 속 메서드로 만들고 싶다면 ReentrantLock에 대해 알아보면 된다. (read와 write에 대해 각각 lock을 걸고 싶다면 더 효과 좋을 ReentrantReadWriteLock을 알아보면 된다.)

더보기

Synchronized를 아래처럼 사용하면 ReentrantLock과 동일하다.

fun syncStatements() {
	synchronized(this) {
    	doSomething()
    }
    doSomething2()
}

 

사용법

private val lock: ReentrantLock = ReentrantLock(true)

fun foo(){
	try {
    	lock.lock()
        doSomething()
    }finally {
    	lock.unlock()
    }
}

 

 

추가 문제

하지만 이 조차도 문제가 발생한다.

1) 그룹에 참여하는 join 메서드와 그룹에서 탈퇴하는 out 메서드에 모두 synchronized가 붙어있다고 가정해보자. 두 메서드가 동시에 호출될 때, 각각 다른 스레드로 동시에 실행 될 수 있다. (synchronized는 한 메서드에 대해 Lock을 걸기 때문이다.) 제한 인원 값을 바꾼다면 또다시 race condition이 일어날 수 있다.

2) synchronized 는 한개의 프로세스에서만 동시성이 보장된다. 서버가 한 대일 경우는 문제 없지만 Scale Out을 통해 서버가 2대 이상 늘어나면 여러 서버에서 데이터에 접근하며 여전히 동시성 문제가 발생하게 된다. 

 

 

 

ConcurrentHashMap

다음 방법 또한 java에서 제공하는 동시성 처리 방법인 ConcurrentHashMap이다.

ConcurrentHashMap 클래스는 Hashtable 클래스, HashMap 클래스와 유사한 특징을 갖고 있다. 

이들의 차이를 먼저 알아보자.

 

먼저 Hashtable 클래스

public class Hashtable<K,V>
    extends Dictionary<K,V>
    implements Map<K,V>, Cloneable, java.io.Serializable {

    public synchronized int size() { }

    @SuppressWarnings("unchecked")
    public synchronized V get(Object key) { }

    public synchronized V put(K key, V value) { }
}

Hashtable 클래스는 거의 모든 method에 synchronized 가 붙어있는 것을 확인할 수 있다. 따라서 synchronized 의 장점과 같이 Multi-thread 에서 동시성 문제를 해결할 수 있지만 synchronized 가 남발되어 병목 현상이 발생할 수 있고 여러 개의 서버에서는 동시성 문제를 해결할 수 없다. 

또한 Collection Framework가 나오기 이전부터 존재하는 클래스이기 때문에 최근에는 잘 사용하지 않는 클래스이다.

 

HashMap 클래스

public class HashMap<K,V> extends AbstractMap<K,V>
    implements Map<K,V>, Cloneable, Serializable {

    public V get(Object key) {}
    public V put(K key, V value) {}
}

HashMap 클래스는 synchronized 가 붙어있지 않다. 따라서 Java의 Map 인터페이스 구현체 중에는 성능이 가장 좋다. 하지만 Multi-thread 환경에서 사용할 수 있도록 하는 장치(Lock)가 없기 때문에 멀티 스레드 환경에서 사용할 수 없다. 

 

ConcurrentHashMap 클래스

Hashtable의 단점을 해결하면서 Multi-thread 환경에서 사용할 수 있도록 나온 클래스가 ConcurrentHashMap 이다.

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {

    // 버킷의 수
    private static final int DEFAULT_CAPACITY = 16;

    // 동시에 업데이트를 수행하는 쓰레드 수
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    
    public V get(Object key) {}

    public boolean containsKey(Object key) { }

    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
}

위는 ConcurrentHashMap 클래스의 일부 코드이다. 

get() 메소드에는 synchronized가 없고, put()에는 중간에 synchronized가 한 번 들어간다. 

 

따라서 읽기 작업은 동시성 제어를 하지 않고 여러 스레드에서 읽을 수 있지만, 쓰기 작업은 특정 버킷(index)나 엔트리에 대해 Lock을 사용한다는 것을 알 수 있다.

 

DEFAULT_CAPACITY(버킷의 수) == DEFAULT_CONCURRENCY_LEVEL(동시작업 가능한 쓰레드 수)인 이유는 버킷 단위로 Lock을 사용하기 때문에 두 스레드가 같은 버킷을 접근하는게 아니라면 Lock을 기다릴 필요가 없기 때문이다. (= 버킷당 하나의 Lock을 가지고 있다고 할 수 있다.)

* DEFAULT는 16개지만 ConcurrentHashMap 생성자를 통해 동시에 몇 개의 쓰레드를 작동시킬 것인가에 대한 조정이 가능하다. 

 

즉, 여러 쓰레드에서 ConcurrentHashMap 객체에 동시에 데이터를 삽입, 참조하더라도 그 데이터가 다른 세그먼트에 위치하면 서로 락을 얻기 위해 경쟁하지 않습니다.

 

 

 

세가지 Map을 정리하자면, HashTable과, ConcurrentHashMap은 thread-safe 하기 때문에 멀티 스레드 환경에서도 사용할 수 있다. HashTable은 synchronized 를 이용해 스레드간 락을 걸어 멀티 스레드 환경에서도 안전하지만, synchronized 가 너무 남발되어 스레드간 동기화 락이 성능 측면에서 매우 느리다. 하지만, ConcurrentHashMap은 Bucket 또는 Entry 별로 락을 걸어 성능이 HashTable 보다 빠르다. 

 

 Bucket(= Entry) 이란?

더보기

아래 사진을 참고하면 HashTable은 method를 한개의 스레드씩 점유해 다른 스레드들은 접근을 대기하지만, ConcurrentHashMap은 스레드별로 한 Entry 씩 점유하기 때문에 HashTable보다 성능이 좋은 것이다. 

출처:&nbsp;https://parkmuhyeun.github.io/woowacourse/2023-09-09-Concurrent-Hashmap/
출처:&nbsp;https://parkmuhyeun.github.io/woowacourse/2023-09-09-Concurrent-Hashmap/

 

 

사용법

각 메서드 사용법

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>  
implements ConcurrentMap<K,V>, Serializable {
	// ...
    
	// 주어진 키에 해당하는 값이 없거나 null이면,
	// mappingFunction을 사용해서 새 값을 계산하고 맵에 저장한다.
	// 이미 값이 존재하면 현재 값을 반환한다.
	public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { /* ... */ }
	// 주어진 키에 해당하는 값이 존재하면,
	// mappingFunction을 사용해서 새 값을 계산하고 맵에 저장한다.
	// 계산된 값이 null이면 키-값 쌍이 맵에서 제거된다.
	// 주어진 키에 값이 없으면 아무 일도 하지 않는다.
	public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { /* ... */ }
	// 키에 대한 현재 값을 사용하여 remappingFunction을 통해서
	// 새 값을 계산하고 맵에 저장한다.
	// 계산된 값이 null이면 키-값 쌍이 맵에서 제거된다.
	public V compute(K key,  
	BiFunction<? super K, ? super V, ? extends V> remappingFunction) { /* ... */ }
	// 주어진 키에 해당하는 값이 없거나 null이면 주어진 값이 맵에 저장된다.
	// 이미 값이 존재하면 remappingFunction을 통해 주어진 값과 현재 값을 병합하고
	// 병합된 값을 맵에 저장한다.
	// 병합된 값이 null이면 키-값 쌍이 맵에서 제거된다.
	public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { /* ... */ }
}

 

computeIfAbsent 사용 예제

주어진 키에 해당하는 값이 없거나 null이면, mappingFunction을 사용해서 새 값을 계산하고 맵에 저장한다. 이미 값이 존재하면 현재 값을 반환한다.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
 
public class GroupingExample {
    public static void main(String[] args) {
        String[] names = {"앨리스", "밥", "찰리", "앨리스", "밥", "찰리", "앨리스"};
 
        ConcurrentHashMap<String, List<String>> nameGroups = new ConcurrentHashMap<>();
 
        for (String name : names) {
            // computeIfAbsent를 사용하여 nameGroups에 이름이 없으면 새 ArrayList를 생성하고 추가한다.
            nameGroups.computeIfAbsent(name, key -> new ArrayList<>()).add(name);
        }
 
        System.out.println(nameGroups);
    }
}
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

@RequiredArgsConstructor
@Service
public class LockStockFacade {

    private ConcurrentHashMap<String, Lock> locks = new ConcurrentHashMap<>();

    private final StockService stockService;

    public void decrease(Long id) throws InterruptedException {
        Lock lock = locks.computeIfAbsent(String.valueOf(id), key -> new ReentrantLock());
        boolean acquiredLock = lock.tryLock(3, TimeUnit.SECONDS);
        if (!acquiredLock) {
            throw new RuntimeException("Lock 획득 실패");
        }
        try {
            stockService.decrease(id);
            dosomething();
        } finally {
            lock.unlock();
        }
    }
}
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

@RequiredArgsConstructor
@Service
public class LockStockFacade {
   // 입금 통장 id를 Map에 가둬놓기
    private val locks: ConcurrentMap<Long, AtomicBoolean> = ConcurrentHashMap()

    fun deposit(@PathVariable id: Long, @RequestBody amount: Long): Account {
        val lock = locks.computeIfAbsent(id) { AtomicBoolean(false) }

        if (lock.get()) {
            throw RuntimeException("ID: {id} | 입금이 진행되고 있는 아이디입니다.")
        }

        try {
            lock.set(true)

            return db.balance(id, db.balance(id).balance + amount)
        } finally {
            lock.set(false)
        }
    }
}

 

* 스프링부트 3.2 이상부터는 Spring Integration 의존성을 추가한 뒤, 더보기처럼 LockRegistry을 사용해 더 간편하게 Lock 이용가능

더보기
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;

@Configuration
public class LockConfig {

    @Bean
    public LockRegistry lockRegistry() {
        return new DefaultLockRegistry();
    }
}
@RequiredArgsConstructor
@Service
public class LockStockFacade {

    private final LockRegistry lockRegistry;
    private final StockService stockService;

    public void decrease(Long id) throws InterruptedException {
        lockRegistry.executeLocked(String.valueOf(id), () -> {
            stockService.decrease(id);
        });
    }
}

 

 

 

 

결론 

** synchronized ** 

- 스레드 별로 Lock을 걸기 때문에 동시성 처리는 가능하지만 남발하면 오히려 성능이 안 좋아진다.

- 한 메서드에 대해 Single server + Multi-thread 는 동시성 제어가 가능하지만 여러 대 server에서는 동시성 제어가 불가능하다.

- 한 값을 변경시키는 두 메서드가 동시에 실행되면 race condition이 일어날 수 있다. 

 

** ConcurrentHashMap **

Hashtable 클래스

- method 대부분에 synchronized 가 붙어 성능은 안 좋다.

- 스레드 별로 Lock을 걸어 Single server + Multi-thread 에서는 동시성 처리가 가능하다.

HashMap 클래스

- synchronized 등 Lock 처리가 없어 성능은 가장 좋지만, 동시성 처리가 불가능하다.

ConcurrentHashMap 클래스

- get은 Lock이 없어 여러 스레드가 자원에 접근 가능하고, put에는 Lock이 있어 한 스레드만 각 entry에 접근 가능하다.

- Single server + Multi-thread 는 동시성 제어가 가능하지만 여러 대 server에서는 동시성 제어가 불가능하다.

 

 

 

 

1편이 너무 길어져서 낙관적 Lock, 비관적 Lock은 2편으로 넘기겠다!

 

 

 

 

 

참고)

synchronized : https://ttl-blog.tistory.com/1567

ReentantLock : https://velog.io/@picbel/Synchronized%EC%99%80-ReentrantLock%EC%9D%98-%EC%B0%A8%EC%9D%B4

ConcurrentHashMap :

설명 : https://devlog-wjdrbs96.tistory.com/269

사용법 : https://blog.hexabrain.net/403#cascompare-and-swap-%EC%97%B0%EC%82%B0