Run Meltano, run...

Run Meltano, run...

February 04, 2024 383 read

Zooming Through Data: The Hilarious Odyssey of Installing, Authenticating, and Supercharging the Workflows with Meltano.

Once upon a time, I wrote about Prague, calling it “The City of 100 Towers.” Discussing Prague, from its magical streets to revealing secrets like its nickname, was a delightful experience. If you have not read it yet, I highly recommend doing so. Just, click click here.

With this text, I am going to tell you about something that will take you to a different adventure, something that does not have a username like Prague but is as fun as a city and full of mysticism as the streets. Okay… At the end of a story that starts like a tireless orientalist, I may not take you to the city where the Arabian Nights took place. I will basically not introduce you to the genie who suddenly appears from Aladdin’s wonder teapot (lamp) and make you say whatever you want from me. Or it might be that you will have three wishes. I am not sure about what genie was telling: a wish, a lamp. Caress the lamp, bring out the genie and say your wish, numbers are just detail. 

Well, after all this to start the article, I think it would be good to ask the following question: Are you ready to have an adventure in the data world? I believe, there is a silent but excited voice that just says: “yes”. 

There are two words that will follow us throughout the article: Meltano and SDK (Software Development Kit). To not get bogged down in technical and boring academic definitions, they can be explained as follows: Meltano is a comprehensive data integration platform that acts as a map and compass, allowing you to find your way in the data forest. However, SDK is the magic digging tool we have that helps you discover the mysterious and unknown data sources you face in this adventure. Going back to the Aladdin’s story, if Meltano is Aladdin’s wonder teapot (sorry, lamp), SDK is the genie that comes out when you caress (or configure) it and grants you an untold number of wishes. 

 

LET’S START!

So, what is this custom tap or extractor? Simply, we use them in data integration process to extract data from special, complex, or niche data sources that standard data extractors or taps cannot reach or are not designed for. These are usually designed to suit the specific structure of a particular data source, access protocols or specific data formats. For example, there is an API and none has worked on it before, no tap has been created for it. In this case, you can roll up your sleeves, create a custom tap, and effectively pull data from the source.

BUT HOW?

Before starting, there are few dependencies that need to be installed or checked that it is already installed in the computer. For more deep knowledge, just click to Meltano documentation.

1. Python3 – is the beginning of everything. If you watched the Avatar movie, the tree to which the avatars are connected is the same for our custom tap. 

python –-version

if there is no python, just go to official website of python and download it.

2. Pip3 – of course, it is needed its distant relative pip to install pipx.

pip3 install pipx
pipx ensurepath
source ~/.bashrc

3. meltano – installing meltano.

pipx install meltano

4. Poetry – will give us a stable management and a deterministic way for custom tap.

pipx install poetry

5. Cookiecutter – will give us a template for everything and we will be able to move on it. 

pipx install cookiecutter

After all installation, there is only one thing left that can come from the documentation of Meltano. It is described in docs.meltano.com. Create a project for the custom tap with running the following command: 

cookiecutter https://github.com/meltano/sdk --directory="cookiecutter/tap-template"

