Java & Kotlin

Kotlin으로 TTL 기반 Map 구현하기: Java 21 가상 쓰레드와 ConcurrentHashMap 최적화

백엔드 유성 2024. 10. 4. 01:11

Redis를 사용할 때 유용한 기능인 시간이 지나면 데이터가 지워지는 기능이 있습니다.

 

이것을 Kotlin에서 사용해보면 좋을 것 같아 구현해보겠습니다.

 

Redis의 TTL(Time To Live) 기능은 데이터가 일정 시간이 지나면 자동으로 삭제되는 매우 유용한 기능입니다. 이 기능을 활용하면 일정 시간 동안만 유효한 데이터를 관리할 수 있어 리소스 효율성을 극대화할 수 있습니다. Redis 대신 Kotlin에서 직접 이러한 TTL 기능을 구현해보면, 기존 Map처럼 동작하면서도 시간 기반 데이터 관리가 필요한 상황에서 유용하게 사용할 수 있습니다.

 

요구사항

  • 기능은 기존 Map과 동일하게 동작해야 합니다.
  • TTL 시간을 생성자로 주입합니다.
  • 시간이 지나면 key, Value가 제거됩니다.

 

구글이 제작한 라이브러리인 Guava에 기능이 있지만 직접 코드로 만들어보겠습니다.

 

우선 단순하게 작성하기 위해서 그리고 기존 Map의 인스턴스 메서드(함수) 사용을 위해 AbstractMap을 상속해서 작성해줍니다.

 

코드는 아래 페이지를 열어 확인할 수 있습니다.

더보기
import java.time.LocalDateTime
import java.util.*
import java.util.concurrent.TimeUnit

class TtlMap<K, V>(private val ttl: Long, private val unit: TimeUnit) : AbstractMap<K, V>() {

    private val map = mutableMapOf<K, Pair<LocalDateTime, V>>()

    // 만료 시간 계산
    private fun calculateExpiration(): LocalDateTime {
        return LocalDateTime.now().plusNanos(unit.toNanos(ttl))
    }

    // 만료된 데이터 삭제
    private fun cleanupExpiredEntries() {
        val now = LocalDateTime.now()
        map.entries.removeIf { it.value.first.isBefore(now) }
    }

    override fun get(key: K): V? {
        cleanupExpiredEntries()
        return map[key]?.second
    }

    override fun containsKey(key: K): Boolean {
        cleanupExpiredEntries()
        return map.containsKey(key)
    }


    override fun put(key: K, value: V): V? {
        // 만료 시간을 가지고 데이터 삽입
        val expirationTime = calculateExpiration()
        return map.put(key, Pair(expirationTime, value))?.second
    }

    override val entries: MutableSet<MutableMap.MutableEntry<K, V>>
        get() {
            cleanupExpiredEntries()
            return map.entries.map { (key, pair) ->
                object : MutableMap.MutableEntry<K, V> {
                    override val key: K = key
                    override var value: V = pair.second

                    override fun setValue(newValue: V): V {
                        val oldValue = value
                        map[key] = Pair(calculateExpiration(), newValue)
                        return oldValue
                    }
                }
            }.toMutableSet()
        }
}
  • AbstractMap을 사용해서 구현을 단순화 했습니다.
  • map을 만들고 value를 Pair로 설정하여 값을 2개 갖을 수 있게 만듭니다.
  • calculateExpiration 함수 을 만들어 만료시간 계산을 합니다.
    • 데이터 put() 시점에 만료 시간을 삽입합니다.
  • cleanupExpiredEntries 함수를 만들어 만료된 데이터를 삭제하도록 합니다.
  • 데이터 호출 시에만 cleanupExpiredEntries가 동작합니다.

 

테스트부터 해보겠습니다.

 

테스트 코드는 아래 페이지를 열어 확인할 수 있습니다.

더보기

 

import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.DisplayName
import java.util.concurrent.TimeUnit
import kotlin.test.Test

class TtlMapTest {

    @Test
    @DisplayName("put과 get 동작 테스트")
    fun testPutAndGet() {
        //given
        val map = TtlMap<String, String>(1, TimeUnit.SECONDS)
        map["key1"] = "value1"
        assertEquals("value1", map["key1"], "Initial value should be retrievable")

        //when
        Thread.sleep(1000)

        //then
        assertNull(map["key1"], "Expired value should be null")
    }

    @Test
    @DisplayName("containsKey 동작 테스트")
    fun testContainsKey() {
        //given
        val map = TtlMap<String, String>(1, TimeUnit.SECONDS)
        map["key2"] = "value2"
        assertTrue(map.containsKey("key2"), "Key should exist after putting a value")

        //when
        Thread.sleep(1000)

        //then
        assertFalse(map.containsKey("key2"), "Key should be expired and removed")
    }

