비동기 처리

비동기 처리란?

먼저 처리는 동기처리, 비동기 처리가 있는데, 프로그래밍에서 작업을 어떻게 처리하고 제어하는 방식을 나타낸다.

동기부터 살펴보면, 데이터의 요청과 결과가 한 자리에서 동시에 일어나는 것을 말한다. 즉, 요청하면 시간이 얼마나 걸리든 요청한 자리에서 결과가 주어져야 한다.

→ 사용자가 데이터를 서버한테 요청하면 그 서버가 응답을 사용자에게 다시 리턴 해주기 전까지 사용자는 기다려야한다.

ex) 브라우저를 실행 시키는데에 10분 걸린다고 치면, 그 10분동안 컴퓨터의 다른 프로그램을 동작시키지 못하고 브라우저가 켜질때 까지 기다려야 한다는 것이다.

비동기란 동시에 일어나지 않는다는 의미이며, 작업이 동시에 실행되지 않고, 한 작업의 완료 여부와 상관없이 다른 작업을 수행해도 되고 서버에 다른 요청을 보내도 상관없다.

장단점

동기

  • 장점: 설계가 매우 간단하고 직관적이다.

  • 단점: 결과가 나올때까지 기다려야 한다.

비동기

  • 장점: 요청에 따른 결과가 나올동안 다른 작업을 할 수 있다. → 자원을 효율적으로 사용할 수 있다.

  • 단점: 동기식보다는 설계가 복잡하다.

순수 Java 멀티 스레드

먼저 Java로 구현한 코드를 보면,

public class MessageService {

    public void print(String message) {
        new Thread(() -> System.out.println(message))
                .start();
    }
}

public class Main {

    public static void main(String[] args) {
        MessageService messageService = new MessageService();

        for (int i = 1; i <= 100; i++) {
            messageService.print(i + "");
        }
    }
}

이 방법은 관리할 수 없어서 매우 위험하다.

만약 동시에 1000개의 호출이 있으면 짧은 시간에 Thread를 1000개 생성해야 하기 때문에, 프로그램의 성능에 악영향을 미치고, 에러가 날 수 있다.

그래서 Thread를 관리하기 위해서는 Thread Pool을 구현해야하고, Java에서는 ExecutorService 클래스가 있다.

public class MessageService {

    private final ExecutorService executorService = Executors.newFixedThreadPool(10);//10개 제한

    public void print(String message) {
        executorService.submit(() -> System.out.println(message));
    }
}

public class Main {

    public static void main(String[] args) {
        MessageService messageService = new MessageService();

        for (int i = 1; i <= 100; i++) {
            messageService.print(i + "");
        }
    }
}

전체 Thread의 갯수를 10개로 제한하고, 우리가 원하는 멀티 스레드 방식의 비동기 처리를 할 수 있다.

하지만, 비동기 방식으로 처리하고 싶은 메소드 마다 messageService.submit() 메소드안에 다 넣어야하기 때문에 기존 코드를 수정하지 않고, 기능을 추가해야하는 OCP 원칙을 위반하기에 좋지 않다.

@Async

주의사항

  • public 메서드에서만 사용 가능하다.

  • 자가 호출(self-invocation) 불가능

    → 같은 객체(클래스) 내의 메소드 호출시 불가능하다.

    무조건 @Async가 붙은 메서드를 호출 시에 다른 클래스에서 호출해야 한다.

이 두가지는 @Async의 동작을 별도로 설정하지 않으며 프록시 모드가 적용되면서 스프링의 AOP를 가져가는데, 그로 인해 AOP와 관련된 제약사항을 다 가지게 된다.

AOP는 프록시 패턴이 사용하고, 프록시 패턴은 실제 기능을 수행하는 객체 대신 가상의 객체를 사용하게 되는 것이므로, private으로 접근이 불가능하다던가 자가호출을 하게 되면 프록시를 거치지 않기 때문에 사용이 불가능해진다.

