In addition to password-based authentication, you can authenticate with Snowflake using a key pair. To set up private key authentication for your Snowflake account, see the instructions in the Snowflake docs. Currently, the Snowflake I/O manager only supports encrypted private keys.
You can provide the private key directly to the Snowflake I/O manager, or via a file containing the private key.
from dagster_snowflake_pandas import snowflake_pandas_io_manager
from dagster import Definitions
defs = Definitions(
assets=[iris_dataset],
resources={"io_manager": snowflake_pandas_io_manager.configured({"account":"abc1234.us-east-1","user":{"env":"SNOWFLAKE_USER"},"private_key":{"env":"SNOWFLAKE_PK"},"private_key_password":{"env":"SNOWFLAKE_PK_PASSWORD"},"database":"FLOWERS",})},)
from dagster_snowflake_pandas import snowflake_pandas_io_manager
from dagster import Definitions
defs = Definitions(
assets=[iris_dataset],
resources={"io_manager": snowflake_pandas_io_manager.configured({"account":"abc1234.us-east-1","user":{"env":"SNOWFLAKE_USER"},"private_key_path":"/path/to/private/key/file.p8","private_key_password":{"env":"SNOWFLAKE_PK_PASSWORD"},"database":"FLOWERS",})},)
Sometimes you may not want to fetch an entire table as the input to a downstream asset. With the Snowflake I/O manager, you can select specific columns to load by supplying metadata on the downstream asset.
import pandas as pd
from dagster import AssetIn, asset
# this example uses the iris_dataset asset from Step 2 of the Using Dagster with Snowflake tutorial@asset(
ins={"iris_sepal": AssetIn(
key="iris_dataset",
metadata={"columns":["Sepal length (cm)","Sepal width (cm)"]},)})defsepal_data(iris_sepal: pd.DataFrame)-> pd.DataFrame:
iris_sepal["Sepal area (cm2)"]=(
iris_sepal["Sepal length (cm)"]* iris_sepal["Sepal width (cm)"])return iris_sepal
In this example, we only use the columns containing sepal data from the IRIS_DATASET table created in Step 2: Create tables in Snowflake of the Using Dagster with Snowflake tutorial. Fetching the entire table would be unnecessarily costly, so to select specific columns, we can add metadata to the input asset. We do this in the metadata parameter of the AssetIn that loads the iris_dataset asset in the ins parameter. We supply the key columns with a list of names of the columns we want to fetch.
When Dagster materializes sepal_data and loads the iris_dataset asset using the Snowflake I/O manager, it will only fetch the Sepal length (cm) and Sepal width (cm) columns of the FLOWERS.IRIS.IRIS_DATASET table and pass them to sepal_data as a Pandas DataFrame.
The Snowflake I/O manager supports storing and loading partitioned data. In order to correctly store and load data from the Snowflake table, the Snowflake I/O manager needs to know which column contains the data defining the partition bounds. The Snowflake I/O manager uses this information to construct the correct queries to select or replace the data. In the following sections, we describe how the I/O manager constructs these queries for different types of partitions.
In order to store static partitioned assets in Snowflake, you must specify partition_expr metadata on the asset to tell the Snowflake I/O manager which column contains the partition data:
Dagster uses the partition_expr metadata to craft the SELECT statement when loading the partition in the downstream asset. When loading a static partition (or multiple static partitions), the following statement is used:
When the partition_expr value is injected into this statement, the resulting SQL query must follow Snowflake's SQL syntax. Refer to the Snowflake documentation for more information.
When materializing the above assets, a partition must be selected, as described in Materializing partitioned assets. In this example, the query used when materializing the Iris-setosa partition of the above assets would be:
Like static partitioned assets, you can specify partition_expr metadata on the asset to tell the Snowflake I/O manager which column contains the partition data:
import pandas as pd
from dagster import DailyPartitionsDefinition, asset
@asset(
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"),
metadata={"partition_expr":"TO_TIMESTAMP(TIME::INT)"},)defiris_data_per_day(context)-> pd.DataFrame:
partition = context.asset_partition_key_for_output()# get_iris_data_for_date fetches all of the iris data for a given date,# the returned dataframe contains a column named 'time' with that stores# the time of the row as an integer of seconds since epochreturn get_iris_data_for_date(partition)@assetdefiris_cleaned(iris_data_per_day: pd.DataFrame):return iris_data_per_day.dropna().drop_duplicates()
Dagster uses the partition_expr metadata to craft the SELECT statement when loading the correct partition in the downstream asset. When loading a dynamic partition, the following statement is used:
When the partition_expr value is injected into this statement, the resulting SQL query must follow Snowflake's SQL syntax. Refer to the Snowflake documentation for more information.
When materializing the above assets, a partition must be selected, as described in Materializing partitioned assets. The [partition_start] and [partition_end] bounds are of the form YYYY-MM-DD HH:MM:SS. In this example, the query when materializing the 2023-01-02 partition of the above assets would be:
In this example, the data in the TIME column are integers, so the partition_expr metadata includes a SQL statement to convert integers to timestamps. A full list of Snowflake functions can be found here.
The Snowflake I/O manager can also store data partitioned on multiple dimensions. To do this, you must specify the column for each partition as a dictionary of partition_expr metadata:
import pandas as pd
from dagster import(
DailyPartitionsDefinition,
MultiPartitionsDefinition,
StaticPartitionDefinition,
asset,)@asset(
partitions_def=MultiPartitionsDefinition({"date": DailyPartitionsDefinition(start_date="2023-01-01"),"species": StaticPartitionDefinition(["Iris-setosa","Iris-virginica","Iris-versicolor"]),}),
metadata={"partition_expr":{"date":"TO_TIMESTAMP(TIME::INT)","species":"SPECIES"}},)defiris_dataset_partitioned(context)-> pd.DataFrame:
partition = partition = context.partition_key.keys_by_dimension
species = partition["species"]
date = partition["date"]# get_iris_data_for_date fetches all of the iris data for a given date,# the returned dataframe contains a column named 'time' with that stores# the time of the row as an integer of seconds since epoch
full_df = get_iris_data_for_date(date)return full_df[full_df["Species"]== species]@assetdefiris_cleaned(iris_dataset_partitioned: pd.DataFrame):return iris_dataset_partitioned.dropna().drop_duplicates()
Dagster uses the partition_expr metadata to craft the SELECT statement when loading the correct partition in a downstream asset. For multi-partitions, Dagster concatenates the WHERE statements described in the above sections to craft the correct SELECT statement.
When materializing the above assets, a partition must be selected, as described in Materializing partitioned assets. For example, when materializing the 2023-01-02|Iris-setosa partition of the above assets, the following query will be used:
SELECT*WHERE SPECIES in('Iris-setosa')AND TO_TIMESTAMP(TIME::INT)>='2023-01-02 00:00:00'AND TO_TIMESTAMP(TIME::INT)<'2023-01-03 00:00:00'
You may want to have different assets stored in different Snowflake schemas. The Snowflake I/O manager allows you to specify the schema in several ways.
In this example, the iris_dataset asset will be stored in the IRIS schema, and the daffodil_dataset asset will be found in the DAFFODIL schema.
The two options for specifying schema are mutually exclusive. If you provide schema configuration to the I/O manager, you cannot also provide it via the asset key and vice versa. If no schema is provided, either from configuration or asset keys, the default schema PUBLIC will be used.
Due to a longstanding bug in the Snowflake Pandas connector, loading timestamp data from a Pandas DataFrame to Snowflake sometimes causes the data to be corrupted. Prior to dagster-snowflake version 0.19.0 we solved this issue by converting all timestamp data to strings before loading the data in Snowflake, and doing the opposite conversion when fetching a DataFrame from Snowflake. However, we can also avoid this issue by ensuring that all timestamp data has a timezone. This allows us to store the data as TIMESTAMP_NTZ(9) type in Snowflake.
To specify how you would like timestamp data to be handled, use the time_data_to_string configuration value for the Snowflake I/O manager. If True, the I/O manager will convert timestamp data to a string before loading it into Snowflake. If False the I/O manager will ensure the data has a timezone (attaching the UTC timezone if necessary) before loading it into Snowflake.
If you would like to migrate a table created prior to 0.19.0 to one with a TIMESTAMP_NTZ(9) type, you can run the follow SQL queries in Snowflake. In the example, our table is located at database.schema.table and the column we want to migrate is called time:
// Add a column of type TIMESTAMP_NTZ(9)ALTERTABLEdatabase.schema.tableADDCOLUMN time_copy TIMESTAMP_NTZ(9)// copy the data from time and convert to timestamp dataUPDATEdatabase.schema.tableSET time_copy = to_timestamp_ntz(time)// drop the time columnALTERTABLEdatabase.schema.tableDROPCOLUMNtime// rename the time_copy column to timeALTER TABLER database.schema.tableRENAMECOLUMN time_copy TOtime
The time_data_to_string configuration value will be deprecated in version X.Y.Z of the dagster-snowflake library.
Using the Snowflake I/O manager with other I/O managers#
You may have assets that you don't want to store in Snowflake. You can provide an I/O manager to each asset using the io_manager_key parameter in the asset decorator:
import pandas as pd
from dagster_aws.s3.io_manager import s3_pickle_io_manager
from dagster_snowflake_pandas import snowflake_pandas_io_manager
from dagster import Definitions, asset
@asset(io_manager_key="warehouse_io_manager")defiris_dataset()-> pd.DataFrame:return pd.read_csv("https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data",
names=["Sepal length (cm)","Sepal width (cm)","Petal length (cm)","Petal width (cm)","Species",],)@asset(io_manager_key="blob_io_manager")defiris_plots(iris_dataset):# plot_data is a function we've defined somewhere else# that plots the data in a DataFramereturn plot_data(iris_dataset)
defs = Definitions(
assets=[iris_dataset, iris_plots],
resources={"warehouse_io_manager": snowflake_pandas_io_manager.configured({"database":"FLOWERS","schema":"IRIS","account":"abc1234.us-east-1","user":{"env":"SNOWFLAKE_USER"},"password":{"env":"SNOWFLAKE_PASSWORD"},}),"blob_io_manager": s3_pickle_io_manager,},)
In this example, the iris_dataset asset uses the I/O manager bound to the key warehouse_io_manager and iris_plots will use the I/O manager bound to the key blob_io_manager. In the Definitions object, we supply the I/O managers for those keys. When the assets are materialized, the iris_dataset will be stored in Snowflake, and iris_plots will be saved in Amazon S3.
Storing and loading PySpark DataFrames in Snowflake#
The Snowflake I/O manager also supports storing and loading PySpark DataFrames. To use the snowflake_pyspark_io_manager, first install the package:
from dagster_snowflake_pyspark import snowflake_pyspark_io_manager
from dagster import Definitions
defs = Definitions(
assets=[iris_dataset],
resources={"io_manager": snowflake_pyspark_io_manager.configured({"account":"abc1234.us-east-1",# required"user":{"env":"SNOWFLAKE_USER"},# required"password":{"env":"SNOWFLAKE_PASSWORD"},# required"database":"FLOWERS",# required"warehouse":"PLANTS",# required for pyspark"role":"writer",# optional, defaults to the default role for the account"schema":"IRIS",# optional, defaults to PUBLIC})},)
When using the snowflake_pyspark_io_manager the warehouse configuration is required.
The snowflake_pyspark_io_manager requires that a SparkSession be active and configured with the Snowflake connector for Spark. You can either create your own SparkSession or use the spark_resource.
Using Pandas and PySpark DataFrames with Snowflake#
If you work with both Pandas and PySpark DataFrames and want a single I/O manager to handle storing and loading these DataFrames in Snowflake, you can construct a Snowflake I/O manager using build_snowflake_io_manager:
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake_pandas import SnowflakePandasTypeHandler
from dagster_snowflake_pyspark import SnowflakePySparkTypeHandler
from dagster import Definitions
snowflake_io_manager = build_snowflake_io_manager([SnowflakePandasTypeHandler(), SnowflakePySparkTypeHandler()])
defs = Definitions(
assets=[iris_dataset, rose_dataset],
resources={"io_manager": snowflake_io_manager.configured({"account":"abc1234.us-east-1","user":{"env":"SNOWFLAKE_USER"},"password":{"env":"SNOWFLAKE_PASSWORD"},"database":"FLOWERS","warehouse":"PLANTS","schema":"IRIS",})},)
Executing custom SQL commands with the Snowflake resource#
In addition to the Snowflake I/O manager, Dagster also provides a Snowflake resource for executing custom SQL queries.
from dagster_snowflake import snowflake_resource
from dagster import Definitions, asset
# this example executes a query against the IRIS_DATASET table created in Step 2 of the# Using Dagster with Snowflake tutorial@asset(required_resource_keys={"snowflake"})defsmall_petals(context):return context.resources.snowflake.execute_query(('SELECT * FROM IRIS_DATASET WHERE "Petal length (cm)" < 1 AND "Petal width'' (cm)" < 1'),
fetch_results=True,
use_pandas_result=True,)
defs = Definitions(
assets=[small_petals],
resources={"snowflake": snowflake_resource.configured({"account":"abc1234.us-east-1","user":{"env":"SNOWFLAKE_USER"},"password":{"env":"SNOWFLAKE_PASSWORD"},"database":"FLOWERS","schema":"IRIS,",})},)
In this example, we attach the Snowflake resource to the small_petals asset. In the body of the asset function, we use the execute_query method of the resource to execute a custom SQL query against the IRIS_DATASET table created in Step 2: Create tables in Snowflake of the Using Dagster with Snowflake tutorial.
For more information on the Snowflake resource, including additional configuration settings, see the Snowflake resource API docs.