    @Test
    @DisplayName("여러 엔트리 처리 및 TTL 만료 테스트")
    fun testMultipleEntries() {
        //given
        val map = TtlMap<String, String>(2, TimeUnit.SECONDS)

        map["key1"] = "value1"
        map["key2"] = "value2"

        assertEquals("value1", map["key1"], "key1 should return value1")
        assertEquals("value2", map["key2"], "key2 should return value2")

        //when
        Thread.sleep(1000)

        //then
        assertEquals("value1", map["key1"], "key1 should still exist before TTL expires")
        assertEquals("value2", map["key2"], "key2 should still exist before TTL expires")

        //when
        Thread.sleep(2000)

        //then
        assertNull(map["key1"], "key1 should be expired after TTL")
        assertNull(map["key2"], "key2 should be expired after TTL")
    }

    @Test
    @DisplayName("Map 크기 및 만료 후 크기 테스트")
    fun testSize() {
        //given
        val map = TtlMap<String, String>(1, TimeUnit.SECONDS)
        map["key1"] = "value1"
        map["key2"] = "value2"
        assertEquals(2, map.size, "Size should be 2 after adding two elements")

        //when
        Thread.sleep(1000)

        //then
        assertEquals(0, map.size, "Size should be 0 after TTL expires and entries are removed")
    }

    @Test
    @DisplayName("setValue 동작 테스트")
    fun testSetValue() {
        //given
        val map = TtlMap<String, String>(2, TimeUnit.SECONDS)
        map["key1"] = "value1"
        assertEquals("value1", map["key1"], "Initial value should be 'value1'")
        map["key1"] = "new_value1"
        assertEquals("new_value1", map["key1"], "Updated value should be 'new_value1'")

        //when
        Thread.sleep(2000)

        //then
        assertNull(map["key1"], "Value should be null after TTL expires")
    }
}

 

 

기존 데이터 100만건을 기준으로 HashMap과 TtlMap을 비교해보겠습니다.

성능 테스트 결과


## 성능 테스트 결과

HashMap - Put Time: 205 ms
HashMap - Get Time: 65 ms
TtlMap - Put Time: 343 ms
TtlMap - Get Time: 5095 ms (HashMap 대비 7800%)

 

테스트 코드는 아래 페이지를 열어 확인할 수 있습니다.

더보기
import java.util.concurrent.TimeUnit
import kotlin.system.measureTimeMillis
import kotlin.test.Test

class MapPerformanceTest {

    @Test
    fun testPerformanceWithHashMap() {
        // given
        val hashMap = mutableMapOf<String, String>()

        // when
        val putTime = measureTimeMillis {
            for (i in 1..1_000_000) {
                hashMap["key$i"] = "value$i"
            }
        }

        val getTime = measureTimeMillis {
            for (i in 1..1_000_000) {
                hashMap["key$i"]
            }
        }

        // then
        println("HashMap - Put Time: $putTime ms")
        println("HashMap - Get Time: $getTime ms")
    }

    @Test
    fun testPerformanceWithTtlMap() {
        // given
        val ttlMap = TtlMap<String, String>(5, TimeUnit.SECONDS)

        // when
        val putTime = measureTimeMillis {
            for (i in 1..1_000_000) {
                ttlMap["key$i"] = "value$i"
            }
        }

        val getTime = measureTimeMillis {
            for (i in 1..1_000_000) {
                ttlMap["key$i"]
            }
        }

        // then
        println("TtlMap - Put Time: $putTime ms")
        println("TtlMap - Get Time: $getTime ms")
    }
}

 

 

성능을 개선시킬 방법으로 아래와 같은 방법이 있습니다.

  1. 백그라운드 scheduled cleanup
  2. LocalDateTime 이 아닌 System nano 사용
  3. get() 메서드 사용 시 키가 존재할때만 cleanup 실행

1번이 가장 좋은 방법일 것 같고 3번은 작성하는 코드 대비 성능 개선이 크지 않을 것 같으므로

2번 -> 테스트 -> 1번 -> 테스트 를 진행하겠습니다.

LocalDateTime 이 아닌 System nano 사용

// 만료 시간 계산
private fun calculateExpiration(): Long {
    return System.nanoTime().plus(unit.toNanos(ttl))
}

// 만료된 데이터 삭제
private fun cleanupExpiredEntries() {
    val now = System.nanoTime()
    map.entries.removeIf { it.value.first.compareTo(now) < 0 }
}

 

## 성능 테스트 결과