There will be some couple of questions like source_name, admin_name, tap_id, library_name, variant, select stream_type: 1 – REST, 2 – GraphQL, 3 – SQL, 4 – Other. (in this journey, the first one (1 – REST) will be selected. The other question that is select auth_method: 1 – API Key, 2 – Bearer Token, 3 – Basic Auth, 4 – OAuth2, 5 – JWT, 6 – Custom or N/A- (In this journey, the fourth one (4 – Oauth2) will be selected. The last question is include_cicd_sample_template: 1 – GitHub, 2 – None (Skip). In this case the default can be selected. 

Let me tell that I named the project as “dreamland” and created. There will be directory as created, and under the tap-dreamland the other python dependencies using poetry needs to be installed. The following command can be used in terminal: 

poetry install

Before we get into hardcore coding, we should talk about the scenario. We imagine that we will receive data from the API. We can assume that this API is of REST stream type and uses OAuth2 as the authentication method. Well, if we agree on this part, now is the time to get on the magic carpet or touch the magic lamp to force the genie to show up.

After you caress the lamp -or- jump on the magic carpet, there will be four files in your project directory (~/tap-dreamland/tap_dreamland/). These are as follows: auth.py, client.py, streams.py, tap.py. There will be one more file (in the case if there is pagination) from me, it is pagination.py.

Now let’s take a look at each page and examine in detail: what it is useful for, where and what we should do and what we will do. Let’s examine them one by one.

~/tap-dreamland/tap_dreamland/tap.py will be following as default:

# dreamland tap class
from typing import List
from singer_sdk import Tap, Stream
from singer_sdk import typing as th  # JSON schema typing helpers

# Importing stream classes from the 'streams' module
from tap_dreamland.streams import (
    DreamlandStream,  # Assuming DreamlandStream is a class you intended to include but wasn't used
    CharacterListStream,
    CharacterDetailStream,
)

# List of stream types this tap will handle
STREAM_TYPES = [CharacterListStream, CharacterDetailStream]

class TapDreamland(Tap):
    """Dreamland tap class for extracting data."""

    name = "tap-dreamland"
    
    # Define the JSON schema configuration for the tap
    config_jsonschema = th.PropertiesList(
        th.Property(
            "char_id",  # Changed "CharId" to "char_id" for consistency with Python naming conventions
            th.StringType,
            required=True,
            description="The character list’s ID."
        )
    ).to_dict()  # Ensure the JSON schema is converted to a dictionary

    def discover_streams(self) -> List[Stream]:
        """Return a list of discovered streams."""
        return [stream_class(tap=self) for stream_class in STREAM_TYPES]

Imports and Global Variables: The code begins by importing necessary types and classes from the typing and singer_sdk modules for type hinting, base functionality, and JSON schema definition. It specifically imports List, Tap, Stream, and a typing helper module renamed as th for convenience. It also imports custom stream classes from a module related to "dreamland," selecting characterListStream for use, which indicates the specific data streams this Tap is designed to handle.

Tapdreamland Class Definition: A new class, Tapdreamland, is defined, extending the Tap base class from singer_sdk, indicating this is a custom Tap for "dreamland." It sets its name and specifies required configuration in JSON schema format, emphasizing the necessity of a "CharId" property for operation.

Stream Discovery Method: The discover_streams method is implemented to identify and instantiate available data streams defined by STREAM_TYPES, using list comprehension to create a list of stream instances, enabling the Tap to access and extract the specified data streams efficiently.

~/tap-dreamland/tap_dreamland/streams.py will be following:

"""Stream type classes for tap-dreamland."""

from pathlib import Path
from singer_sdk import typing as th  # JSON Schema typing helpers
from tap_dreamland.client import DreamlandStream
from tap_dreamland.pagination import DreamlandPaginator

SCHEMAS_DIR = Path(__file__).parent / "schemas"

class CharacterList(DreamlandStream):
    name = "character-list"
    path = "/characters"
    primary_keys = ["charId"]
    schema_filepath = SCHEMAS_DIR / "character_list.json"
    records_jsonpath = "$.data[*]"

    def get_child_context(self, record: dict, context: dict | None) -> dict:
        return {"charId": record["charId"]}

class CharacterDetail(DreamlandStream):
    name = "character-detail"
    path = "/character-detail/{charId}"
    primary_keys = ["charId"]
    schema_filepath = SCHEMAS_DIR / "character_detail.json"
    parent_stream_type = CharacterList

Imports and Global Variables: The code imports essential modules and classes for type hinting, file path manipulation, and defining JSON schemas, facilitating readable and maintainable code. It also sets up a directory for JSON schema files, ensuring data structure consistency across streams.

CharacterList Stream: This class is designed to list characters from the "dreamland" service, specifying the API endpoint, primary key, and JSON schema file for data validation. It includes a method to provide context for child streams, focusing on the unique character ID.

CharacterDetail Stream: This class fetches detailed information for individual characters, indicating its dependency on the CharacterList stream for context. It reuses the JSON schema from CharacterList, suggesting similar data structures between these streams.

~/tap-dreamland/tap_dreamland/client.py will be following:

"""REST client handling, including DreamlandStream base class."""

import sys
from typing import Any, Dict, Iterable, Optional, Union
from pathlib import Path
import requests
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk import typing as th  # JSON Schema typing helpers
from memorization import cached
from tap_dreamland.client import DreamlandStream
from tap_dreamland.auth import DreamlandAuthenticator
from tap_dreamland.pagination import DreamlandPaginator

SCHEMA_DIR = Path(__file__).parent / "schemas"
PAGE_SIZE = 10

class DreamlandStream(RESTStream):
    """Dreamland stream class."""
    
    records_jsonpath = "$[*]"
    next_page_token_jsonpath = "$.hasNextPage[*]"

    @property
    def url_base(self) -> str:
        """Return the API URL root, configurable via tap settings."""
        return "api_url_here"

    @property
    @cached
    def authenticator(self) -> DreamlandAuthenticator:
        """Return a new authenticator object."""
        return DreamlandAuthenticator.create_for_stream(self)

    @property
    def http_headers(self) -> Dict[str, str]:
        """Return the HTTP headers needed."""
        headers = {}
        return headers

    def get_url_params(self, context: Optional[Dict] = None, next_page_token: Optional[Any] = None) -> Dict[str, Any]:
        """Return a dictionary of values to be used in URL parameterization."""
        params: Dict[str, Any] = {}
        if next_page_token:
            params["limit"] = self.config.get("limit", PAGE_SIZE)
            params["page"] = next_page_token
        if self.replication_key:
            params["sort"] = "asc"
            params["order_by"] = self.replication_key
        return params

    def get_new_paginator(self) -> DreamlandPaginator:
        """Get a fresh paginator for this API endpoint."""
        return DreamlandPaginator(start_value=0, page_size=PAGE_SIZE, next_page_token_jsonpath=self.next_page_token_jsonpath)

    def parse_response(self, response: requests.Response) -> Iterable[Dict]:
        """Parse the response and return an iterator of result rows."""
        yield from extract_jsonpath(self.records_jsonpath, input=response.json())

 

Imports and Setup: The code includes essential imports for HTTP requests, file path management, and type annotations, utilizing a cached decorator for performance optimization. It also references custom classes like dreamlandStream for stream handling, dreamlandAuthenticator for authentication, and dreamlandPaginator for pagination management.

dreamlandStream Class: This class extends RESTStream, indicating its role in facilitating RESTful API interactions. It includes mechanisms for setting the API's base URL, customizing HTTP headers, authenticating requests, managing pagination, and parsing API responses to yield data records. Key functionalities include:

  • URL and Headers: Defines methods to set the API base URL and HTTP headers, essential for forming requests.
  • Authentication: Implements a cached authenticator property to efficiently manage authentication instances.
  • Pagination: Provides methods to construct URL parameters for pagination and instantiate a paginator object for managing pagination logic.
  • Response Parsing: Includes a parse_response method to extract and yield data records from API responses using JSONPath.

~/tap-dreamland/tap_dreamland/pagination.py will be following:

class DreamlandPaginator(BaseOffsetPaginator):
    def __init__(self, start_value: int, page_size: int, next_page_token_jsonpath: str) -> None:
        super().__init__(start_value, page_size)
        self.next_page_token_jsonpath = next_page_token_jsonpath

    def has_more(self, response: requests.Response) -> bool:
        """ Check if the response has more items based on the next page token.
        Args:
            response: API response object.
        Returns:
            True if the next page token is present, indicating more items.
        """
        try:
            next_page_token = extract_jsonpath(self.next_page_token_jsonpath, response.json())
            return bool(next_page_token)
        except Exception:
            return False

    def get_next(self, response: requests.Response) -> Optional[int]:
        """ Determine the next page's start value if there are more items.
        Args:
            response: API response object.
        Returns:
            The next start value if there are more items, None otherwise.
        """
        try:
            next_page_token = extract_jsonpath(self.next_page_token_jsonpath, response.json())
            if next_page_token:
                return self.start_value + self.page_size
        except Exception:
            pass
        return None

Class Definition and Initialization: The dreamlandPaginator class, inheriting from BaseOffsetPaginator, implements offset-based pagination for API data fetching. It's initialized with a starting offset, page size, and a JSONPath to identify the next page token in API responses.

Methods:

  • has_more: Checks if more data is available by looking for a next page indicator in the API response using the specified JSONPath. Returns True if more data is available, otherwise False.
  • get_next: Determines the starting value for the next page of data, incrementing the current start value if a next page is indicated. Returns None when no further data is available.

~/tap-dreamland/tap_dreamland/auth.py will be following:

import requests
from pendulum import DateTime
from datetime import timedelta
from singer_sdk.authenticators import OAuthAuthenticator
from singer_sdk.streams.rest import RESTStream
from typing import Optional
from singer_sdk.helpers.jsonpath import utc_now
AUDIENCE = "APIURL"
AUTH_ENDPOINT = "OAuth2 authenticator url"
class DreamlandAuthenticator(OAuthAuthenticator):
   """Authenticator class for pliant."""
   def __init__(self, stream: RESTStream, auth_endpoint: Optional[str] = None, oauth_scopes: Optional[str] = None,
                default_expiration: Optional[int] = None, oauth_headers: Optional[dict] = None,
                environment: Optional[bool] = None):
       super().__init__(stream=stream)
       self._auth_endpoint = auth_endpoint
       self.default_expiration = default_expiration
       self._oauth_scopes = oauth_scopes
       self._oauth_headers = oauth_headers or {}
       self.access_token: Optional[str] = None
       self.expires_in: Optional[int] = None
       self.last_refreshed: Optional[DateTime] = None
       
   @property
   def auth_headers(self) -> dict:
       if not self.is_token_valid():
           self.update_access_token()
       result = super().auth_headers
       result["Authorization"] = f"Bearer {self.access_token}"
       return result
       
   @property
   def auth_endpoint(self) -> str:
       if not self._auth_endpoint:
           msg = "Authorization endpoint not set."
           raise ValueError(msg)
       return self._auth_endpoint
       
   @property
   def oauth_scopes(self) -> Optional[str]:
       return self._oauth_scopes
       
   @property
   def oauth_request_body(self) -> dict:
       return {
           'username': self.config.get("username"),
           'password': self.config.get("password"),
           'audience': AUDIENCE,
           'grant_type': "user_grant_type"
       }
       
   @property
   def username(self) -> Optional[str]:
       return self.config.get("username") if self.config else None
       
   @property
   def password(self) -> Optional[str]:
       return self.config.get("password") if self.config else None
   def is_token_valid(self) -> bool:
       if self.last_refreshed is None or not self.expires_in:
           return False
       return self.expires_in > (utc_now() - self.last_refreshed).total_seconds()
   def update_access_token(self) -> None:
       request_time = utc_now()
       auth_request_payload = self.oauth_request_body
       token_response = requests.post(
           self.auth_endpoint, headers=self._oauth_headers, data=auth_request_payload, timeout=60
       )
       try:
           token_response.raise_for_status()
       except requests.HTTPError as ex:
           msg = f"Failed OAuth login, response was: {token_response.json()} - {ex}"
           raise RuntimeError(msg) from ex
       token_json = token_response.json()
       self.access_token = token_json["access_token"]
       expiration = token_json.get("expires_in", self.default_expiration)
       self.expires_in = int(expiration) if expiration else None
       self.last_refreshed = request_time
   @classmethod
   def create_for_stream(cls, stream) -> 'DreamlandAuthenticator':
       """Instantiate an authenticator for a specified Singer stream."""
       return cls(stream=stream, auth_endpoint=AUTH_ENDPOINT)

1. Imports: Necessary libraries for HTTP requests, date/time handling, and Singer SDK functionalities are imported.

2. Constants: `AUDIENCE` and `AUTH_ENDPOINT` specify the OAuth audience value and the authentication endpoint URL.

3. DreamlandAuthenticator Class:

   - Initialization: Sets up the authenticator with optional parameters like the authentication endpoint, OAuth scopes, default token expiration, and headers.

   - `auth_headers` Property: Generates authentication headers, updating the access token if it's invalid.

   - `auth_endpoint`, `oauth_scopes` Properties: Return the authentication endpoint and OAuth scopes, respectively.

   - `oauth_request_body` Property: Constructs the OAuth token request payload.

   - `username`, `password` Properties: Fetch user credentials from the configuration.

   - `is_token_valid` Method: Checks if the current access token is valid.

   - `update_access_token` Method: Refreshes the access token when it's invalid or expired.

   - `create_for_stream` Class Method: Instantiates the authenticator for a given Singer stream with predefined settings.