Break Your Data Silos

ETLやBIを中心に興味のあるトピックを扱います。

XplentyとApache Airflow

今回はXplentyとApache Airflowの連携に関する以下の記事について紹介したいと思います。

www.xplenty.com

 

前提条件と初期セットアップ

Airflowのセットアップの詳細はここでは省略します。また、今回は諸々の考慮が不要なDockerコンテナを使用します。(Dockerの知識がある程度ある方向け)

DockerをWindowsもしくはMacで使用するにはDocker Desktopが必要です。Windowsの場合、OSはWindows Pro/EnterpriseエディションでHyper-Vが必要となります。Homeエディションの方はDocker Toolboxをお使いください。(Docker Desktopと比べると少し機能が限定されますが)

Docker Toolboxのインストール手順はこちらです。

 

 

なぜApache Airflowが必要?

 Astronomer社のCo-Founder 兼 CEOであるRy Walker氏によるブログ(こちら)を見るとなぜAirflowがジョブスケジューラに対する素晴らしい選択肢になるかが分かります。

 

Apache Airflowセットアップ

Airflow Dockerの読み込み

docker pull puckel/docker-airflow

これにより、Python(3.7-slim-stretch)公式イメージに基づいたAirflowでコンテナが読み込まれます。詳細については、Puckel docker-airflow Docker Hubウェブサイトをご覧ください。

Dockerコンテナ内のホスト上に作成されたDAGにアクセスするには、VirtualBoxでフォルダ共有を有効にします。

  1. Go to Settings -> Shared Folders
  2. Choose host folder
  3. Name the resource for Docker Containers
  4. Check Auto-mount and Make Permanent options.

f:id:datapipeline:20200116153825p:plain

作成されたフォルダには、DAG用のdagサブフォルダが含まれると仮定します。私の場合はc:\GoogleDrive\docker\airflow\dags\です。

 

ここで、コンテナ内で実行されるアプリケーションにアクセスするために、いくつかのポートをマップする必要があります。この例では、ポート8080でAirflowを実行すると想定しています。

  1. Go to Settings -> Network, expand the Advanced node and choose Port Forwarding

f:id:datapipeline:20200116153849p:plain

 

  1. Create a rule for port 8080 - we’re assuming that host and guest ports will be the same:

f:id:datapipeline:20200116154009p:plain 

共有フォルダへの変更が適用されていることを確認するために、Dockerを再起動します。

docker-machine.exe restart

追加のpythonパッケージについては、すべてのパッケージをリストするrequirements.txtファイルを作成しました。私の場合は

c:\GoogleDrive\docker\airflow\requirements.txt

そこに置いたものは何でもentrypoint.shスクリプトによってインストールされます。

entrypoint.shはpip installコマンド(--userオプション)を実行します。

 
これで、Airflowを実行する準備が整いました!Airflowフォルダ(私の場合は、/c/GoogleDrive/docker/airflow )から実行して、それを試してみましょう。:

docker run -d -p 8080:8080 --env-file=env -v 
/airflow/dags/:/usr/local/airflow/dags -v 
/airflow/requirements.txt:/requirements.txt puckel/docker-airflow webserver

ちなみにオプションのリストは以下です。

  • -d - Run container in background and print container ID
  • -p 8080:8080 - Publish a container’s port(s) to the host. Note we’re mapping ports from container to the virtual host. We do have already mapped these ports from virtual host to our os in VirtualBox
  • -env-file=env - this uses the env file created in airflow folder. We will use it later to hold the environment variables, including the  Xplenty API key.
  • -v /airflow/dags/:/usr/local/airflow/dags - maps the dag folder from host to container. Now  any files we create or edit on the host will be accessible inside the container
  • -v /airflow/requirements.txt:/requirements.txt - mounts the above mentioned requirements.txt file as ./requirements.txt for the entrypoint.sh to fetch.
  • puckel/docker-airflow - the name of the image we want to create a container with
  • webserver - the command to be executed

 

もし、全てのセットアップが上手くいったらAirflowにアクセスします。

http://localhost:8080/admin/ 

f:id:datapipeline:20200116154050p:plain