HashMap - Put Time: 203 ms
HashMap - Get Time: 80 ms
TtlMap - Put Time: 168 ms
TtlMap - Get Time: 5066 ms

 

LocalDateTime 대신 System.nanoTime을 사용하는 이유는 성능을 조금이라도 더 개선하기 위함입니다. System.nanoTime은 현재 시간보다 상대적인 시간 계산에 더 적합하며, 보다 가볍고 빠르게 실행됩니다. 성능 테스트에서 큰 차이는 없었지만, JVM이 LocalDateTime을 최적화했거나 시간 관련 연산이 생각보다 빠르게 이루어졌을 가능성도 있습니다

 

 

백그라운드 scheduled cleanup

백그라운드 작업을 위해서는 ScheduledThreadPool 을 사용하면 되는데

 

이번 Java21에서 새로 나온 가상 쓰레드를 사용해보겠습니다.

    •  인스턴스가 생성될 때 마다 쓰레드(또는 가상쓰레드)가 생성되고 쓰레드의 동작시간 중 99%가 "TIMED_WAITING" 상태이므로 가상 쓰레드가 더 효율적일 것 같습니다.
    • Java 21에서 도입된 가상 쓰레드는 기존의 커널 쓰레드와는 다르게 JVM에서 가볍게 생성되며, 작업이 필요할 때만 커널 쓰레드를 사용해 리소스를 효율적으로 관리합니다. 가상 쓰레드는 대규모의 동시성 작업을 처리할 때 특히 유리합니다. 예를 들어, 하나의 OS 커널 쓰레드에서 수천 개의 가상 쓰레드를 실행할 수 있기 때문에 리소스 사용을 최소화하면서도 높은 동시성을 제공할 수 있습니다.
private val cleanupExecutor = Executors.newVirtualThreadPerTaskExecutor()

    init {
        cleanupExecutor.submit {
            while (true) {
                cleanupExpiredEntries()
                Thread.sleep(unit.toMillis(ttl))
            }
        }
    }

 

그럼 이제 cleanupExpiredEntries 함수를 get 또는 containsKey 함수에서 사용하지 않아도 되니

override 된 함수를 AbstractMap에 넘겨주겠습니다.

 

AbstractMap에 구현되어있는 메서드들을 지워주겠습니다.

import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

class TtlMap<K, V>(private val ttl: Long, private val unit: TimeUnit) : AbstractMap<K, V>() {

    private val map = mutableMapOf<K, Pair<Long, V>>()

    private val cleanupExecutor = Executors.newVirtualThreadPerTaskExecutor()

    init {
        cleanupExecutor.submit {
            while (true) {
                cleanupExpiredEntries()
                Thread.sleep(1000)
            }
        }
    }
    
    // 만료 시간 계산
    private fun calculateExpiration(): Long {
        return System.nanoTime().plus(unit.toNanos(ttl))
    }

    // 만료된 데이터 삭제
    private fun cleanupExpiredEntries() {
        val now = System.nanoTime()
        map.entries.removeIf { it.value.first.compareTo(now) < 0 }
    }

    override fun put(key: K, value: V): V? {
        // 만료 시간을 가지고 데이터 삽입
        val expirationTime = calculateExpiration()
        return map.put(key, Pair(expirationTime, value))?.second
    }

    override val entries: MutableSet<MutableMap.MutableEntry<K, V>>
        get() {
//            cleanupExpiredEntries()
            return map.entries.map { (key, pair) ->
                object : MutableMap.MutableEntry<K, V> {
                    override val key: K = key
                    override var value: V = pair.second

                    override fun setValue(newValue: V): V {
                        val oldValue = value
                        map[key] = Pair(calculateExpiration(), newValue)
                        return oldValue
                    }
                }
            }.toMutableSet()
        }
}

 

get과 containsKey 함수의 경우 AbstractMap에서 entries라는 오버라이딩 된 함수를 사용하므로 제거해주었습니다.

 

기능 테스트를 해보면

 

테스트 결과는 성공입니다.

[주의] 단. 백그라운드로 실행 시 1초 차이로 삭제가 되어야 할 데이터가 다음 cleanup이 돌때까지 남아있을 수 있습니다.

 

## 성능 결과

HashMap - Put Time: 159 ms
HashMap - Get Time: 56 ms
TtlMap - Put Time: 172 ms
TtlMap - Get Time: 10093 ms

 

TtlMap의 성능이 초기에는 낮게 측정되었습니다. 그 이유는 entrySet()을 오버라이드할 때 발생하는 잦은 반복 작업으로 인해 성능이 저하되었기 때문입니다. 이를 해결하기 위해 필요한 메서드인 putget만을 오버라이드하고 나니 성능이 크게 개선되었습니다.

 