ThreadLocal 사용시 내용 복사

@Async 를 사용하게 되면, 새로운 스레드를 생성하여 작동하는 것이므로 기존 스레드의 스택에 저장되는 ThreadLocal의 데이터는 사용하지 못하게 되므로, 복사해서 전달해야 한다.

비동기 스레드에서 터진 Exception 처리

비동기 스레드에서 터진 Error는 메인까지 반환하지 못하므로, 별도의 처리 또는 @Async 를 Return 값이 있는 형태로 줘서 별도 처리가 필요하다.

프로젝트 내에 스레드 갯수 제한이 걸려 있는지 확인

빈의 메서드에 @Async 를 추가하면, 해당 메서드는 별도의 스레드에서 실행된다. 즉, 호출자는 메소드가 완료될때까지 기다리지 않는다.

간단하게 작동시키려면 애플리케이션 클래스 위에 @EnableAsync 어노테이션을 붙여주면 된다.

@EnableAsync@Async 를 감지한다.

@EnableAsync
@SpringBootApplication
public class SpringBootApplication {
    ...
}

그다음 작동시킬 메서드 위에 @Async 를 붙여주면 사용할 수 있다.

@Service
@RequiredArgsConstructor
public class TestService {

    @Async("sampleExecutor")
    public void testAsync(String message){

        for(int i = 1; i <= 3; i++){
            System.out.println(message + "비동기 : " + i);
        }
        
    }
}

하지만 이 방식은 스레드 관리하지 않는다.

@Async 의 기본 설정은 SimpleAsynTaskExecutor를 사용하도록 되어 있는데, 이것은 Thread Pool이 아니고 단순히 스레드를 만들어내는 역할이기 때문이다.

Thread Pool 사용 방법

위에 애플리케이션 클래스에서 @EnableAsync 를 제거한 뒤 AsyncConfig를 생성한다.

@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean(name = "sampleExecutor")
    public Executor threadPoolTaskExecutor(){

        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

        taskExecutor.setCorePoolSize(3);// 기본 스레드 수
        taskExecutor.setMaxPoolSize(30);// 최대 쓰레드 수
        taskExecutor.setQueueCapacity(100); //Queue 사이즈
        taskExecutor.setThreadNamePrefix("Executor-");

        return taskExecutor;
    }
}

setCorePoolSize: 최초 동작 시에 CorePoolSize 만큼 스레드가 생성되서 사용된다.(디폴트 값: 1)

setMaxPoolSize: Queue 사이즈 이상 요청이 들어올 경우 스레드 갯수를 MaxPoolSize 만큼 늘린다.(디폴트: Integer.MAX_VALUE)

setQueueCapacity: CorePoolSize 이상의 요청이 들어오는 경우, LinkedBlockingQueue에서 대기하는데, 그 Queue의 사이즈를 지정 해준다.(디폴트: integer.MAX_VALUE)

setThreadNamePrefix: 스레드 명 설정

설정을 정리하면, 요청 3개 까진 CorePoolSize 범위 내에서 작업하고, 요청이 더 들어 와서 100개까지는 QueueCapacity의 크기내에서 대기하고, Queue 사이즈도 넘는 요청이 들어오는 경우 integer.MAX_VALUE 만큼 스레드 갯수를 늘려서 작업한다.

Thread Pool 종류 여러개 사용시에 @Async 설정 시에 위의 Bean 이름을 설정 해주면 된다.

    @Async("sampleExecutor")//ThreadPoolTaskExecutor Bean명과 동일하게
    public void print(String message) {
        System.out.println(message);
    }

리턴 타입 별 반환

없는 경우

처리 결과를 전달할 필요가 없는 경우

    @Async("sampleExecutor")//ThreadPoolTaskExecutor Bean명과 동일하게
    public void print(String message) {
        System.out.println(message);
    }