フォルダのマッピングやインストールされたパッケージのマッピングを確認したい場合、以下のコマンドを実行してください。

docker run -it --env-file=env -v /airflow/dags/:/usr/local/airflow/dags -v
/airflow/requirements.txt:/requirements.txt puckel/docker-airflow sh -c 
"bash" 

以下のような結果が返されます。

f:id:datapipeline:20200116154113p:plain

 

Xplentyと統合する

スケジューラーを配置したら、それを活用して、Xplentyで作成されたグラフの作成および統合ジョブのスケジューリングに使用します。

まずはじめにXplentyコンテナ用にシンプルなWrapperを用意します。パッケージを実行するためには以下の機能をサポートしていることが重要です。

  1. インプットパラメータとしてパッケージIDを取得
  2. 利用可能なコンテナを確認
  3. もし利用可能なコンテナがない場合は作成する
  4. クラスターが利用可能になるまでWaitする
  5. パッケージを実行する
  6. パッケージの実行が完了するまでWaitする
  7. 結果を返す

 

f:id:datapipeline:20200116154256p:plain



Xplenty Wrapper

 Xplenty API documentation を参考に上記のプロセスをハンドルするためのPythonスクリプトを簡単に作成できます。

変数:

まずImportと共にスクリプトで使用される以下の変数を設定するところからはじめましょう。


account_id には名前を設定します

api_keyはアカウントにアクセスするのに使います

api_urlは後ほど必要となる機能で使用される一般的なURLです。

import os
import requests, json
from time import sleep


account_id = "my-account-id"
api_key = os.getenv('xpl_api_key')
api_url = 'https://api.xplenty.com/{}/api/'.format(account_id) 

run_job

パッケージを実行するメインとなるFunctionの引数は「package_id」のみです。次にヘッダーはrun job documentation.にしたがってセットされます。UrlにJobsメソッドが追加されます。

get_available_clustersというFunctionはクラスターが利用可能かどうかをチェックする際に実行されます。

 headers = {
      'Accept': 'application/vnd.xplenty+json; version=2',
      'Content-Type': 'application/json',
  }
  url = api_url + 'jobs'

利用可能なクラスターが稼働中でない場合、create_clusterというFunctionがクラスターを作成するのに実行されます。

  # get available running clusters
  cluster_id = get_available_cluster()

スクリプトクラスターが利用可能になるまで待つ必要があります。

  # if there are no available clusters, create one
  if cluster_id < 0:
      create_cluster()

 

  # wait for the cluster to get created and available
  while cluster_id < 0:
      print('Waiting for new cluster to start...')
      sleep(30)
      cluster_id = get_available_cluster()
      print(cluster_id)

次にクラスターが選ばれたらdataという変数にクラスターIDとパッケージIDがJSON形式で割り当てられます

# prepare the json indicating cluster_id and package_id
  data = '{{"cluster_id": {cluster_id}, "package_id":{package_id}}}'.format(cluster_id=cluster_id, package_id=package_id)

全て完了したらPostリクエス

全て完了したら、POSTリクエストがヘッダ情報、データおよび認証キーと一緒に先ほどのURLに送信されます。ジョブリクエスト送信後、スクリプトは結果のモニタリングを開始します。ステータスが実行中かPendingの間はXplenty APIでジョブのステータスを30秒ごとに確認します。

  if cluster_id:
      print('Running package: {}'.format(package_id))
      print(url, str(data))

      # run the POST request to start the job
      r = requests.post(url,
                        headers=headers,
                        data=str(data),
                        auth=(api_key, ''))

      print(r.json())
      print('Started job id={}'.format(r.json()['id']))

 

  while job_status in ('pending', 'running'):
          sleep(30)
          job_status = get_job_status(r.json()['id'])
          print(job_status)

最後にジョブのステータスが返されます。

 

  # return the final status of the job
  return job_status

run_job -以下は全てを 以下は完全なFunctionです。

   # return the final status of the job
   return job_status

run_job - putting it all together
Heres a complete function:

def run_job(package_id):
   headers = {
       'Accept': 'application/vnd.xplenty+json; version=2',
       'Content-Type': 'application/json',
   }
   url = api_url + 'jobs'
   # get available running clusters
   cluster_id = get_available_cluster()
   # if there are no available clusters, create one
   if cluster_id < 0:
       create_cluster()
   # wait for the cluster to get created and available
   while cluster_id < 0:
       print('Waiting for new cluster to start...')
       sleep(30)
       cluster_id = get_available_cluster()
       print(cluster_id)
   # prepare the json indicating cluster_id and package_id
   data = '{{"cluster_id": {cluster_id}, "package_id": {package_id} }}'.format(cluster_id=cluster_id,
                                                                          
package_id=package_id)

   if cluster_id:
       print('Running package: {}'.format(package_id))
       print(url, str(data))

       # run the POST request to start the job
       r = requests.post(url,
                         headers=headers,
                         data=str(data),
                         auth=(api_key, ''))

       print(r.json())
       print('Started job id={}'.format(r.json()['id']))
       # get the status of the job
       job_status = get_job_status(r.json()['id'])
       print(job_status)
       # wait as long as the job is either 'pending' or 'running'
       while job_status in ('pending', 'running'):
           sleep(30)
           job_status = get_job_status(r.json()['id'])
           print(job_status)
   # return the final status of the job
   return job_status

get_available_clusters

前述したように、このFunctionは全てのクラスターをフェッチし、それらのステータスをチェックします。もし、クラスターが利用可能であることを確認した場合、 FunctionはクラスターIDを返します。また、利用可能なクラスターがない場合は-1を返します。APIの詳細はこちら(here)で確認できます。

def get_available_cluster():
  headers = {
      'Accept': 'application/vnd.xplenty+json; version=2',
  }
  url = api_url + 'clusters'

  r = requests.get(url,
                    headers=headers,
                    auth=(api_key, ''))

  for cluster in json.loads(r.text):
      print('Cluster: {}, status: {}'.format(cluster['name'], cluster['status']))
      if cluster['status'] == 'available':
          print('Using cluster: {}'.format(cluster['name']))
          return cluster['id']
  print('No available cluster!')
  return -1

このFunctionは全てのクラスターをリスト化し、data'{"status": "available"}'で利用可能な物だけをリストアップできます。以下のようにurlに追加されます。

  r = requests.get(url,
                    headers=headers,
                    data = '{"status": "available"}',
                    auth=(api_key, ''))

create_clusters

ちなみにXplentyのUI画面でクラスターをセットアップすることが可能です。

f:id:datapipeline:20200116154344p:plain

SandboxクラスターかProductionクラスターか、ノードの設定、さらにクラスターがInactiveになってからどれくらいの時間間隔でクラスターをTerminateするかについて数回クリックするだけで作成可能です。

 

今回のブログでは、以下のFunctionによりSandboxクラスターが自動的に作成されます。

def create_cluster():
  headers = {
      'Accept': 'application/vnd.xplenty+json; version=2',
      'Content-Type': 'application/json',
  }

  data = '{"nodes":1, "type":"Sandbox"}'

  print(data)
  print('Creating new cluster...')
  url = api_url + 'clusters'

  r = requests.post(url,
                    headers=headers,
                    data=data,
                    auth=(api_key, ''))
  print(r.status_code)
  print(r.text) 

ヘッダー情報と今回使用するクラスターのパラメーターに関する情報を保持するdataという変数(ドキュメント(こちら)に従い、サンドボックスはシングルノードのみを使用する必要があります)と共にPOSTリクエストが送信されます。

 

get_job_status

最後のFunctionは実行中のジョブのステータスをフェッチするのに使われます。job_idパラメータを引数とし、適切なヘッダ情報と共にGETリクエストを実行しジョブのステータスが取得されます。

def get_job_status(job_id):
  headers = {
      'Accept': 'application/vnd.xplenty+json; version=2',
  }
  url = '{api_url}/jobs/{job_id}'.format(api_url=api_url,
                                          job_id=job_id)
  r = requests.get(url,
                    headers=headers,
                    auth=(api_key, ''))
  return r.json()['status']

 

Xplenty DAGS

