マイクロサービス向けのNATSストリーミングを使用したデータストリーミング - パート2

このチュートリアルへようこそ。この記事では、Nats Streamingを使用したマイクロサービス向けのデータストリーミング - パート1で始めた内容を完結させます。簡単におさらいすると、下図のようなアプリケーションを完成させます。

バスケットボールのダンクコンテストアプリケーションの画像

このアプリケーションは、NATSストリーミングを使用してお互いに通信する2つのサービスから構成されているバスケットボールのダンクコンテストアプリです。サービス1、「Dunk Service」はExpressアプリケーションで、コンテストのプレーヤーを登録し、プレーヤーがダンクショットを試み、NATSストリーミングサーバー上でイベントを介してショット情報を公開します。これはパート1で成功裏に完了しましたので、次にサービス2に進みます。

1. 統計サービス

これは、コンテストアプリの統計をリアルタイムで表示するExpressアプリケーションです。下の表に示されているように、プレーヤーを順位付けします。

統計表の画像

プロジェクトディレクトリdunk-contest/内のターミナルで下記のコマンドを実行してください。

$ mkdir statistic-service
$ cd statistic-service/
$ npm init -y
$ npm install ts-node-dev typescript express @types/express node-nats-streaming mongodb

パッケージ.jsonファイルを開き、実際のスクリプトセクションを下記のものに変更してください。

  "scripts": {
    "listen": "ts-node-dev --rs --notify false src/listener.ts"
  },

ファイルを保存し、同じディレクトリ内にsrc/という名前のフォルダを作成し、そこにlistener.tsという名前のtypescriptファイルを追加してください。
そしてlistener.tsに次のコードを記述してください。

import nats, { Message } from "node-nats-streaming";
import { MongoClient } from "mongodb";

const start = async () => {
  const stan = nats.connect("dunk-contest", "321", {
    url: "http://localhost:4222",
  });

  stan.on("connect", () => {
    console.log(
      "Statistic Service is connected to NATS Streaming Server \nWaiting for Events ..."
    );

    stan.on("close", () => {
      console.log("Nats connection closed!");
      process.exit();
    });

    const options = stan
      .subscriptionOptions()
      .setManualAckMode(true)
      .setDeliverAllAvailable()
      .setDurableName("Dunk-Shot-Queue-Group");

    const subscription = stan.subscribe(
      "Dunk-Shot",
      "Dunk-Shot-Queue-Group",
      options
    );

    subscription.on("message", async (msg: Message) => {
      const parsedData = JSON.parse(msg.getData().toString("utf-8"));
      console.log("EVENT RECEIVED WITH THE DATA BELOW :");
      console.table(parsedData);

      const mongoClient = await MongoClient.connect(
        "mongodb://localhost:27017/statistic-service"
      );

      const db = mongoClient.db();
      const dunkCollection = db.collection("dunks");
      await dunkCollection.insertOne(parsedData);

      const dunkStatistic = await dunkCollection
        .aggregate([
          {
            $group: {
              _id: "$PLAYER_NAME",
              TOTAL_DUNK: { $count: {} },
              TOTAL_POINT: { $sum: "$DUNK_POINT" },
            },
          },
          { $sort: { TOTAL_POINT: -1 } },
        ])
        .toArray();

      console.log("\x1b[36m%s\x1b[0m", "DUNK CONTEST STATISTIC :");
      console.table(dunkStatistic);
      mongoClient.close();

      msg.ack();
    });
  });
};

start();

上記のコードでは、natsMessageをnode-nats-streamingライブラリから、MongoClientをmongodbライブラリからインポートし、start関数内で:

  1. パート1で開始したNATS-ストリーミング-サーバーへの接続を定義しました。
  2. 接続時、アプリはDunk-Shotチャネル(ダンク-サービスがイベントを通じてイベントを公開するチャネル)へのサブスクリプションを開始します。そして、チャネルにはサブスクリプションオプションとキューグループを追加しました。
  3. メッセージイベントが発生した際(ダンクサービスがダンクショット情報を公開した際)に、統計サービスはmongodbライブラリを使用してMongoDBサーバーに接続し、メッセージデータ(ダンクショット情報)を取得し、dunksコレクションに挿入します。
  4. dunksコレクションに対して、ダンクショットの合計数とダンクポイントの合計数で各プレーヤーを選択し、グループ化するクエリを実行します。

次に、TypeScriptコードをJavaScriptコードにコンパイルするためのtsconfig.jsonファイルを生成します。ターミナルを開き、statistic-service/ディレクトリに戻って次のコマンドを実行してください。

$ tsc --init

