리액티브 프로그래밍의 옵저버 패턴

리액티브 프로그래밍의 옵저버 패턴

이전 장에서 우리는 동작 목록의 마지막 네 가지를 다루었습니다.

이 장은 또한 4인조 그녀의 책에서.

지금까지 논의한 패턴 중 하나는 이제 이 새로운 장에서 특히 흥미로운 것입니다.

관찰자 패턴 (덮어 11장, * 관찰자 패턴), 유용하다

주어진 객체의 상태가 변경될 때 객체 또는 객체 그룹에 알리기 위해. 이러한 종류의 전통적인 관찰자는 게시-구독 원칙을 적용하여 일부 개체 변경 이벤트에 반응할 수 있습니다.

이것은 많은 경우에 좋은 솔루션을 제공하지만, 일부는 서로 의존하는 많은 이벤트를 처리해야 하는 상황에서 전통적인 방법은 복잡하고 유지 관리하기 어려운 코드로 이어질 수 있습니다.

여기에 또 다른 패러다임 리액티브 프로그래밍 흥미로운 옵션을 제공합니다.

간단히 말해서, 리액티브 프로그래밍의 개념은 많은 이벤트에 반응하는 것입니다.

사건의 흐름코드를 깨끗하게 유지하면서.

이 장에서는 다음과 같은 프레임워크에 초점을 맞출 것입니다.