## 성능 결과

HashMap - Put Time: 261 ms
HashMap - Get Time: 58 ms
TtlMap - Put Time: 227 ms
TtlMap - Get Time: 78 ms

 

드디어 원하는 속도가 나와줍니다~

 

 

 

그런데 여기서 두가지 문제가 있습니다.

  1. TtlMap은 기본적으로 Thread-Safe하지 않게 만들려고 했으나, 백그라운드용 쓰레드가 map의 값을 삭제하기에 인스턴스를 생성하면 동시성 문제가 발생합니다.
  2. TtlMap 인스턴스가 사용이 끝나도 cleanupExecutor가 TtlMap인스턴스를 (강한)참조하고 있으므로 GC의 대상이 되지 않습니다. 사용자로 하여금 Interupt 를 해주어야 합니다.

1번을 해결하기 위해 TtlMap에서 처리되는 map을 처음 생각했던 방식과는 다르게 Thread-Safe하게 만들어주겠습니다.

 -> ConcurrentHashMap을 사용하겠습니다.

 

2번을 해결하기 위해 TtlMap이 약한참조 또는 GC 대상인 상태가 되면 이벤트를 발생시켜 Interupt를 하기 위해서 아래와 같은 참조 타입을 사용해보았습니다.

  1. PhantomReference : GC의 Mark, Sweep 가 모두 끝나고 메모리 해제 직전에 Reference Queue에 들어가게 됩니다.(객체 연결이 끊어져 사용은 불가능하나 메모리에 남아있는 상태)
  2. WeakReference : 강한 참조가 없는 상태, 예를 들어 지역변수(객체)를 메서드 실행이 끝나고 스택 메모리에서 메서드가 빠져나갔을 때 (언제든 Mark, Sweep의 대상이 될 수 있는 상태)

그러나 cleanupExpiredEntries() 라는 함수를 사용하거나, map의 크기가 변경되었는지 확인하는 것 또한 강한 참조이므로 해당 방법들은 사용할 수 없고 파일 I/O 에서 많이 사용하는 AutoCloseable을 구현하는 것으로 만들었습니다.

 

kotlin에서는 use로 java에서는 try-with-resource로 사용해야 하며, Java로 해당 코드를 구현 시 InterruptedException 같은 checked Exception을 사용하고 try-with-resource 구문을 유도하는 것이 좋을 것 같습니다.

 

class TtlMap<K, V>(private val ttl: Long, private val unit: TimeUnit) : AbstractMap<K, V>(), AutoCloseable {

    private val map = ConcurrentHashMap<K, Pair<Long, V>>()

    private val cleanupExecutor = Executors.newVirtualThreadPerTaskExecutor()

    init {
        cleanupExecutor.submit {
            while (true) {
                cleanupExpiredEntries() // TtlMap 객체 참조 없이 바로 만료된 항목 정리
                Thread.sleep(1000)
            }
        }
    }

    override fun close() {
        cleanupExecutor.shutdown()
    }

 

쓰레드를 종료하는 시점에 대해서는 추가적인 검토가 필요할 것 같습니다..

 

감사합니다.

 


2024/10/06

Lazy Initialization 패턴을 활용하여 쓰레드 종료를 자동화했습니다.

 

가상 쓰레드를 실행하는 init 블럭을 삭제하고 청소를 위한 TtlMapCleaner Class를 생성했습니다.

package com.swiftpay.sample

import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean

class TtlMapCleaner {

    private val isRunning = AtomicBoolean(false)

    fun wakeup(ttlMap: TtlMap<*, *>) {
        if (isRunning.compareAndSet(false, true)) {
            val cleanupExecutor = Executors.newVirtualThreadPerTaskExecutor()
            cleanupExecutor.submit {
                try {
                    Thread.sleep(1000)
                    while (!ttlMap.isEmpty()) {
                        ttlMap.cleanupExpiredEntries()
                        Thread.sleep(1000)
                    }
                } finally {
                    isRunning.set(false)
                    cleanupExecutor.shutdown()
                }
            }
        }
    }
}

 

여기서 값이 있는 경우 1초마다 cleanup을 하고 만약 값이 없다면 Thread를 shutdown합니다.

 

ttlMap 인스턴스에 값이 없는 상태로 강한 참조 상태일때는 cleanup Thread가 생성되지 않습니다.

 

생성되는 시점은 데이터 put 시점으로 아래 코드가 실행될 때 쓰레드가 생성됩니다.

 