void로 설정하면 된다.

리턴이 값 있는 경우

Future, ListenableFuture, CompletableFuture 타입을 리턴 타입으로 사용할 수 있는데, 메서드 반환 형태를 new AsyncResult()로 묶으면 된다.

Future

@Service
public class MessageService {

    @Async
    public Future<String> print(String message) throws InterruptedException {
        System.out.println("Task Start - " + message);
        Thread.sleep(3000);
        return new AsyncResult<>("jayon-" + message);
    }
}

@RequiredArgsConstructor
@RestController
public class MessageController {

    private final MessageService messageService;

    @GetMapping("/messages")
    @ResponseStatus(code = HttpStatus.OK)
    public void printMessage() throws ExecutionException, InterruptedException {
        for (int i = 1; i <= 5; i++) {
            Future<String> future = messageService.print(i + "");
            System.out.println(future.get());
        }
    }
}

future.get() 은 블로킹을 통해 요청 결과가 올때까지 기다린다. 블로킹 방식은 성능이 안좋아서 Future는 잘 쓰지 않는다고 한다.

ListenableFuture

@Service
public class MessageService {

    @Async
    public ListenableFuture<String> print(String message) throws InterruptedException {
        System.out.println("Task Start - " + message);
        Thread.sleep(3000);
        return new AsyncResult<>("jayon-" + message);
    }
}

@RequiredArgsConstructor
@RestController
public class MessageController {

    private final MessageService messageService;

    @GetMapping("/messages")
    @ResponseStatus(code = HttpStatus.OK)
    public void printMessage() throws InterruptedException {
        for (int i = 1; i <= 5; i++) {
            ListenableFuture<String> listenableFuture = messageService.print(i + "");
            listenableFuture.addCallback(System.out::println, error -> System.out.println(error.getMessage()));
        }
    }
}

콜백을 통해 논 블로킹 방식으로 작업을 처리할 수 있다.

addCallback() 메서드의 첫번째 파라미터는 작업 완료 콜백 메서드,

두번째 파라미터는 작업 실패 콜백 메서드를 정의하면 된다.

CompletableFuture

Java 8에 추가된 것으로, 비동기 작업 이후의 다양한 메서드를 제공 해준다.

@Service
public class MessageService {

    @Async
    public CompletableFuture<String> print(String message) throws InterruptedException {
        System.out.println("Task Start - " + message);
        Thread.sleep(3000);
        return new AsyncResult<>("jayon-" + message).completable();
    }
}

@RequiredArgsConstructor
@RestController
public class MessageController {

    private final MessageService messageService;

    @GetMapping("/messages")
    @ResponseStatus(code = HttpStatus.OK)
    public void printMessage() throws InterruptedException {
        for (int i = 1; i <= 5; i++) {
            CompletableFuture<String> completableFuture = messageService.print(i + "");
            completableFuture
                    .thenAccept(System.out::println)
                    .exceptionally(error -> {
                        System.out.println(error.getMessage());
                        return null;
                    });
        }
    }
}

이전보다 가독성이 좋아지고, 논 블로킹 기능까지 완벽하다.

@Async를 사용할 때 리턴값이 필요하면 해당 방법을 사용하는 것을 권장한다.

장점

개발자는 메서드를 동기 방식으로 작성하다가, 비동기 방식을 원할때 @Async 어노테이션을 메서드 위에 붙여주면 된다. (유지보수에 좋다.)

WebClient

요청 시 프로그램에서 가장 흔하게 사용하는게 Http Client이다.

웹으로 API를 호출하기 위해 사용되는 Http Client 모듈 중 하나이며, Java에서 가장 많이 사용하는 Http Client는 RestTemplate이다.

RestTemplate와 WebClient의 공통점과 차이점

  • 공통점: 둘다 HttpClient 모듈이다.

  • 차이점: 통신 방법이 RestTemplate는 블로킹, WebClient는 논 블로킹 방식이다.