Wrapperが準備できたら、Xplentyパッケージを呼び出して実行するAirflow DAGを作成しましょう!今回は、3つのパッケージを順番に実行するシンプルなDAGを作成します。

undefined

それぞれジョブが完了するまで待機し、次のジョブをスタートします。

必要となるAirflowコンポーネントをImportし、それから上記のXplenty WrapperとDatet imライブラリをインポートします。

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow import AirflowException
from wrapper import xplenty
from datetime import datetime

次に、1日1回のスケジュールと、開始タスクとして使用される「dummy_operator」を含むDAG定義を作成します。

 

dag = DAG('Xplenty_Jobs_Schedule', description='DAG for Xplenty packages',
        schedule_interval='15 15 * * *',

オペレータ:今回は非常にシンプルです。各パッケージについてそれぞれ1つのオペレータを用意し、PackageIDの箇所が異なるだけで同一のFunctionを使用します。毎回基本的なPythonオペレータが使用されています。

rest_s3_operator = PythonOperator(task_id='rest_s3_operator',
                                python_callable=run_xplenty_package,
                                op_kwargs={'package_id': 114089},
                                dag=dag)

s3_mysql_operator = PythonOperator(task_id='s3_mysql_operator',
                                python_callable=run_xplenty_package,
                                op_kwargs={'package_id': 114090},
                                dag=dag)

salesforce_mysql_upsert_operator = PythonOperator(task_id='salesforce_mysql_upsert_operator',
                                python_callable=run_xplenty_package,
                                op_kwargs={'package_id': 114091},
                                dag=dag)

最後に依存関係を設定します。前述したとおり、ジョブは順次実行されるように設定します。

dummy_operator >> rest_s3_operator
rest_s3_operator >> s3_mysql_operator
s3_mysql_operator >> salesforce_mysql_upsert_operator

Flowを実行

セットアップ全体を機能させるには、Airflow Docker Containerを起動し、DAGをチェックして実行し、Xplenty側で検証する必要があります。 

 

Airflow Containerを実行する

以下のコマンドでコンテナを起動します。

docker run -d -p 8080:8080 --env-file=env -v 
/airflow/dags/:/usr/local/airflow/dags -v 
/airflow/requirements.txt:/requirements.txt puckel/docker-airflow webserver

その後コンテナが起動していることをログで確認します。私の環境では、optimistic_paniniというのがコンテナの名称なので以下のコマンドで確認します。

docker logs optimistic_panini

ログを確認すると、DAGが読み取られ、「SequentialExecutor」が使用されていることがわかります。

[2019-12-02 19:01:42,475] {{__init__.py:51}} INFO - Using executor SequentialExecutor
[2019-12-02 19:01:42,477] {{dagbag.py:92}} INFO - Filling up the DagBag from /usr/local/airflow/dags

http://localhost:8080/admin/へアクセスし、AirflowのUI画面が表示されることを確認します。

f:id:datapipeline:20200116154519p:plain

 

DAGが有効であることかつスケジュールを確認します。

f:id:datapipeline:20200116154609p:plain



またLinksメニューの左端のボタンで直接DAGを実行することができます。

f:id:datapipeline:20200116154627p:plain

DAGの名前をクリックすると、ジョブの実行結果などを確認できるようになります。複数の実行が列に表示され、統計が表示されます。以下の例では、成功、失敗、前処理の失敗などのさまざまなステータスを表示しています。

f:id:datapipeline:20200116154659p:plain

 

Xplentyではどのように反映されているかを確認してみましょう!!

f:id:datapipeline:20200116154723p:plain

見ての通り、XplentyのジョブステータスはAirflowに反映されます-最新の実行(Airflowツリービューの右端の列)は次のことを示しています。

  • salesfore_mysql_upsert_operatorは実行されませんでした。そしてそれはXplentyダッシュボード画面ではリストに表示されていません。
  • s2_mysql_operatorは失敗しました。そしてXplentyダッシュボード画面の一番上でにFailedのステータスと共に表示されています。
  • rest_s3_operatorは最初に実行され、成功しています。そしてXplentyダッシュボードでもリストの二番目で100%完了と表示されています。

各実行ごとにGraphビューで以下のように確認することができます。

f:id:datapipeline:20200116154810p:plain 

ガントチャートでも確認できます。

f:id:datapipeline:20200116154827p:plain

ツリー、グラフビュー、またはガントチャートでジョブをクリックすると、そのほかのメニューがいくつかの追加オプションと共にポップアップで表示され、ジョブの成功、失敗、実行中と言った設定やログの確認を行うことができます。

f:id:datapipeline:20200116154853p:plain



Take off!

さて、すべてのセットアップが完了したら、実際に本番環境を作成し、企業全体に展開するのはどれくらい難しいのかと疑問に思うかもしれません。必要なすべてのインフラストラクチャ、サーバー構成、メンテナンスと可用性、ソフトウェアのインストールを考慮に入れると、スケジューラの信頼性を確保するために多くのことを実現する必要があります。もし誰かがこういった心配をすべて取り除いてくれて、ジョブのスケジュールに集中できるようにしたらどうでしょう?

 

f:id:datapipeline:20200116154949p:plain

 

ということで、ここからは、Astronomer.ioとXplentyをさらに詳しく見ていきます。実際に彼らが主張しているように簡単かどうかを見てみましょう。

f:id:datapipeline:20200116155020p:plain

手始めにHP(page)で公開されているガイドを見てみましょう。トライアルアカウントをセットアップしWorkspaceを作成します。

f:id:datapipeline:20200116155051p:plain



New Deploymentで名前など設定し、 Executorを選択します。

 

f:id:datapipeline:20200116155117p:plain

Astronomer.ioの説明をここで紹介します。

「Airflowは複数のexecutorプラグインをサポートしています。これらのプラグインは、タスクを実行する方法と場所を決定します。軽いワークロードまたはテストワークロードにはLocal Executorを、大規模な本番ワークロードにはCeleryおよびKubernetes Executorをサポートしています。 Celery Executorは分散タスクキューとスケーラブルなワーカープールを使用しますが、Kubernetes Executorは個別のKubernetes podですべてのタスクを起動します。

 

保存するとOverviewページにリダイレクトされ、Apache AirflowをOpenする画面が表示されます。

f:id:datapipeline:20200116155200p:plain

 

すでに皆さんも気づかれたかもしれませんが、サーバーが裏で作成されます。生成されたウェブアドレスをみて気付くかもしれません。私の場合は以下のようになっています。

f:id:datapipeline:20200116155214p:plain

環境全体がバックグラウンドで開始されるため、少し時間がかかる場合があります。起動したら(ブラウザウィンドウを更新して確認します)、以下のようにAirflowメイン画面が表示されます。

f:id:datapipeline:20200116155228p:plain

ただ、画面にDAGがなく、完全に空の状態です。また、DAGをアップロードするUIオプションもありません。そのため、CLIクイックスタートの手順( CLI quickstart instructions.)に従う必要があります。 

Astronomer.io上でDAGを設定する

Windows上でWSLを実行する

Hyper-Vを有効にしてWindowsバージョンを実行している場合にかぎり、WSLを使用して以下の手順を行ってください。

以下の手順に従ってCLIをインストールしましょう。

curl -sSL https://install.astronomer.io | sudo bash


これですべての設定が完了します。設定が正しく行われたかどうかを確認するには、astroコマンドを実行してヘルプが表示されるかどうかを確認してください。

f:id:datapipeline:20200116155244p:plain

プロジェクトを作成し、作成したディレクトリに移動します。

mkdir xplenty && cd xplenty

astro dev initコマンドを使用してプロジェクトを初期化すると、以下のようなメッセージが返されます

f:id:datapipeline:20200116155306p:plain

これで以下のコマンドによりAstronomer Cloudに接続することができます。

astro auth login gcp0001.us-east4.astronomer.io

画面の指示に従ってログインします。oAuthを使用するか、ユーザー名/パスワードを使用します。

f:id:datapipeline:20200116155320p:plain

完了したら確認メッセージが表示されます。

Successfully authenticated to registry.gcp0001.us-east4.astronomer.io


Xplenty APIキーを.envに入れてください。あなたのDAGをプロジェクトにコピーできるはずです。

(私の環境では、/mnt/c/Astronomer/xplenty/dag)。次に、astro deployコマンドを実行します。

このコマンドは、最初にDeploymentとWorkspaceのオプションを提供します。今回の例では、リストに1つしかありません。最終的には果、セットアップ全体がAstronomer.ioにPublishされます。

¥Select which airflow deployment you want to deploy to:
#     LABEL       DEPLOYMENT NAME            WORKSPACE DEPLOYMENT ID
1     Xplenty     quasarian-antenna-4223     Trial Workspace ck3xao7sm39240a38qi5s4y74

> 1
Deploying: quasarian-antenna-4223
quasarian-antenna-4223/airflow
Building image...
Sending build context to Docker daemon  26.62kB
Step 1/1 : FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild
# Executing 5 build triggers
---> Using cache
---> Using cache
---> Using cache
---> Using cache
---> a5866d1769c4
Successfully built a5866d1769c4
Successfully tagged quasarian-antenna-4223/airflow:latest
Pushing image to Astronomer registry
The push refers to repository [registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow]
d2a571c73db1: Pushed
01de691c8f7c: Layer already exists
6dca0d392e56: Layer already exists
097cec956145: Layer already exists
dd314892853b: Layer already exists
4285fcfc2381: Layer already exists
3f4cdd9563bd: Layer already exists
15797b66dbf6: Layer already exists
0f65bcec71fa: Layer already exists
299fd49bdb72: Layer already exists
da37bee05289: Layer already exists
132a2e1367b6: Layer already exists
03901b4a2ea8: Layer already exists
cli-3: digest: sha256:b48933029f2c76e7f4f0c2433c7fcc853771acb5d60c176b357d28f6a9b6ef4b size: 3023
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow:cli-3
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow@sha256:b48933029f2c76e7f4f0c2433c7fcc853771acb5d60c176b357d28f6a9b6ef4b
Deploy succeeded!
root@270c02e5d9d5:/home/astronomer/xplenty#

これで、Astronomer Workspaceに新しいDAGが利用可能になります。

f:id:datapipeline:20200116155335p:plain

Docker上で実行する

上記のように動くはずでしたが、実際には何も起こりませんでした。実際にはデプロイできておらず、WSLから「astro deploy」コマンドを実行すると以下のようにエラーにより実行できませんでした。

vic@DESKTOP-I5D2O6C:/c/Astronomer/xplenty$ astro deploy
Authenticated to gcp0001.us-east4.astronomer.io

Select which airflow deployment you want to deploy to:
#     LABEL       DEPLOYMENT NAME            WORKSPACE DEPLOYMENT ID
1     Xplenty     quasarian-antenna-4223     Trial Workspace ck3xao7sm39240a38qi5s4y74

> 1
Deploying: quasarian-antenna-4223
quasarian-antenna-4223/airflow
Building image...
Cannot connect to the Docker daemon at tcp://localhost:2375. Is the docker daemon running?
Error: command 'docker build -t quasarian-antenna-4223/airflow:latest failed: failed to execute cmd: exit status 1

vic@DESKTOP-I5D2O6C:/c/Astronomer/xplenty$

Hyper-Vがないため、私のWindowsでDockerデーモンが起動しません。同様の問題に直面しても心配なく!

まず、ubuntuのDocker Imageを読み込みます。

docker pull ubuntu

次に、コンテナ内にAstronomer CLIをインストールします-上記で実施したのと同じように。コンテナをインタラクティブモードで起動するには以下のコマンドを実行します。

docker run -it ubuntu sh -c "bash"

CLIをインストールします

curl -sSL https://install.astronomer.io | sudo bash

プロジェクトのディレクトリを作成し、カレントパスに設定します

mkdir xplenty && cd xplenty

「astro dev init」でプロジェクトを初期化し、確認メッセージを確認します。

root@270c02e5d9d5:/home/astronomer/xplenty# astro dev init
Initialized empty astronomer project in /home/astronomer/xplenty

以下のコマンドでAstronomer Cloudに接続することができます。

astro auth login gcp0001.us-east4.astronomer.io

画面の指示に従ってログインします。oAuthを使用するか、ユーザー名/パスワードを使用します。

root@270c02e5d9d5:/home/astronomer/xplenty# astro auth login gcp0001.us-east4.astronomer.io
CLUSTER                             WORKSPACE
gcp0001.us-east4.astronomer.io      ck3xaemty38yx0a383cmooskp

Switched cluster
Username (leave blank for oAuth):

Please visit the following URL, authenticate and paste token in next prompt

https://app.gcp0001.us-east4.astronomer.io/login?source=cli

oAuth Token:

Tokenを取得して貼り付けます(うまく機能しました)

もしくはユーザー名とパスワードを使用します。

 

完了したら確認メッセージが表示されます。

Successfully authenticated to registry.gcp0001.us-east4.astronomer.io

手順が完了したら、Dockerコンテナの状態を新しいイメージに保存しておくと便利です。 Docker psでコンテナIDを確認するだけです

私の場合、270c02e5d9d5です。

CONTAINER ID        IMAGE   COMMAND CREATED             STATUS PORTS NAMES
270c02e5d9d5        ubuntu "sh -c bash"        48 minutes ago      Up 48 minutes                     charming_galileo

私は以下のコマンドでAstronomerがインストール済みのイメージを作ることができます。

docker commit 270c02e5d9d5 ubuntu:astro

これで新しいイメージが作成されました。docker imagesmコマンドで確認することができます。

$ docker images
REPOSITORY                       TAG IMAGE ID           CREATED SIZE
ubuntu                           astro 6f7e5bf1b01c        2 hours ago 139MB
ubuntu                           latest 775349758637        5 weeks ago 64.2MB

最後に。コンテナ内で作成したXplentyプロジェクトにコピーするDAGボリュームをマウントして、コンテナを再実行しました。さらに、私はdocker.sockをマウントすることで、astroがコンテナ内からdockerにアクセスできるようにしました。

docker run -it -v /airflow/dags/:/usr/local/Astronomer/dags/ -v /var/run/docker.sock:/var/run/docker.sock --env-file=env ubuntu:astro sh -c "bash"

さて、デプロイの前の最後の手順はAPIキーです。次のように、Dockerfileの環境変数として設定することをお勧めします。

root@270c02e5d9d5:/home/astronomer/xplenty# ll
total 60
drwxr-xr-x 1 root root 4096 Dec  9 14:08 ./
drwxr-xr-x 1 root root 4096 Dec  9 12:23 ../
drwxr-x--- 2 root root 4096 Dec  9 10:07 .astro/
-rw-r--r-- 1 root root   38 Dec 9 10:07 .dockerignore
-rw-r--r-- 1 root root   45 Dec 9 12:03 .env
-rw-r--r-- 1 root root   31 Dec 9 10:07 .gitignore
-rw-r--r-- 1 root root  101 Dec 9 14:00 Dockerfile
-rw-r--r-- 1 root root  556 Dec 9 10:07 airflow_settings.yaml
drwxr-xr-x 1 root root 4096 Dec  9 14:07 dags/
drwxr-xr-x 2 root root 4096 Dec  9 10:07 include/
-rw------- 1 root root   62 Dec 9 10:52 nohup.out
-rw-r--r-- 1 root root    0 Dec 9 10:07 packages.txt
drwxr-xr-x 2 root root 4096 Dec  9 10:07 plugins/
-rw-r--r-- 1 root root    0 Dec 9 10:07 requirements.txt
root@270c02e5d9d5:/home/astronomer/xplenty# more Dockerfile
FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild
ENV xpl_api_key=<your-API-key-here>

Deploymentのための設定が全て完了しました。あとはastro deployを実行するだけです。

root@270c02e5d9d5:/home/astronomer/xplenty# astro deploy
Authenticated to gcp0001.us-east4.astronomer.io

Select which airflow deployment you want to deploy to:
#     LABEL       DEPLOYMENT NAME            WORKSPACE DEPLOYMENT ID
1     Xplenty     quasarian-antenna-4223     Trial Workspace ck3xao7sm39240a38qi5s4y74

> 1
Deploying: quasarian-antenna-4223
quasarian-antenna-4223/airflow
Building image...
Sending build context to Docker daemon  26.62kB
Step 1/2 : FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild
# Executing 5 build triggers
---> Using cache
---> Using cache
---> Using cache
---> Using cache
---> b4f4c9e5cb16
Step 2/2 : ENV xpl_api_key=Vf9ykgM3UCiBsDMUQpkpUyTYsp7uPQd2
---> Running in 0ec9edff34a5
Removing intermediate container 0ec9edff34a5
---> 24232535523f
Successfully built 24232535523f
Successfully tagged quasarian-antenna-4223/airflow:latest
Pushing image to Astronomer registry
The push refers to repository [registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow]
caafa5dbf9af: Pushed
01de691c8f7c: Layer already exists
6dca0d392e56: Layer already exists
097cec956145: Layer already exists
dd314892853b: Layer already exists
4285fcfc2381: Layer already exists
3f4cdd9563bd: Layer already exists
15797b66dbf6: Layer already exists
0f65bcec71fa: Layer already exists
299fd49bdb72: Layer already exists
da37bee05289: Layer already exists
132a2e1367b6: Layer already exists
03901b4a2ea8: Layer already exists
cli-11: digest: sha256:b7d5f8b5b1ba49fb70549c473a52a7587c5c6a22be8141353458cb8899f4159a size: 3023
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow:cli-11
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow@sha256:b7d5f8b5b1ba49fb70549c473a52a7587c5c6a22be8141353458cb8899f4159a
Deploy succeeded!

お気づきかもしれませんが、私の環境では既にいくつかのレイヤーが存在していました。一連のDeploymentの場合、一部の部品が再利用されていることがわかります。全ての設定が完了するのに数回の試行が必要でした。この例では問題が発生した場合に手順を複数回実施しました。実験することを恐れないでください!

 

完了したら、Astronomerワークスペースに移動してDAGを実行できます。

f:id:datapipeline:20200116155401p:plain

前述したXplenty Wrapperによってクラスターが作成されました。

f:id:datapipeline:20200116155418p:plain

ジョブが開始されました。

f:id:datapipeline:20200116155430p:plain

しばらくすると、Xplentyですべてのジョブが完了したことを確認できます。

f:id:datapipeline:20200116155446p:plain

そしてAstronomer グラフビューに反映されました。

f:id:datapipeline:20200116155503p:plain

ツリービューで見ると

f:id:datapipeline:20200116155514p:plain

おまけ

好奇心が強い人は、途中でいくつかの失敗に気付くでしょう。特定の実行にマウスを合わせるだけで、詳細を取得できます。

f:id:datapipeline:20200116155533p:plain



タスクの「State」は明らかに失敗しています。ボタンをクリックするだけで、ログを確認できるメニューが表示されます。

f:id:datapipeline:20200116155631p:plain



ログを確認するのは本当に簡単なのですが、あまり細かく表現されていないのでわかりません。

f:id:datapipeline:20200116155641p:plain

それでは、もう少し深く掘り下げて、Xplentyを調査してみましょう。失敗したジョブを見つけて一覧の右端にある「View details」をクリックするだけで簡単に確認できます。

f:id:datapipeline:20200116155657p:plain

エラーおよびジョブで使用している変数を確認できるパネルが開きます。

f:id:datapipeline:20200116155708p:plain

ジョブが失敗した要因が判明しました。

Input(s):
Failed to read data from "xplenty://XPLENTY_USER_S3_CONNECTION_9669@xplenty.dev/mako/breakfast.csv"

ソースファイルが確認できなかったのが原因のようです。Xplentyで障害調査とエラーの要因分析がどれくらい簡単にできるかやってみました。

 

Apache AirflowとXplentyの統合

AirflowとXplentyと一緒に使用することで、シームレスにジョブのスケジュールおよび監視を行うことができ、エンタープライズ規模のワークフローを実現することが可能になります。 XplentyはクラウドベースのコードフリーのETLソフトウェアであり、シンプルな視覚化されたデータパイプラインを提供して、幅広いソースとデスティネーションをカバーする自動化されたデータフローを実現します。