    override fun put(key: K, value: V): V? {
        ttlMapCleaner.wakeup(this)
        // 만료 시간을 가지고 데이터 삽입
        val expirationTime = calculateExpiration()
        return map.put(key!!, Pair(expirationTime, value))?.second
    }

 

put을 여러번 해도 하나의 cleanup Thread만 생성되고 map이 empty 상태가 되면 쓰레드가 shutdown되도록 만들었습니다.

 

전체 코드 :

더보기
package com.swiftpay.sample

import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit

class TtlMap<K, V>(private val ttl: Long, private val unit: TimeUnit) : AbstractMap<K, V>() {

    private val map = ConcurrentHashMap<K, Pair<Long, V>>()

    private val ttlMapCleaner = TtlMapCleaner()

    // 만료 시간 계산
    private fun calculateExpiration(): Long {
        return System.nanoTime().plus(unit.toNanos(ttl))
    }

    // 만료된 데이터 삭제
    fun cleanupExpiredEntries() {
        val now = System.nanoTime()
        map.entries.removeIf { it.value.first < now }
    }

    override fun get(key: K): V? {
        return map[key]?.second
    }

    override fun put(key: K, value: V): V? {
        ttlMapCleaner.wakeup(this)
        // 만료 시간을 가지고 데이터 삽입
        val expirationTime = calculateExpiration()
        return map.put(key!!, Pair(expirationTime, value))?.second
    }


    override val entries: MutableSet<MutableMap.MutableEntry<K, V>>
        get() {
            ttlMapCleaner.wakeup(this)
            return map.entries.map { (key, pair) ->
                object : MutableMap.MutableEntry<K, V> {
                    override val key: K = key
                    override var value: V = pair.second

                    override fun setValue(newValue: V): V {
                        val oldValue = value
                        map[key] = Pair(calculateExpiration(), newValue)
                        return oldValue
                    }
                }
            }.toMutableSet()
        }

}





package com.swiftpay.sample

import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean

class TtlMapCleaner {

    private val isRunning = AtomicBoolean(false)

    fun wakeup(ttlMap: TtlMap<*, *>) {
        if (isRunning.compareAndSet(false, true)) {
            val cleanupExecutor = Executors.newVirtualThreadPerTaskExecutor()
            cleanupExecutor.submit {
                try {
                    while (!ttlMap.isEmpty()) {
                        ttlMap.cleanupExpiredEntries()
                        Thread.sleep(1000)
                    }
                    cleanupExecutor.shutdown()
                } finally {
                    isRunning.set(false)
                }
            }
        }
    }
}

 

 

GC 테스트 코드 (Blocking 방식을 사용했습니다. OS 및 JVM에 따라서 테스트 코드가 실패할 수 있습니다.)

package com.swiftpay.sample

import java.lang.ref.PhantomReference
import java.lang.ref.ReferenceQueue
import java.util.concurrent.TimeUnit
import kotlin.test.Test

class TtlMapGCTest {

    @Test
    fun gcTest() {
        val referenceQueue = ReferenceQueue<TtlMap<String, String>>()
        val phantomReference = createAndNullifyTtlMap(referenceQueue)

        repeat(4) {
            System.gc()
            println("Waiting for GC...")
            Thread.sleep(3000)
        }

        val cleanedUpReference = referenceQueue.remove(30000)

        val cleanedUp = cleanedUpReference === phantomReference

        assert(cleanedUp) { "TtlMap instance was not garbage collected" }
    }

    private fun createAndNullifyTtlMap(queue: ReferenceQueue<TtlMap<String, String>>): PhantomReference<TtlMap<String, String>> {
        var ttlMap = TtlMap<String, String>(5, TimeUnit.SECONDS)
        ttlMap["key"] = "value"
        val phantomReference = PhantomReference(ttlMap, queue)


        return phantomReference
    }
}

 

 

결론

이번 구현을 통해 TtlMap에서 성능 문제와 동시성 문제를 모두 해결할 수 있었습니다. 초기에는 반복 작업으로 인해 성능 저하가 발생했지만, 필요한 메서드만 오버라이드하고 가상 쓰레드를 사용함으로써 효율성을 높였습니다. 또한, ConcurrentHashMap을 사용해 Thread-Safe한 구조로 개선하면서 동시성 문제도 해결했습니다.

가상 쓰레드를 사용하여 백그라운드 작업을 처리하고, 불필요한 쓰레드 리소스를 줄이는 방식으로 GC 문제를 해결할 수 있었습니다. 이러한 최적화를 통해, 원하는 성능을 달성하면서도 안정적인 동작을 보장하는 TTL 기반의 Map을 구현할 수 있었습니다.

 

감사합니다