동작 원리

먼저 RestTemplate의 동작 원리를 알아야한다.

RestTemplate은 멀티 스레드와 블로킹 방식을 사용한다.

image
  • Thread Pool은 클라이언트 애플리케이션 구동시에 미리 만들어 놓는다.

  • 요청은 먼저 Queue에 쌓이고, 가능한 스레드가 있으면 그 스레드에 할당 되어 처리된다. → 1요청: 1스레드

  • 각 스레드는 블로킹 방식, 응답이 올때까지 해당 스레드는 사용하지 못한다.

RestTemplate을 Connection Pool에 Spring Bean으로 등록하기 위한 예제코드이다.

@Configuration
public class RestTemplateConfig {
	public RestTemplate getRestTemplate(int defaultMaxPerRoute, int maxTotal) {
		PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
		connManager.setDefaultMaxPerRoute(defaultMaxPerRoute);
		connManager.setMaxTotal(maxTotal);

		HttpClient client = HttpClientBuilder.create().setConnectionManager(connManager).build();

		HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory(client);
		factory.setConnectTimeout(3000);
		factory.setReadTimeout(3000);

		return new RestTemplate(factory);

	}

	@Bean
	public RestTemplate coffeeRestTemplate() {
		return getRestTemplate(20, 50);
	}
}

스레드가 다 차는 경우 이후의 요청은 Queue에 대기한다.

대부분 문제는 네트워크나 DB 통신에서 생기는데, 이런 문제가 여러 스레드에서 발생하면 사용 가능한 스레드 수가 많이 줄고 이런 과정이 전체 서비스가 매우 느려지게 된다.

Spring WebClient

Spring WebClient는 싱글 스레드와 논 블로킹 방식을 사용한다.

JSON, XML을 쉽게 응답 받는다. → Core 당 1개 스레드

image
  • 각 요청은 Event Loop 내에 Job으로 등록된다.

  • Event Loop는 각 Job(처리)을 제공자(Workers)에게 요청한 후 결과를 기다리지 않고, 다른 Job(처리)을 처리한다.

  • webClient는 이렇게 이벤트에 반응형으로 동작하게 설계되었다.

    그래서 반응성, 탄력성, 가용성, 비동기성을 보장하는 Spring React 프레임워크를 사용한다.

    또한, React Web 프레임워크인 Spring WebFlux에서 Http Client로 사용된다.

성능 비교

image

동시 사용자가 늘수록 RestTemplate는 급격하게 느려진다.

요즘 Spring 커뮤니티는 WebClient를 사용할 것을 강력하게 권고하고 있다.

간단한 사용법

간단하게 몇가지만 알아보겠다.

application.yml 작성 및 의존성 추가

server.port: 5011
spring:
  application:
    name: webserver
dependencies {
    compile 'org.springframework.boot:spring-boot-starter-webflux'
}

생성(create)

  • create()

WebClient client = WebClient.create();
  • 기본 url(base url)

WebClient client = WebClient.create("http://localhost:8080");
  • builder(): 상세하게 옵션

WebClient webClient = WebClient.builder()
                .baseUrl("localhost:5011")
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .build();

Get 2가지 (Flux, Mono)

@Autowired
WebClient webClient;

public Flux<Employee> findAll() {
	return webClient.get()
		.uri("/employees")
		.retrieve()
		.bodyToFlux(Employee.class);
}

@Autowired
WebClient webClient;

public Mono<Employee> findById(Integer id) {
	return webClient.get()
		.uri("/employees/" + id)
		.retrieve()
		.bodyToMono(Employee.class);
}

POST

@Autowired
WebClient webClient;

public Mono<Employee> create(Employee empl) {
	return webClient.post()
		.uri("/employees")
		.body(Mono.just(empl), Employee.class)
		.retrieve()
		.bodyToMono(Employee.class);
}

참고 자료

Last updated