statistic-service/ディレクトリは下記のような構造になっているはずです。

.
├── package.json
├── package-lock.json
├── src
│   └── listener.ts
└── tsconfig.json

1 directory, 4 files

すべてのファイルを保存し、次のテストを行います。

テスト1: NATS Streaming server Podへのアクセス

ターミナルでローカルのKubernetesクラスタを起動します。

$ minikube start

次に、クラスタ内で実行されているすべてのポッドのリストを取得するために実行します。

$ kubectl get pods

NATSサーバーのイメージ

ポッドの名前をコピーし、これから来るコマンドで使用します。

ここでは、Kubernetesクラスタ内で動作しているNATS Streaming serverポッドをローカルマシンでアクセス可能にします。ターミナルを開き、次のコマンドを実行して、ローカルマシン上のポートをポッド上のポートにフォワードします。

$ kubectl port-forward <あなたのポッドの名前> 4222:4222

注意:実際のポッド名に置き換えてください。

ポートフォワードのイメージ

テスト2: ダンクサービス専用のMongoDBポッドへのアクセス

ターミナルでkubectl get podsを実行し、ポッドの名前を取得します。

MongoDBのイメージ

このポッドの名前を使用して、クラスタ内のMongoDBポッドに27017ポートをフォワードします。

$ kubectl port-forward <あなたのポッドの名前> 27017:27017

注意:実際のポッド名に置き換えてください。

Mongo実行中のイメージ

テスト3: 統計サービス専用のMongoDBポッドへのアクセス

27016ポートをローカルマシンからクラスタ内の統計サービス専用のMongoDBポッドにフォワードします。

MongoDBポッドのイメージ

$ kubectl port-forward <あなたのポッドの名前> 27016:27017

注意:実際のポッド名に置き換えてください。

ポートフォワードのイメージ

テスト4: ダンクサービスと統計サービス(Expressアプリケーション)の起動

注意:上記の3つのターミナルはそのまま実行中でなければなりません。

プロジェクトディレクトリdunk-contest/にいる必要があります。タブを開き、dunk-service/ディレクトリ内で次のコマンドを実行します。

$ npm start

ダンクサービスのイメージ

同じくstatistic-service/ディレクトリ内で以下のコマンドを実行します。

$ npm run listen

npm実行中のイメージ

今の時点で、ダンクサービスと統計サービスはNATSストリーミングサーバーポッドおよびそれぞれのMongoDBポッドに接続されているはずです。

テスト5: APIクライアントを開いて、以下のリクエストを実行します

リクエスト1http://localhost:4001/dunk-contest/registerに対するPOSTリクエスト

ヘッダーContent-Type: application/jsonを持つ2つのPOSTリクエストを実行してください。
POSTリクエスト1

{
  "name": "LeBron",
  "height": 2.06,
  "weight": 113,
  "experience": 19
}

POSTリクエスト2

{
  "name": "Stephen",
  "height": 1.88,
  "weight": 84,
  "experience": 13
}

注記:ここでは、2人のプレーヤーをコンテストに登録しました。次のリクエストで彼らにショットを試みてもらいます。

リクエスト2

注記:2つのPOSTリクエストで、2人のプレーヤーがそれぞれ1回のダンクショットを試みました。統計サービスがリアルタイムでコンテスト統計をコンソールに表示するのを見るために、これらの2つの最後のリクエストをたくさん行うことができます。

これでテストは終わりです。我々の2つのサービスは意図した通りに動作しています。ダンクサービスはプレーヤーを登録し、ダンクショットを試みさせています。一方、統計サービスは各ダンクショットの後でコンテストの統計を作成し、更新しています。

今までのテストはKubernetesクラスタの外で行われましたが、これは単にデモンストレーションのためでした。次のセクションでは、すべてのサービスがクラスタ内で動作するようにしましょう。

2. サービスのDocker化

今回のチュートリアルのこの段階では、Dockerを使用してサービスをコンテナ化します。これにより、サービスのDockerイメージがDocker Hubにアップロードされ、後でKubernetesクラスタ内でポッドを作成するために使用されます。

2.1 ダンクサービスのDockerイメージ

dunk-service/ディレクトリ内にDockerfileという名前のファイルを作成し、下記のコードを入力してください。

FROM node:alpine

WORKDIR /app
<br><br>こちらの記事はdev.toの良い記事を日本人向けに翻訳しています。<br>[https://dev.to/musolemasu/data-streaming-for-microservices-using-nats-streaming-part-2-2jfc](https://dev.to/musolemasu/data-streaming-for-microservices-using-nats-streaming-part-2-2jfc)