리액티브X (http://reactivex.io), 리액티브 프로그래밍의 일부. ReactiveX의 핵심 엔터티는 주목할 만한*.* 그리고 공식 웹사이트에서 볼 수 있듯이 ReactiveX는 다음을 위한 API로 정의되어 있습니다.

관찰 가능한 스트림을 사용한 비동기 프로그래밍. 또한 관찰자도 있습니다.

Observable은 관찰자에게 데이터를 보내거나 보낼 수 있는 스트림으로 생각할 수 있습니다.

그리고 이벤트를 출력할 수도 있습니다.

리액티브 프로그래밍의 옵저버 패턴 195장*

다음은 관찰 가능 항목의 정의를 제공하는 사용 가능한 문서의 두 인용문입니다.

*”Observable은 ReactiveX의 핵심 유형입니다.

배출로 알려진 요소를 일련의 연산자를 통해 직렬로 푸시하여 최종적으로 소비되는 관찰자에 도착합니다.

”*

*”푸시 기반(풀 기반이 아닌) 반복은 코드와 동시성을 훨씬 더 빠르게 표현하는 강력하고 새로운 방법을 열어줍니다.

관찰 가능 항목은 이벤트를 데이터로, 데이터를 이벤트로 취급하기 때문에 두 가지를 함께 구성하는 것이 간단해집니다.

”*

이 장에서는 다음에 대해 논의합니다.

  • 실용적인 예
    • 사용 사례
      • 구현

실용적인 예

실생활에서 어딘가에 모이는 물줄기는 관찰 가능한 것과 비슷합니다.

소프트웨어와 관련된 몇 가지 예가 있습니다.

  • 스프레드시트 애플리케이션은 내부 동작으로 인해 반응형 프로그래밍의 예로 간주될 수 있습니다.

    거의 모든 스프레드시트 응용 프로그램에서 시트의 셀을 대화식으로 변경하면 해당 셀에 직간접적으로 의존하는 모든 수식이 즉시 재평가되고 표시가 업데이트되어 이러한 재평가가 반영됩니다.

    • ReactiveX 개념은 Java(RxJava), Python(RxPY) 및 JavaScript(RxJS)를 비롯한 다양한 언어로 구현됩니다.

      • Angular Framework는 RxJS를 사용하여 Observable 패턴을 구현합니다.

사용 사례

하나의 사용 사례는 수집 파이프라인 Martin Fowler가 자신의 블로그(https://martinfowler.com/articles/collection-pipeline):

*”컬렉션 파이프라인은 하나의 컬렉션을 한 작업의 출력으로 가져와 다음 작업에 공급함으로써 자체적으로 구축되는 일련의 작업으로 일부 계산을 구성하는 프로그래밍 패턴입니다.

”*

Observable을 사용하여 다음과 같은 작업을 수행할 수 있습니다.

매핑 및 축소 또는 그룹화 데이터를 처리할 때 개체 시퀀스에 대해.

버튼 이벤트, 요청, RSS 또는 Twitter 피드와 같은 다양한 기능에 대해 Observable을 생성할 수 있습니다.

구현

여기에서 전체 구현을 생성하는 대신 다양한 방법을 살펴보고 이를 사용할 수 있는 방법을 살펴보겠습니다.

먼저 pip install rx 명령을 사용하여 Python 환경에 RxPY를 설치합니다.

첫 번째 예

먼저 RxPY 문서에서 예제를 가져와 더 재미있는 변형을 작성해 보겠습니다.

우리는 만들어진 개울을 관찰할 것입니다.

파이썬의 선 팀 피터스의 인용문(https://www.python.org/dev/peps/pep-0020/).

일반적으로 가능한 한 python 콘솔에서 import this를 사용하여 따옴표를 볼 수 있습니다.

콘솔의 다음 스크린샷을 참조하십시오.

>>> 가져오기

그런 다음 Python 프로그램에서 이 인용 목록을 가져오는 방법에 대한 질문이 생깁니다.

빠른 검색을 통해 Stack Overflow에 대한 답을 얻을 수 있습니다.

기본적으로 이 명령문을 io.StringIO 인스턴스로 가져온 결과를 리디렉션한 다음 액세스하여 다음을 사용하여 인쇄할 수 있습니다.

print()는 다음과 같습니다.

컨텍스트 라이브러리 가져오기, 확인

선 = io.StringIO()

contextlib.redirect_stdout(zen) 사용: 가져오기

인쇄(zen.getvalue())

샘플 코드(rx_example1.py 파일에서)를 실제로 사용하려면 다음과 같이 rx 모듈에서 Observable 및 Observer 클래스를 가져와야 합니다.

from rx import Observable, 옵저버

그런 다음 contextlib.redirect_stdout() 트릭을 사용하여 get_quotes() 함수를 생성하고 인용 부호를 가져오고 반환하기 위해 이전에 보여준 코드를 약간 수정합니다.

get_quotes() 정의:

컨텍스트 라이브러리 가져오기, 확인

선 = io.StringIO()

contextlib.redirect_stdout(zen) 사용: 가져오기

quotes = zen.getvalue().split(‘\n’)(1:) 따옴표를 반환합니다.

이건 끝났어!
이제 우리는 우리가 얻은 견적 목록에서 관찰 가능 항목을 만들고 싶습니다.

우리가 할 수 있는 방법은 다음과 같습니다.

  1. 데이터 항목을 관찰자에게 전달하는 함수 정의
  2. Observable.create() 팩토리를 사용하고 이 함수에 전달하여 데이터 소스 또는 스트림을 설정합니다.

  3. 관찰자가 소스를 구독하도록 만들기

Observer 클래스 자체에는 이러한 유형의 통신에 사용되는 세 가지 방법이 있습니다.

  • on_next()는 요소를 전달하는 데 사용됩니다.

    • on_completed()는 더 이상 기사가 없다는 신호를 보냅니다.

      • on_error()는 오류 신호를 보냅니다.

따라서 옵저버 객체 obs를 입력으로 사용하고 인용 문자열을 사용하여 on_next()로 각 인용을 보내고 on_completed()로 끝(마지막 인용이 전송된 후) 신호를 보내는 push_quotes() 함수를 만들어 봅시다.

기능은 다음과 같습니다.

def push_quotes(obs):

따옴표 = get_quotes()

따옴표 안의 q:

if q: # 비어 있음 건너뛰기 obs.on_next(q) obs.on_completed()

다음과 같이 Observer 기본 클래스의 하위 클래스를 사용하여 사용할 Observer를 구현합니다.

클래스 ZenQuotesObserver(관찰자):

def on_next(자신, 값):

print(f”수신: {value}”)

def on_completed(self): print(“완료!
”)

def on_error(자신, 오류):

print(f”오류 발생: {error}”)

다음으로 정의합니다.

고려해야 할 소스다음과 같이:

소스 = Observable.create(push_quotes)

마지막으로 observable에 대한 구독을 정의해 보겠습니다.

구독 없이는 아무 일도 일어나지 않습니다.

source.subscribe(ZenQuotesObserver())

이제 코드의 결과를 볼 수 있습니다.

다음은 python rx_example1.py를 실행할 때 얻는 결과입니다.

두 번째 예

(rx_example2.py** 파일에서) 코드를 작성하고 첫 번째 예제와 유사한 결과를 얻는 다른 방법을 살펴보겠습니다.

get_quotes() 함수를 사용하여 다음과 같이 Python의 내장 enumerate() 함수를 사용하여 시퀀스의 열거형을 반환합니다.

get_quotes() 정의:

컨텍스트 라이브러리 가져오기, 확인

선 = io.StringIO()

contextlib.redirect_stdout(zen) 사용: 가져오기

따옴표 = zen.getvalue().split(‘\n’)(1:) 열거형(따옴표) 반환

이 함수를 호출하고 그 결과를 zen_quotes 변수에 저장할 수 있습니다.

zen_quotes = get_quotes()

특수 함수 Observable.from_()을 사용하여 observable을 만들고 시퀀스에서 filter()와 같은 연쇄 작업을 수행하고 마지막으로 scribe()를 사용하여 observable을 구독합니다.

최종 코드 조각은 다음과 같습니다.

Observable.from_(zen_quotes) \

.filter(lambda q: len(q(1)) > 0) \

.subscribe(lambda 값: print(f”수신: {value(0)} – {value(1)}”))

다음은 python rx_example2.py를 실행할 때 얻는 결과입니다.

세 번째 예

observable에 액세스하기 위해 flat_map(), filter() 및 map 체인을 사용하는 유사한 예(rx_example3.py** 파일에서)를 살펴보겠습니다(동일한 get_quotes() 함수로 생성된 인용 스트림은 react( ) 운영.

이전 예제와의 주요 차이점은 5초마다 새 항목을 내보내도록 항목 스트리밍을 예약한다는 것입니다(간격), Observable.interval() 함수를 사용합니다.

또한 매핑을 위해 flat_map() 메서드를 사용합니다.

각 방출을 관찰 가능 항목(예: Observable.from_(zen_quotes))으로 내보내고 해당 방출을 단일 관찰 가능 항목으로 병합합니다.

코드의 주요 부분은 다음과 같습니다.

관찰 가능 간격(5000)\

.flat_map(lambda seq: Observable. from_(zen_quotes)) \

.flat_map(lambda q: Observable.from_(q(1).split())) \

.filter(lambda s: len(s) > 2) \

.map(lambda s: s.replace(‘.’, ”).replace(‘,’, ”).replace(‘!
’, ”).replace(‘-‘, ”)) \

.map(lambda s: s.lower()) \

.subscribe(람다 값: print(f”수신: {값}”))

또한 사용자가 원할 경우 실행을 중지할 수 있도록 마지막에 input() 함수와 함께 다음 줄을 추가합니다.

input(“시작 중… 종료하려면 아무 키나 누르십시오\n”)

python rx_example3.py 명령을 사용하여 예제를 실행해 보겠습니다.

5초마다 콘솔에 새 항목이 표시됩니다(이 경우 구두점을 제거하고 소문자로 변환한 최소 3개의 문자로 구성된 인용 부호의 단어). 결과는 다음과 같습니다.

또한 다른 출력 스크린샷은 다음과 같습니다.

이 경우 단어가 많지 않을 수 있으므로 비교적 빠르지만 아무 문자나 입력하여 프로그램을 종료할 수 있습니다.

Ctrl 키 버튼) 다시 실행하여 무슨 일이 일어나고 있는지 더 잘 이해할 수 있습니다.

네 번째 예

사람 목록의 스트림과 이를 기반으로 Observable을 만들어 봅시다.

여기 또 다른 요령이 있습니다.

데이터 소스를 처리하는 데 도움이 되도록 다음이라는 타사 모듈을 사용하겠습니다.

사기꾼 (https://pypi.org/project/Faker). 사람들의 이름.

pip install faker 명령으로 faker 모듈을 설치할 수 있습니다.

peoplelist.py 파일에는 가상 인물의 이름과 성을 생성하기 위해 Faker 인스턴스를 사용하는 populate() 함수가 포함된 다음 코드가 있습니다.

from faker import Faker 가짜 = Faker()

데프 채우기():

사람들 = ()

(0, 20) 범위의 _에 대해:

p = {‘FirstName’: Fake.FirstName(), ‘LastName’: Fake.LastName()} persons.append(p)

반환 iter(명)

프로그램의 주요 부분에서는 생성된 목록에 있는 사람들의 이름을 텍스트 파일 people.txt에 씁니다.

__name__ == ‘__main__’인 경우:

new_people = 채우기()

new_data = (f”{p(‘firstname’)} {p(‘lastname’)}” for p in new_persons) new_data = “, “.join(new_data) + “, “

open(‘people.txt’, ‘a’) as f: f.write(new_data)

계속하기 전에 멈추고 전략을 세우자!
많은 새로운 개념과 기술이 떠돌고 있기 때문에 점진적으로 일을 가져가는 것이 좋은 생각처럼 보입니다.

그래서 우리는 observable에 대한 첫 번째 구현 작업을 하고 나중에 확장할 것입니다.

fx_peoplelist_1.py** 파일에서 코드의 첫 번째 버전을 작성해 보겠습니다.

먼저 firstnames_from_db()라는 함수를 정의합니다.

이 함수는 flat_map(), filter() 및 map을 사용하여 변환(이미 본 것처럼)과 함께 이름이 포함된 텍스트 파일(파일의 내용을 읽음)에서 관찰 가능 항목을 반환합니다.

() 메소드와 새로운 메소드

다른 시퀀스에서 요소를 출력하는 작업, group_by() – 해당 빈도와 함께 파일에서 찾은 첫 번째 이름:

rx 가져오기로 관찰 가능

def firstnames_from_db(파일명): file = open(파일명)

  • 사람들의 저장된 이름 수집 및 푸시

Observable.from_(파일) 반환 \

.flat_map(Lambda 콘텐츠: content.split(‘, ‘)) \

.filter(람다 이름: 이름!
=”) \

.map(람다 이름: name.split()(0)) \

.group_by(람다 이름: 이름) \

.flat_map(lambda grp: grp.count().map(lambda ct: (grp.key, ct)))

그런 다음 이전 예제와 같이 5초마다 데이터를 내보내고 반환된 데이터와 해당 데이터를 병합하는 관찰 가능 항목을 정의합니다.

다음과 같이 db_file을 people.txt 파일로 설정한 후 firstnames_from_db(db_file):

db_file = “사람.txt”

  • 5초마다 데이터 전송

관찰 가능 간격(5000)\

.flat_map(lambda i: firstnames_from_db(db_file)) \ .subscribe(람다 값: print(str(value)))

input(“시작 중… 종료하려면 아무 키나 누르십시오\n”)

이제 두 프로그램(peoplelist.py 및 rx_peoplelist_1.py)을 실행하면 어떻게 되는지 살펴보겠습니다.

명령줄 창이나 터미널에서 python peoplelist.py 를 실행하여 사람들의 이름을 생성할 수 있습니다.

people.txt 파일은 쉼표로 구분된 이름으로 생성됩니다.

이 명령을 다시 실행할 때마다 새 이름 세트가 파일에 추가됩니다.

두 번째 명령줄 창에서는 python rx_peoplelist_1.py 명령을 통해 Observable을 구현하는 프로그램의 첫 번째 버전을 실행할 수 있습니다.

다음과 유사한 출력이 표시됩니다.

첫 번째 명령을 여러 번 다시 실행하고 두 번째 창에서 어떤 일이 발생하는지 관찰하면 people.txt 파일을 계속 읽어 이름을 추출하고 전체 이름에서 첫 번째 이름을 가져오고 필요한 변환 슬라이드를 수행하는 것을 볼 수 있습니다.

각각 빈도가 있는 이름으로 구성된 요소.

방금 달성한 것을 개선하기 위해 최소 4번 나타나는 첫 번째 이름만 방출하도록 노력할 것입니다.

rx_peoplelist_2.py 파일에서 observable을 반환하고 그런 식으로 필터링하는 또 다른 함수가 필요합니다.

이 함수를frequent_firstnames_from_db()라고 부르자. 첫 번째 버전에서 사용한 것과 비교할 때 Filter() 연산자를 사용하여 개수가 적용되는 이름 그룹만 유지해야 합니다.

발생 값(ct)이 3보다 큽니다.

얻은 그룹을 기반으로 코드를 다시 확인하면 람다 함수 lambda grp를 사용하여 그룹의 키를 첫 번째 요소로, 개수를 두 번째 요소로 포함하는 튜플을 얻습니다.

grp.count().map(lambda ct: (grp.key, ct)), .flat_map() 연산자 덕분에 반환됩니다.

다음으로 .filter(lambda name_and_ct: name_and_ct(1) > 3)를 사용하여 함수에서 추가로 필터링하여 첫 번째 항목만 가져옵니다.

현재 4회 이상 등장하는 이름.

다음은 이 새 함수의 코드입니다.

def common_first_names_from_db(파일명): file = open(파일명)

  • 일반적인 이름만 수집 및 푸시

Observable.from_(파일) 반환 \

.flat_map(Lambda 콘텐츠: content.split(‘, ‘)) \

.filter(람다 이름: 이름!
=”) \

.map(람다 이름: name.split()(0)) \

.group_by(람다 이름: 이름) \

.flat_map(lambda grp: grp.count().map(lambda ct: (grp.key, ct))) \ .filter(lambda name_and_ct: name_and_ct(1) > 3)

그리고 Observable 간격에 대해 거의 동일한 코드를 추가합니다.

그에 따라 참조된 함수의 이름을 변경하기만 하면 됩니다.

마지막 비트에 대한 새 코드(에서 볼 수 있음)

rx_peoplelist_2.py**) 파일은 다음과 같습니다.

  • 5초마다 데이터 전송

관찰 가능 간격(5000)\

.flat_map(lambda i: frequency_firstnames_from_db(db_file)) \ .subscribe(람다 값: print(str(값)))

  • 사용자가 아무 키나 입력할 때까지 활성 상태 유지(“시작 중… 종료하려면 아무 키나 누르십시오\n”)

동일한 프로토콜을 사용하여 예제를 실행하는 경우 두 번째 창 또는 터미널에서 python rx_peoplelist_2.py 명령을 실행할 때 다음과 유사한 출력이 표시되어야 합니다.

첫 번째 이름과 숫자 쌍 출력을 볼 수 있으며, 5초마다 새로운 출력이 있고 이 값은 첫 번째 셸 창에서 다시 peoplelist 프로그램(python peoplelist.py 사용)을 실행하면 변경됩니다.

좋은 결과!

마지막으로 rx_peoplelist_3.py 파일에서 대부분의 코드를 재사용하여 이전 예제의 변형만 보여주고 한 가지 작은 변경 사항을 적용했습니다.

관찰자가 구독하기 직전의 Observable 간격으로. 이 코드 조각의 새 버전은 다음과 같습니다.

  • Observable Interval(5000)이 변경된 경우에만 5초마다 데이터 덤프\

.flat_map(lambda i:frequent_firstnames_from_db(db_file)) \ .distinct() \

.subscribe(람다 값: print(str(값)))

이전과 마찬가지로 python rx_peoplelist_3.py를 실행하면 다음과 유사한 출력이 표시되어야 합니다.

차이점을 말할 수 있습니까

다시 우리는 이름과 번호 쌍의 방출을 볼 수 있습니다.

하지만 이번에는 변경 사항이 적고 경우에 따라 데이터가 10초 이상 변경되지 않는다는 인상을 받을 수도 있습니다.

더 많은 변경 사항을 보려면 peoplelist.py 프로그램을 다시 실행해야 합니다.

몇 가지 일반적인 이름의 수가 증가할 수 있도록 여러 번 실행할 수 있습니다.

이 동작에 대한 설명은 다음과 같습니다.

관찰 가능한 간격에 대한 .distinct() 연산, 요소의 값은 출력만 됩니다.

변경된 경우. 이로 인해 더 적은 데이터가 방출되고 각 이름에 대해 동일한 숫자가 두 번 표시되지 않습니다.

이 예제 시리즈를 통해 우리는 observable이 전통적인 패턴으로는 수행하기 어려운 작업을 수행하는 영리한 방법을 제공하는 방법을 발견했습니다.

그러나 우리는 단지 표면을 긁었을 뿐이며 관심 있는 독자가 ReactiveX 및 반응형 프로그래밍을 계속 탐색할 수 있는 기회입니다.

요약

이 장에서는 리액티브 프로그래밍의 관찰자 패턴을 소개했습니다.

이러한 유형의 관찰자 패턴의 핵심 아이디어는 우리가 자연에서 보는 물줄기와 같은 데이터 및 이벤트의 흐름에 응답하는 것입니다.

프로그래밍 언어(RxJava, RxJS, RxPY 등)에 대한 확장을 통해 이 아이디어 또는 ReactiveX 기술의 컴퓨팅 세계에 많은 예가 있습니다.

Angular와 같은 최신 JavaScript 프레임워크는 우리가 언급한 다른 예입니다.

독자가 이 프로그래밍 패러다임에 접근하고 공식 RxPY 문서 및 예제에 대한 자체 연구를 수행하는 소개 역할을 하는 함수를 만드는 데 사용할 수 있는 RxPY의 예제와 찾을 수 있는 기존 코드에 대해 논의했습니다.

GitHub를 계속합니다.

다음 장에서는 마이크로서비스 패턴과 클라우드에 대한 기타 패턴을 다룹니다.

(195))

  • 5